root/trunk/galaxy-dist/lib/divergence/lsgp_client.py

Revision 867, 9.0 KB (checked in by tim.te.beek@…, 17 months ago)

Change IOError into AssertionError? when job overruns alotted time duration

Line 
1#!/usr/bin/env python
2'''
3Module to talk to the SARA Life Science Grid Portal.
4
5@author: tbeek
6'''
7
8from datetime import datetime
9from poster.encode import multipart_encode, MultipartParam
10from poster.streaminghttp import StreamingHTTPSHandler
11import base64
12import logging
13import os
14import shutil
15import tarfile
16import tempfile
17import time
18import urllib2
19
20#Verified to work with: X-Portal-Version: 3603
21HOSTNAME = 'apps.grid.sara.nl'
22BASE_URL = 'https://' + HOSTNAME + '/'
23URL_APPS = BASE_URL + 'applications/'
24URL_DBS = BASE_URL + 'databases/'
25URL_JOBS = BASE_URL + 'jobstates/'
26
27#URL opener with support for multipart file upload forms
28URLLIB2_OPENER = urllib2.build_opener(StreamingHTTPSHandler)
29
30
31def _load_lsg_credentials():
32    """Read Life Science Grid Portal credentials from file and store in os.environ variables."""
33    #Get path to credential file
34    from divergence import resource_filename
35    lsgp_credentials_file = resource_filename(__name__, 'credentials/lsg-portal.cfg')
36
37    # Copy template config file to actual search path when file can not be found
38    if not os.path.exists(lsgp_credentials_file):
39        shutil.copy(lsgp_credentials_file + '.sample', lsgp_credentials_file)
40        logging.info('Copied .sample file to %s', lsgp_credentials_file)
41
42    logging.info('Credentials not found on path: Reading credentials from %s', lsgp_credentials_file)
43
44    #Parse credential file
45    from ConfigParser import SafeConfigParser
46    parser = SafeConfigParser()
47    parser.read(lsgp_credentials_file)
48    os.environ['lsg_username'] = parser.defaults()['lsg_username']
49    os.environ['lsg_password'] = parser.defaults()['lsg_password']
50
51
52def submit_application_run(application, params, files):
53    """
54    Submit an application run with given parameters and files. Returns jobid of submitted run.
55    @param application: the application part of the Life Science Grid Portal url
56    @param params: dictionary mapping keys to values for use as parameters
57    @param files: dictionary mapping keys to files for use as parameters
58    """
59    logging.info('Submitting %s run', application)
60    url_application = URL_APPS + application
61    logging.info('Parameters:\n%s', params)
62    logging.info('Files:\n%s', files)
63    url_job = send_request(url_application, params=params, files=files)
64    logging.info('Result will be at: %s', url_job)
65    jobid = url_job.split('/')[-1]
66    return jobid
67
68
69def retrieve_run_result(jobid, max_duration=None):
70    """
71    Retrieve results for jobid. Returns directory containing results.
72    @param jobid: id of the job to wait for
73    @param max_duration: the maximum number of seconds to wait for job completion
74    """
75    # Wait for job to finish
76    wait_for_job(jobid, max_duration)
77    logging.info('Finished waiting for job result %s', jobid)
78    # Retrieve job result file & Extract files from .tgz
79    directory = _save_job_result(jobid)
80    logging.info('Saved job result %s to %s', jobid, directory)
81    # Delete job result
82    send_request(URL_JOBS + jobid, method='DELETE')
83    logging.info('Deleted remote job result %s', jobid)
84    return directory
85
86
87def run_application(application, params=None, files=None, max_duration=None):
88    """
89    Run application with provided parameters and files, and retrieve result directly.
90    @param application: the application part of the Life Science Grid Portal url
91    @param params: dictionary mapping keys to values for use as parameters
92    @param files: dictionary mapping keys to files for use as parameters
93    """
94    jobid = submit_application_run(application, params, files)
95    directory = retrieve_run_result(jobid, max_duration)
96    return directory
97
98
99def upload_database(database, dbtype='formatdb', shared=False):
100    """
101    Upload a database file to the Life Science Grid Portal. Returns the url of the database file in the portal.
102    @param database: database file
103    @param dbtype: one of: FASTA, csbfa, formatdb
104    @param shared: boolean to indicate whether this database should be shared with other users
105    """
106    #Build a unique URL using todays date
107    today = datetime.today()
108    # The trailing slash is key.. Time spent: ~2 hours
109    version = str(today.date()) + '_' + str(today.time()).replace(':', '-') + '/'
110    url_db_version = URL_DBS + 'www.odose.nl/' + version
111
112    #Build up parameters for send_request
113    params = {'type': dbtype,
114              'shared': 1 if shared else 0}
115    files = {'dbfile': database}
116
117    #Upload
118    content = send_request(url_db_version, params=params, files=files)
119    logging.info('Uploaded %s database %s to: ' + content, dbtype, database)
120    return content
121
122
123def send_request(url, params=None, files=None, method=None):
124    """
125    Send a request to the SARA Life Science Grid Portal, using the provided arguments. Returns text content.
126    @param url: url to request / submit to
127    @param params: dictionary of parameters that should be POSTed to the url (optional)
128    @param files: dictionary of files that should be POSTed to the url (optional)
129    @param method: string HTTP method (optional: POST when params of files are provided, GET otherwise)
130    """
131    #Encode data
132    data = None
133    headers = {}
134    multipart_params = []
135    if params:
136        for key, value in params.iteritems():
137            multipart_params.append(MultipartParam(key, value))
138    if files:
139        for key, value in files.iteritems():
140            multipart_params.append(MultipartParam.from_file(key, value))
141    if multipart_params:
142        data, headers = multipart_encode(multipart_params)
143
144    #Create request
145    headers.update(Accept='text/plain')
146    request = urllib2.Request(url=url, headers=headers, data=data)
147
148    # Set method, which could be DELETE
149    if method:
150        request.get_method = lambda: method
151
152    # Add authentication, explicitly not using the urllib2.HTTPBasicAuthHandler, as it caused frequent failures
153    if 'lsg_username' not in os.environ or 'lsg_password' not in os.environ:
154        _load_lsg_credentials()
155    base64string = base64.encodestring(os.environ['lsg_username'] + ':' + os.environ['lsg_password']).replace('\n', '')
156    request.add_header("Authorization", "Basic %s" % base64string)
157
158    #Send request over opener and retrieve response
159    try:
160        response = URLLIB2_OPENER.open(request, timeout=180)
161    except urllib2.HTTPError as err:
162        print url
163        print err
164        for key in sorted(err.hdrs.keys()):
165            print key, err.hdrs[key]
166        raise err
167
168    #Retrieve
169    content = response.read()
170    response.close()
171    return content
172
173
174def wait_for_job(jobid, max_duration=None):
175    """
176    Wait for a given job to either leave the Queued status, or disappear from the job states page completely.
177    @param jobid: id of the job to wait for
178    @param max_duration: the maximum number of seconds to wait for job completion
179    """
180    duration = 0
181    timetosleep = 30
182    failures = 0
183    while True:
184        try:
185            #It would be a shame to lose a reference to all jobs, so we allow for more errors when retrieving jobstates
186            jobstates = send_request(URL_JOBS)
187            failures = 0
188
189            #Retrieve state for all jobs, and convert to dictionary for easier lookup
190            jobstates = dict(line.split('\t')[:2] for line in jobstates.strip().split('\r\n')[1:])
191            if jobid not in jobstates:
192                logging.error('Life Science Grid Portal jobid %s not found in overview', jobid)
193                break
194            if jobstates[jobid] != 'Queued':
195                break
196        except urllib2.URLError as err:
197            failures += 1
198            #But after five consecutive failures we just plain give up
199            if 5 <= failures:
200                raise err
201
202        # Raise an error when the maximum allotted time has arrived
203        if max_duration:
204            assert duration < max_duration, 'Job {1} overran allotted time: {0}{1}'.format(URL_JOBS, jobid)
205            duration += timetosleep
206
207        #If we're still here: Sleep for up to two minutes before trying again
208        time.sleep(timetosleep)
209        if timetosleep < 120:
210            timetosleep += 10
211
212    #Check job result
213    if jobstates[jobid] == 'Error':
214        raise IOError('Job {1} in error: {0}{1}'.format(URL_JOBS, jobid))
215
216    #Return url with job result
217    return URL_JOBS + jobid
218
219
220def _save_job_result(jobid):
221    """
222    Save the job result gzipped tarfile and extract it's contents to a new temporary filename.
223    @param jobid:
224    """
225    #Retrieve the produced gzipped tarfile and write it to a tempdir
226    content = send_request(URL_JOBS + jobid)
227    tempdir = tempfile.mkdtemp('_' + jobid, 'lsgp_jobid_')
228    outputfile = os.path.join(tempdir, 'out.tgz')
229    with open(outputfile, mode='wb') as write_handle:
230        write_handle.write(content)
231
232    #Extract all files to tempdir
233    tar = tarfile.open(outputfile)
234    tar.extractall(path=tempdir)
235    tar.close()
236
237    #Remove the out.tgz file we created above
238    os.remove(outputfile)
239
240    return tempdir
241
242
243if __name__ == '__main__':
244    DIR = run_application('greeter/1.0', {'name': 'Tim'})
245    print DIR, os.listdir(DIR)
246    with open(os.path.join(DIR, os.listdir(DIR)[0])) as read_handle:
247        print read_handle.read()
Note: See TracBrowser for help on using the browser.