source: trunk/id-mapper/pmid2doi-mapper.py @ 6

Last change on this file since 6 was 6, checked in by rob.hooft@…, 6 years ago

first cut at the pmid2doi upfdater

File size: 15.2 KB
Line 
1#!/usr/bin/env python
2#
3# This script maps the pmid to doi by querying the crossref API.
4#
5# For its data it connects to a MySQL database given in an ini file
6#
7# The input is a table containing a field of pmids
8# The output is a table of <pmid><doi>
9
10# Standard library imports
11import os
12import re
13import Queue
14import sys
15import threading
16import time
17
18# Project imports
19import crossreftitles
20
21# Where to find the configuration parameters
22configfn = 'mapper.ini'
23
24
25class Config(object):
26    def __init__(self):
27        import ConfigParser
28        self.parser = ConfigParser.SafeConfigParser()
29        if os.path.exists(configfn):
30            self.parser.read(configfn)
31        else:
32            sourcefolder = os.path.dirname(sys.argv[0])
33            self.parser.read(os.path.join(sourcefolder, configfn))
34
35    def mysql(self, key):
36        value = self.parser.get('mysql', key)
37        # "port" is an integer, the rest are strings
38        if key == 'port':
39            return int(value)
40        else:
41            return value
42
43    def crossref(self, key):
44        # All variables are strings
45        return self.parser.get('crossref', key)
46
47    def run(self, key):
48        # Some are booleans
49        if key in ('create_table', 'debug'):
50            return self.parser.getboolean('run', key)
51        # All others are integers
52        value = self.parser.get('run', key)
53        return int(value)
54
55# Open and read the configuration
56config = Config()
57
58debug = config.run('debug')
59if debug:
60    nthread = config.run('nr_thread_debug')
61else:
62    nthread = config.run('nr_thread')
63
64# Name of the database that we are creating
65ourdb = config.mysql('db_mapping')
66
67# Name of the input and outputtable in our database
68inputTable = ourdb + "." + config.mysql('table_unmapped')
69outputTable = ourdb + "." + config.mysql('table_mapped')
70
71# How many times a CrossRef API call should be retried if
72# there are strange HTTP errors
73nretry = config.run('nretry')
74
75# How often to report about the intermediate results
76reportinterval = config.run('reportinterval')
77
78# How to recognize a DOI in the CrossRef results
79DOIinPubMed = re.compile(r'<doi type="journal_article">(.+?)</doi>')
80
81
82def opendb():
83    """ Open a database connection."""
84    import MySQLdb
85    return MySQLdb.connect(host=config.mysql('host'),
86                           port=config.mysql('port'),
87                           user=config.mysql('username'),
88                           passwd=config.mysql('password'),
89                           db=config.mysql('db_mapping'))
90
91
92def log(message):
93    import datetime
94    now = datetime.datetime.now()
95    sys.stderr.write("%s: %s\n" % (now, message))
96
97
98def logerr(errname):
99    log("*** %s encountered" % errname)
100    time.sleep(1)
101
102
103class RateLimit(object):
104    def __init__(self, mintime=1):
105        self.lasttime = 0
106        self.mintime = mintime
107
108    def __call__(self):
109        t = time.time()
110        if t < self.lasttime + self.mintime:
111            time.sleep(self.mintime - (self.lasttime - t))
112        self.lasttime = time.time()
113
114q = Queue.Queue()
115
116
117# Our threads
118class ChildThread(threading.Thread):
119   # Override Thread's __init__ method to accept the parameters needed:
120    def __init__(self, thread_ID):
121        # Open a connection to the database, and a cursor.
122        self.connection = opendb()
123        self.cursor = self.connection.cursor()
124        self.cursor.execute('set autocommit=1;')
125        #
126        self.thread_ID = thread_ID
127        self.ratelimit = RateLimit()
128        self.cnt = 0
129        self.indexskip = 0
130        # Call the original constructor
131        threading.Thread.__init__(self)
132        # Do not wait for this thread to terminate if the program is ready
133        self.daemon = True
134
135    def run(self):
136        while True:
137            row = q.get()
138            try:
139                pmid = row[0]
140                assert pmid != 'NULL'
141                lookupReturned = False
142                doi = self.lookup(row)
143                lookupReturned = True
144                if doi:
145                    # We found one!
146                    log("Mapped %s to %s" % (pmid, doi))
147                    self.handleNewPair(pmid, doi)
148                else:
149                    self.markAsTried(pmid)
150            finally:
151                if not lookupReturned:
152                    sys.stderr.write("ERROR: Lookup fails %s\n" % str(row))
153                    self.tryAgainInAFewHours(pmid)
154                q.task_done()
155
156    def lookup(self, row):
157        if debug:
158            print "DBG> row:", row
159        if row is None:
160            return None
161        if self.cnt % reportinterval == 0:
162            log("Thread %2d : count=%d (skipped=%d)" % (self.thread_ID, self.cnt, self.indexskip))
163        self.cnt += 1
164
165        # Get the information about the pubmed article from the database row
166        aulast, journal, volume, issue, startpage, pubyear = unpackRow(row)
167
168        if journal.lower() == 'biochimica et biophysica acta':
169            # Biochimica et Biophysica acta (sic in pubmed) are actually several different journals
170            # that we need to try each in crossref. Luckily we can check via the crossref indexed
171            # titles document which volume occurs in which journal name, saving a lot of time.
172            jlist = crossreftitles.bbatitles(volume)
173            if len(jlist) == 0:
174                sys.stderr.write("ERROR: No BBA title for volume %s\n" % repr(row))
175                return None
176        elif journal.lower() == 'mutation research':
177            # Mutation Research (sic in pubmed) are actually several different journals
178            # that we need to try each in crossref. Luckily we can check via the crossref indexed
179            # titles document which volume occurs in which journal name, saving a lot of time.
180            jlist = crossreftitles.mrtitles(volume)
181            if len(jlist) == 0:
182                sys.stderr.write("ERROR: No MR title for volume %s\n" % repr(row))
183                return None
184        elif crossreftitles.lookup(journal) and not crossreftitles.lookup2(journal, pubyear):
185            # If the journal was found, but the year was not found....
186            if debug:
187                print "DBG> %s not indexed for %s" % (pubyear, journal)
188            self.indexskip += 1
189            return None
190        else:
191            # Only the "pubmed" name of the journal to be considered for crossref
192            jlist = [journal]
193        if debug:
194            print "DBG> journals:", jlist
195        for journal in jlist:
196            # Make sure we do not go too fast
197            self.ratelimit()
198
199            # Perform the lookup with crossref
200            dois = getDOI(aulast, journal, volume, issue, startpage, pubyear)
201            if dois:
202                return dois
203        return None
204
205    def handleNewPair(self, pmid, doi):
206        self.cursor.execute('set autocommit=0;')
207        try:
208            try:
209                # Insert the match in the match table
210                sqlinsert = ("INSERT INTO %s (pmid, doi)" % outputTable +
211                             " VALUES ('%s', '%s');" % (pmid, doi))
212                if debug:
213                    print "DBG> would have executed:", sqlinsert
214                else:
215                    self.cursor.execute(sqlinsert)
216       
217                # Remove the pmid from the table of unmatched entries
218                sqldelete = "DELETE FROM %s WHERE pmid = %s;" % (
219                    inputTable, str(pmid))
220                if debug:
221                    print "DBG> would have executed:", sqldelete
222                else:
223                    self.cursor.execute(sqldelete)
224            except:
225                self.connection.rollback()
226                raise
227            else:
228                self.connection.commit()
229        finally:
230            self.cursor.execute('set autocommit=1;')
231
232   
233    def markAsTried(self, pmid):
234        """ Mark the record with the given pubmed id as tried."""
235        if debug:
236            self.tryAgainInAFewHours(pmid)
237            return
238        sql = ("UPDATE %s " % inputTable +
239               "SET nexttrytime=(TO_SECONDS(NOW())+86400*7*(ntried+1))," +
240               "ntried=ntried+1 " +
241               "WHERE pmid=%s;" % pmid)
242        self.cursor.execute(sql)
243
244    def tryAgainInAFewHours(self, pmid):
245        """ Mark the record with the given pubmed id as tried, but only for a few hours and do not raise the try counter."""
246        sql = """UPDATE %s SET
247                 nexttrytime=(TO_SECONDS(NOW())+10800)
248                 WHERE pmid="%s";""" % (inputTable, pmid)
249        self.cursor.execute(sql)
250
251
252def unpackRow(row):
253    pmid = row[0]
254    assert pmid != 'NULL'
255    journal = row[1] if row[1] != None else ''
256    volume = row[2] if row[2] != None else ''
257    issue = row[3] if row[3] != None else ''
258    medline_pgn = row[4] if row[4] != None else ''
259    if ';' in medline_pgn:
260        # Solve pages like: '1447; author reply 1447-8'
261        medline_pgn = medline_pgn.split(";")[0]
262    startpage = medline_pgn.split("-")[0]
263    pubyear = row[5].year if row[5] != None else ''
264    aulast = row[6] if row[6] != None else ''
265    return aulast, journal, volume, issue, startpage, pubyear
266
267def getDOI(aulast, journal, volume, issue, startpage, pubyear):
268    """ Find the article with the specified characteristics in the CrossRef
269        database.
270    """
271    import httplib
272    import urllib
273    import urllib2
274
275    # Get the user ID for the access to the CrossRef API
276    PID = config.crossref('PID')
277    URL = config.crossref('api_url')
278
279    # Encode the query to the API
280    params = urllib.urlencode({'aulast': aulast,
281                               'pid': PID,
282                               'noredirect': '',
283                               'title': journal,
284                               'volume': volume,
285                               'issue': issue,
286                               'spage': startpage,
287                               'date': pubyear})
288    if debug:
289        print "DBG> params=", params
290    for ntries in range(nretry):
291        try:
292            u = urllib2.urlopen(URL + params, data=None, timeout=20)
293        except httplib.BadStatusLine:
294            logerr("httplib.BadStatusLine")
295        except urllib2.HTTPError:
296            logerr("urllib2.HTTPError")
297        except urllib2.URLError:
298            logerr("urllib2.URLError")
299        else:
300            break
301    else:
302        # If it was still not successful, raise the exception again.
303        sys.stderr.write("ERROR> Failed request for %s\n" % params)
304        sys.stderr.flush()
305        raise
306
307    xml = u.read()
308    if debug and False:
309        print "DBG> xml from crossref:", xml
310
311    dois = DOIinPubMed.findall(xml)
312
313    # Only return the result if it is unique.
314    if len(dois) == 0:
315        return None
316    elif len(dois) == 1:
317        return dois[0]
318    else:
319        if debug:
320            print "DBG> Number of hits in PubMed:", len(dois)
321        log("Error! more than 1 DOI found")
322        return None
323
324def _createTable(cursor):
325    """ just create the table structure.
326    """
327    cursor.execute("""CREATE TABLE %(inputTable)s (pmid int(11) NOT NULL DEFAULT '0' PRIMARY KEY,
328                                                   ntried bigint(20) DEFAULT 0,
329                                                   nexttrytime bigint(20) DEFAULT 0, 
330                                                   KEY nexttrytime (nexttrytime))
331                             engine=InnoDB;
332                   """ % {"inputTable": inputTable})
333
334def updateTable(cursor):
335    """ Update the input table of pubmed articles that still do not have a DOI
336        starting from the pubmed2013 database and the output table from earlier runs.
337    """
338    # Add new records
339    sql = """INSERT INTO %(inputTable)s 
340        (SELECT c.pmid, 0 as ntried, 0 as nexttrytime
341         FROM   %(medlineDB)s.medline_citation c
342         WHERE  c.pmid not in (SELECT pmid from %(outputTable)s)
343         AND    c.pmid not in (SELECT pmid from %(inputTable)s));
344        """ % {"inputTable": inputTable, 
345               "medlineDB": config.mysql('db_medline'), 
346               "outputTable": outputTable}
347    log("Adding new records to the table (expect this to take 15 minutes)...")
348    cursor.execute(sql)
349    log("...done.")
350
351
352def tryOne(pmid):
353    connectionDB = opendb()
354
355    connectionDB.set_character_set('utf8')
356    cursor = connectionDB.cursor()
357    cursor.execute('set autocommit=1;')
358
359    print "pmid =", pmid
360    sql = """SELECT DISTINCT c.pmid, c.journal_title, c.volume, c.issue, c.medline_pgn, c.pub_date, a.last_name
361             FROM %(inputTable)s, %(medlineDB)s.medline_citation c, %(medlineDB)s.medline_author a
362             WHERE c.pmid = a.pmid AND c.pmid = %(inputTable)s.pmid AND a.author_order = '0'
363             AND %(inputTable)s.pmid = %(pmid)s AND a.pmid_version = c.pmid_version ORDER BY c.pmid_version DESC
364             LIMIT 1;
365          """ % {"inputTable": inputTable, 
366                 "medlineDB": config.mysql('db_medline'),
367                 "pmid": pmid}
368    cursor.execute(sql)
369    row = cursor.fetchone()
370    print "row =", row
371    aulast, journal, volume, issue, startpage, pubyear = unpackRow(row)
372    print aulast, journal, volume, issue, startpage, pubyear
373    dois = getDOI(aulast, journal, volume, issue, startpage, pubyear)
374    print "dois =", dois
375
376    db = config.mysql('db_medline')
377    sql = ("SELECT article_title,issn from %s.medline_citation " % db
378           + "where pmid=%s limit 1;" % pmid)
379    cursor.execute(sql)
380    row = cursor.fetchone()
381    print "title:", row[0]
382    print "issn:", row[1]
383
384
385def main():
386    log("Mapping PMID to DOIs started")
387
388    connectionDB = opendb()
389
390    connectionDB.set_character_set('utf8')
391    cursor = connectionDB.cursor()
392    cursor.execute('set autocommit=1;')
393
394    if config.run('create_table'):
395        _createTable(cursor)
396
397    if config.run('update_table'):
398        updateTable(cursor)
399
400    # Startup statistics
401    sql = ("SELECT COUNT(*) FROM %s " +
402           "WHERE nexttrytime<TO_SECONDS(NOW());") % inputTable
403    cursor.execute(sql)
404    ntodo = cursor.fetchone()[0]
405    print "There are %d pmids that need to be mapped." % ntodo
406    sys.stdout.flush()
407
408    thread_ID = 0
409
410    while True:
411        # Start nthread parallel threads, each reading from "q"
412        while threading.active_count() < nthread + 1:
413            thread_ID += 1
414            ChildThread(thread_ID).start()
415        # Make sure all work that was queued so far is done.
416        q.join()
417        # Put the best new work in the queue
418        sql = """SELECT %(inputTable)s.pmid, c.journal_title, c.volume, c.issue,
419                        c.medline_pgn, c.pub_date, a.last_name
420                 FROM %(inputTable)s
421                 INNER JOIN %(medlineDB)s.medline_citation c ON c.pmid = %(inputTable)s.pmid
422                 LEFT JOIN %(medlineDB)s.medline_author a ON a.pmid = c.pmid AND a.author_order = '0'
423                 WHERE nexttrytime<TO_SECONDS(NOW())
424                 ORDER BY nexttrytime LIMIT 100;
425              """ % {"inputTable": inputTable, 
426                     "medlineDB": config.mysql('db_medline')}
427        # This can result in duplicates if there are multiple pmid_versions, but selecting on those
428        # makes the query very, very slow.
429        cursor.execute(sql)
430        row = cursor.fetchone()
431        if row is None:
432            # No more things that need to be done "before now"
433            log("Nothing to do. Waiting a bit.")
434            time.sleep(600)
435        while row:
436            q.put(row)
437            row = cursor.fetchone()
438
439if __name__ == "__main__":
440    if len(sys.argv) == 2:
441        tryOne(sys.argv[1])
442    else:
443        main()
Note: See TracBrowser for help on using the repository browser.