source: trunk/grails-app/services/nl/tno/massSequencing/ClassificationService.groovy @ 60

Last change on this file since 60 was 60, checked in by robert@…, 10 years ago
  • Improved speed by using even more ajax calls in pagination
  • Improved importing taxonomy files, changed mysql specific statements
  • Implemented taxonomy output
File size: 17.8 KB
Line 
1package nl.tno.massSequencing
2
3import java.io.BufferedWriter;
4import java.io.File;
5import java.io.OutputStream;
6import java.io.Writer;
7import java.util.ArrayList;
8import java.util.List;
9import org.codehaus.groovy.grails.commons.ConfigurationHolder
10import org.hibernate.StatelessSession
11import org.hibernate.Transaction
12import org.hibernate.engine.SessionImplementor
13import java.util.zip.*
14import nl.tno.massSequencing.classification.*
15
16class ClassificationService implements nl.tno.massSequencing.imports.Importer {
17        def fileService
18        def csvService
19        def sessionFactory
20
21        static transactional = false
22
23        /**
24         * Determines whether a file can be processed.
25         * @param filetype      Filetype of the file
26         * @see FileService.determineFileType()
27         * @return
28         */
29        public boolean canParseFileType( String filetype ) {
30                switch( filetype ) {
31                        case "groups":
32                        case "taxonomy":
33                                return true;
34                        default:
35                                return false;
36                }
37        }
38
39        /**
40         * Parses the given GROUPS or TAXONOMY file
41         *
42         * @param file                  File to parse
43         * @param filetype              Type of the given file
44         * @param onProgress    Closure to execute when progress indicators should be updated.
45         *                                              Has 2 parameters: numFilesProcessed and numBytesProcessed that indicate the number
46         *                                              of files and bytes that have been processed in this file (so the first parameter should
47         *                                              only be 1 when the file is finished)
48         *
49         * @return                              List structure. Examples:
50         *
51         *   [ success: true, filename: 'abc.fasta', type: 'fasta', numSequences: 200 ]
52         *   [ success: true, filename: 'abc.qual', type: 'qual', numSequences: 200, avgQuality: 36 ]
53         *   [ success: false, filename: 'abc.txt', type: 'txt', message: 'Filetype could not be parsed.' ]
54         */
55        public Map parseFile( File file, String filetype, Closure onProgress ) {
56                switch( filetype ) {
57                        case "groups":
58                        case "taxonomy":
59                                def fileInfo = countLines( file, onProgress );
60                                return [ success: true, filename: file.getName(), type: filetype, numLines: fileInfo.numLines, filesize: fileInfo.fileSize ]
61                        default:
62                                onProgress( file.length(), 0 );
63                                return [ success: false, type: filetype, message: 'Filetype could not be parsed.' ]
64                }
65        }
66
67        /**
68         * Parses the given GROUPS file
69         * @param file                  File to parse
70         * @param onProgress    Closure to execute when progress indicators should be updated.
71         *                                              Has 2 parameters: numFilesProcessed and numBytesProcessed that indicate the number
72         *                                              of files and bytes that have been processed in this file (so the first parameter should
73         *                                              only be 1 when the file is finished)
74         * @return                              List structure. Examples:
75         *
76         *   [ success: true, filename: 'abc.fasta', type: 'fasta', numSequences: 200 ]
77         *   [ success: false, filename: 'def.fasta', type: 'fasta', message: 'File is not a valid FASTA file' ]
78         */
79        protected Map countLines( File file, Closure onProgress ) {
80
81                long startTime = System.nanoTime();
82                log.trace "Start counting lines for " + file.getName()
83
84                // Count the number of lines. The real processing is done later on, because
85                // the GROUPS and TAXONOMY files must be handled together
86                long numLines = 0;
87                long bytesProcessed = 0;
88
89                file.eachLine { line ->
90                        numLines++;
91
92                        // Update progress every time 100kB is processed
93                        bytesProcessed += line.size();
94                        if( bytesProcessed > 100000 ) {
95                                onProgress( bytesProcessed, 0 );
96                                bytesProcessed = 0;
97                        }
98                }
99
100                // Update progress and say we're finished
101                onProgress( bytesProcessed, 0 );
102
103                log.trace "Finished counting lines for " + file.getName() + ": " + ( System.nanoTime() - startTime ) / 1000000L
104
105                return [ numLines: numLines, fileSize: file.size() ];
106        }
107
108        /**
109         * Removes all classification sequence objects for a given SequenceData object
110         * @param sequenceData 
111         * @return      True if the deletion has succeeded
112         */
113        public boolean removeClassificationForSequenceData( SequenceData sequenceData ) {
114                if( sequenceData ) {
115                        Sequence.executeUpdate( "DELETE FROM Sequence s WHERE s.sequenceData = ?", [sequenceData] );
116                        return true;
117                } else {
118                        return false;
119                }
120        }
121
122        /**
123         * Store the classification from the given files
124         * @param groupsFile    Filename
125         * @param taxonomyFile
126         * @return
127         */
128        public Map storeClassification( String taxonomyFilename, SequenceData sequenceData, Closure onProgress ) {
129                def hibernateSession = sessionFactory.getCurrentSession();
130                hibernateSession.flush();
131                hibernateSession.clear();
132               
133                File taxonomyFile = fileService.get( taxonomyFilename );
134
135                if( !taxonomyFile ) {
136                        return [ success: false, type: 'taxonomy', filename: groupsFilename, message: 'Taxonomy file doesn\'t exist.' ]
137                }
138               
139                // Open files for reading and create temporary variables
140                def taxonomyReader = taxonomyFile.newReader();
141
142                def parts, sequenceName, classification, sampleName, taxa, taxon, line, groupsLine
143
144                def taxonRegex = /"?([^"(]+)"?(\(\d+\))?/
145                def startLevel = 1    // The first level that is present in the file. Sometimes, the root element (level 0) is not mentioned in the file, in that case, this value should be set to 1
146                def unclassified = "unclassified"
147
148                def i = 0;
149                def start = System.currentTimeMillis();
150                def lapTime = start;
151
152                // Create a stateless session in order to speed up inserts
153                StatelessSession statelessSession = sessionFactory.openStatelessSession();
154               
155                // For each line in the taxonomy file, read the sequence and save it
156                def discardedSequences = 0;
157                def importedSequences = 0;
158                long bytesProcessed = 0;
159               
160                try {
161                        while( ( line = taxonomyReader.readLine() ) ) {
162                                bytesProcessed += line.size();
163       
164                                // Find the taxon for this line
165                                parts = line.tokenize( "\t" );
166       
167                                if( parts.size() != 2 ) {
168                                        // Skip this line because it is incorrect
169                                        continue;
170                                }
171       
172                                sequenceName = parts[ 0 ];
173                                classification = parts[ 1 ];
174       
175                                // Split classification to check whether the taxon already exists
176                                taxa = classification.tokenize( ';' ).collect { it.replaceAll( taxonRegex, '$1' ) }.findAll { it && it != unclassified }
177                               
178                                // Find taxon or create one if needed
179                                taxon = Taxon.findOrCreateTaxonByPath( taxa, startLevel );
180       
181                                if( taxon ) {
182                                        // Create a new sequence record
183                                        def s = new Sequence()
184                                        s.name = sequenceName
185                                        s.classification = taxon
186                                        s.sequenceData = sequenceData
187       
188                                        statelessSession.insert(s);
189                                } else {
190                                        log.trace( "" + i + " Sequence " + sequenceName + " not imported because it is unclassified." )
191                                }
192       
193                                if( i % 80 == 0 ) {
194                                        onProgress( bytesProcessed );
195                                        bytesProcessed = 0;
196                                }
197       
198                                importedSequences++;
199       
200                                i++;
201                        }
202                       
203                } catch( Exception e ) {
204                        throw e;
205                } finally {
206                        // Flush the last bit of inserts. Unfortunately, this is not automatically done when jdbc batch_size > 1
207                        // See also http://opensource.atlassian.com/projects/hibernate/browse/HHH-4042
208                        SessionImplementor sessionImpl = (SessionImplementor) statelessSession;
209                        sessionImpl.getBatcher().executeBatch();
210               
211                        // Close the session and the reader
212                        statelessSession.close();
213                        taxonomyReader.close();
214                }
215
216                onProgress( bytesProcessed );
217
218                return [ filename: taxonomyFilename, type: 'taxonomy', success: true, importedSequences: importedSequences, discardedSequences: discardedSequences ]
219        }
220
221        /**
222         * Updates the Classification table with data computed from the raw sequences for the given assay sample
223         * @param assaySample
224         */
225        public void updateClassificationForAssaySample( AssaySample assaySample ) {
226                if( !assaySample )
227                        return;
228                       
229                updateClassificationForAssaySamples( [ assaySample ] );
230        }
231
232        /**
233         * Updates the Classification table with data computed from the raw sequences for the given assay sample
234         * @param assaySample
235         */
236        public void updateClassificationForAssaySamples( List assaySamples ) {
237                if( !assaySamples )
238                        return;
239
240                // Clear hibernate session for otherwise the session will be out of sync with the
241                // database
242                def hibernateSession = sessionFactory.getCurrentSession();
243                hibernateSession.flush();
244                hibernateSession.clear();
245
246                // Remove all Classifications for the given assaySample
247                Classification.executeUpdate( "DELETE FROM Classification WHERE assaySample IN (:assaySamples)", [ 'assaySamples': assaySamples ] );
248
249                // Update classification table with data from sequences
250
251                // Unfortunately, Hibernate (through HQL) can't handle bulk insertion if a table has a
252                // composite id (Classification). See http://opensource.atlassian.com/projects/hibernate/browse/HHH-3434
253                // For that reason we execute plain SQL
254                def connection = sessionFactory.currentSession.connection()
255                def statement = connection.createStatement();
256
257                // This statements searches within the sequences for every taxon that is mentioned (as a leaf-node in the tree)
258                // and inserts statistics about those taxons into the classification table.
259                //
260                // However, if we have the following sequences:
261                //              sequence1               Bacteria -> Firmicutes -> Clostridia -> (unclassified)
262                //              sequence2               Bacteria -> (unclassified)
263                //
264                // The statement will insert values for
265                //              Clostridia: 1
266                //              Bacteria: 2
267                //
268                // and nothing for firmicutes. Of course, these values can be computed at any time, since
269                // the taxonomy tree is known. Since these values are used for searching and exporting (e.g. show all samples
270                // where firmicutes are present), it is essential that the extra values are also added. This is done in the second statement
271
272                // Insert statistics for all sequences and (leaf) taxa
273                statement.executeUpdate( connection.nativeSQL( """
274                        INSERT INTO     classification (assay_sample_id, taxon_id, unclassified, total)
275                        SELECT          sa.id, s.classification_id, count( * ),
276                                                (
277                                                        SELECT          count( * )
278                                                        FROM            sequence s2
279                                                        LEFT JOIN       taxonomy t2 ON s2.classification_id = t2.id
280                                                        LEFT JOIN       sequence_data sd2 ON s2.sequence_data_id = sd2.id
281                                                        LEFT JOIN       assay_sample sa2 ON sd2.sample_id = sa2.id
282                                                        WHERE           sa2.id = sa.id AND t2.lft >= t.lft AND t2.rgt <= t.rgt
283                                                )
284                        FROM            sequence s
285                        LEFT JOIN       taxonomy t ON s.classification_id = t.id
286                        LEFT JOIN       sequence_data sd ON s.sequence_data_id = sd.id
287                        LEFT JOIN       assay_sample sa ON sd.sample_id = sa.id
288                        WHERE           sa.id IN (""" + assaySamples.id.join( ", " ) + """ )
289                        GROUP BY        sa.id, s.classification_id, t.lft, t.rgt
290           """ ) );
291
292                // Update the table with missing values
293                statement.executeUpdate( connection.nativeSQL( """
294                INSERT INTO     classification (assay_sample_id, taxon_id, unclassified, total)
295                SELECT * FROM (
296                        SELECT          a.id as assay_sample_id, t.id as taxon_id, 0 as unclassified, 
297                                                (
298                                                        SELECT          sum( c.unclassified )
299                                                        FROM            classification c
300                                                        LEFT JOIN       taxonomy t3 ON c.taxon_id = t3.id
301                                                        WHERE           c.assay_sample_id = a.id
302                                                                AND     t3.lft >= t.lft
303                                                                AND     t3.rgt <= t.rgt
304                                                ) AS total
305                        FROM            taxonomy t, assay_sample a
306                        WHERE           a.id IN (""" + assaySamples.id.join( ", " ) + """ )
307                        AND             t.id NOT IN (
308                                                        SELECT          t2.id
309                                                        FROM            classification c
310                                                        LEFT JOIN       taxonomy t2 ON c.taxon_id = t2.id
311                                                        WHERE           c.assay_sample_id = a.id
312                                                )
313                ) missingClassifications WHERE missingClassifications.total IS NOT NULL
314          """ ) );
315        }
316
317        /**
318         * Find a file that matches the parameters and is not already stored
319         * @param inputFiles
320         * @param type
321         * @param numLines
322         * @param alreadyStored
323         * @return
324         */
325        protected Map findMatchingClassificationFile( def inputFiles, String type, long numLines, def alreadyStored ) {
326                return inputFiles.find { it.type == type && it.numLines == numLines && !alreadyStored?.contains( it.filename ) };
327        }
328
329        /**
330         * Exports all known data about the classifications of these samples to an excel file
331         * @param assaySamples  Assaysamples to export information about
332         * @param stream                        Outputstream to write the data to
333         * @return
334         */
335        def exportClassifications( def assaySamples, OutputStream stream ) {
336                if( assaySamples == null )
337                        assaySamples = []
338
339                // Create csv file; sheetIndex is ignored
340                def sheetIndex = 0;
341
342                // Create headerrow
343                def headers = [ "level", "rankId", "taxon" ];
344                def ids = [];
345                assaySamples.each {
346                        headers << it.sample.name
347                        ids << it.id
348                }
349               
350                // Structure to store the data in
351                def data = [];
352
353                // Retrieve classification information for all assaySamples
354                def classifications = Classification.executeQuery( "FROM Classification c LEFT JOIN c.taxon t LEFT JOIN c.assaySample a WHERE a IN (:assaySamples) ORDER BY t.lft, a.sample.name", [ "assaySamples": assaySamples ] );
355
356                // Find the maximum level present in this classification list
357                def levels = Classification.executeQuery( "SELECT MAX(t.level), MIN(t.level) FROM Classification c LEFT JOIN c.taxon t LEFT JOIN c.assaySample a WHERE a IN (:assaySamples)", [ "assaySamples": assaySamples ] )[ 0 ];
358                def maxLevel = levels[ 0 ]
359                def minLevel = levels[ 1 ]
360
361                // Determine the starting prefix and the starting counter for the rankId
362                def rankGlue = ".";
363                def startCounter = 1
364                def rootId = 0;
365                def rankPrefix = [:]; rankPrefix [ minLevel ] = rootId;
366                def rankCounter = [:]; rankCounter[ 1 ] = startCounter;
367
368                // Closure to retrieve a new id for the given level
369                def rankId = { level ->
370                        def rank = rankPrefix[ level ] + rankGlue + rankCounter[ level ];
371                        rankCounter[ level ]++;
372                        rankPrefix[ ( level + 1 ) ] = rank
373                        rankCounter[ ( level + 1 ) ] = startCounter
374                       
375                        return rank;
376                }
377               
378                def currentLine = [];
379                def hasValues = false;
380               
381                def currentTaxonId = -1;
382                def currentLevel = -1;
383               
384                def unClassifiedLine = [];
385                def unClassifiedLines = [:]
386                def hasUnClassified = false;
387               
388                def numAssaySamples = assaySamples.size();
389               
390                // Determine index of assaysample in list
391                def assaySampleIndex = [:]
392                for( def i = 0; i < assaySamples.size(); i++ ) {
393                        assaySampleIndex[ assaySamples[ i ].id ] = i;
394                }
395               
396                // Append classifications entry to the list with details about the numbers of unclassified sequences
397                //  AND a.id IN (:assaySampleIds)
398                def extraClassifications = Classification.executeQuery( "SELECT a.id, SUM( c.unclassified ), (SELECT SUM(sd.numSequences) FROM SequenceData sd WHERE sd.sample = a) FROM AssaySample a, Classification c WHERE c.assaySample = a AND  a IN (:assaySamples) GROUP BY a.id", [ "assaySamples": assaySamples ] );
399                def rootLine = [ minLevel - 1, rootId, "root" ]
400                extraClassifications.each {
401                        // Add a classification in order to show the 'unclassified' values
402                        classifications << [
403                                [ 'total': it[ 2 ], 'unclassified': it[ 2 ] - it[ 1 ] ],
404                                ['id':0,'level': minLevel - 1, 'name': ""],
405                                [ 'id': it[ 0 ] ]
406                        ]
407                       
408                        // Create a root line
409                        rootLine[ assaySampleIndex[ it[ 0 ] ] + 3 ] = it[ 2 ];
410                }
411                data << rootLine;
412               
413                // Adding a line to ensure that the totals will be shown
414                classifications << [ null, ['id':-1,'level':-1, 'name': "unclassified"], [ 'id': 0 ] ]
415               
416                classifications.each { line ->
417                        def classification = line[ 0 ];
418                        def taxon = line[ 1 ];
419                        def assaySample = line[ 2 ];
420                       
421                        if( taxon.id != currentTaxonId ) {
422                                // Start with a new taxon.
423                                // Save the unclassified values for the previous level. These have to be shown it the subtree
424                                // underneath the previous taxon is finished.
425                                if( hasUnClassified )
426                                        unClassifiedLines[ currentLevel ] = unClassifiedLine
427                               
428                                // If a line was generated, add it to the list
429                                if( currentLine && hasValues && currentLevel >= minLevel )
430                                        data << currentLine;
431                                       
432                                // First see whether a unclassified line must be put in place
433                                // by checking whether a unClassifiedLine for the next level is given
434                                if( taxon.level <= currentLevel ) {
435                                        // Check which 'unclassified' lines must be shown. Every line of a subtree that is being closed
436                                        // should be shown. 'unclassified' at the highest level should be shown first
437                                        currentLevel.downto( taxon.level ) { lvl ->
438                                                if( unClassifiedLines[ lvl ] ) {
439                                                        // Each unclassified line should be repeated up to the highest level, in order
440                                                        // to have the same number of sequences at every level.
441                                                        def ucl = unClassifiedLines.remove( lvl );
442                                                        lvl.upto( maxLevel - 1 ) { unclassifiedLevel ->
443                                                                def newLine = [ unclassifiedLevel + 1 ] + rankId( unclassifiedLevel + 1 ) + ucl[2..-1];
444                                                                data << newLine;
445                                                        }
446                                                }
447                                        }
448                                }
449
450                                // Determine the rankID for the given taxon. It is composed of the parentTaxon with a counter
451                                def rank;
452                               
453                                if( taxon.level >= minLevel ) {
454                                        rank = rankId( taxon.level )
455                                }
456                               
457                                // Create a new line, because we arrived at a new taxon
458                                currentLine = [ taxon.level, rank, taxon.name ]
459                                hasValues = false;
460                               
461                                // Create a corresponding 'unclassified' line. This line will be saved for the end of the subtree
462                                // The rank for the unclassified line is determined later on, because it depends on the subtree
463                                unClassifiedLine = [ taxon.level + 1, "", "Unclassified (" + taxon.level + ") " + taxon.name  ]
464                               
465                                // Create as many entries in the list as there are samples
466                                numAssaySamples.times {
467                                        currentLine << ""
468                                        unClassifiedLine << ""
469                                }
470                               
471                                // Update taxonId and level
472                                currentTaxonId = taxon.id;
473                                currentLevel = taxon.level;
474                                hasUnClassified = false;
475                        }
476                       
477                        // Find index of this assaySample in the list, and save the classification total and unclassified value in the correct column
478                        def index = assaySampleIndex[ assaySample.id ];
479                        if( index != null ) {
480                                currentLine[ index + 3 ] = classification.total ?: "";
481                                unClassifiedLine[ index + 3 ] = classification.unclassified ?: "";
482                       
483                                // Set flag whether this line has to be included or not
484                                if( classification.total > 0 )
485                                        hasValues = true;
486                               
487                                // Set flag whether unclassified values have been found for this taxon. These values
488                                // are only saved for levels < maxLevel, since at the highest level, all sequences are
489                                // unclassified at a higher level
490                                if( classification.unclassified > 0 && taxon.level < maxLevel )
491                                        hasUnClassified = true;
492                        }
493                }
494               
495                // Create an excel sheet
496                def wb = csvService.create();
497
498                // Put the headers on the first row
499                csvService.writeHeader( wb, headers, sheetIndex );
500                csvService.writeData( wb, data, sheetIndex, 1 );
501
502                // Write the data to the output stream
503                csvService.output( wb, stream );
504
505                return true;
506        }
507       
508}
Note: See TracBrowser for help on using the repository browser.