source: trunk/grails-app/services/nl/tno/massSequencing/ClassificationService.groovy @ 74

Last change on this file since 74 was 74, checked in by robert@…, 8 years ago
  • Several bugfixes
  • Added an extra step in the worker process for importing data
File size: 18.9 KB
Line 
1package nl.tno.massSequencing
2
3import java.io.BufferedWriter;
4import java.io.File;
5import java.io.OutputStream;
6import java.io.Writer;
7import java.util.ArrayList;
8import java.util.List;
9import org.codehaus.groovy.grails.commons.ConfigurationHolder
10import org.hibernate.StatelessSession
11import org.hibernate.Transaction
12import org.hibernate.engine.SessionImplementor
13import java.util.zip.*
14import nl.tno.massSequencing.classification.*
15
16class ClassificationService implements nl.tno.massSequencing.imports.Importer {
17        def fileService
18        def csvService
19        def sessionFactory
20
21        static transactional = false
22
23        /**
24         * Determines whether a file can be processed.
25         * @param filetype      Filetype of the file
26         * @see FileService.determineFileType()
27         * @return
28         */
29        public boolean canParseFileType( String filetype ) {
30                switch( filetype ) {
31                        case "groups":
32                        case "taxonomy":
33                                return true;
34                        default:
35                                return false;
36                }
37        }
38
39        /**
40         * Parses the given GROUPS or TAXONOMY file
41         *
42         * @param file                  File to parse
43         * @param filetype              Type of the given file
44         * @param onProgress    Closure to execute when progress indicators should be updated.
45         *                                              Has 2 parameters: numFilesProcessed and numBytesProcessed that indicate the number
46         *                                              of files and bytes that have been processed in this file (so the first parameter should
47         *                                              only be 1 when the file is finished)
48         *
49         * @return                              List structure. Examples:
50         *
51         *   [ success: true, filename: 'abc.fasta', type: 'fasta', numSequences: 200 ]
52         *   [ success: true, filename: 'abc.qual', type: 'qual', numSequences: 200, avgQuality: 36 ]
53         *   [ success: false, filename: 'abc.txt', type: 'txt', message: 'Filetype could not be parsed.' ]
54         */
55        public Map parseFile( File file, String filetype, Closure onProgress ) {
56                switch( filetype ) {
57                        case "groups":
58                        case "taxonomy":
59                                def fileInfo = countLines( file, onProgress );
60                                return [ success: true, filename: file.getName(), type: filetype, numLines: fileInfo.numLines, filesize: fileInfo.fileSize ]
61                        default:
62                                onProgress( file.length(), 0 );
63                                return [ success: false, type: filetype, message: 'Filetype could not be parsed.' ]
64                }
65        }
66
67        /**
68         * Parses the given GROUPS file
69         * @param file                  File to parse
70         * @param onProgress    Closure to execute when progress indicators should be updated.
71         *                                              Has 2 parameters: numFilesProcessed and numBytesProcessed that indicate the number
72         *                                              of files and bytes that have been processed in this file (so the first parameter should
73         *                                              only be 1 when the file is finished)
74         * @return                              List structure. Examples:
75         *
76         *   [ success: true, filename: 'abc.fasta', type: 'fasta', numSequences: 200 ]
77         *   [ success: false, filename: 'def.fasta', type: 'fasta', message: 'File is not a valid FASTA file' ]
78         */
79        protected Map countLines( File file, Closure onProgress ) {
80
81                long startTime = System.nanoTime();
82                log.trace "Start counting lines for " + file.getName()
83
84                // Count the number of lines. The real processing is done later on, because
85                // the GROUPS and TAXONOMY files must be handled together
86                long numLines = 0;
87                long bytesProcessed = 0;
88
89                file.eachLine { line ->
90                        numLines++;
91
92                        // Update progress every time 100kB is processed
93                        bytesProcessed += line.size();
94                        if( bytesProcessed > 100000 ) {
95                                onProgress( bytesProcessed, 0 );
96                                bytesProcessed = 0;
97                        }
98                }
99
100                // Update progress and say we're finished
101                onProgress( bytesProcessed, 0 );
102
103                log.trace "Finished counting lines for " + file.getName() + ": " + ( System.nanoTime() - startTime ) / 1000000L
104
105                return [ numLines: numLines, fileSize: file.size() ];
106        }
107
108        /**
109         * Removes all classification sequence objects for a given SequenceData object
110         * @param sequenceData 
111         * @return      True if the deletion has succeeded
112         */
113        public boolean removeClassificationForSequenceData( SequenceData sequenceData ) {
114                if( sequenceData ) {
115                        Sequence.executeUpdate( "DELETE FROM Sequence s WHERE s.sequenceData = ?", [sequenceData] );
116                        return true;
117                } else {
118                        return false;
119                }
120        }
121
122        /**
123         * Store the classification from the given files
124         * @param groupsFile    Filename
125         * @param taxonomyFile
126         * @return
127         */
128        public Map storeClassification( String taxonomyFilename, SequenceData sequenceData, Closure onProgress ) {
129                def hibernateSession = sessionFactory.getCurrentSession();
130                hibernateSession.flush();
131                hibernateSession.clear();
132               
133                File taxonomyFile = fileService.get( taxonomyFilename );
134
135                if( !taxonomyFile ) {
136                        return [ success: false, type: 'taxonomy', filename: groupsFilename, message: 'Taxonomy file doesn\'t exist.' ]
137                }
138               
139                // Open files for reading and create temporary variables
140                def taxonomyReader = taxonomyFile.newReader();
141
142                def parts, sequenceName, classification, sampleName, taxa, taxon, line, groupsLine
143
144                def taxonRegex = /"?([^"(]+)"?(\(\d+\))?/
145                def startLevel = 1    // The first level that is present in the file. Sometimes, the root element (level 0) is not mentioned in the file, in that case, this value should be set to 1
146                def unclassified = "unclassified"
147
148                def i = 0;
149                def start = System.currentTimeMillis();
150                def lapTime = start;
151
152                // Create a stateless session in order to speed up inserts
153                StatelessSession statelessSession = sessionFactory.openStatelessSession();
154               
155                // For each line in the taxonomy file, read the sequence and save it
156                def discardedSequences = 0;
157                def importedSequences = 0;
158                long bytesProcessed = 0;
159               
160                try {
161                        while( ( line = taxonomyReader.readLine() ) ) {
162                                bytesProcessed += line.size();
163       
164                                // Find the taxon for this line
165                                parts = line.tokenize( "\t" );
166       
167                                if( parts.size() != 2 ) {
168                                        // Skip this line because it is incorrect
169                                        continue;
170                                }
171       
172                                sequenceName = parts[ 0 ];
173                                classification = parts[ 1 ];
174       
175                                // Split classification to check whether the taxon already exists
176                                taxa = classification.tokenize( ';' ).collect { it.replaceAll( taxonRegex, '$1' ) }.findAll { it && it != unclassified }
177                               
178                                // Find taxon or create one if needed
179                                taxon = Taxon.findOrCreateTaxonByPath( taxa, startLevel );
180       
181                                if( taxon ) {
182                                        // Create a new sequence record
183                                        def s = new Sequence()
184                                        s.name = sequenceName
185                                        s.classification = taxon
186                                        s.sequenceData = sequenceData
187       
188                                        statelessSession.insert(s);
189                                } else {
190                                        log.trace( "" + i + " Sequence " + sequenceName + " not imported because it is unclassified." )
191                                }
192       
193                                if( i % 80 == 0 ) {
194                                        onProgress( bytesProcessed );
195                                        bytesProcessed = 0;
196                                }
197       
198                                importedSequences++;
199       
200                                i++;
201                        }
202                       
203                } catch( Exception e ) {
204                        throw e;
205                } finally {
206                        // Flush the last bit of inserts. Unfortunately, this is not automatically done when jdbc batch_size > 1
207                        // See also http://opensource.atlassian.com/projects/hibernate/browse/HHH-4042
208                        SessionImplementor sessionImpl = (SessionImplementor) statelessSession;
209                        sessionImpl.getBatcher().executeBatch();
210               
211                        // Close the session and the reader
212                        statelessSession.close();
213                        taxonomyReader.close();
214                }
215
216                onProgress( bytesProcessed );
217
218                return [ filename: taxonomyFilename, type: 'taxonomy', success: true, importedSequences: importedSequences, discardedSequences: discardedSequences ]
219        }
220
221        /**
222         * Updates the Classification table with data computed from the raw sequences for the given assay sample
223         * @param assaySample
224         */
225        public void updateClassificationForAssaySample( AssaySample assaySample ) {
226                if( !assaySample )
227                        return;
228               
229                // Clear hibernate session for otherwise the session will be out of sync with the
230                // database
231                def hibernateSession = sessionFactory.getCurrentSession();
232                hibernateSession.flush();
233                hibernateSession.clear();
234
235                // Remove all Classifications for the given assaySample
236                Classification.executeUpdate( "DELETE FROM Classification WHERE assaySample = :assaySample", [ 'assaySample': assaySample ] );
237
238                // Update classification table with data from sequences
239
240                // Unfortunately, Hibernate (through HQL) can't handle bulk insertion if a table has a
241                // composite id (Classification). See http://opensource.atlassian.com/projects/hibernate/browse/HHH-3434
242                // For that reason we execute plain SQL
243                def connection = sessionFactory.currentSession.connection()
244                def statement = connection.createStatement();
245
246                // This statement searches within the sequences for every taxon that is mentioned (as a leaf-node in the tree)
247                // and inserts statistics about those taxons into the classification table.
248                //
249                // However, if we have the following sequences:
250                //              sequence1               Bacteria -> Firmicutes -> Clostridia -> (unclassified)
251                //              sequence2               Bacteria -> (unclassified)
252                //
253                // The statement will insert values for
254                //              Clostridia: 1
255                //              Bacteria: 2
256                //
257                // and nothing for firmicutes. Of course, these values can be computed at any time, since
258                // the taxonomy tree is known. Since these values are used for searching and exporting (e.g. show all samples
259                // where firmicutes are present), it is essential that the extra values are also added. This is done in the second statement
260
261                // Insert statistics for all sequences and (leaf) taxa
262                statement.executeUpdate( connection.nativeSQL( """
263                        INSERT INTO     classification (assay_sample_id, taxon_id, unclassified, total)
264                        SELECT          sa.id, s.classification_id, count( * ),
265                                                (
266                                                        SELECT          count( * )
267                                                        FROM            sequence s2
268                                                        LEFT JOIN       taxonomy t2 ON s2.classification_id = t2.id
269                                                        LEFT JOIN       sequence_data sd2 ON s2.sequence_data_id = sd2.id
270                                                        LEFT JOIN       assay_sample sa2 ON sd2.sample_id = sa2.id
271                                                        WHERE           sa2.id = sa.id AND t2.lft >= t.lft AND t2.rgt <= t.rgt
272                                                )
273                        FROM            sequence s
274                        LEFT JOIN       taxonomy t ON s.classification_id = t.id
275                        LEFT JOIN       sequence_data sd ON s.sequence_data_id = sd.id
276                        LEFT JOIN       assay_sample sa ON sd.sample_id = sa.id
277                        WHERE           sa.id = """ + assaySample.id + """
278                        GROUP BY        sa.id, s.classification_id, t.lft, t.rgt
279           """ ) );
280
281                // Update the table with missing values
282                statement.executeUpdate( connection.nativeSQL( """
283                INSERT INTO     classification (assay_sample_id, taxon_id, unclassified, total)
284                SELECT * FROM (
285                        SELECT          a.id as assay_sample_id, t.id as taxon_id, 0 as unclassified,
286                                                (
287                                                        SELECT          sum( c.unclassified )
288                                                        FROM            classification c
289                                                        LEFT JOIN       taxonomy t3 ON c.taxon_id = t3.id
290                                                        WHERE           c.assay_sample_id = a.id
291                                                                AND     t3.lft >= t.lft
292                                                                AND     t3.rgt <= t.rgt
293                                                ) AS total
294                        FROM            taxonomy t, assay_sample a
295                        WHERE           a.id = """ + assaySample.id + """ 
296                        AND             t.id NOT IN (
297                                                        SELECT          t2.id
298                                                        FROM            classification c
299                                                        LEFT JOIN       taxonomy t2 ON c.taxon_id = t2.id
300                                                        WHERE           c.assay_sample_id = a.id
301                                                )
302                ) missingClassifications WHERE missingClassifications.total IS NOT NULL
303          """ ) );
304       
305        }
306
307        /**
308         * Updates the Classification table with data computed from the raw sequences for the given assay sample
309         * @param assaySample
310         */
311        public void updateClassificationForAssaySamples( List assaySamples, Closure onProgress = null ) {
312                if( !assaySamples )
313                        return;
314
315                       
316                assaySamples.each {
317                        updateClassificationForAssaySample( it );
318                       
319                        if( onProgress )
320                                onProgress( 1 );
321                }
322        }
323
324        /**
325         * Find a file that matches the parameters and is not already stored
326         * @param inputFiles
327         * @param type
328         * @param numLines
329         * @param alreadyStored
330         * @return
331         */
332        protected Map findMatchingClassificationFile( def inputFiles, String type, long numLines, def alreadyStored ) {
333                return inputFiles.find { it.type == type && it.numLines == numLines && !alreadyStored?.contains( it.filename ) };
334        }
335       
336        /**
337         * Retrieves a list of classifications for the given assaysamples
338         *
339         * @param assaySamples  List of assaysamples
340         * @param prefix                (optional) prefix that is prepended to the taxon name. e.g. '  ' can be used to indent every new level
341         * @return                              List with classification data
342         *
343         * The list is returned like:
344         *
345         * 0    0               root                                    12      0       15      16
346         * 1    0.1             Bacteria                                7               15      13
347         * 2    0.1.1   Firmicutes                              6                       11
348         * 2    0.1.2   Unclassified bacteria   1               15      2
349         * 1    0.2             Unclassified                    5                       3
350         *
351         * Where the columns are:
352         *      1.      Level number of the taxon on that line
353         *      2.      rank ID: identifier for the given line. The identifier is unique per output file, but can be reused in new output files.
354         *      3.      Taxon name
355         *      4.      Every column shows the number of sequences for that taxon in a an assaysample. The first column matches the first assaySample given etc.
356         */
357        def retrieveClassifications( def assaySamples, String prefix = "" ) {
358                // Structure to store the data in
359                def data = [];
360
361                // Retrieve classification information for all assaySamples
362                def classifications = Classification.executeQuery( "FROM Classification c LEFT JOIN c.taxon t LEFT JOIN c.assaySample a WHERE a IN (:assaySamples) ORDER BY t.lft, a.sample.name", [ "assaySamples": assaySamples ] );
363
364                // Find the maximum level present in this classification list
365                def levels = Classification.determineMinAndMaxLevels( assaySamples );
366                def maxLevel = levels[ 0 ]
367                def minLevel = levels[ 1 ]
368
369                // Determine the starting prefix and the starting counter for the rankId
370                def rankGlue = ".";
371                def startCounter = 1
372                def rootId = 0;
373                def rankPrefix = [:]; rankPrefix [ minLevel ] = rootId;
374                def rankCounter = [:]; rankCounter[ 1 ] = startCounter;
375
376                // Closure to retrieve a new id for the given level
377                def rankId = { level ->
378                        def rank = rankPrefix[ level ] + rankGlue + rankCounter[ level ];
379                        rankCounter[ level ]++;
380                        rankPrefix[ ( level + 1 ) ] = rank
381                        rankCounter[ ( level + 1 ) ] = startCounter
382                       
383                        return rank;
384                }
385               
386                def currentLine = [];
387                def hasValues = false;
388               
389                def currentTaxonId = -1;
390                def currentLevel = -1;
391               
392                def unClassifiedLine = [];
393                def unClassifiedLines = [:]
394                def hasUnClassified = false;
395               
396                def numAssaySamples = assaySamples.size();
397               
398                // Determine index of assaysample in list
399                def assaySampleIndex = [:]
400                for( def i = 0; i < assaySamples.size(); i++ ) {
401                        assaySampleIndex[ assaySamples[ i ].id ] = i;
402                }
403               
404                // Append classifications entry to the list with details about the numbers of unclassified sequences
405                //  AND a.id IN (:assaySampleIds)
406                def extraClassifications = Classification.executeQuery( "SELECT a.id, SUM( c.unclassified ), (SELECT SUM(sd.numSequences) FROM SequenceData sd WHERE sd.sample = a) FROM AssaySample a, Classification c WHERE c.assaySample = a AND  a IN (:assaySamples) GROUP BY a.id", [ "assaySamples": assaySamples ] );
407                def rootLine = [ minLevel - 1, rootId, "root" ] + [ 0 ] * assaySamples.size();
408                extraClassifications.each {
409                        // Add a classification in order to show the 'unclassified' values
410                        classifications << [
411                                [ 'total': it[ 2 ], 'unclassified': it[ 2 ] - it[ 1 ] ],
412                                ['id':0,'level': minLevel - 1, 'name': ""],
413                                [ 'id': it[ 0 ] ]
414                        ]
415                       
416                        // Create a root line with total numbers of sequences for the given sample
417                        rootLine[ assaySampleIndex[ it[ 0 ] ] + 3 ] = it[ 2 ] ?: 0;
418                }
419                data << rootLine;
420               
421                // Adding a line to ensure that the totals will be shown
422                classifications << [ null, ['id':-1,'level':-1, 'name': "unclassified"], [ 'id': 0 ] ]
423               
424                classifications.each { line ->
425                        def classification = line[ 0 ];
426                        def taxon = line[ 1 ];
427                        def assaySample = line[ 2 ];
428                       
429                        if( taxon.id != currentTaxonId ) {
430                                // Start with a new taxon.
431                                // Save the unclassified values for the previous level. These have to be shown it the subtree
432                                // underneath the previous taxon is finished.
433                                if( hasUnClassified )
434                                        unClassifiedLines[ currentLevel ] = unClassifiedLine
435                               
436                                // If a line was generated, add it to the list
437                                if( currentLine && hasValues && currentLevel >= minLevel )
438                                        data << currentLine;
439                                       
440                                // First see whether a unclassified line must be put in place
441                                // by checking whether a unClassifiedLine for the next level is given
442                                if( taxon.level <= currentLevel ) {
443                                        // Check which 'unclassified' lines must be shown. Every line of a subtree that is being closed
444                                        // should be shown. 'unclassified' at the highest level should be shown first
445                                        currentLevel.downto( taxon.level ) { lvl ->
446                                                if( unClassifiedLines[ lvl ] ) {
447                                                        // Each unclassified line should be repeated up to the highest level, in order
448                                                        // to have the same number of sequences at every level.
449                                                        def ucl = unClassifiedLines.remove( lvl );
450                                                        lvl.upto( maxLevel - 1 ) { unclassifiedLevel ->
451                                                                def name = ( prefix * ( unclassifiedLevel + 1 ) ) + ucl[ 2 ]
452                                                                def newLine = [ unclassifiedLevel + 1 ] + rankId( unclassifiedLevel + 1 ) + name + ucl[3..-1];
453                                                                data << newLine;
454                                                        }
455                                                }
456                                        }
457                                }
458
459                                // Determine the rankID for the given taxon. It is composed of the parentTaxon with a counter
460                                def rank;
461                               
462                                if( taxon.level >= minLevel ) {
463                                        rank = rankId( taxon.level )
464                                }
465                               
466                                // Create a new line, because we arrived at a new taxon
467                                currentLine = [ taxon.level, rank, ( prefix * Math.max( 0, taxon.level ) ) + taxon.name ]
468                                hasValues = false;
469                               
470                                // Create a corresponding 'unclassified' line. This line will be saved for the end of the subtree
471                                // The rank for the unclassified line is determined later on, because it depends on the subtree
472                                unClassifiedLine = [ taxon.level + 1, "", "Unclassified (" + taxon.level + ") " + taxon.name  ]
473                               
474                                // Create as many entries in the list as there are samples
475                                numAssaySamples.times {
476                                        currentLine << ""
477                                        unClassifiedLine << ""
478                                }
479                               
480                                // Update taxonId and level
481                                currentTaxonId = taxon.id;
482                                currentLevel = taxon.level;
483                                hasUnClassified = false;
484                        }
485                       
486                        // Find index of this assaySample in the list, and save the classification total and unclassified value in the correct column
487                        def index = assaySampleIndex[ assaySample.id ];
488                        if( index != null ) {
489                                currentLine[ index + 3 ] = classification.total ?: "";
490                                unClassifiedLine[ index + 3 ] = classification.unclassified ?: "";
491                       
492                                // Set flag whether this line has to be included or not
493                                if( classification.total > 0 )
494                                        hasValues = true;
495                               
496                                // Set flag whether unclassified values have been found for this taxon. These values
497                                // are only saved for levels < maxLevel, since at the highest level, all sequences are
498                                // unclassified at a higher level
499                                if( classification.unclassified > 0 && taxon.level < maxLevel )
500                                        hasUnClassified = true;
501                        }
502                }
503               
504                return data;
505        }
506       
507        /**
508         * Exports all known data about the classifications of these samples to an excel file
509         * @param assaySamples  Assaysamples to export information about
510         * @param stream                Outputstream to write the data to
511         * @return
512         */
513        def exportClassifications( def assaySamples, OutputStream stream ) {
514                if( assaySamples == null )
515                        assaySamples = []
516
517                // Create csv file; sheetIndex is ignored
518                def sheetIndex = 0;
519
520                // Create headerrow
521                def headers = [ "level", "rankId", "taxon" ];
522                def ids = [];
523                assaySamples.each { 
524                        headers << it.sample.name 
525                        ids << it.id
526                }
527               
528                def data = retrieveClassifications( assaySamples );
529               
530                // Create an excel sheet
531                def wb = csvService.create();
532
533                // Put the headers on the first row
534                csvService.writeHeader( wb, headers, sheetIndex );
535                csvService.writeData( wb, data, sheetIndex, 1 );
536
537                // Write the data to the output stream
538                csvService.output( wb, stream );
539
540                return true;
541        }
542       
543}
Note: See TracBrowser for help on using the repository browser.