~grubng-dev/grubng/clients-python

« back to all changes in this revision

Viewing changes to grub.py

  • Committer: thindil
  • Date: 2009-09-29 10:11:52 UTC
  • Revision ID: thindil2@gmail.com-20090929101152-5vweaduucqrn037g
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
## Python Grub Client
 
3
## http://www.grub.org
 
4
## 
 
5
## Copyright (c) 2008 Giorgos Logiotatidis (seadog@sealabs.net)
 
6
##
 
7
## Grub.py is free software; you can redistribute it and/or modify
 
8
## it under the terms of the GNU General Public License as published by
 
9
## the Free Software Foundation; either version 3 of the License, or
 
10
## (at your option) any later version.
 
11
## 
 
12
## Grub.py is distributed in the hope that it will be useful,
 
13
## but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
15
## GNU General Public License for more details.
 
16
## 
 
17
## You should have received a copy of the GNU General Public License
 
18
## along with Grub.py; if not, write to the Free Software
 
19
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
 
20
##
 
21
 
 
22
import urllib2
 
23
import httplib
 
24
import time
 
25
import gzip
 
26
import socket
 
27
from threading import Thread
 
28
import thread
 
29
import sys
 
30
import os
 
31
import getopt
 
32
from cStringIO import StringIO
 
33
import zlib
 
34
 
 
35
NUM_CLIENTS = 5
 
36
NUM_UPLOADERS = 2
 
37
USERNAME = None
 
38
PASSWORD = None
 
39
VERBOSE = True
 
40
DEBUG = False
 
41
DISPATCHER = 'http://dispatch.grub.org/do/workunit/'
 
42
VERSION = '0.4.2'
 
43
 
 
44
 
 
45
# set socket timeout
 
46
socket.setdefaulttimeout(30)
 
47
 
 
48
workerlist = []
 
49
threadlock = thread.allocate_lock()
 
50
threadlock_fetch = thread.allocate_lock()
 
51
 
 
52
 
 
53
class Uploader(Thread):
 
54
    def __init__(self, name="Uploader"):
 
55
        Thread.__init__(self)
 
56
        self.setName(name)
 
57
        self._sleeptime = 10 
 
58
        self.LOOP = True
 
59
    
 
60
    def sleep(self, error, seconds):
 
61
        printer("%s Retrying in %s seconds" % (error, seconds), error=1)
 
62
        time.sleep(seconds)
 
63
    
 
64
    def run(self):
 
65
        while self.LOOP:           
 
66
            if os.path.exists("workunits.dat") == False:
 
67
                # nothing ready yet. Get some sleep and try again later
 
68
                time.sleep(self._sleeptime)
 
69
                continue
 
70
            
 
71
            threadlock.acquire()
 
72
            
 
73
            try:
 
74
                fp = open("workunits.dat", "rb")           
 
75
                workUnit = fp.readlines()
 
76
                fp.close()
 
77
                fp = open("workunits.dat", "wb")
 
78
                fp.writelines(workUnit[1:])
 
79
                fp.close()
 
80
            except IOError, e:
 
81
                printer("Workunit.dat open failed in %s: %s" % (self.getName(), e), debug=1)
 
82
            
 
83
            threadlock.release()
 
84
               
 
85
            # keep only the first line of the file
 
86
            try:
 
87
                workUnit = workUnit[0].strip()
 
88
            except IndexError:
 
89
                # seems there is no line
 
90
                # sleep some time and then retry
 
91
                time.sleep(self._sleeptime)
 
92
                continue
 
93
            
 
94
            if len(workUnit) > 0:
 
95
                try:
 
96
                    host, url, filename = workUnit.split()
 
97
                except ValueError:
 
98
                    # oops maybe wrong entry. lets ignore
 
99
                    continue
 
100
                    
 
101
                # compress the workunit
 
102
                try:
 
103
                    self.compressWorkUnit(filename)
 
104
                except IOError:
 
105
                    # hmm arc file maybe deleted. just move on
 
106
                    printer("CompressWorkUnit error %s:%s" % (self.getName(), e), debug=1)
 
107
                    continue
 
108
             
 
109
                for i in range(3):
 
110
                    try:
 
111
                        result = self.uploadWorkUnit(host, url, filename)
 
112
                    except (httplib.HTTPException, socket.error):
 
113
                        result = -500
 
114
 
 
115
                    if result == 1:
 
116
                        # everything went OK
 
117
                        break
 
118
                    elif result == -500:
 
119
                        # let's retry
 
120
                        self.sleep("Thread %s failed to upload arc file because of a server or network error." % self.getName(), seconds=self._sleeptime*i)
 
121
                    elif result == -401:
 
122
                        # skip cleaning by changing the filename
 
123
                        if DEBUG:
 
124
                            from shutil import copy
 
125
                            copy(filename, "%s.401" % filename)
 
126
                            copy("%s.wu" % filename, "%s.wu.401" % filename)
 
127
                        break
 
128
                       
 
129
                self.cleanup(filename)  
 
130
 
 
131
    def compressWorkUnit(self, filename):
 
132
        # gzip the data
 
133
        printer("Thread %s is packing" % self.getName())
 
134
        farc = open(filename, 'rb')
 
135
        f = gzip.open("%s.gz" % filename, 'wb')
 
136
        f.writelines(farc)
 
137
        f.close()
 
138
        farc.close()
 
139
        
 
140
            
 
141
    def cleanup(self, filename): 
 
142
        try:
 
143
            os.remove("%s" % filename)
 
144
            os.remove("%s.gz" % filename)
 
145
            if DEBUG:
 
146
                # remove workunit
 
147
                os.remove("%s.wu" % filename)
 
148
        except (OSError, TypeError), e:
 
149
            printer("Cleanup error %s:%s" % (self.getName(), e), debug=1)
 
150
            
 
151
 
 
152
    def join(self):
 
153
        self.LOOP=False
 
154
         
 
155
 
 
156
    def uploadWorkUnit(self, host, url, filename):
 
157
        # upload data
 
158
        filename = "%s.gz" % filename 
 
159
        start_stamp = int(time.time())
 
160
        printer("Thread %s is uploading" % self.getName())
 
161
        
 
162
        try:
 
163
            fsize = os.path.getsize(filename)
 
164
            f = open("%s" % filename, "rb")
 
165
        except IOError, e:
 
166
            # the file does not exist, return a 401 to ignore that
 
167
            printer("uploadWorkUnit error %s: %s" % (self.getName(), e), debug=1)
 
168
            return -401
 
169
 
 
170
        try:
 
171
            conn = httplib.HTTP(host)
 
172
            conn.putrequest('PUT', url)
 
173
            conn.putheader('Content-Length', str(fsize))
 
174
            conn.endheaders()
 
175
            conn.send(f.read())
 
176
            errcode, errmsg, headers = conn.getreply()
 
177
        except (socket.error), e:
 
178
            # ok socket timedout, let's retry
 
179
            printer("uploadWorkUnit error %s: %s" % (self.getName(), e), debug=1)
 
180
            return -500
 
181
 
 
182
        f.close()
 
183
 
 
184
        if errcode == 200:
 
185
            stop_stamp = int(time.time() - start_stamp)
 
186
            printer("Thread %s upload OK at %s Kb/s" % (self.getName(), str(int((fsize/1024)/stop_stamp)) ) )
 
187
            return 1
 
188
        else:
 
189
            printer("Thread %s upload FAILED, Code: %s Message: %s" % (self.getName(), errcode, errmsg), error=1)
 
190
        
 
191
            return -errcode
 
192
        return 0
 
193
 
 
194
class Crawler(Thread):
 
195
    def __init__(self, name):
 
196
        Thread.__init__(self)
 
197
        self.setName(name)
 
198
        self.wu = None
 
199
        self.filename = None
 
200
        self.arc = None
 
201
        self._sleeptime = 10
 
202
        self.LOOP = True
 
203
 
 
204
    def sleep(self, error):
 
205
        printer("%s Retrying in %s seconds" % (error, self._sleeptime), error=1)
 
206
        time.sleep(self._sleeptime)
 
207
        self._sleeptime *= 2
 
208
 
 
209
    def fetchWorkUnit(self):
 
210
        # make this function atomic so we don't hammer the dispatch server
 
211
        threadlock_fetch.acquire()
 
212
 
 
213
        # create an OpenerDirector with suppoer for Basic HTTP Authentication
 
214
        printer("Thread %s is fetching Work Unit" % self.getName())
 
215
 
 
216
 
 
217
        auth_handler = urllib2.HTTPBasicAuthHandler()
 
218
        auth_handler.add_password('localhost', 'dispatch.grub.org', USERNAME, PASSWORD)
 
219
 
 
220
        opener = urllib2.build_opener(auth_handler)
 
221
 
 
222
        try:
 
223
            self.wu = opener.open(DISPATCHER).readlines()
 
224
            #reset sleep time
 
225
            self._sleeptime = 10
 
226
        except (urllib2.URLError, socket.error), e:
 
227
            threadlock_fetch.release()
 
228
            self.sleep("Thread %s failed to fetch workunit. Maybe your username or password is incorrect." % self.getName())
 
229
            return False
 
230
 
 
231
        threadlock_fetch.release()
 
232
        return True
 
233
 
 
234
    def checkWorkUnit(self):
 
235
        """ Set self.filename and Return 1 if OK
 
236
            Return 0 otherwise
 
237
        """ 
 
238
        printer("Thread %s is checking Work Unit" % self.getName())
 
239
        for line in self.wu:
 
240
            if line[0:3] == "PUT":
 
241
                try:
 
242
                    line = line.split()[1].split('/')
 
243
                    self.filename = line[len(line)-1][:-3]
 
244
                    
 
245
                    if os.path.exists("%s.gz" % self.filename):
 
246
                        # oops file already exists!
 
247
                        # The server feed us previous stuff
 
248
                        # we should fetch another wu
 
249
                        printer("Oops, same file!")
 
250
                        return False
 
251
                    return True
 
252
                except:
 
253
                    return False
 
254
        return False
 
255
 
 
256
 
 
257
    def run(self):
 
258
        while self.LOOP:
 
259
            if self.fetchWorkUnit() == False or self.checkWorkUnit() == False:
 
260
                # oops, bad workunit, let's move to a new one
 
261
                continue
 
262
            
 
263
            # if debug save workunit
 
264
            if DEBUG:
 
265
                f = open("%s.wu" % self.filename, 'wb')
 
266
                f.writelines(self.wu)
 
267
                f.close()
 
268
 
 
269
            self.arc = open("%s" % self.filename, 'wb')
 
270
            self.arc.write("filedesc://dummy.arc.gz 0.0.0.0 %s text/plain 69\n" % time.strftime("%Y%m%d%H%M%S", time.localtime()))
 
271
            self.arc.write('1 0 grub.org\n')
 
272
            self.arc.write('URL IP-address Archive-date Content-type Archive-length\n\n')
 
273
 
 
274
            printer("Thread %s is crawling" % self.getName())
 
275
            url = ''
 
276
            host = ''
 
277
 
 
278
            wuIterator = iter(self.wu)
 
279
 
 
280
            while self.LOOP:
 
281
                line = wuIterator.next()
 
282
                while line.strip() == '':
 
283
                    try:
 
284
                        line = wuIterator.next()
 
285
                    except StopIteration:
 
286
                        return 0
 
287
 
 
288
                if line[0:3] == "PUT":
 
289
                    url = line.split()[1]
 
290
                    host = wuIterator.next()[6:].strip()
 
291
                    break
 
292
 
 
293
                data = ''
 
294
                body = ''
 
295
                host = ''
 
296
                url  = ''
 
297
 
 
298
                try:
 
299
                    url = line.split()[1]
 
300
                    host = wuIterator.next()[6:].strip()
 
301
                except StopIteration:
 
302
                    return 0
 
303
                except IndexError:
 
304
                    # lets hope it's just a wrong entry and continue
 
305
                    continue
 
306
 
 
307
 
 
308
                #python2.4 compatibility with try/finally
 
309
                try:
 
310
                    try:
 
311
   
 
312
                        headers = {
 
313
                            'Accept':'*/*',
 
314
                            'Accept-encoding':'gzip,deflate',
 
315
                            'Connection':'close',
 
316
                            }                        
 
317
 
 
318
                        while True:
 
319
                            header = wuIterator.next().strip()
 
320
                            if header == '':
 
321
                                break
 
322
                            
 
323
                            headerType, headerValue = header.split(':')
 
324
                            headerType = headerType.strip()
 
325
                            headerValue = headerValue.strip()
 
326
                            
 
327
                            headers[headerType] = headerValue
 
328
 
 
329
                        ip = socket.gethostbyname(host)
 
330
                        conn = httplib.HTTPConnection(host)
 
331
 
 
332
                        conn.request("GET", url, headers=headers)
 
333
                        response = conn.getresponse()
 
334
                     
 
335
                        if response.version == "10":
 
336
                            data += "HTTP/1.0 "
 
337
                        else:
 
338
                            data += "HTTP/1.1 "
 
339
                        data += "%s %s\r\n" % (response.status, response.reason)
 
340
 
 
341
                        for header in response.getheaders():
 
342
                            data += ("%s: %s\r\n" % (header[0], header[1]))
 
343
                        data += "\r\n"
 
344
 
 
345
                        body = self.readResponse(response)
 
346
                        if len(body) == 0:
 
347
                            # we got only headers
 
348
                            # printer("Headers only: --%s%s-- Status %s"% (host,url, response.status), error=1)
 
349
                            raise httplib.HTTPException
 
350
 
 
351
                        # check for compressed content
 
352
                        if response.getheader('content-encoding') in ('gzip', 'deflate'):
 
353
                            body = self.decompressData(body, response.getheader('content-encoding'))                   
 
354
 
 
355
                        responseType = self.getResponseType(response.msg.type)
 
356
 
 
357
                    except (httplib.HTTPException, ValueError, socket.error), e:
 
358
                        data = 'HTTP/1.0 500 Invalid URL\r\n\r\nInvalid URL'
 
359
                        body = ''
 
360
                        ip = "0.0.0.0"
 
361
                        responseType = 'application/x-grub-error'
 
362
                        #printer("Invalid URL %s%s %s" % (host,url,e), error=1)
 
363
 
 
364
                finally:
 
365
                    self.arc.writelines("http://%s%s %s %s %s %s\n" % (host, url, ip, time.strftime("%Y%m%d%H%M%S", time.localtime()), responseType, len(data) + len(body)))
 
366
                    self.arc.writelines(data)
 
367
                    self.arc.writelines(body)
 
368
                    self.arc.writelines("\n")
 
369
 
 
370
            # ok we finished the workunit
 
371
            self.arc.close()
 
372
            
 
373
 
 
374
            if self.LOOP==True:
 
375
                # everything runs normal
 
376
                self.writeDataFile(host, url)
 
377
                self.cleanup()
 
378
            else:
 
379
                # we have to exit immediatly
 
380
                self.cleanup(real=True)
 
381
        
 
382
        self.cleanup()
 
383
 
 
384
    def readResponse(self, r):
 
385
        data = ''
 
386
        while True:
 
387
            tmp = r.read(500)
 
388
            if len(tmp) == 0:
 
389
                break
 
390
            data += tmp
 
391
            
 
392
        return data
 
393
            
 
394
 
 
395
    def getResponseType(self, responseType):
 
396
        try:
 
397
            if not responseType:
 
398
                # broken http server. let's guess it's text/html
 
399
                return "text/html"
 
400
            # deal with other broken http servers. Some even send 'text/html text/html'
 
401
            return responseType.split()[0]
 
402
        except:
 
403
            raise httplib.HTTPException
 
404
 
 
405
 
 
406
    def decompressData(self, compressed_data, method):
 
407
        try:
 
408
            if method == 'gzip':
 
409
                compressed_data = StringIO(compressed_data)
 
410
                data = gzip.GzipFile(fileobj=compressed_data).read()
 
411
                compressed_data.close()
 
412
 
 
413
            elif method == 'deflate':
 
414
                try:
 
415
                    data = zlib.decompress(compressed_data)
 
416
                except zlib.error:
 
417
                    # server does not respect RFC 1950
 
418
                    # check http://www.velocityreviews.com/forums/t634601-deflate-with-urllib2.html
 
419
                    data = zlib.decompress(compressed_data, -zlib.MAX_WBITS)
 
420
        except Exception, e:
 
421
            printer("Decompress %s: %s" % (self.getName(), e), debug=1)
 
422
            raise ValueError
 
423
 
 
424
        return data
 
425
 
 
426
        
 
427
    def writeDataFile(self, host, url):
 
428
        if (len(host) == 0 or len(url) == 0):
 
429
            return
 
430
            
 
431
        threadlock.acquire()
 
432
        try:            
 
433
            fp = open("workunits.dat", "ab")
 
434
            fp.write("%s %s %s\n" % (host, url, self.filename))
 
435
            fp.close()
 
436
        except IOError, e:
 
437
            printer("WriteDataFile error %s: %s" % (self.getName(), e), debug=1)
 
438
        threadlock.release()
 
439
        
 
440
            
 
441
    def cleanup(self, real=False):     
 
442
        if real:
 
443
            try:
 
444
                os.remove(self.filename)
 
445
            except (OSError, TypeError), e:
 
446
                printer("Cleanup error %s:%s" % (self.getName(), e), debug=1)
 
447
       
 
448
            
 
449
        self.filename = None
 
450
        self.wu = None
 
451
 
 
452
 
 
453
    def join(self):
 
454
        self.LOOP=False
 
455
        self.cleanup(real=True)
 
456
 
 
457
def printer(msg, error=0, debug=0):
 
458
    p = False
 
459
    if (debug==1):
 
460
        if DEBUG:
 
461
            p = True
 
462
    elif (error==1):
 
463
        p = True
 
464
    elif (VERBOSE == True):
 
465
        p = True
 
466
            
 
467
    if p:
 
468
        print "[%s] %s" % (time.strftime("%a %b %d %H:%M:%S %Y"), msg)
 
469
 
 
470
    sys.stdout.flush()
 
471
 
 
472
def printUsage():
 
473
    global VERSION
 
474
    print "Grub Python Client version %s\n" % VERSION 
 
475
    print "Usage:" \
 
476
         "\tgrub.py [-v] [-t NUMBER] -u USERNAME -p PASSWORD\n" \
 
477
         "\n" \
 
478
         "\t -v             : Verbose (Default. Use quiet (-q) to turn off messages\n" \
 
479
         "\t -t, --threads  : Number of threads. Default 5\n" \
 
480
         "\t -u, --username : Grub.org username\n" \
 
481
         "\t -i, --info     : Advanced, prints client and OS imformation\n" \
 
482
         "\t -p, --password : Grub.org password\n" \
 
483
         "\t --debug        : Set debug mode. Failed arcs are not removed\n" \
 
484
         "\t --version      : Print version\n" \
 
485
         "\n" \
 
486
         "http://www.grub.org/\n"\
 
487
 
 
488
def printInfo():
 
489
    print "Grub Python Client version: %s" % VERSION
 
490
    print "Username: %s" % USERNAME
 
491
    print "Password set: %s" % bool(PASSWORD)
 
492
    print "Number of clients: %s" % NUM_CLIENTS
 
493
    print "Operating system: %s" % sys.platform
 
494
    print "Python version: %s" % sys.version
 
495
 
 
496
def main():
 
497
    global USERNAME, PASSWORD, VERBOSE, NUM_CLIENTS, DEBUG
 
498
 
 
499
    try:
 
500
        opts, args = getopt.getopt(sys.argv[1:], 'ihvqt:u:p:', ["username=", "password=", "version", "info", "debug", "help"])
 
501
    except getopt.GetoptError, err:
 
502
        print str(err)
 
503
        sys.exit(2)
 
504
 
 
505
    for o, a in opts:
 
506
        if o == "-v":
 
507
            printer("Verbose mode (-v) is now default. Use quiet (-q) to turn off messages")
 
508
        elif o == "-q":
 
509
            VERBOSE=False
 
510
        elif o in ("-t", "--threads"):
 
511
            try:
 
512
                NUM_CLIENTS = int(a)
 
513
            except ValueError:
 
514
                printer("Please give a valid thread number", error=1)
 
515
                sys.exit(1)
 
516
        elif o in ("-u", "--username"):
 
517
            USERNAME = a
 
518
        elif o in ("-p", "--password"):
 
519
            PASSWORD = a
 
520
        elif o in ("-h", "--help"):
 
521
            printUsage()
 
522
            sys.exit(0)
 
523
        elif o in ("-i", "--info"):
 
524
            printInfo()
 
525
            sys.exit(0)
 
526
        elif o in ("--debug"):
 
527
            DEBUG = True
 
528
        elif o in ("--version"):
 
529
            print "Grub Python Client, version %s" % VERSION
 
530
            sys.exit(0)
 
531
        else:
 
532
            assert False, "unhandled option"
 
533
 
 
534
    if USERNAME == None or PASSWORD == None:
 
535
        # try to read username and password from config file
 
536
        passwdfile = ""
 
537
        if os.path.exists(os.path.expanduser("~") + "/.grub/account"):
 
538
            passwdfile = os.path.expanduser("~") + "/.grub/account"
 
539
        elif os.path.exists("./account"):
 
540
            passwdfile = "./account"
 
541
        else:           
 
542
            # no username/password specified!
 
543
            printer("No username and/or password specified!", error=1)
 
544
            sys.exit()
 
545
 
 
546
        try:
 
547
            USERNAME, PASSWORD = open(passwdfile).readline().split(':')
 
548
        except:
 
549
            printer("Error reading username and password from %s" % passwdfile)
 
550
            sys.exit(1)
 
551
 
 
552
    for i in range(NUM_CLIENTS): 
 
553
        w = Crawler(i)
 
554
        workerlist.append(w)
 
555
        w.start()    
 
556
 
 
557
    for i in range(NUM_UPLOADERS):
 
558
        w = Uploader("Uploader-%s" % i)
 
559
        workerlist.append(w)
 
560
        w.start()
 
561
 
 
562
    try:
 
563
        sys.stdin.read()
 
564
    except (KeyboardInterrupt, SystemExit):
 
565
        print "Closing threads, please wait..."
 
566
        sys.stdout.flush()
 
567
 
 
568
        for worker in workerlist:
 
569
            worker.join()
 
570
        sys.exit()
 
571
 
 
572
 
 
573
 
 
574
if __name__ == "__main__":
 
575
    main()
 
576