source: trunk/grails-app/controllers/nl/tno/massSequencing/files/ImportController.groovy @ 74

Last change on this file since 74 was 74, checked in by robert@…, 8 years ago
  • Several bugfixes
  • Added an extra step in the worker process for importing data
File size: 20.1 KB
Line 
1package nl.tno.massSequencing.files
2
3import org.codehaus.groovy.grails.commons.ConfigurationHolder
4import org.hibernate.SessionFactory
5import grails.converters.*;
6import nl.tno.massSequencing.*
7
8class ImportController {
9        def fileService
10        def fastaService
11        def importService
12        def classificationService
13        def sessionFactory
14        def workerService
15       
16        /**************************************************************************
17         *
18         * Methods for handling uploaded sequence, quality and classification files
19         *
20         *************************************************************************/
21
22        /**
23         * Shows a screen to indicate that files are being parsed
24         */
25        def parseUploadedFiles = {
26                def entityType = params.entityType
27                def entityId = params.id
28               
29                // Check whether files are given
30                def names = [] + params.list( 'sequencefiles' )
31
32                if( !names ) {
33                        flash.message = "No files uploaded for processing"
34                        if( params.entityType && params.id)
35                                redirect( controller: params.entityType, action: 'show', 'id': params.id)
36                        else
37                                redirect( url: "" )
38                               
39                        return
40                }
41               
42                // Check for total size of the files in order to be able
43                // to show a progress bar
44                long filesize = 0;
45                names.each {
46                        filesize += fileService.get( it )?.length()
47                }
48
49                // Create a unique process identifier
50                String processId = workerService.initProcess( session, "Parsing files", 1, filesize ); 
51                                       
52                session.process[ processId ].filenames = names;
53                session.process[ processId ].entityId = entityId;
54                session.process[ processId ].entityType = entityType
55               
56                // Retrieve worker URL
57                def finishUrl = createLink( controller: "import", action: 'parseUploadResult', params: [ processId: processId ] ).toString();
58                def returnUrl = createLink( controller: entityType, action: "show", id: entityId ).toString();
59               
60                def url = workerService.startProcess( session, processId, finishUrl, returnUrl )
61               
62                //
63                // Initiate work
64                //
65               
66                /* Parses uploaded files, discards files we can not handle
67                *
68                * [
69                *               success: [
70                *                       [filename: 'abc.fasta', type: FASTA, numSequences: 190]
71                *                       [filename: 'cde.fasta', type: FASTA, numSequences: 140]
72                *                       [filename: 'abc.qual', type: QUAL, numSequences: 190, avgQuality: 38]
73                *                       [filename: 'cde.qual', type: QUAL, numSequences: 140, avgQuality: 29]
74                *               ],
75                *               failure: [
76                *                       [filename: 'testing.doc', message: 'Type not recognized']
77                *               ]
78                * ]
79                *
80                * The second parameter is a callback function to update progress indicators
81                */
82           def httpSession = session;
83           def onProgress = { progress, total ->
84                   // Update progress
85                   httpSession.progress[ processId ].stepTotal = total;
86                   httpSession.progress[ processId ].stepProgress = progress;
87           }
88           def newStep = { total, description ->
89                   // Start a new step
90                   httpSession.progress[ processId ].stepTotal = total;
91                   httpSession.progress[ processId ].stepProgress = 0;
92                   
93                   httpSession.progress[ processId ].stepDescription = description;
94                   httpSession.progress[ processId ].stepNum++;
95           }
96
97           // Perform the actual computations asynchronously
98           runAsync {
99                   def entity
100                   
101                   // Determine entity and assaysamples
102                   switch( httpSession.process[ processId ].entityType ) {
103                           case "run":
104                                   entity = getRun( httpSession.process[ processId ].entityId );
105                                   break;
106                           case "assay":
107                                   entity = getAssay( httpSession.process[ processId ].entityId );
108                                   break;
109                           default:
110                                   httpSession.progress[ processId ].error = true;
111                                   httpSession.progress[ processId ].finished = true;
112                                   return;
113                   }
114                   
115                   if (!entity) {
116                           httpSession.progress[ processId ].error = true;
117                           httpSession.progress[ processId ].finished = true;
118                           return;
119                   }
120   
121                   def assaySamples = entity.assaySamples.findAll { it.assay.study.canWrite( httpSession.user ) };
122                   
123                   def parsedFiles = importService.parseFiles( names, onProgress, [progress: 0, total: httpSession.progress[ processId ].stepTotal ], newStep );
124                   
125                   // Determine excel matches from the uploaded files
126                   parsedFiles.success = fastaService.inferExcelMatches( parsedFiles.success );
127                   
128                   // Match files with samples in the database
129                   def matchedFiles = fastaService.matchFiles( parsedFiles.success, assaySamples );
130   
131                   // Sort files on filename
132                   matchedFiles.sort { a,b -> a.fasta?.originalfilename <=> b.fasta?.originalfilename }
133   
134                   // Retrieve all files that have not been matched
135                   def notMatchedFiles = parsedFiles.success.findAll {
136                           switch( it.type ) {
137                                   case "fasta":
138                                           return !matchedFiles*.fasta*.filename.contains( it.filename );
139                                   case "qual":
140                                           return !matchedFiles*.feasibleQuals.flatten().filename.contains( it.filename );
141                                   case "taxonomy":
142                                           return !matchedFiles*.feasibleClassifications.flatten().filename.contains( it.filename );
143                           }
144                           return false;
145                   }
146                   
147                   // Saved file matches in session to use them later on
148                   httpSession.process[ processId ].processedFiles = [ parsed: parsedFiles,  matched: matchedFiles, notMatched: notMatchedFiles ];
149                   
150                   // Check whether quality, classification or logfiles have been added
151                   def types = [ "fasta", "qual", "taxonomy", "logfile" ];
152                   def typesExist = [:]
153                   types.each { type -> typesExist[ type ] = parsedFiles.success.any { it.type == type } }
154                   
155                   httpSession.process[ processId ].fileTypes = typesExist
156                   
157                   // Tell the frontend we are finished
158                   httpSession.progress[ processId ].finished = true;
159           }
160               
161                redirect( url: url );
162        }
163       
164        /**
165         * Show result of processing uploaded files (step 1)
166         */
167        def parseUploadResult = {
168                def processId = params.processId;
169                // load study with id specified by param.id
170                def entity
171               
172                switch( session.process[ processId ].entityType ) {
173                        case "run":
174                                entity = getRun( session.process[ processId ].entityId )
175                                break;
176                        case "assay":
177                                entity = getAssay( session.process[ processId ].entityId )
178                                break;
179                        default:
180                                response.setStatus( 404, "No entity found" );
181                                render "";
182                                return;
183                }
184               
185                def assaySamples = entity.assaySamples.findAll { it.assay.study.canWrite( session.user ) };
186               
187                if (!entity) {
188                        response.setStatus( 404, flash.error )
189                        render "";
190                        return
191                }
192               
193                if( !session.process[ processId ].processedFiles ) {
194                        flash.error = "Processing of files failed. Maybe the session timed out."
195                        redirect( controller: params.entityType, action: 'show', 'id': params.id)
196                        return
197                }
198               
199                // Find matching sequenceData objects for taxonomyfiles that have not been matched
200                def notMatchedFiles = session.process[ processId ].processedFiles.notMatched;
201                def extraClassifications = notMatchedFiles.findAll { it.type == "taxonomy" }
202                extraClassifications.collect {
203                        // Find all sequence files that have the correct number of sequences and are in the list of assaySamples
204                        it[ 'feasibleSequenceData' ] = SequenceData.findAllByNumSequences( it.numLines ).findAll { assaySamples.contains( it.sample ) }
205                        return it
206                }
207               
208                [       entityType: session.process[ processId ].entityType, processId: processId, entity: entity, 
209                        parsedFiles: session.process[ processId ].processedFiles.parsed, 
210                        matchedFiles: session.process[ processId ].processedFiles.matched, 
211                        remainingClassificationFiles: extraClassifications,
212                        existingTypes: session.process[ processId ].fileTypes,
213                        selectedRun: params.selectedRun ]
214        }
215
216        /**
217         * Returns from the upload wizard without saving the data. The uploaded files are removed
218         */
219        def returnWithoutSaving = {
220                def processId = params.processId;
221                def entityType = session.process[ processId ].entityType;
222                def entityId = session.process[ processId ].entityId;
223               
224                // Delete all uploaded files from disk
225                session.process[ processId ]?.processedFiles?.parsed?.success?.each {
226                        fileService.delete( it.filename );
227                }
228               
229                // Clear process from session
230                workerService.clearProcess( session, processId );
231
232                // Redirect to the correct controller           
233                switch( entityType ) {
234                        case "run":
235                        case "assay":
236                                redirect( controller: entityType, action: "show", id: entityId );
237                                return;
238                        default:
239                                response.setStatus( 404, "No entity found" );
240                                render "";
241                                return;
242                }
243               
244               
245        }
246       
247        /**
248         * Shows a screen with the progress of saving matched files
249         */
250        def saveMatchedFiles = {
251                def processId = params.processId
252
253                def entityType = session.process[ processId ].entityType
254                def entityId = session.process[ processId ].entityId
255               
256                session.process[ processId ].matchedFiles = params.file
257                session.process[ processId ].matchedRemainingClassification = params.remainingClassification
258               
259                // Check for total size of the classification files in order to be able
260                // to show a progress bar. The handling of classification files is orders
261                // of magnitude bigger than the rest, so we only show progress of those files
262                long filesize = 0;
263
264                // Loop through all files. Those are the numeric elements in the 'files' array
265                def digitRE = ~/^\d+$/;
266                params.file.findAll { it.key.matches( digitRE ) }.each { file ->
267                        def filevalue = file.value;
268                       
269                        // Check if the file is selected
270                        if( filevalue.include == "on" ) {
271                                if( fileService.fileExists( filevalue.fasta ) ) {
272                                        // Also save classification data for this file, if it is present
273                                        if( filevalue.classification ) {
274                                                filesize += fileService.get( filevalue.classification )?.size()
275                                        }
276                                }
277                        }
278                }
279                params.remainingClassification.findAll { it.key.matches( digitRE ) }.each { file ->
280                        def filevalue = file.value;
281                       
282                        // Check if the file is selected
283                        if( filevalue.include == "on" ) {
284                                if( fileService.fileExists( filevalue.filename ) ) {
285                                        // Also save classification data for this file, if it is present
286                                        filesize += fileService.get( filevalue.filename )?.size()
287                                }
288                        }
289                }
290
291                // Clear old process, but save useful data
292                def processInfo = session.process[ processId ]
293                workerService.clearProcess( session, processId );
294               
295                // Create a new unique process identifier
296                processId = workerService.initProcess( session, "Store sequence data and classification", 2, filesize );
297               
298                session.process[ processId ] = processInfo;
299               
300                // Retrieve worker URL
301                def finishUrl = createLink( controller: "import", action: 'saveMatchedResult', params: [ processId: processId ] ).toString();
302                def returnUrl = createLink( controller: entityType, action: "show", entityId ).toString();
303               
304                def url = workerService.startProcess( session, processId, finishUrl, returnUrl )
305               
306                //
307                // Initiate work
308                //
309                // Check whether files are given
310                def files = session.process[ processId ].matchedFiles
311                def remainingClassification = session.process[ processId ].matchedRemainingClassification;
312               
313                if( !files && !remainingClassification ) {
314                        flash.message = "No files were selected for import."
315                        redirect( controller: session.process[ processId ].entityType, action: 'show', 'id': session.process[ processId ].entityId)
316                        return
317                }
318
319                File permanentDir = fileService.absolutePath( ConfigurationHolder.config.massSequencing.fileDir )
320               
321                // This closure enables keeping track of the progress
322                def httpSession = session;
323                def onProgress = { progress ->
324                        // Update progress
325                        httpSession.progress[ processId ].stepProgress += progress;
326                }
327               
328                // Run the computations asynchronously, since it takes a lot of time
329                runAsync {
330                        // Loop through all FASTA files. Those are the numeric elements in the 'files' array
331                        def fastaReturn = saveMatchedFastaFiles( files, httpSession.process[ processId ]?.processedFiles, onProgress );
332                        def classificationReturn = saveRemainingClassificationFiles( remainingClassification, onProgress );
333                       
334                        // Update classification (summary) for updated samples
335                        def samplesClassified = [] + fastaReturn.samplesClassified + classificationReturn.samplesClassified;
336                        def uniqueSamples = samplesClassified.findAll { it }.unique();
337
338                        // Now all classification files have been parsed, start a new step. This might take a while, so
339                        // the progress should be shown.
340                        workerService.nextStep( httpSession, processId, "Updating classification statistics in database", uniqueSamples.size() );
341                        classificationService.updateClassificationForAssaySamples( uniqueSamples, onProgress )
342                       
343                        def returnStructure = [
344                                numSequenceFiles: fastaReturn.numSequenceFiles,
345                                numQualFiles: fastaReturn.numQualFiles,
346                                numClassificationFiles: fastaReturn.numClassificationFiles,
347                                numLogFiles: fastaReturn.numLogFiles,
348                                numExtraClassificationFiles: classificationReturn.numExtraClassifications,
349                                numTotal: fastaReturn.numSequenceFiles + classificationReturn.numExtraClassifications,
350                                errors: [] + fastaReturn.errors + classificationReturn.errors
351                        ]
352                       
353                        // Return all files that have not been moved
354                        httpSession.process[ processId ]?.processedFiles?.parsed?.success?.each {
355                                fileService.delete( it.filename );
356                        }
357                       
358                        httpSession.process[ processId ].result = returnStructure;
359                       
360                        // Tell the frontend we are finished
361                        httpSession.progress[ processId ].finished = true;
362       
363                }
364               
365                redirect( url: url );
366        }
367       
368        def saveMatchedFastaFiles( def files, processedFiles, Closure onProgress ) {
369                int numSuccesful = 0;
370                int numQualFiles = 0;
371                int numClassificationFiles = 0;
372                int numLogFiles = 0;
373                def samplesClassified = [];
374                def errors = [];
375
376                def digitRE = ~/^\d+$/;
377                files.findAll { it.key.matches( digitRE ) }.each { file ->
378                        def filevalue = file.value;
379                       
380                        // Check if the file is selected
381                        if( filevalue.include == "on" ) {
382                                if( fileService.fileExists( filevalue.fasta ) ) {
383                                        try {
384                                                def permanent = fastaService.savePermanent( filevalue.fasta, filevalue.qual, filevalue.logfile, processedFiles );
385                                               
386                                                // Save the data into the database
387                                                SequenceData sd = new SequenceData();
388                                               
389                                                sd.sequenceFile = permanent.fasta
390                                                sd.qualityFile = permanent.qual
391                                                sd.logFile = permanent.logfile
392                                                sd.numSequences = permanent.numSequences
393                                                sd.averageQuality = permanent.avgQuality
394                                                       
395                                                def sample = AssaySample.get( filevalue.assaySample );
396                                                if( sample ) {
397                                                        sample.addToSequenceData( sd );
398                                                       
399                                                        AssaySample.recalculateNumSequences( sample );
400                                                }
401                                               
402                                                if( !sd.validate() ) {
403                                                        errors << "an error occurred while saving " + filevalue.fasta + ": validation of SequenceData failed.";
404                                                } else {
405                                                        sd.save(flush:true);
406                                                       
407                                                        // Also save classification data for this file, if it is present
408                                                        if( filevalue.classification ) {
409                                                                classificationService.storeClassification( filevalue.classification, sd, onProgress )
410                                                                samplesClassified << sample
411                                                               
412                                                                numClassificationFiles++;
413                                                        }
414                                                       
415                                                        if( sd.qualityFile )
416                                                                numQualFiles++;
417                                                       
418                                                        if( sd.logFile )
419                                                                numLogFiles++;
420                                                       
421                                                        numSuccesful++;
422                                                }
423                                        } catch( Exception e ) {
424                                                e.printStackTrace();
425                                                errors << "an error occurred while saving " + filevalue.fasta + ": " + e.getMessage()
426                                        }
427                                }
428                        } else {
429                                // File doesn't need to be included in the system. Delete it also from disk
430                                fileService.delete( filevalue.fasta );
431                        }
432                }
433               
434                return [ numSequenceFiles: numSuccesful, numQualFiles: numQualFiles, numClassificationFiles: numClassificationFiles, numLogFiles: numLogFiles, errors: errors, samplesClassified: samplesClassified.unique() ]
435        }
436       
437        def saveRemainingClassificationFiles( def files, Closure onProgress ) {
438                def digitRE = ~/^\d+$/;
439                def errors = [];
440                def samplesClassified = [];
441                def numSuccesful = 0;
442               
443                files.findAll { it.key.matches( digitRE ) }.each { file ->
444                        def filevalue = file.value;
445                       
446                        // Check if the file is selected
447                        if( filevalue.include == "on" ) {
448                                if( fileService.fileExists( filevalue.filename ) ) {
449                                        def sequenceDataId = filevalue.sequenceData;
450                                        try {
451                                                if( sequenceDataId.toString().isLong() ) {
452                                                        // Retrieve sequenceData and sample now, because the session will be cleared during import
453                                                        def sequenceData = SequenceData.get( sequenceDataId.toString().toLong() );
454                                                        def sample = sequenceData.sample;
455                                                       
456                                                        if( sequenceData ) {
457                                                                classificationService.removeClassificationForSequenceData( sequenceData );
458                                                                classificationService.storeClassification( filevalue.filename, sequenceData, onProgress )
459                                                                samplesClassified << sample;
460                                                        }
461       
462                                                        numSuccesful++;
463                                                } else {
464                                                        errors << "a wrong ID is entered for classification file " + filevalue.filename;
465                                                }
466                                        } catch( Exception e ) {
467                                                e.printStackTrace();
468                                                errors << "an error occurred while saving " + filevalue.filename + ": " + e.getMessage()
469                                        }
470                                }
471                        }
472                       
473                        // File doesn't need to be included in the system. Delete it from disk.
474                        fileService.delete( filevalue.filename );
475                }
476               
477                return [ numExtraClassifications: numSuccesful, errors: errors, samplesClassified: samplesClassified.unique()  ]
478               
479        }
480
481        /**
482         * Redirects the user back to the start screen with a message about how things went
483         */
484        def saveMatchedResult = {
485                def processId = params.processId
486               
487                def result = session.process[ processId ].result 
488
489                // Return a message to the user
490                if( result.numTotal == 0 ) {
491                       
492                        if( result.errors.size() > 0 ) {
493                                flash.error = "None of the files were imported, because "
494                                result.errors.each {
495                                        flash.error += "<br />- " + it
496                                }
497                        } else {
498                                flash.message = "None of the files were imported, because none of the files were selected for import."
499                        }
500                } else {
501                        flash.message = ""     
502                        if( result.numSequenceFiles == 1 ) {
503                                flash.message += result.numSequenceFiles + " sequence file has been added to the system"
504                        } else if( result.numSequenceFiles > 1 ) {
505                                flash.message += result.numSequenceFiles + " sequence files have been added to the system"
506                        }
507                       
508                        if( result.numQualFiles > 0 || result.numClassificationFiles > 0 || result.numLogFiles > 0 ) {
509                                flash.message += ", with";
510                        }
511                       
512                        if( result.numQualFiles == 1 ) {
513                                flash.message += " 1 quality file"
514                        } else if( result.numQualFiles > 1 ) {
515                                flash.message += " " + result.numQualFiles + " quality files"
516                        }
517                       
518                        if( result.numQualFiles > 0 && ( result.numClassificationFiles > 0 || result.numLogFiles > 0 ) ) {
519                                flash.message += " and";
520                        }
521                       
522                        if( result.numClassificationFiles == 1 ) {
523                                flash.message += " 1 classification file"
524                        } else if( result.numClassificationFiles > 1 ) {
525                                flash.message += " " + result.numClassificationFiles + " classification files"
526                        }
527
528                        if( ( result.numQualFiles > 0 || result.numClassificationFiles > 0 ) && result.numLogFiles > 0 ) {
529                                flash.message += " and";
530                        }
531                       
532                        if( result.numLogFiles == 1 ) {
533                                flash.message += " 1 log file"
534                        } else if( result.numLogFiles > 1 ) {
535                                flash.message += " " + result.numLogFiles + " log files"
536                        }
537                                               
538                        if( flash.message ) 
539                                flash.message += "."
540
541                        if( result.numExtraClassificationFiles == 1 ) {
542                                flash.message += result.numExtraClassificationFiles + " additional classification file has been read. ";
543                        } else if( result.numExtraClassificationFiles > 1 ) {
544                                flash.message += result.numExtraClassificationFiles + " additional classification files have been read. ";
545                        }
546
547                        if( result.errors.size() > 0 ) {
548                                flash.error = "However, " + result.errors.size() + " errors occurred during import: "
549                                result.errors.each {
550                                        flash.error += "<br />- " + it
551                                }
552                        }
553                }
554
555                // Determine where to redirect the user to
556                def entityType = session.process[ processId ].entityType;
557                def entityId = session.process[ processId ].entityId;
558                               
559                // Clear session
560                workerService.clearProcess( session, processId );
561               
562                // Redirect user
563                redirect( controller: entityType, action: "show", id: entityId )
564        }
565
566        protected Assay getAssay(def assayId) {
567                // load assay with id specified by param.id
568                def assay
569                try {
570                        assay = Assay.get(assayId as Long)
571                } catch( Exception e ) {
572                        flash.error = "Incorrect id given: " + assayId
573                        return null
574                }
575
576                if (!assay) {
577                        flash.error = "No assay found with id: " + assayId
578                        return null
579                }
580               
581                if (!assay.study.canRead( session.user ) ) {
582                        flash.error = "You don't have the right authorizaton to access assay " + assay.name
583                        return null
584                }
585               
586                return assay
587        }
588       
589        protected Run getRun(def runId) {
590                // load run with id specified by param.id
591                def run
592                try {
593                        run = Run.get(runId as Long)
594                } catch( Exception e ) {
595                        flash.error = "Incorrect id given: " + runId
596                        return null
597                }
598
599                if (!run) {
600                        flash.error = "No run found with id: " + runId
601                        return null
602                }
603
604                return run
605        }
606}
Note: See TracBrowser for help on using the repository browser.