source: trunk/grails-app/services/nl/tno/massSequencing/ClassificationService.groovy @ 75

Last change on this file since 75 was 75, checked in by robert@…, 8 years ago

Last fixes and extra bugfix in import controller

File size: 19.6 KB
Line 
1package nl.tno.massSequencing
2
3import java.io.BufferedWriter;
4import java.io.File;
5import java.io.OutputStream;
6import java.io.Writer;
7import java.util.ArrayList;
8import java.util.List;
9import org.codehaus.groovy.grails.commons.ConfigurationHolder
10import org.hibernate.StatelessSession
11import org.hibernate.Transaction
12import org.hibernate.engine.SessionImplementor
13import java.util.zip.*
14import nl.tno.massSequencing.classification.*
15
16class ClassificationService implements nl.tno.massSequencing.imports.Importer {
17        def fileService
18        def csvService
19        def sessionFactory
20
21        static transactional = false
22
23        /**
24         * Determines whether a file can be processed.
25         * @param filetype      Filetype of the file
26         * @see FileService.determineFileType()
27         * @return
28         */
29        public boolean canParseFileType( String filetype ) {
30                switch( filetype ) {
31                        case "groups":
32                        case "taxonomy":
33                                return true;
34                        default:
35                                return false;
36                }
37        }
38
39        /**
40         * Parses the given GROUPS or TAXONOMY file
41         *
42         * @param file                  File to parse
43         * @param filetype              Type of the given file
44         * @param onProgress    Closure to execute when progress indicators should be updated.
45         *                                              Has 2 parameters: numFilesProcessed and numBytesProcessed that indicate the number
46         *                                              of files and bytes that have been processed in this file (so the first parameter should
47         *                                              only be 1 when the file is finished)
48         *
49         * @return                              List structure. Examples:
50         *
51         *   [ success: true, filename: 'abc.fasta', type: 'fasta', numSequences: 200 ]
52         *   [ success: true, filename: 'abc.qual', type: 'qual', numSequences: 200, avgQuality: 36 ]
53         *   [ success: false, filename: 'abc.txt', type: 'txt', message: 'Filetype could not be parsed.' ]
54         */
55        public Map parseFile( File file, String filetype, Closure onProgress ) {
56                switch( filetype ) {
57                        case "groups":
58                        case "taxonomy":
59                                def fileInfo = countLines( file, onProgress );
60                                return [ success: true, filename: file.getName(), type: filetype, numLines: fileInfo.numLines, filesize: fileInfo.fileSize ]
61                        default:
62                                onProgress( file.length(), 0 );
63                                return [ success: false, type: filetype, message: 'Filetype could not be parsed.' ]
64                }
65        }
66
67        /**
68         * Parses the given GROUPS file
69         * @param file                  File to parse
70         * @param onProgress    Closure to execute when progress indicators should be updated.
71         *                                              Has 2 parameters: numFilesProcessed and numBytesProcessed that indicate the number
72         *                                              of files and bytes that have been processed in this file (so the first parameter should
73         *                                              only be 1 when the file is finished)
74         * @return                              List structure. Examples:
75         *
76         *   [ success: true, filename: 'abc.fasta', type: 'fasta', numSequences: 200 ]
77         *   [ success: false, filename: 'def.fasta', type: 'fasta', message: 'File is not a valid FASTA file' ]
78         */
79        protected Map countLines( File file, Closure onProgress ) {
80
81                long startTime = System.nanoTime();
82                log.trace "Start counting lines for " + file.getName()
83
84                // Count the number of lines. The real processing is done later on, because
85                // the GROUPS and TAXONOMY files must be handled together
86                long numLines = 0;
87                long bytesProcessed = 0;
88
89                file.eachLine { line ->
90                        numLines++;
91
92                        // Update progress every time 100kB is processed
93                        bytesProcessed += line.size();
94                        if( bytesProcessed > 100000 ) {
95                                onProgress( bytesProcessed, 0 );
96                                bytesProcessed = 0;
97                        }
98                }
99
100                // Update progress and say we're finished
101                onProgress( bytesProcessed, 0 );
102
103                log.trace "Finished counting lines for " + file.getName() + ": " + ( System.nanoTime() - startTime ) / 1000000L
104
105                return [ numLines: numLines, fileSize: file.size() ];
106        }
107
108        /**
109         * Removes all classification sequence objects for a given SequenceData object
110         * @param sequenceData 
111         * @return      True if the deletion has succeeded
112         */
113        public boolean removeClassificationForSequenceData( SequenceData sequenceData ) {
114                if( sequenceData ) {
115                        Sequence.executeUpdate( "DELETE FROM Sequence s WHERE s.sequenceData = ?", [sequenceData] );
116                        return true;
117                } else {
118                        return false;
119                }
120        }
121
122        /**
123         * Store the classification from the given files
124         * @param groupsFile    Filename
125         * @param taxonomyFile
126         * @return
127         */
128        public Map storeClassification( String taxonomyFilename, SequenceData sequenceData, Closure onProgress ) {
129                def hibernateSession = sessionFactory.getCurrentSession();
130                hibernateSession.flush();
131                hibernateSession.clear();
132               
133                File taxonomyFile = fileService.get( taxonomyFilename );
134
135                if( !taxonomyFile ) {
136                        return [ success: false, type: 'taxonomy', filename: groupsFilename, message: 'Taxonomy file doesn\'t exist.' ]
137                }
138               
139                // Open files for reading and create temporary variables
140                def taxonomyReader = taxonomyFile.newReader();
141
142                def parts, sequenceName, classification, sampleName, taxa, taxon, line, groupsLine
143
144                def taxonRegex = /"?([^"(]+)"?(\(\d+\))?/
145                def startLevel = 1    // The first level that is present in the file. Sometimes, the root element (level 0) is not mentioned in the file, in that case, this value should be set to 1
146                def unclassified = "unclassified"
147
148                def i = 0;
149                def start = System.currentTimeMillis();
150
151                // Create a stateless session in order to speed up inserts
152                StatelessSession statelessSession = sessionFactory.openStatelessSession();
153                def connection = statelessSession.connection()
154                def statement
155               
156                // For each line in the taxonomy file, read the sequence and save it
157                def discardedSequences = 0;
158                def importedSequences = 0;
159                long bytesProcessed = 0;
160               
161                def lapTime = System.currentTimeMillis();
162
163                // Store a cache for taxa
164                def taxonCache = Taxon.emptyTaxonCache();
165               
166                try {
167                        while( ( line = taxonomyReader.readLine() ) ) {
168                                bytesProcessed += line.size();
169       
170                                // Find the taxon for this line
171                                parts = line.tokenize( "\t" );
172       
173                                if( parts.size() != 2 ) {
174                                        // Skip this line because it is incorrect
175                                        continue;
176                                }
177                               
178                                sequenceName = parts[ 0 ];
179                                classification = parts[ 1 ];
180       
181                                // Split classification to check whether the taxon already exists
182                                taxa = classification.tokenize( ';' ).collect { it.replaceAll( taxonRegex, '$1' ) }.findAll { it && it != unclassified }
183                               
184                                // Find taxon or create one if needed
185                                taxon = Taxon.findOrCreateTaxonByPath( taxa, startLevel, taxonCache );
186       
187                                if( taxon ) {
188                                        // Create a new sequence record
189                                        def s = new Sequence()
190                                        s.name = sequenceName
191                                        s.classification = taxon
192                                        s.sequenceData = sequenceData
193       
194                                        statelessSession.insert(s);
195                                       
196                                } else {
197                                        log.trace( "" + i + " Sequence " + sequenceName + " not imported because it is unclassified." )
198                                }
199                               
200                                if( i % 80 == 0 ) {
201                                        onProgress( bytesProcessed );
202                                        bytesProcessed = 0;
203                                }
204                               
205                                importedSequences++;
206       
207                                i++;
208                        }
209                       
210                } catch( Exception e ) {
211                        throw e;
212                } finally {
213                        // Flush the last bit of inserts. Unfortunately, this is not automatically done when jdbc batch_size > 1
214                        // See also http://opensource.atlassian.com/projects/hibernate/browse/HHH-4042
215                        SessionImplementor sessionImpl = (SessionImplementor) statelessSession;
216                        sessionImpl.getBatcher().executeBatch();
217               
218                        // Close the session and the reader
219                        statelessSession.close();
220                        taxonomyReader.close();
221                }
222               
223               
224                log.trace "Processing taxonomy file " + taxonomyFilename + " took: " + ( System.currentTimeMillis() - lapTime ) + "ms for " + importedSequences + " sequences. On average, that is " + ( ( System.currentTimeMillis() - lapTime ) / importedSequences ) + "ms per sequence."
225               
226                // Update progress
227                onProgress( bytesProcessed );
228
229                return [ filename: taxonomyFilename, type: 'taxonomy', success: true, importedSequences: importedSequences, discardedSequences: discardedSequences ]
230        }
231
232        /**
233         * Updates the Classification table with data computed from the raw sequences for the given assay sample
234         * @param assaySample
235         */
236        public void updateClassificationForAssaySample( AssaySample assaySample ) {
237                if( !assaySample )
238                        return;
239               
240                // Clear hibernate session for otherwise the session will be out of sync with the
241                // database
242                def hibernateSession = sessionFactory.getCurrentSession();
243                hibernateSession.flush();
244                hibernateSession.clear();
245
246                // Remove all Classifications for the given assaySample
247                Classification.executeUpdate( "DELETE FROM Classification WHERE assaySample = :assaySample", [ 'assaySample': assaySample ] );
248
249                // The process below might seem a bit complicated. If we just search the sequence table for COUNT(*) for each taxon,
250                // we would only insert statistics for the leaf nodes in the database.
251                //
252                // For example, if we have the following sequences:
253                //              sequence1               Bacteria -> Firmicutes -> Clostridia -> (unclassified)
254                //              sequence2               Bacteria -> (unclassified)
255                //
256                // The statement will insert values for
257                //              Clostridia: 1
258                //              Bacteria: 2
259                //
260                // and nothing for firmicutes. Of course, these values can be computed at any time, since
261                // the taxonomy tree is known. Since these values are used for searching and exporting (e.g. show all samples
262                // where firmicutes are present), it is essential that the extra values are also added.
263
264                // First find the number of sequences that have been classified as a specific taxon or a subtaxon of that taxon
265                def numTotals = Taxon.executeQuery( "SELECT t.id, count(*) FROM Taxon t, Sequence s LEFT JOIN s.classification t2 WHERE t2.lft >= t.lft AND t2.rgt <= t.rgt AND s.sequenceData.sample = :assaySample GROUP BY t.id ORDER BY t.id", [ "assaySample": assaySample ] );
266               
267                // Also find the number of sequences that have been classified as that specific taxon, but not further down (numUnClassified)
268                def numUnclassifieds = Sequence.executeQuery( "SELECT s.classification.id, count(*) FROM Sequence s WHERE s.sequenceData.sample = :assaySample GROUP BY s.classification.id ORDER BY s.classification.id", [ "assaySample": assaySample ]);
269
270                def unClassifiedPointer = 0;
271                def unClassifiedsSize = numUnclassifieds.size()
272               
273                // Combine the two lists. The numTotals list is the full list, and the numUnclassifieds list should be entered in it
274                def insertQuadriples = []       // assay_sample_id, taxon_id, unclassified, total
275                numTotals.each { lineTotal ->
276                        def line = [ assaySample.id, lineTotal[ 0 ], 0, lineTotal[ 1 ] ];
277                       
278                        // Check whether we have an entry in numUnclassifieds. If not, the number of unclassified sequences for this taxon is zero
279                        if( unClassifiedPointer < unClassifiedsSize && numUnclassifieds[ unClassifiedPointer ][ 0 ] == lineTotal[ 0 ] ) {
280                                line[ 2 ] = numUnclassifieds[ unClassifiedPointer ][ 1 ];
281                                unClassifiedPointer++;
282                        }
283                       
284                        insertQuadriples << line
285                }
286               
287                // Insert the quadriples into the database
288                if( insertQuadriples.size() > 0 ) {
289                        // Unfortunately, Hibernate (through HQL) can't handle bulk insertion if a table has a
290                        // composite id (Classification). See http://opensource.atlassian.com/projects/hibernate/browse/HHH-3434
291                        // For that reason we execute plain SQL
292                        def connection = sessionFactory.currentSession.connection()
293                        def statement = connection.createStatement();
294       
295                        // Insert statistics for all sequences and taxa. All values in the quadriples list are numbers,
296                        // so no escaping has to be done.
297                        def sql  = "INSERT INTO classification (assay_sample_id, taxon_id, unclassified, total) VALUES";
298                            sql += insertQuadriples.collect { "( " + it.join( ", " ) + " )" }.join( ", " );
299                               
300                        statement.executeUpdate( connection.nativeSQL( sql ) );
301                }
302        }
303
304        /**
305         * Updates the Classification table with data computed from the raw sequences for the given assay sample
306         * @param assaySample
307         */
308        public void updateClassificationForAssaySamples( List assaySamples, Closure onProgress = null ) {
309                if( !assaySamples )
310                        return;
311
312                       
313                assaySamples.each {
314                        updateClassificationForAssaySample( it );
315                       
316                        if( onProgress )
317                                onProgress( 1 );
318                }
319        }
320
321        /**
322         * Find a file that matches the parameters and is not already stored
323         * @param inputFiles
324         * @param type
325         * @param numLines
326         * @param alreadyStored
327         * @return
328         */
329        protected Map findMatchingClassificationFile( def inputFiles, String type, long numLines, def alreadyStored ) {
330                return inputFiles.find { it.type == type && it.numLines == numLines && !alreadyStored?.contains( it.filename ) };
331        }
332       
333        /**
334         * Retrieves a list of classifications for the given assaysamples
335         *
336         * @param assaySamples  List of assaysamples
337         * @param prefix                (optional) prefix that is prepended to the taxon name. e.g. '  ' can be used to indent every new level
338         * @return                              List with classification data
339         *
340         * The list is returned like:
341         *
342         * 0    0               root                                    12      0       15      16
343         * 1    0.1             Bacteria                                7               15      13
344         * 2    0.1.1   Firmicutes                              6                       11
345         * 2    0.1.2   Unclassified bacteria   1               15      2
346         * 1    0.2             Unclassified                    5                       3
347         *
348         * Where the columns are:
349         *      1.      Level number of the taxon on that line
350         *      2.      rank ID: identifier for the given line. The identifier is unique per output file, but can be reused in new output files.
351         *      3.      Taxon name
352         *      4.      Every column shows the number of sequences for that taxon in a an assaysample. The first column matches the first assaySample given etc.
353         */
354        def retrieveClassifications( def assaySamples, String prefix = "" ) {
355                // Structure to store the data in
356                def data = [];
357
358                // Retrieve classification information for all assaySamples
359                def classifications = Classification.executeQuery( "FROM Classification c LEFT JOIN c.taxon t LEFT JOIN c.assaySample a WHERE a IN (:assaySamples) ORDER BY t.lft, a.sample.name", [ "assaySamples": assaySamples ] );
360
361                // Find the maximum level present in this classification list
362                def levels = Classification.determineMinAndMaxLevels( assaySamples );
363                def maxLevel = levels[ 0 ]
364                def minLevel = levels[ 1 ]
365
366                // Determine the starting prefix and the starting counter for the rankId
367                def rankGlue = ".";
368                def startCounter = 1
369                def rootId = 0;
370                def rankPrefix = [:]; rankPrefix [ minLevel ] = rootId;
371                def rankCounter = [:]; rankCounter[ 1 ] = startCounter;
372
373                // Closure to retrieve a new id for the given level
374                def rankId = { level ->
375                        def rank = rankPrefix[ level ] + rankGlue + rankCounter[ level ];
376                        rankCounter[ level ]++;
377                        rankPrefix[ ( level + 1 ) ] = rank
378                        rankCounter[ ( level + 1 ) ] = startCounter
379                       
380                        return rank;
381                }
382               
383                def currentLine = [];
384                def hasValues = false;
385               
386                def currentTaxonId = -1;
387                def currentLevel = -1;
388               
389                def unClassifiedLine = [];
390                def unClassifiedLines = [:]
391                def hasUnClassified = false;
392               
393                def numAssaySamples = assaySamples.size();
394               
395                // Determine index of assaysample in list
396                def assaySampleIndex = [:]
397                for( def i = 0; i < assaySamples.size(); i++ ) {
398                        assaySampleIndex[ assaySamples[ i ].id ] = i;
399                }
400               
401                // Append classifications entry to the list with details about the numbers of unclassified sequences
402                //  AND a.id IN (:assaySampleIds)
403                def extraClassifications = Classification.executeQuery( "SELECT a.id, SUM( c.unclassified ), (SELECT SUM(sd.numSequences) FROM SequenceData sd WHERE sd.sample = a) FROM AssaySample a, Classification c WHERE c.assaySample = a AND  a IN (:assaySamples) GROUP BY a.id", [ "assaySamples": assaySamples ] );
404                def rootLine = [ minLevel - 1, rootId, "root" ] + [ 0 ] * assaySamples.size();
405                extraClassifications.each {
406                        // Add a classification in order to show the 'unclassified' values
407                        classifications << [
408                                [ 'total': it[ 2 ], 'unclassified': it[ 2 ] - it[ 1 ] ],
409                                ['id':0,'level': minLevel - 1, 'name': ""],
410                                [ 'id': it[ 0 ] ]
411                        ]
412                       
413                        // Create a root line with total numbers of sequences for the given sample
414                        rootLine[ assaySampleIndex[ it[ 0 ] ] + 3 ] = it[ 2 ] ?: 0;
415                }
416                data << rootLine;
417               
418                // Adding a line to ensure that the totals will be shown
419                classifications << [ null, ['id':-1,'level':-1, 'name': "unclassified"], [ 'id': 0 ] ]
420               
421                classifications.each { line ->
422                        def classification = line[ 0 ];
423                        def taxon = line[ 1 ];
424                        def assaySample = line[ 2 ];
425                       
426                        if( taxon.id != currentTaxonId ) {
427                                // Start with a new taxon.
428                                // Save the unclassified values for the previous level. These have to be shown it the subtree
429                                // underneath the previous taxon is finished.
430                                if( hasUnClassified )
431                                        unClassifiedLines[ currentLevel ] = unClassifiedLine
432                               
433                                // If a line was generated, add it to the list
434                                if( currentLine && hasValues && currentLevel >= minLevel )
435                                        data << currentLine;
436                                       
437                                // First see whether a unclassified line must be put in place
438                                // by checking whether a unClassifiedLine for the next level is given
439                                if( taxon.level <= currentLevel ) {
440                                        // Check which 'unclassified' lines must be shown. Every line of a subtree that is being closed
441                                        // should be shown. 'unclassified' at the highest level should be shown first
442                                        currentLevel.downto( taxon.level ) { lvl ->
443                                                if( unClassifiedLines[ lvl ] ) {
444                                                        // Each unclassified line should be repeated up to the highest level, in order
445                                                        // to have the same number of sequences at every level.
446                                                        def ucl = unClassifiedLines.remove( lvl );
447                                                        lvl.upto( maxLevel - 1 ) { unclassifiedLevel ->
448                                                                def name = ( prefix * ( unclassifiedLevel + 1 ) ) + ucl[ 2 ]
449                                                                def newLine = [ unclassifiedLevel + 1 ] + rankId( unclassifiedLevel + 1 ) + name + ucl[3..-1];
450                                                                data << newLine;
451                                                        }
452                                                }
453                                        }
454                                }
455
456                                // Determine the rankID for the given taxon. It is composed of the parentTaxon with a counter
457                                def rank;
458                               
459                                if( taxon.level >= minLevel ) {
460                                        rank = rankId( taxon.level )
461                                }
462                               
463                                // Create a new line, because we arrived at a new taxon
464                                currentLine = [ taxon.level, rank, ( prefix * Math.max( 0, taxon.level ) ) + taxon.name ]
465                                hasValues = false;
466                               
467                                // Create a corresponding 'unclassified' line. This line will be saved for the end of the subtree
468                                // The rank for the unclassified line is determined later on, because it depends on the subtree
469                                unClassifiedLine = [ taxon.level + 1, "", "Unclassified (" + taxon.level + ") " + taxon.name  ]
470                               
471                                // Create as many entries in the list as there are samples
472                                numAssaySamples.times {
473                                        currentLine << ""
474                                        unClassifiedLine << ""
475                                }
476                               
477                                // Update taxonId and level
478                                currentTaxonId = taxon.id;
479                                currentLevel = taxon.level;
480                                hasUnClassified = false;
481                        }
482                       
483                        // Find index of this assaySample in the list, and save the classification total and unclassified value in the correct column
484                        def index = assaySampleIndex[ assaySample.id ];
485                        if( index != null ) {
486                                currentLine[ index + 3 ] = classification.total ?: "";
487                                unClassifiedLine[ index + 3 ] = classification.unclassified ?: "";
488                       
489                                // Set flag whether this line has to be included or not
490                                if( classification.total > 0 )
491                                        hasValues = true;
492                               
493                                // Set flag whether unclassified values have been found for this taxon. These values
494                                // are only saved for levels < maxLevel, since at the highest level, all sequences are
495                                // unclassified at a higher level
496                                if( classification.unclassified > 0 && taxon.level < maxLevel )
497                                        hasUnClassified = true;
498                        }
499                }
500               
501                return data;
502        }
503       
504        /**
505         * Exports all known data about the classifications of these samples to an excel file
506         * @param assaySamples  Assaysamples to export information about
507         * @param stream                Outputstream to write the data to
508         * @return
509         */
510        def exportClassifications( def assaySamples, OutputStream stream ) {
511                if( assaySamples == null )
512                        assaySamples = []
513
514                // Create csv file; sheetIndex is ignored
515                def sheetIndex = 0;
516
517                // Create headerrow
518                def headers = [ "level", "rankId", "taxon" ];
519                def ids = [];
520                assaySamples.each { 
521                        headers << it.sample.name 
522                        ids << it.id
523                }
524               
525                def data = retrieveClassifications( assaySamples );
526               
527                // Create an excel sheet
528                def wb = csvService.create();
529
530                // Put the headers on the first row
531                csvService.writeHeader( wb, headers, sheetIndex );
532                csvService.writeData( wb, data, sheetIndex, 1 );
533
534                // Write the data to the output stream
535                csvService.output( wb, stream );
536
537                return true;
538        }
539       
540}
Note: See TracBrowser for help on using the repository browser.