~ubuntu-branches/ubuntu/precise/bittornado/precise

« back to all changes in this revision

Viewing changes to .pc/23_remove_UPnP_options.dpatch/BitTornado/launchmanycore.py

  • Committer: Barry Warsaw
  • Date: 2011-08-10 23:17:46 UTC
  • mfrom: (7.1.1 bittornado)
  • Revision ID: barry@python.org-20110810231746-5buiob6p54m266s8
Tags: 0.3.18-10ubuntu2
* switch to dh_python2 (LP: #788514)
  - install btmakemetafile.py and btcompletedir.py via pyinstall
  - add build depend on python-all
  - bump debhelper depend to 7 for dh_auto_install

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
# Written by John Hoffman
 
4
# see LICENSE.txt for license information
 
5
 
 
6
from BitTornado import PSYCO
 
7
if PSYCO.psyco:
 
8
    try:
 
9
        import psyco
 
10
        assert psyco.__version__ >= 0x010100f0
 
11
        psyco.full()
 
12
    except:
 
13
        pass
 
14
 
 
15
from download_bt1 import BT1Download
 
16
from RawServer import RawServer, UPnP_ERROR
 
17
from RateLimiter import RateLimiter
 
18
from ServerPortHandler import MultiHandler
 
19
from parsedir import parsedir
 
20
from natpunch import UPnP_test
 
21
from random import seed
 
22
from socket import error as socketerror
 
23
from threading import Event
 
24
from sys import argv, exit
 
25
import sys, os
 
26
from clock import clock
 
27
from __init__ import createPeerID, mapbase64, version
 
28
from cStringIO import StringIO
 
29
from traceback import print_exc
 
30
 
 
31
try:
 
32
    True
 
33
except:
 
34
    True = 1
 
35
    False = 0
 
36
 
 
37
 
 
38
def fmttime(n):
 
39
    try:
 
40
        n = int(n)  # n may be None or too large
 
41
        assert n < 5184000  # 60 days
 
42
    except:
 
43
        return 'downloading'
 
44
    m, s = divmod(n, 60)
 
45
    h, m = divmod(m, 60)
 
46
    return '%d:%02d:%02d' % (h, m, s)
 
47
 
 
48
class SingleDownload:
 
49
    def __init__(self, controller, hash, response, config, myid):
 
50
        self.controller = controller
 
51
        self.hash = hash
 
52
        self.response = response
 
53
        self.config = config
 
54
        
 
55
        self.doneflag = Event()
 
56
        self.waiting = True
 
57
        self.checking = False
 
58
        self.working = False
 
59
        self.seed = False
 
60
        self.closed = False
 
61
 
 
62
        self.status_msg = ''
 
63
        self.status_err = ['']
 
64
        self.status_errtime = 0
 
65
        self.status_done = 0.0
 
66
 
 
67
        self.rawserver = controller.handler.newRawServer(hash, self.doneflag)
 
68
 
 
69
        d = BT1Download(self.display, self.finished, self.error,
 
70
                        controller.exchandler, self.doneflag, config, response,
 
71
                        hash, myid, self.rawserver, controller.listen_port)
 
72
        self.d = d
 
73
 
 
74
    def start(self):
 
75
        if not self.d.saveAs(self.saveAs):
 
76
            self._shutdown()
 
77
            return
 
78
        self._hashcheckfunc = self.d.initFiles()
 
79
        if not self._hashcheckfunc:
 
80
            self._shutdown()
 
81
            return
 
82
        self.controller.hashchecksched(self.hash)
 
83
 
 
84
 
 
85
    def saveAs(self, name, length, saveas, isdir):
 
86
        return self.controller.saveAs(self.hash, name, saveas, isdir)
 
87
 
 
88
    def hashcheck_start(self, donefunc):
 
89
        if self.is_dead():
 
90
            self._shutdown()
 
91
            return
 
92
        self.waiting = False
 
93
        self.checking = True
 
94
        self._hashcheckfunc(donefunc)
 
95
 
 
96
    def hashcheck_callback(self):
 
97
        self.checking = False
 
98
        if self.is_dead():
 
99
            self._shutdown()
 
100
            return
 
101
        if not self.d.startEngine(ratelimiter = self.controller.ratelimiter):
 
102
            self._shutdown()
 
103
            return
 
104
        self.d.startRerequester()
 
105
        self.statsfunc = self.d.startStats()
 
106
        self.rawserver.start_listening(self.d.getPortHandler())
 
107
        self.working = True
 
108
 
 
109
    def is_dead(self):
 
110
        return self.doneflag.isSet()
 
111
 
 
112
    def _shutdown(self):
 
113
        self.shutdown(False)
 
114
 
 
115
    def shutdown(self, quiet=True):
 
116
        if self.closed:
 
117
            return
 
118
        self.doneflag.set()
 
119
        self.rawserver.shutdown()
 
120
        if self.checking or self.working:
 
121
            self.d.shutdown()
 
122
        self.waiting = False
 
123
        self.checking = False
 
124
        self.working = False
 
125
        self.closed = True
 
126
        self.controller.was_stopped(self.hash)
 
127
        if not quiet:
 
128
            self.controller.died(self.hash)
 
129
            
 
130
 
 
131
    def display(self, activity = None, fractionDone = None):
 
132
        # really only used by StorageWrapper now
 
133
        if activity:
 
134
            self.status_msg = activity
 
135
        if fractionDone is not None:
 
136
            self.status_done = float(fractionDone)
 
137
 
 
138
    def finished(self):
 
139
        self.seed = True
 
140
 
 
141
    def error(self, msg):
 
142
        if self.doneflag.isSet():
 
143
            self._shutdown()
 
144
        self.status_err.append(msg)
 
145
        self.status_errtime = clock()
 
146
 
 
147
 
 
148
class LaunchMany:
 
149
    def __init__(self, config, Output):
 
150
        try:
 
151
            self.config = config
 
152
            self.Output = Output
 
153
 
 
154
            self.torrent_dir = config['torrent_dir']
 
155
            self.torrent_cache = {}
 
156
            self.file_cache = {}
 
157
            self.blocked_files = {}
 
158
            self.scan_period = config['parse_dir_interval']
 
159
            self.stats_period = config['display_interval']
 
160
 
 
161
            self.torrent_list = []
 
162
            self.downloads = {}
 
163
            self.counter = 0
 
164
            self.doneflag = Event()
 
165
 
 
166
            self.hashcheck_queue = []
 
167
            self.hashcheck_current = None
 
168
            
 
169
            self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'],
 
170
                              config['timeout'], ipv6_enable = config['ipv6_enabled'],
 
171
                              failfunc = self.failed, errorfunc = self.exchandler)
 
172
            upnp_type = UPnP_test(config['upnp_nat_access'])
 
173
            while True:
 
174
                try:
 
175
                    self.listen_port = self.rawserver.find_and_bind(
 
176
                                    config['minport'], config['maxport'], config['bind'],
 
177
                                    ipv6_socket_style = config['ipv6_binds_v4'],
 
178
                                    upnp = upnp_type, randomizer = config['random_port'])
 
179
                    break
 
180
                except socketerror, e:
 
181
                    if upnp_type and e == UPnP_ERROR:
 
182
                        self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP')
 
183
                        upnp_type = 0
 
184
                        continue
 
185
                    self.failed("Couldn't listen - " + str(e))
 
186
                    return
 
187
 
 
188
            self.ratelimiter = RateLimiter(self.rawserver.add_task,
 
189
                                           config['upload_unit_size'])
 
190
            self.ratelimiter.set_upload_rate(config['max_upload_rate'])
 
191
 
 
192
            self.handler = MultiHandler(self.rawserver, self.doneflag, config)
 
193
            seed(createPeerID())
 
194
            self.rawserver.add_task(self.scan, 0)
 
195
            self.rawserver.add_task(self.stats, 0)
 
196
 
 
197
            self.start()
 
198
        except:
 
199
            data = StringIO()
 
200
            print_exc(file = data)
 
201
            Output.exception(data.getvalue())
 
202
 
 
203
    def start(self):
 
204
        try:
 
205
            self.handler.listen_forever()
 
206
 
 
207
            self.hashcheck_queue = []
 
208
            for hash in self.torrent_list:
 
209
                self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"')
 
210
                self.downloads[hash].shutdown()
 
211
 
 
212
        except:
 
213
            data = StringIO()
 
214
            print_exc(file=data)
 
215
            self.Output.exception(data.getvalue())
 
216
 
 
217
        self.rawserver.shutdown()
 
218
 
 
219
    def scan(self):
 
220
        self.rawserver.add_task(self.scan, self.scan_period)
 
221
                                
 
222
        r = parsedir(self.torrent_dir, self.torrent_cache,
 
223
                     self.file_cache, self.blocked_files,
 
224
                     return_metainfo = True, errfunc = self.Output.message)
 
225
 
 
226
        ( self.torrent_cache, self.file_cache, self.blocked_files,
 
227
            added, removed ) = r
 
228
 
 
229
        for hash, data in removed.items():
 
230
            self.Output.message('dropped "'+data['path']+'"')
 
231
            self.remove(hash)
 
232
        for hash, data in added.items():
 
233
            self.Output.message('added "'+data['path']+'"')
 
234
            self.add(hash, data)
 
235
 
 
236
    def stats(self):            
 
237
        self.rawserver.add_task(self.stats, self.stats_period)
 
238
        data = []
 
239
        for hash in self.torrent_list:
 
240
            cache = self.torrent_cache[hash]
 
241
            if self.config['display_path']:
 
242
                name = cache['path']
 
243
            else:
 
244
                name = cache['name']
 
245
            size = cache['length']
 
246
            d = self.downloads[hash]
 
247
            progress = '0.0%'
 
248
            peers = 0
 
249
            seeds = 0
 
250
            seedsmsg = "S"
 
251
            dist = 0.0
 
252
            uprate = 0.0
 
253
            dnrate = 0.0
 
254
            upamt = 0
 
255
            dnamt = 0
 
256
            t = 0
 
257
            if d.is_dead():
 
258
                status = 'stopped'
 
259
            elif d.waiting:
 
260
                status = 'waiting for hash check'
 
261
            elif d.checking:
 
262
                status = d.status_msg
 
263
                progress = '%.1f%%' % (d.status_done*100)
 
264
            else:
 
265
                stats = d.statsfunc()
 
266
                s = stats['stats']
 
267
                if d.seed:
 
268
                    status = 'seeding'
 
269
                    progress = '100.0%'
 
270
                    seeds = s.numOldSeeds
 
271
                    seedsmsg = "s"
 
272
                    dist = s.numCopies
 
273
                else:
 
274
                    if s.numSeeds + s.numPeers:
 
275
                        t = stats['time']
 
276
                        if t == 0:  # unlikely
 
277
                            t = 0.01
 
278
                        status = fmttime(t)
 
279
                    else:
 
280
                        t = -1
 
281
                        status = 'connecting to peers'
 
282
                    progress = '%.1f%%' % (int(stats['frac']*1000)/10.0)
 
283
                    seeds = s.numSeeds
 
284
                    dist = s.numCopies2
 
285
                    dnrate = stats['down']
 
286
                peers = s.numPeers
 
287
                uprate = stats['up']
 
288
                upamt = s.upTotal
 
289
                dnamt = s.downTotal
 
290
                   
 
291
            if d.is_dead() or d.status_errtime+300 > clock():
 
292
                msg = d.status_err[-1]
 
293
            else:
 
294
                msg = ''
 
295
 
 
296
            data.append(( name, status, progress, peers, seeds, seedsmsg, dist,
 
297
                          uprate, dnrate, upamt, dnamt, size, t, msg ))
 
298
        stop = self.Output.display(data)
 
299
        if stop:
 
300
            self.doneflag.set()
 
301
 
 
302
    def remove(self, hash):
 
303
        self.torrent_list.remove(hash)
 
304
        self.downloads[hash].shutdown()
 
305
        del self.downloads[hash]
 
306
        
 
307
    def add(self, hash, data):
 
308
        c = self.counter
 
309
        self.counter += 1
 
310
        x = ''
 
311
        for i in xrange(3):
 
312
            x = mapbase64[c & 0x3F]+x
 
313
            c >>= 6
 
314
        peer_id = createPeerID(x)
 
315
        d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id)
 
316
        self.torrent_list.append(hash)
 
317
        self.downloads[hash] = d
 
318
        d.start()
 
319
        return d
 
320
 
 
321
 
 
322
    def saveAs(self, hash, name, saveas, isdir):
 
323
        x = self.torrent_cache[hash]
 
324
        style = self.config['saveas_style']
 
325
        if style == 1 or style == 3:
 
326
            if saveas:
 
327
                saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])])
 
328
            else:
 
329
                saveas = x['path'][:-1-len(x['type'])]
 
330
            if style == 3:
 
331
                if not os.path.isdir(saveas):
 
332
                    try:
 
333
                        os.mkdir(saveas)
 
334
                    except:
 
335
                        raise OSError("couldn't create directory for "+x['path']
 
336
                                      +" ("+saveas+")")
 
337
                if not isdir:
 
338
                    saveas = os.path.join(saveas, name)
 
339
        else:
 
340
            if saveas:
 
341
                saveas = os.path.join(saveas, name)
 
342
            else:
 
343
                saveas = os.path.join(os.path.split(x['path'])[0], name)
 
344
                
 
345
        if isdir and not os.path.isdir(saveas):
 
346
            try:
 
347
                os.mkdir(saveas)
 
348
            except:
 
349
                raise OSError("couldn't create directory for "+x['path']
 
350
                                      +" ("+saveas+")")
 
351
        return saveas
 
352
 
 
353
 
 
354
    def hashchecksched(self, hash = None):
 
355
        if hash:
 
356
            self.hashcheck_queue.append(hash)
 
357
            self.hashcheck_queue.sort(lambda x, y: cmp(self.downloads[x].d.datalength, self.downloads[y].d.datalength))
 
358
        if not self.hashcheck_current:
 
359
            self._hashcheck_start()
 
360
 
 
361
    def _hashcheck_start(self):
 
362
        self.hashcheck_current = self.hashcheck_queue.pop(0)
 
363
        self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback)
 
364
 
 
365
    def hashcheck_callback(self):
 
366
        self.downloads[self.hashcheck_current].hashcheck_callback()
 
367
        if self.hashcheck_queue:
 
368
            self._hashcheck_start()
 
369
        else:
 
370
            self.hashcheck_current = None
 
371
 
 
372
    def died(self, hash):
 
373
        if self.torrent_cache.has_key(hash):
 
374
            self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"')
 
375
        
 
376
    def was_stopped(self, hash):
 
377
        try:
 
378
            self.hashcheck_queue.remove(hash)
 
379
        except:
 
380
            pass
 
381
        if self.hashcheck_current == hash:
 
382
            self.hashcheck_current = None
 
383
            if self.hashcheck_queue:
 
384
                self._hashcheck_start()
 
385
 
 
386
    def failed(self, s):
 
387
        self.Output.message('FAILURE: '+s)
 
388
 
 
389
    def exchandler(self, s):
 
390
        self.Output.exception(s)