Ignore:
Timestamp:
Jun 17, 2011, 1:54:56 PM (8 years ago)
Author:
robert@…
Message:
  • Installed templates (in order to extend session lifetime to 2 hours)
  • Implemented background worker to do work outside the HTTP request
File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/grails-app/controllers/nl/tno/massSequencing/files/ImportController.groovy

    r63 r70  
    1212        def classificationService
    1313        def sessionFactory
     14        def workerService
    1415       
    1516        /**************************************************************************
     
    2425        def parseUploadedFiles = {
    2526                def entityType = params.entityType
     27                def entityId = params.id
    2628               
    2729                // Check whether files are given
    28                 def names = params.list( 'sequencefiles' )
     30                def names = [] + params.list( 'sequencefiles' )
    2931
    3032                if( !names ) {
     
    3840                }
    3941               
    40                 // Create a unique process identifier
    41                 String processId = UUID.randomUUID().toString();
    42                                        
    43                 // Save filenames in session
    44                 if( !session.process )
    45                         session.process = [:]
    46                        
    47                 if( !session.process[ processId ] )
    48                         session.process[ processId ] = [:]
    49                          
    50                 session.process[ processId ].filenames = names;
    51                 session.process[ processId ].entityId = params.id;
    52                 session.process[ processId ].entityType = entityType
    53                
    5442                // Check for total size of the files in order to be able
    5543                // to show a progress bar
     
    5846                        filesize += fileService.get( it )?.length()
    5947                }
    60                
    61                 if( !session.progress )
    62                         session.progress = [:]
    63                
    64                 session.progress[ processId ] = [
    65                         stepNum: 1,
    66                         numSteps: 2,
    67                         stepDescription: 'Parsing files',       // Second step is Store classification
    68                        
    69                         stepProgress: 0,
    70                         stepTotal: filesize
    71                 ]
     48
     49                // Create a unique process identifier
     50                String processId = workerService.initProcess( session, "Parsing files", 2, filesize );
    7251                                       
    73                 render( view: 'showProcessScreen', model: [
    74                         processUrl: createLink( controller: "import", action: "processUploadedFiles" ),
    75                         processParameters: [ processId: processId, entityId: params.id, entityType: params.entityType ],
    76                         progressUrl: createLink( controller: "import", action: "getProgress", params: [ processId: processId ] ),
    77                         finishUrl: createLink( controller: "import", action: 'parseUploadResult', params: [ processId: processId, id: params.id, entityType: entityType] ),
    78                         errorUrl: createLink( controller: entityType, action: "show", id: params.id ),
    79                         entityId: params.id, entityType: params.entityType] );
    80         }
    81        
    82         /**
    83          * Processes uploaded files and tries to combine them with samples
    84          */
    85         def processUploadedFiles = {
    86                 def processId = params.processId
    87                 def entity
    88                
    89                 switch( params.entityType ) {
    90                         case "run":
    91                                 entity = getRun( params.entityId );
    92                                 break;
    93                         case "assay":
    94                                 entity = getAssay( params.entityId );
    95                                 break;
    96                         default:
    97                                 response.setStatus( 404, "No controller found" );
    98                                 render "";
    99                                 return;
    100                 }
    101                
    102                 def assaySamples = entity.assaySamples.findAll { it.assay.study.canWrite( session.user ) };
    103                
    104                 if (!entity) {
    105                         response.setStatus( 404, flash.error )
    106                         render "";
    107                         return
    108                 }
    109 
    110                 // Check whether files are given
    111                 def names = session.process[ processId ]?.filenames
    112 
    113                 if( !names ) {
    114                         println "Process ID: " + processId
    115                         session.process.each {
    116                                 println it.key + " = " + it.value;
    117                         }
    118                         response.setStatus( 500, "No files uploaded for processing" )
    119                         render "";
    120                         return
    121                 }
    122 
    123                 // If only 1 file is uploaded, it is given as String
    124                 ArrayList filenames = []
    125                 if( names instanceof String )
    126                         filenames << names
    127                 else
    128                         names.each { filenames << it }
    129 
     52                session.process[ processId ].filenames = names;
     53                session.process[ processId ].entityId = entityId;
     54                session.process[ processId ].entityType = entityType
     55               
     56                // Retrieve worker URL
     57                def finishUrl = createLink( controller: "import", action: 'parseUploadResult', params: [ processId: processId ] ).toString();
     58                def returnUrl = createLink( controller: entityType, action: "show", id: entityId ).toString();
     59               
     60                def url = workerService.startProcess( session, processId, finishUrl, returnUrl )
     61               
     62                //
     63                // Initiate work
     64                //
     65               
    13066                /* Parses uploaded files, discards files we can not handle
    131                  *
    132                  * [
    133                  *              success: [
    134                  *                      [filename: 'abc.fasta', type: FASTA, numSequences: 190]
    135                  *                      [filename: 'cde.fasta', type: FASTA, numSequences: 140]
    136                  *                      [filename: 'abc.qual', type: QUAL, numSequences: 190, avgQuality: 38]
    137                  *                      [filename: 'cde.qual', type: QUAL, numSequences: 140, avgQuality: 29]
    138                  *              ],
    139                  *              failure: [
    140                  *                      [filename: 'testing.doc', message: 'Type not recognized']
    141                  *              ]
    142                  * ]
    143                  *
    144                  * The second parameter is a callback function to update progress indicators
    145                  */
    146                 def httpSession = session;
    147                 def onProgress = { progress, total ->
    148                         // Update progress
    149                         httpSession.progress[ processId ].stepTotal = total;
    150                         httpSession.progress[ processId ].stepProgress = progress;
    151                 }
    152                 def newStep = { total, description ->
    153                         // Start a new step
    154                         httpSession.progress[ processId ].stepTotal = total;
    155                         httpSession.progress[ processId ].stepProgress = 0;
    156                        
    157                         httpSession.progress[ processId ].stepDescription = description;
    158                         httpSession.progress[ processId ].stepNum++;
    159                 }
    160 
    161                 def parsedFiles = importService.parseFiles( filenames, onProgress, [progress: 0, total: httpSession.progress[ processId ].stepTotal ], newStep );
    162                
    163                 // Determine excel matches from the uploaded files
    164                 parsedFiles.success = fastaService.inferExcelMatches( parsedFiles.success );
    165                
    166                 // Match files with samples in the database
    167                 def matchedFiles = fastaService.matchFiles( parsedFiles.success, assaySamples );
    168 
    169                 // Sort files on filename
    170                 matchedFiles.sort { a,b -> a.fasta?.originalfilename <=> b.fasta?.originalfilename }
    171 
    172                 // Retrieve all files that have not been matched
    173                 def notMatchedFiles = parsedFiles.success.findAll {
    174                         switch( it.type ) {
    175                                 case "fasta":
    176                                         return !matchedFiles*.fasta*.filename.contains( it.filename );
    177                                 case "qual":
    178                                         return !matchedFiles*.feasibleQuals.flatten().filename.contains( it.filename );
    179                                 case "taxonomy":
    180                                         return !matchedFiles*.feasibleClassifications.flatten().filename.contains( it.filename );
    181                         }
    182                         return false;
    183                 }
    184                
    185                 // Saved file matches in session to use them later on
    186                 session.process[ processId ].processedFiles = [ parsed: parsedFiles,  matched: matchedFiles, notMatched: notMatchedFiles ];
    187 
    188                 render ""
    189         }
    190 
    191         def getProgress = {
    192                 def processId = params.processId;
    193                 if( !processId || !session.progress?.getAt( processId ) ) {
    194                         response.setStatus( 500, "No progress information found" );
    195                         render ""
    196                         return
    197                 }
    198                
    199                 render session.progress[ processId ] as JSON
     67                *
     68                * [
     69                *               success: [
     70                *                       [filename: 'abc.fasta', type: FASTA, numSequences: 190]
     71                *                       [filename: 'cde.fasta', type: FASTA, numSequences: 140]
     72                *                       [filename: 'abc.qual', type: QUAL, numSequences: 190, avgQuality: 38]
     73                *                       [filename: 'cde.qual', type: QUAL, numSequences: 140, avgQuality: 29]
     74                *               ],
     75                *               failure: [
     76                *                       [filename: 'testing.doc', message: 'Type not recognized']
     77                *               ]
     78                * ]
     79                *
     80                * The second parameter is a callback function to update progress indicators
     81                */
     82           def httpSession = session;
     83           def onProgress = { progress, total ->
     84                   // Update progress
     85                   httpSession.progress[ processId ].stepTotal = total;
     86                   httpSession.progress[ processId ].stepProgress = progress;
     87           }
     88           def newStep = { total, description ->
     89                   // Start a new step
     90                   httpSession.progress[ processId ].stepTotal = total;
     91                   httpSession.progress[ processId ].stepProgress = 0;
     92                   
     93                   httpSession.progress[ processId ].stepDescription = description;
     94                   httpSession.progress[ processId ].stepNum++;
     95           }
     96
     97           // Perform the actual computations asynchronously
     98           runAsync {
     99                   def entity
     100                   
     101                   // Determine entity and assaysamples
     102                   switch( httpSession.process[ processId ].entityType ) {
     103                           case "run":
     104                                   entity = getRun( httpSession.process[ processId ].entityId );
     105                                   break;
     106                           case "assay":
     107                                   entity = getAssay( httpSession.process[ processId ].entityId );
     108                                   break;
     109                           default:
     110                                   httpSession.progress[ processId ].error = true;
     111                                   httpSession.progress[ processId ].finished = true;
     112                                   return;
     113                   }
     114                   
     115                   if (!entity) {
     116                           httpSession.progress[ processId ].error = true;
     117                           httpSession.progress[ processId ].finished = true;
     118                           return;
     119                   }
     120   
     121                   def assaySamples = entity.assaySamples.findAll { it.assay.study.canWrite( httpSession.user ) };
     122                   
     123                   def parsedFiles = importService.parseFiles( names, onProgress, [progress: 0, total: httpSession.progress[ processId ].stepTotal ], newStep );
     124                   
     125                   // Determine excel matches from the uploaded files
     126                   parsedFiles.success = fastaService.inferExcelMatches( parsedFiles.success );
     127                   
     128                   // Match files with samples in the database
     129                   def matchedFiles = fastaService.matchFiles( parsedFiles.success, assaySamples );
     130   
     131                   // Sort files on filename
     132                   matchedFiles.sort { a,b -> a.fasta?.originalfilename <=> b.fasta?.originalfilename }
     133   
     134                   // Retrieve all files that have not been matched
     135                   def notMatchedFiles = parsedFiles.success.findAll {
     136                           switch( it.type ) {
     137                                   case "fasta":
     138                                           return !matchedFiles*.fasta*.filename.contains( it.filename );
     139                                   case "qual":
     140                                           return !matchedFiles*.feasibleQuals.flatten().filename.contains( it.filename );
     141                                   case "taxonomy":
     142                                           return !matchedFiles*.feasibleClassifications.flatten().filename.contains( it.filename );
     143                           }
     144                           return false;
     145                   }
     146                   
     147                   // Saved file matches in session to use them later on
     148                   httpSession.process[ processId ].processedFiles = [ parsed: parsedFiles,  matched: matchedFiles, notMatched: notMatchedFiles ];
     149                   
     150                   // Tell the frontend we are finished
     151                   httpSession.progress[ processId ].finished = true;
     152           }
     153               
     154                redirect( url: url );
    200155        }
    201156       
     
    208163                def entity
    209164               
    210                 switch( params.entityType ) {
     165                switch( session.process[ processId ].entityType ) {
    211166                        case "run":
    212                                 entity = getRun( params.id )
     167                                entity = getRun( session.process[ processId ].entityId )
    213168                                break;
    214169                        case "assay":
    215                                 entity = getAssay( params.id )
     170                                entity = getAssay( session.process[ processId ].entityId )
    216171                                break;
    217172                        default:
     
    234189                        return
    235190                }
    236                
    237191               
    238192                // Find matching sequenceData objects for taxonomyfiles that have not been matched
     
    245199                }
    246200               
    247                 [       entityType: params.entityType, processId: processId, entity: entity, id: params.id,
     201                [       entityType: session.process[ processId ].entityType, processId: processId, entity: entity,
    248202                        parsedFiles: session.process[ processId ].processedFiles.parsed,
    249203                        matchedFiles: session.process[ processId ].processedFiles.matched,
     
    257211        def returnWithoutSaving = {
    258212                def processId = params.processId;
    259 
     213                def entityType = session.process[ processId ].entityType;
     214                def entityId = session.process[ processId ].entityId;
     215               
    260216                // Delete all uploaded files from disk
    261217                session.process[ processId ]?.processedFiles?.parsed?.success?.each {
    262218                        fileService.delete( it.filename );
    263219                }
     220               
     221                // Clear process from session
     222                workerService.clearProcess( session, processId );
    264223
    265224                // Redirect to the correct controller           
    266                 switch( params.entityType ) {
     225                switch( entityType ) {
    267226                        case "run":
    268227                        case "assay":
    269                                 redirect( controller: params.entityType, action: "show", id: params.id );
     228                                redirect( controller: entityType, action: "show", id: entityId );
    270229                                return;
    271230                        default:
     
    282241         */
    283242        def saveMatchedFiles = {
    284                 def entityType = params.entityType
    285243                def processId = params.processId
     244
     245                def entityType = session.process[ processId ].entityType
     246                def entityId = session.process[ processId ].entityId
    286247               
    287248                session.process[ processId ].matchedFiles = params.file
     
    320281                }
    321282
    322                 if( !session.progress )
    323                         session.progress = [:]
    324                        
    325                 session.progress[ processId ] = [
    326                         stepNum: 2,
    327                         numSteps: 2,
    328                         stepDescription: 'Store sequence data and classification',
    329                        
    330                         stepProgress: 0,
    331                         stepTotal: filesize
    332                 ]
    333                                        
    334                 render( view: 'showProcessScreen', model: [
    335                         processUrl: createLink( controller: "import", action: "processMatchedFiles" ),
    336                         processParameters: [ processId: processId, entityId: params.id, entityType: params.entityType ],
    337                         progressUrl: createLink( controller: "import", action: "getProgress", params: [ processId: processId ] ),
    338                         finishUrl: createLink( controller: "import", action: 'saveMatchedResult', params: [ processId: processId, id: params.id, entityType: entityType] ),
    339                         errorUrl: createLink( controller: entityType, action: "show", id: params.id ),
    340                         entityId: params.id, entityType: params.entityType] );
    341         }
    342        
    343         /**
    344          * Saves processed files to the database, based on the selections made by the user
    345          */
    346         def processMatchedFiles = {
    347                 // load entity with id specified by param.id
    348                 def processId = params.processId;
    349 
     283                // Clear old process, but save useful data
     284                def processInfo = session.process[ processId ]
     285                workerService.clearProcess( session, processId );
     286               
     287                // Create a new unique process identifier
     288                processId = workerService.initProcess( session, "Store sequence data and classification", 2, filesize );
     289               
     290                session.progress[ processId ].stepNum = 2;
     291                session.process[ processId ] = processInfo;
     292               
     293                // Retrieve worker URL
     294                def finishUrl = createLink( controller: "import", action: 'saveMatchedResult', params: [ processId: processId ] ).toString();
     295                def returnUrl = createLink( controller: entityType, action: "show", entityId ).toString();
     296               
     297                def url = workerService.startProcess( session, processId, finishUrl, returnUrl )
     298               
     299                //
     300                // Initiate work
     301                //
    350302                // Check whether files are given
    351                 def files = session.process[ processId ].matchedFiles 
     303                def files = session.process[ processId ].matchedFiles
    352304                def remainingClassification = session.process[ processId ].matchedRemainingClassification;
    353305               
    354306                if( !files && !remainingClassification ) {
    355307                        flash.message = "No files were selected for import."
    356                         redirect( controller: params.entityType, action: 'show', 'id': params.entityId)
     308                        redirect( controller: session.process[ processId ].entityType, action: 'show', 'id': session.process[ processId ].entityId)
    357309                        return
    358310                }
     
    367319                }
    368320               
    369                 // Loop through all FASTA files. Those are the numeric elements in the 'files' array
    370                 def fastaReturn = saveMatchedFastaFiles( files, session.process[ processId ]?.processedFiles, onProgress );
    371                 def classificationReturn = saveRemainingClassificationFiles( remainingClassification, onProgress );
    372                
    373                 // Update classification (summary) for updated samples
    374                 def samplesClassified = [] + fastaReturn.samplesClassified + classificationReturn.samplesClassified;
    375                 classificationService.updateClassificationForAssaySamples( samplesClassified.findAll { it }.unique() )
    376                
    377                 def returnStructure = [
    378                         numSequenceFiles: fastaReturn.numSequenceFiles,
    379                         numQualFiles: fastaReturn.numQualFiles,
    380                         numClassificationFiles: fastaReturn.numClassificationFiles,
    381                         numExtraClassificationFiles: classificationReturn.numExtraClassifications,
    382                         numTotal: fastaReturn.numSequenceFiles + classificationReturn.numExtraClassifications,
    383                         errors: [] + fastaReturn.errors + classificationReturn.errors
    384                 ]
    385 
    386                 // Return all files that have not been moved
    387                 session.process[ processId ]?.processedFiles?.parsed?.success?.each {
    388                         fileService.delete( it.filename );
    389                 }
    390                
    391                 session.process[ processId ].result = returnStructure;
    392                
    393                 response.contentType = "text/plain"
    394                 render "";
     321                // Run the computations asynchronously, since it takes a lot of time
     322                runAsync {
     323                        // Loop through all FASTA files. Those are the numeric elements in the 'files' array
     324                        def fastaReturn = saveMatchedFastaFiles( files, httpSession.process[ processId ]?.processedFiles, onProgress );
     325                        def classificationReturn = saveRemainingClassificationFiles( remainingClassification, onProgress );
     326                       
     327                        // Update classification (summary) for updated samples
     328                        def samplesClassified = [] + fastaReturn.samplesClassified + classificationReturn.samplesClassified;
     329                        classificationService.updateClassificationForAssaySamples( samplesClassified.findAll { it }.unique() )
     330                       
     331                        def returnStructure = [
     332                                numSequenceFiles: fastaReturn.numSequenceFiles,
     333                                numQualFiles: fastaReturn.numQualFiles,
     334                                numClassificationFiles: fastaReturn.numClassificationFiles,
     335                                numExtraClassificationFiles: classificationReturn.numExtraClassifications,
     336                                numTotal: fastaReturn.numSequenceFiles + classificationReturn.numExtraClassifications,
     337                                errors: [] + fastaReturn.errors + classificationReturn.errors
     338                        ]
     339                       
     340                        // Return all files that have not been moved
     341                        httpSession.process[ processId ]?.processedFiles?.parsed?.success?.each {
     342                                fileService.delete( it.filename );
     343                        }
     344                       
     345                        httpSession.process[ processId ].result = returnStructure;
     346                       
     347                        // Tell the frontend we are finished
     348                        httpSession.progress[ processId ].finished = true;
     349       
     350                }
     351               
     352                redirect( url: url );
    395353        }
    396354       
     
    566524                        }
    567525                }
    568                
     526
     527                // Determine where to redirect the user to
     528                def entityType = session.process[ processId ].entityType;
     529                def entityId = session.process[ processId ].entityId;
     530                               
    569531                // Clear session
    570                 session.process?.remove( processId );
    571                 session.progress?.remove( processId );
     532                workerService.clearProcess( session, processId );
    572533               
    573534                // Redirect user
    574                 redirect( controller: params.entityType, action: "show", id: params.id )
    575         }
    576        
    577         def deleteData = {
    578                 // load study with id specified by param.id
    579                 def sequenceData
    580                
    581                 try {
    582                         sequenceData = SequenceData.get(params.id as Long)
    583                 } catch( Exception e ) {}
    584 
    585                 if (!sequenceData) {
    586                         flash.error = "No sequencedata found with id: $params.id"
    587                         redirect( controller: 'study' )
    588                         return
    589                 }
    590 
    591                 def entityId
    592                 def entityType
    593                
    594                 switch( params.entityType ) {
    595                         case "run":
    596                                 entityId = sequenceData.sample.run?.id;
    597                                 entityType = "run"
    598                                 break;
    599                         case "assay":
    600                         default:
    601                                 entityType = "assay";
    602                                 entityId = sequenceData.sample.assay.id;
    603                                 break;
    604                 }
    605                  
    606                 def numFiles = sequenceData.numFiles();
    607                 def sample = sequenceData.sample;
    608                  
    609                 // Set flushmode to auto, since otherwise the sequencedata will
    610                 // not be removed
    611                 sessionFactory.getCurrentSession().setFlushMode( org.hibernate.FlushMode.AUTO );
    612                
    613                 sample.removeFromSequenceData( sequenceData );
    614                 sequenceData.delete(flush:true);
    615                 sample.resetStats();
    616                 sample.save();
    617                
    618                 flash.message = numFiles + " file" + (numFiles != 1 ? "s have" : " has" ) + " been deleted from this sample"
    619 
    620                 redirect( controller: entityType, action: 'show', id: entityId )
    621         }
    622        
     535                redirect( controller: entityType, action: "show", id: entityId )
     536        }
     537
    623538        protected Assay getAssay(def assayId) {
    624539                // load assay with id specified by param.id
Note: See TracChangeset for help on using the changeset viewer.