source: trunk/id-mapper/pmid2doi-mapper.py @ 7

Last change on this file since 7 was 7, checked in by rob.hooft@…, 6 years ago

some timing issues, and fix for different pmid_versions of same article

File size: 15.4 KB
Line 
1#!/usr/bin/env python
2#
3# This script maps the pmid to doi by querying the crossref API.
4#
5# For its data it connects to a MySQL database given in an ini file
6#
7# The input is a table containing a field of pmids
8# The output is a table of <pmid><doi>
9
10# Standard library imports
11import os
12import re
13import Queue
14import sys
15import threading
16import time
17
18# Project imports
19import crossreftitles
20
21# Where to find the configuration parameters
22configfn = 'mapper.ini'
23
24
25class Config(object):
26    def __init__(self):
27        import ConfigParser
28        self.parser = ConfigParser.SafeConfigParser()
29        if os.path.exists(configfn):
30            self.parser.read(configfn)
31        else:
32            sourcefolder = os.path.dirname(sys.argv[0])
33            self.parser.read(os.path.join(sourcefolder, configfn))
34
35    def mysql(self, key):
36        value = self.parser.get('mysql', key)
37        # "port" is an integer, the rest are strings
38        if key == 'port':
39            return int(value)
40        else:
41            return value
42
43    def crossref(self, key):
44        # All variables are strings
45        return self.parser.get('crossref', key)
46
47    def run(self, key):
48        # Some are booleans
49        if key in ('create_table', 'debug'):
50            return self.parser.getboolean('run', key)
51        # All others are integers
52        value = self.parser.get('run', key)
53        return int(value)
54
55# Open and read the configuration
56config = Config()
57
58debug = config.run('debug')
59if debug:
60    nthread = config.run('nr_thread_debug')
61else:
62    nthread = config.run('nr_thread')
63
64# Name of the database that we are creating
65ourdb = config.mysql('db_mapping')
66
67# Name of the input and outputtable in our database
68inputTable = ourdb + "." + config.mysql('table_unmapped')
69outputTable = ourdb + "." + config.mysql('table_mapped')
70
71# How many times a CrossRef API call should be retried if
72# there are strange HTTP errors
73nretry = config.run('nretry')
74
75# How often to report about the intermediate results
76reportinterval = config.run('reportinterval')
77
78# How to recognize a DOI in the CrossRef results
79DOIinPubMed = re.compile(r'<doi type="journal_article">(.+?)</doi>')
80
81
82def opendb():
83    """ Open a database connection."""
84    import MySQLdb
85    return MySQLdb.connect(host=config.mysql('host'),
86                           port=config.mysql('port'),
87                           user=config.mysql('username'),
88                           passwd=config.mysql('password'),
89                           db=config.mysql('db_mapping'))
90
91
92def log(message):
93    import datetime
94    now = datetime.datetime.now()
95    sys.stderr.write("%s: %s\n" % (now, message))
96    sys.stderr.flush()
97
98
99def logerr(errname):
100    log("*** %s encountered" % errname)
101    time.sleep(1)
102
103
104class RateLimit(object):
105    def __init__(self, mintime=1):
106        self.lasttime = 0
107        self.mintime = mintime
108
109    def __call__(self):
110        t = time.time()
111        if t < self.lasttime + self.mintime:
112            time.sleep(self.mintime - (self.lasttime - t))
113        self.lasttime = time.time()
114
115q = Queue.Queue()
116
117
118# Our threads
119class ChildThread(threading.Thread):
120   # Override Thread's __init__ method to accept the parameters needed:
121    def __init__(self, thread_ID):
122        # Open a connection to the database, and a cursor.
123        self.connection = opendb()
124        self.cursor = self.connection.cursor()
125        self.cursor.execute('set autocommit=1;')
126        #
127        self.thread_ID = thread_ID
128        self.ratelimit = RateLimit()
129        self.cnt = 0
130        self.indexskip = 0
131        # Call the original constructor
132        threading.Thread.__init__(self)
133        # Do not wait for this thread to terminate if the program is ready
134        self.daemon = True
135
136    def run(self):
137        while True:
138            row = q.get()
139            try:
140                pmid = row[0]
141                assert pmid != 'NULL'
142                lookupReturned = False
143                doi = self.lookup(row)
144                lookupReturned = True
145                if doi:
146                    # We found one!
147                    log("Mapped %s to %s" % (pmid, doi))
148                    self.handleNewPair(pmid, doi)
149                else:
150                    self.markAsTried(pmid)
151            finally:
152                if not lookupReturned:
153                    sys.stderr.write("ERROR: Lookup fails %s\n" % str(row))
154                    self.tryAgainInAFewHours(pmid)
155                q.task_done()
156
157    def lookup(self, row):
158        if debug:
159            print "DBG> row:", row
160        if row is None:
161            return None
162        if self.cnt % reportinterval == 0:
163            log("Thread %2d : count=%d (skipped=%d)" % (self.thread_ID, self.cnt, self.indexskip))
164        self.cnt += 1
165
166        # Get the information about the pubmed article from the database row
167        aulast, journal, volume, issue, startpage, pubyear = unpackRow(row)
168
169        if journal.lower() == 'biochimica et biophysica acta':
170            # Biochimica et Biophysica acta (sic in pubmed) are actually several different journals
171            # that we need to try each in crossref. Luckily we can check via the crossref indexed
172            # titles document which volume occurs in which journal name, saving a lot of time.
173            jlist = crossreftitles.bbatitles(volume)
174            if len(jlist) == 0:
175                sys.stderr.write("ERROR: No BBA title for volume %s\n" % repr(row))
176                return None
177        elif journal.lower() == 'mutation research':
178            # Mutation Research (sic in pubmed) are actually several different journals
179            # that we need to try each in crossref. Luckily we can check via the crossref indexed
180            # titles document which volume occurs in which journal name, saving a lot of time.
181            jlist = crossreftitles.mrtitles(volume)
182            if len(jlist) == 0:
183                sys.stderr.write("ERROR: No MR title for volume %s\n" % repr(row))
184                return None
185        elif crossreftitles.lookup(journal) and not crossreftitles.lookup2(journal, pubyear):
186            # If the journal was found, but the year was not found....
187            if debug:
188                print "DBG> %s not indexed for %s" % (pubyear, journal)
189            self.indexskip += 1
190            return None
191        else:
192            # Only the "pubmed" name of the journal to be considered for crossref
193            jlist = [journal]
194        if debug:
195            print "DBG> journals:", jlist
196        for journal in jlist:
197            # Make sure we do not go too fast
198            self.ratelimit()
199
200            # Perform the lookup with crossref
201            dois = getDOI(aulast, journal, volume, issue, startpage, pubyear)
202            if dois:
203                return dois
204        return None
205
206    def handleNewPair(self, pmid, doi):
207        self.cursor.execute('set autocommit=0;')
208        try:
209            try:
210                # Insert the match in the match table
211                sqlinsert = ("INSERT INTO %s (pmid, doi)" % outputTable +
212                             " VALUES ('%s', '%s');" % (pmid, doi))
213                if debug:
214                    print "DBG> would have executed:", sqlinsert
215                else:
216                    self.cursor.execute(sqlinsert)
217       
218                # Remove the pmid from the table of unmatched entries
219                sqldelete = "DELETE FROM %s WHERE pmid = %s;" % (
220                    inputTable, str(pmid))
221                if debug:
222                    print "DBG> would have executed:", sqldelete
223                else:
224                    self.cursor.execute(sqldelete)
225            except:
226                print "ERROR: Got exception on moving the match"
227                self.connection.rollback()
228                raise
229            else:
230                self.connection.commit()
231        finally:
232            self.cursor.execute('set autocommit=1;')
233
234   
235    def markAsTried(self, pmid):
236        """ Mark the record with the given pubmed id as tried."""
237        if debug:
238            self.tryAgainInAFewHours(pmid)
239            return
240        sql = ("UPDATE %s " % inputTable +
241               "SET nexttrytime=(TO_SECONDS(NOW())+86400*7*(ntried+1))," +
242               "ntried=ntried+1 " +
243               "WHERE pmid=%s;" % pmid)
244        self.cursor.execute(sql)
245
246    def tryAgainInAFewHours(self, pmid):
247        """ Mark the record with the given pubmed id as tried, but only for a day and do not raise the try counter."""
248        sql = """UPDATE %s SET
249                 nexttrytime=(TO_SECONDS(NOW())+86400)
250                 WHERE pmid="%s";""" % (inputTable, pmid)
251        self.cursor.execute(sql)
252
253
254def unpackRow(row):
255    pmid = row[0]
256    assert pmid != 'NULL'
257    journal = row[1] if row[1] != None else ''
258    volume = row[2] if row[2] != None else ''
259    issue = row[3] if row[3] != None else ''
260    medline_pgn = row[4] if row[4] != None else ''
261    if ';' in medline_pgn:
262        # Solve pages like: '1447; author reply 1447-8'
263        medline_pgn = medline_pgn.split(";")[0]
264    startpage = medline_pgn.split("-")[0]
265    pubyear = row[5].year if row[5] != None else ''
266    aulast = row[6] if row[6] != None else ''
267    return aulast, journal, volume, issue, startpage, pubyear
268
269def getDOI(aulast, journal, volume, issue, startpage, pubyear):
270    """ Find the article with the specified characteristics in the CrossRef
271        database.
272    """
273    import httplib
274    import urllib
275    import urllib2
276
277    # Get the user ID for the access to the CrossRef API
278    PID = config.crossref('PID')
279    URL = config.crossref('api_url')
280
281    # Encode the query to the API
282    params = urllib.urlencode({'aulast': aulast,
283                               'pid': PID,
284                               'noredirect': '',
285                               'title': journal,
286                               'volume': volume,
287                               'issue': issue,
288                               'spage': startpage,
289                               'date': pubyear})
290    if debug:
291        print "DBG> params=", params
292    for ntries in range(nretry):
293        try:
294            u = urllib2.urlopen(URL + params, data=None, timeout=20)
295        except httplib.BadStatusLine:
296            logerr("httplib.BadStatusLine params="+params)
297        except urllib2.HTTPError:
298            logerr("urllib2.HTTPError params="+params)
299        except urllib2.URLError:
300            logerr("urllib2.URLError params="+params)
301        else:
302            break
303    else:
304        # If it was still not successful, raise the exception again.
305        sys.stderr.write("ERROR> Failed request for %s\n" % params)
306        sys.stderr.flush()
307        raise
308
309    xml = u.read()
310    if debug and False:
311        print "DBG> xml from crossref:", xml
312
313    dois = DOIinPubMed.findall(xml)
314
315    # Only return the result if it is unique.
316    if len(dois) == 0:
317        return None
318    elif len(dois) == 1:
319        return dois[0]
320    else:
321        if debug:
322            print "DBG> Number of hits in PubMed:", len(dois)
323        log("Error! more than 1 DOI found")
324        return None
325
326def _createTable(cursor):
327    """ just create the table structure.
328    """
329    cursor.execute("""CREATE TABLE %(inputTable)s (pmid int(11) NOT NULL DEFAULT '0' PRIMARY KEY,
330                                                   ntried bigint(20) DEFAULT 0,
331                                                   nexttrytime bigint(20) DEFAULT 0, 
332                                                   KEY nexttrytime (nexttrytime))
333                             engine=InnoDB;
334                   """ % {"inputTable": inputTable})
335
336def updateTable(cursor):
337    """ Update the input table of pubmed articles that still do not have a DOI
338        starting from the pubmed2013 database and the output table from earlier runs.
339    """
340    # Add new records
341    sql = """INSERT INTO %(inputTable)s 
342        (SELECT c.pmid, 0 as ntried, 0 as nexttrytime
343         FROM   %(medlineDB)s.medline_citation c
344         WHERE  c.pmid not in (SELECT pmid from %(outputTable)s)
345         AND    c.pmid_version=1
346         AND    c.pmid not in (SELECT pmid from %(inputTable)s));
347        """ % {"inputTable": inputTable, 
348               "medlineDB": config.mysql('db_medline'), 
349               "outputTable": outputTable}
350    log("Adding new records to the table (expect this to take 15 minutes)...")
351    cursor.execute(sql)
352    log("...done.")
353
354
355def tryOne(pmid):
356    connectionDB = opendb()
357
358    connectionDB.set_character_set('utf8')
359    cursor = connectionDB.cursor()
360    cursor.execute('set autocommit=1;')
361
362    print "pmid =", pmid
363    sql = """SELECT DISTINCT c.pmid, c.journal_title, c.volume, c.issue, c.medline_pgn, c.pub_date, a.last_name
364             FROM %(inputTable)s, %(medlineDB)s.medline_citation c, %(medlineDB)s.medline_author a
365             WHERE c.pmid = a.pmid AND c.pmid = %(inputTable)s.pmid AND a.author_order = '0'
366             AND %(inputTable)s.pmid = %(pmid)s AND a.pmid_version = c.pmid_version ORDER BY c.pmid_version DESC
367             LIMIT 1;
368          """ % {"inputTable": inputTable, 
369                 "medlineDB": config.mysql('db_medline'),
370                 "pmid": pmid}
371    cursor.execute(sql)
372    row = cursor.fetchone()
373    print "row =", row
374    aulast, journal, volume, issue, startpage, pubyear = unpackRow(row)
375    print aulast, journal, volume, issue, startpage, pubyear
376    dois = getDOI(aulast, journal, volume, issue, startpage, pubyear)
377    print "dois =", dois
378
379    db = config.mysql('db_medline')
380    sql = ("SELECT article_title,issn from %s.medline_citation " % db
381           + "where pmid=%s limit 1;" % pmid)
382    cursor.execute(sql)
383    row = cursor.fetchone()
384    print "title:", row[0]
385    print "issn:", row[1]
386
387
388def main():
389    log("Mapping PMID to DOIs started")
390
391    connectionDB = opendb()
392
393    connectionDB.set_character_set('utf8')
394    cursor = connectionDB.cursor()
395    cursor.execute('set autocommit=1;')
396
397    if config.run('create_table'):
398        _createTable(cursor)
399
400    if config.run('update_table'):
401        updateTable(cursor)
402
403    # Startup statistics
404    sql = ("SELECT COUNT(*) FROM %s " +
405           "WHERE nexttrytime<TO_SECONDS(NOW());") % inputTable
406    cursor.execute(sql)
407    ntodo = cursor.fetchone()[0]
408    print "There are %d pmids that need to be mapped." % ntodo
409    sys.stdout.flush()
410
411    thread_ID = 0
412
413    while True:
414        # Start nthread parallel threads, each reading from "q"
415        while threading.active_count() < nthread + 1:
416            thread_ID += 1
417            ChildThread(thread_ID).start()
418        # Make sure all work that was queued so far is done.
419        q.join()
420        # Put the best new work in the queue
421        sql = """SELECT %(inputTable)s.pmid, c.journal_title, c.volume, c.issue,
422                        c.medline_pgn, c.pub_date, a.last_name
423                 FROM %(inputTable)s
424                 INNER JOIN %(medlineDB)s.medline_citation c ON c.pmid = %(inputTable)s.pmid
425                 LEFT JOIN %(medlineDB)s.medline_author a ON a.pmid = c.pmid AND a.author_order = '0'
426                 WHERE nexttrytime<TO_SECONDS(NOW())
427                 ORDER BY nexttrytime LIMIT 100;
428              """ % {"inputTable": inputTable, 
429                     "medlineDB": config.mysql('db_medline')}
430        # This can result in duplicates if there are multiple pmid_versions, but selecting on those
431        # makes the query very, very slow.
432        cursor.execute(sql)
433        row = cursor.fetchone()
434        if row is None:
435            # No more things that need to be done "before now"
436            log("Nothing to do. Waiting a bit.")
437            time.sleep(600)
438        while row:
439            q.put(row)
440            row = cursor.fetchone()
441
442if __name__ == "__main__":
443    if len(sys.argv) == 2:
444        tryOne(sys.argv[1])
445    else:
446        main()
Note: See TracBrowser for help on using the repository browser.