source: warp2d/background_jobSubmit.php

Last change on this file was 2, checked in by ishtiaq.ahmad@…, 9 years ago

adding warp2d service first time

File size: 12.7 KB
Line 
1<?php
2require_once 'Console/Getopt.php'; 
3 
4// Define exit codes for errors 
5define('NO_ARGS',10); 
6define('INVALID_OPTION',11); 
7
8// Reading the incoming arguments - same as $argv 
9$args = Console_Getopt::readPHPArgv(); 
10 
11// Make sure we got them (for non CLI binaries) 
12if (PEAR::isError($args)) { 
13   fwrite(STDERR,$args->getMessage()."\n"); 
14   exit(NO_ARGS); 
15} 
16
17// Short options 
18$short_opts = 'j:d:k:';   
19
20// Long options 
21$long_opts = array( 
22   'jobid=', 
23   'dir=',
24   'key='
25   ); 
26 
27// Convert the arguments to options - check for the first argument 
28if ( realpath($_SERVER['argv'][0]) == __FILE__ ) { 
29   $options = Console_Getopt::getOpt($args,$short_opts,$long_opts); 
30} else { 
31   $options = Console_Getopt::getOpt2($args,$short_opts,$long_opts); 
32} 
33 
34// Check the options are valid 
35if (PEAR::isError($options)) { 
36   fwrite(STDERR,$options->getMessage()."\n"); 
37   exit(INVALID_OPTION); 
38} 
39$opts =$options[0];
40
41foreach ($opts as $o) {
42        switch ($o[0]) {
43                case "--jobid":
44                        $jobId = $o[1];
45                break;
46                case "--dir":
47                        $dir = $o[1];
48                break;
49                case "--key":
50                        $key = $o[1];
51                break;
52        }
53} 
54define('DIR',$dir); 
55define('JOB_ID',$jobId); 
56
57if( !empty($jobId) && !empty($dir) && !empty($key) && (((int) $jobId) == $jobId) && is_dir($dir) ) {
58        $time_start = microtime_float();
59        submitJob ($jobId,$dir,$key);
60        $time_end = microtime_float();
61        $time = $time_end - $time_start;
62        echo "This Funcation tacks $time seconds\n";
63        memReport();
64}else {
65        fwrite(STDERR,"Invalide command line arguments\n");
66        exit(NO_ARGS); 
67}
68
69
70function submitJob($jobId,$dir,$AuthKey) {
71        $jacketPath = $dir."/jobs/".$jobId;
72        $warp2DParams['AuthKey']=$AuthKey; // add key
73        $refPeakListFile = array();
74        $sampPeakListFile = array();
75       
76        $doc = new DOMDocument();
77        $doc->formatOutput = true;
78        $doc->preserveWhiteSpace = false;
79        $doc->load( $jacketPath .'/jacket.xml' );
80        $jackets = $doc->getElementsByTagName( "jacket" );
81        $fixParams = $jackets->item(0)->getElementsByTagName( "parameter" );
82        $bags = $jackets->item(0)->getElementsByTagName( "bag" );
83        foreach( $fixParams as $fix )
84        {
85                $paramName=$fix->getAttribute("name");
86                $paramValue=$fix->getAttribute("value");               
87                $warp2DParams[$paramName] = $paramValue;
88        }
89
90        foreach( $bags as $bag )
91        {
92                $name_attr = $bag->getAttribute('name');
93                $path_attr = $bag->getAttribute('path');
94                if (file_exists($jacketPath .'/'. $path_attr)) {
95                        $file_handle = fopen($jacketPath.'/'. $path_attr,"r");
96                        if ($file_handle) {
97                                while(!feof($file_handle)) {
98                                        $str = rtrim(fgets($file_handle));
99                                        if(!empty($str) && is_file($dir.'/'.$str)) {
100                                                if($name_attr == 'refBag')
101                                                        $refPeakListFile[]= $str;
102                                                else
103                                                        $sampPeakListFile[] = $str;                       
104                                        }
105                                }
106                          fclose($file_handle);
107                        }
108                }
109        }
110$dummyID =((int)0);
111$URLArray = array();
112        foreach( $refPeakListFile as $refFile ) {
113                foreach( $sampPeakListFile as $sampFile ) {
114                        $newJob = $doc->createElement( "job" );
115                        $newJobNode_attr = $doc->createAttribute('jacketId'); 
116                        $newJobNode_attr->appendChild($doc->createTextNode(1)); 
117                        $newJob->appendChild($newJobNode_attr);
118                        $statusNode = $doc->createElement( "status", "pending" ); 
119                        $newJob->appendChild( $statusNode );
120                        $pathNode = $doc->createElement( "resultPath", "results/" ); 
121                        $newJob->appendChild( $pathNode );
122                        $msgNode = $doc->createElement( "message", "Submited" ); 
123                        $newJob->appendChild( $msgNode );
124
125                        $varParamNode = $doc->createElement( "variableParameters" );
126
127                        $param1 = $doc->createElement( "parameter");
128                        $param1_attr_name = $doc->createAttribute("name");
129                        $param1_attr_name->appendChild($doc->createTextNode("RefPeakListFile"));
130                        $param1_attr_value = $doc->createAttribute( "value" );
131                        $param1_attr_value->appendChild($doc->createTextNode($refFile));
132                        $param1_attr_type = $doc->createAttribute( "type" );
133                        $param1_attr_type->appendChild($doc->createTextNode("java.io.File"));
134                        $param1->appendChild($param1_attr_name);
135                        $param1->appendChild($param1_attr_value);
136                        $param1->appendChild($param1_attr_type);
137                        $varParamNode->appendChild($param1);
138
139                        $param2 = $doc->createElement( "parameter");
140                        $param2_attr_name = $doc->createAttribute("name");
141                        $param2_attr_name->appendChild($doc->createTextNode("AlignPeakListFile"));
142                        $param2_attr_value = $doc->createAttribute( "value" );
143                        $param2_attr_value->appendChild($doc->createTextNode($sampFile));
144                        $param2_attr_type = $doc->createAttribute( "type" );
145                        $param2_attr_type->appendChild($doc->createTextNode("java.io.File"));
146                        $param2->appendChild($param2_attr_name);
147                        $param2->appendChild($param2_attr_value);
148                        $param2->appendChild($param2_attr_type);
149                        $varParamNode->appendChild($param2);
150
151                        $param3 = $doc->createElement( "parameter");
152                        $param3_attr_name = $doc->createAttribute("name");
153                        $param3_attr_name->appendChild($doc->createTextNode("OutputWarpedFileName"));
154                        $param3_attr_value = $doc->createAttribute( "value" );
155                        $refName = substr($refFile,strripos($refFile, '/')+1,-4);
156                        $sampName = substr($sampFile,strripos($sampFile, '/')+1,-4);
157                        $param3_attr_value->appendChild($doc->createTextNode("r_".$refName."_s_".$sampName));
158                        $param3_attr_type = $doc->createAttribute( "type" );
159                        $param3_attr_type->appendChild($doc->createTextNode("java.lang.String"));
160                        $param3->appendChild($param3_attr_name);
161                        $param3->appendChild($param3_attr_value);
162                        $param3->appendChild($param3_attr_type);
163                        $varParamNode->appendChild($param3);
164                        $newJob->appendChild( $varParamNode );
165                        $id =$dummyID;
166                        $URLArray[$id]= creatServiceCallingURL($refFile,$sampFile,"r_".$refName."_s_".$sampName,$warp2DParams);
167                        //$URLArray[$id]="http://localhost/daf/warp2d/ping.php?id=".$id;
168                        $idNode = $doc->createElement( "id", $id  ); 
169                        $newJob->appendChild( $idNode );
170                        $Root = $doc->getElementsByTagName( "jacketData" );
171                        $Root->item( 0 )->appendChild( $newJob );
172                        $doc->appendChild($Root->item( 0 ));
173                        $dummyID++;
174                }
175        }
176        unset($refPeakListFile);
177        unset($sampPeakListFile);
178        $doc->save($jacketPath.'/jacket.xml');
179        unset($doc);
180        // Run the service and get back the actual ID of job
181        //rolling_curl($URLArray, service_callback);
182        //$res =multithread_crawl($URLArray);
183        replaceIDs(multithread_crawl($URLArray),$jacketPath);
184}
185
186function creatServiceCallingURL($refPeakList, $samplePeakList, $outputFname, $params){
187        $ServiceURL= "http://localhost:8080/axis2/services/Warping/warp2D";
188        $fields = '';
189        $fields .='AuthKey=' . urlencode($params['AuthKey']) . '&';
190        $fields .='MZwidth=' . urlencode($params['MZwidth']) . '&';
191        $fields .='RTwidth=' . urlencode($params['RTwidth']) . '&';
192        $fields .='refPeakListFileName=' . urlencode($refPeakList) . '&';
193        $fields .='alignPeakListFileName=' . urlencode($samplePeakList) . '&';
194        $fields .='winSize=' . urlencode($params['winSize']) . '&';
195        $fields .='slack=' . urlencode($params['slack']) . '&';
196        $fields .='maxPeaksPerSegment=' . urlencode($params['MaxPeaksPerSegment']) . '&';
197        $fields .='NTimePoints=' . urlencode($params['NTimePoints']) . '&';
198        $fields .='sampShift=' . urlencode($params['timeShift']) . '&';
199        $fields .='outputWarpedFileName=' . urlencode($outputFname);
200return $ServiceURL.'?'.$fields;
201}
202
203function multithread_crawl($urls, $timeout=1600)
204{
205        $agent = "Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2) Gecko/20100301 Ubuntu/8.04 (hardy) Firefox/3.6";
206        $http_headers = array (
207            'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 
208            'Host: localhost:8080',
209            'Connection: keep-alive',
210            'Content-type: application/x-www-form-urlencoded;charset=UTF-8'
211        );
212        $mh = curl_multi_init();
213
214        foreach ($urls as $i => $url)
215        {
216                $conn[$i] = curl_init($url);
217                curl_setopt($conn[$i], CURLOPT_CONNECTTIMEOUT, 1600);
218                curl_setopt($conn[$i], CURLOPT_RETURNTRANSFER,true);
219                curl_setopt($conn[$i], CURLOPT_URL, $url);
220                curl_setopt($conn[$i], CURLOPT_USERAGENT, $agent);
221                curl_setopt($conn[$i], CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
222                curl_setopt($conn[$i], CURLOPT_HTTPHEADER,$http_headers);
223                curl_setopt($conn[$i], CURLOPT_MAXCONNECTS, 100);               
224                curl_setopt($conn[$i], CURLOPT_FOLLOWLOCATION, true);
225                curl_setopt($conn[$i], CURLOPT_TIMEOUT, $timeout);
226                curl_multi_add_handle ($mh, $conn[$i]);
227        }
228
229        do
230        {
231                $mrc = curl_multi_exec($mh, $active);
232        }while ($mrc == CURLM_CALL_MULTI_PERFORM);
233
234        while ($active and $mrc == CURLM_OK)
235        {
236                if (curl_multi_select($mh) != -1)
237                {
238                        do{
239                                $mrc = curl_multi_exec($mh, $active);
240                        }while ($mrc == CURLM_CALL_MULTI_PERFORM);
241                }
242        }
243
244        if ($mrc != CURLM_OK)
245        {
246                print "Curl multi read error $mrc\n";
247        }
248
249        $res = array();
250        $e = 0;
251        $resultDoc = new DOMDocument();
252
253        foreach ($urls as $i => $url)
254        {
255                if (($err = curl_error($conn[$i])) == '')
256                {
257                        //$res[$i]=curl_multi_getcontent($conn[$i]);
258                         $done_content=curl_multi_getcontent($conn[$i]);
259                        @$resultDoc->loadXML( $done_content );
260                        $jobIds = $resultDoc->getElementsByTagName( "return" );
261                        foreach( $jobIds as $jid )
262                        {
263                                $res[$i] = $jid->nodeValue;
264                        }
265
266                }
267                else
268                {
269                        echo "error: ".$url." (".$err.")\n";
270                }
271
272                curl_multi_remove_handle($mh,$conn[$i]);
273                curl_close($conn[$i]);
274        }
275
276        curl_multi_close($mh);
277        unset($resultDoc);
278        return $res;
279}
280
281function rolling_curl($urls, $callback, $custom_options = null) {
282
283        $doc = new DOMDocument();
284        $jacketPath = DIR."/jobs/".JOB_ID;
285        $doc->formatOutput = true;
286        $doc->preserveWhiteSpace = true;
287        $doc->load( $jacketPath .'/jacket.xml' );
288        $jobsNode = $doc->getElementsByTagName( "job" );
289
290$errUrl_arr = array();
291$jobIdMap = array();
292$threads = 10;
293
294// add additional curl options here
295    $std_options = array(CURLOPT_RETURNTRANSFER => true,
296    CURLOPT_FOLLOWLOCATION => true,
297    CURLOPT_MAXREDIRS => 5,
298    CURLOPT_TIMEOUT => 1600,
299    CURLOPT_HTTPGET => 1,
300    CURLOPT_CONNECTTIMEOUT =>1600);
301$options = ($custom_options) ? ($std_options + $custom_options) : $std_options;
302
303$mcurl = curl_multi_init();
304$threadsRunning = 0;
305$urls_id = 0;
306for(;;) {
307    // Fill up the slots
308    while ($threadsRunning < $threads && $urls_id < count($urls)) {
309        $ch = curl_init();
310        $options[CURLOPT_URL] = $urls[$urls_id++];
311        curl_setopt_array($ch,$options);
312        curl_multi_add_handle($mcurl, $ch);
313        $threadsRunning++;
314    }
315    // Check if done
316    if ($threadsRunning == 0 && $urls_id >= count($urls))
317        break;
318    // Let mcurl do it's thing
319    curl_multi_select($mcurl);
320    while(($mcRes = curl_multi_exec($mcurl, $mcActive)) == CURLM_CALL_MULTI_PERFORM); // usleep(100000);
321    if($mcRes != CURLM_OK) break;
322    while($done = curl_multi_info_read($mcurl)) {
323        $ch = $done['handle'];
324        $done_url = curl_getinfo($ch, CURLINFO_EFFECTIVE_URL); 
325        $done_content = curl_multi_getcontent($ch);
326        if(curl_errno($ch) == 0) {
327                $resultDoc = new DOMDocument();
328                @$resultDoc->loadXML( $done_content );
329                $jobIds = $resultDoc->getElementsByTagName( "return" );
330                foreach( $jobIds as $jid )
331                {
332                        $result = $jid->nodeValue;
333                }
334                unset($resultDoc);
335                $i = array_keys($urls,$done_url);
336                $callback($i[0],$result,$jobsNode);
337                //$jobIdMap[$done_url]= $result;
338        } else {
339           $errUrl_arr[]=$done_url;
340           echo "Link <a href='$done_url'>$done_url</a> failed: ".curl_error($ch)."<br>\n";
341           flush();
342        }
343        curl_multi_remove_handle($mcurl, $ch);
344        curl_close($ch);
345        $threadsRunning--;
346    }
347}
348curl_multi_close($mcurl);
349$doc->save($jacketPath.'/jacket.xml');
350unset($doc);
351//return $errUrl_arr;
352}
353
354function service_callback($tmpsubJobId,$subJobId,$jobs) {
355        foreach( $jobs as $job ) {
356                $tmp_jid = $job->getElementsByTagName( "id" );
357                if($tmp_jid->item( 0 )->nodeValue == $tmpsubJobId ) { // replace the id and break, no need to go further
358                        $tmp_jid->item( 0 )->nodeValue = $subJobId;
359                        break;
360                }
361        }       
362} 
363
364function replaceIDs($arr_Ids,$jacketPath){
365        $doc = new DOMDocument();
366        $doc->formatOutput = true;
367        $doc->preserveWhiteSpace = false;
368        $doc->load($jacketPath.'/jacket.xml');
369        $jobs = $doc->getElementsByTagName( "job" );
370        foreach( $jobs as $job ) {
371                $jobNode = $job->getElementsByTagName( "id" );
372                $jobNode->item( 0 )->nodeValue =$arr_Ids[(int)$jobNode->item( 0 )->nodeValue];
373        }
374
375        $doc->save($jacketPath.'/jacket.xml');
376        unset($jobs);
377        unset($doc);
378}
379function array_replaceIDs($base,$replacement)
380{
381        $counter =((int)0);
382        if(!is_array($base) || !is_array($replacement)) die('invalide arguments');
383        foreach($replacement as $key=>$value){
384                if($base[$counter]==$key);
385                        $base[$counter]=$value;
386                $counter++;
387        }
388        return $base;
389}
390
391function microtime_float()
392{
393    list($usec, $sec) = explode(" ", microtime());
394    return ((float)$usec + (float)$sec);
395}
396
397function memReport() {
398
399print "\n-------\n";
400print "Memory Usage PHP:       ". round(memory_get_usage() / 1024 ,2) ." kB";
401print "\n";
402$mem = `ps aux | grep php5 | head -1`;
403$mem = split(" +",$mem);
404
405print "Memory Usage Total VSZ: $mem[4] kB\n";
406print "Memory Usage Total RSS: $mem[5] kB\n";
407
408}
409
410?>
Note: See TracBrowser for help on using the repository browser.