~ubuntu-branches/debian/jessie/armory/jessie

« back to all changes in this revision

Viewing changes to BitTornado/launchmanycore.py

  • Committer: Package Import Robot
  • Author(s): Joseph Bisch
  • Date: 2014-10-07 10:22:45 UTC
  • Revision ID: package-import@ubuntu.com-20141007102245-2s3x3rhjxg689hek
Tags: upstream-0.92.3
ImportĀ upstreamĀ versionĀ 0.92.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
# Written by John Hoffman
 
4
# see LICENSE.txt for license information
 
5
 
 
6
from BitTornado import PSYCO
 
7
if PSYCO.psyco:
 
8
    try:
 
9
        import psyco
 
10
        assert psyco.__version__ >= 0x010100f0
 
11
        psyco.full()
 
12
    except:
 
13
        pass
 
14
 
 
15
from download_bt1 import BT1Download
 
16
from RawServer import RawServer, UPnP_ERROR
 
17
from RateLimiter import RateLimiter
 
18
from ServerPortHandler import MultiHandler
 
19
from parsedir import parsedir
 
20
from natpunch import UPnP_test
 
21
from random import seed
 
22
from socket import error as socketerror
 
23
from threading import Event
 
24
from sys import argv, exit
 
25
import sys, os
 
26
from clock import clock
 
27
from __init__ import createPeerID, mapbase64, version
 
28
from cStringIO import StringIO
 
29
from traceback import print_exc
 
30
 
 
31
try:
 
32
    True
 
33
except:
 
34
    True = 1
 
35
    False = 0
 
36
 
 
37
 
 
38
def fmttime(n):
 
39
    try:
 
40
        n = int(n)  # n may be None or too large
 
41
        assert n < 5184000  # 60 days
 
42
    except:
 
43
        return 'downloading'
 
44
    m, s = divmod(n, 60)
 
45
    h, m = divmod(m, 60)
 
46
    return '%d:%02d:%02d' % (h, m, s)
 
47
 
 
48
class SingleDownload:
 
49
    def __init__(self, controller, hash, response, config, myid):
 
50
        self.controller = controller
 
51
        self.hash = hash
 
52
        self.response = response
 
53
        self.config = config
 
54
        
 
55
        self.doneflag = Event()
 
56
        self.waiting = True
 
57
        self.checking = False
 
58
        self.working = False
 
59
        self.seed = False
 
60
        self.closed = False
 
61
 
 
62
        self.status_msg = ''
 
63
        self.status_err = ['']
 
64
        self.status_errtime = 0
 
65
        self.status_done = 0.0
 
66
 
 
67
        self.rawserver = controller.handler.newRawServer(hash, self.doneflag)
 
68
 
 
69
        d = BT1Download(self.display, self.finished, self.error,
 
70
                        controller.exchandler, self.doneflag, config, response,
 
71
                        hash, myid, self.rawserver, controller.listen_port)
 
72
        self.d = d
 
73
 
 
74
    def start(self):
 
75
        if not self.d.saveAs(self.saveAs):
 
76
            self._shutdown()
 
77
            return
 
78
        self._hashcheckfunc = self.d.initFiles()
 
79
        if not self._hashcheckfunc:
 
80
            self._shutdown()
 
81
            return
 
82
        self.controller.hashchecksched(self.hash)
 
83
 
 
84
 
 
85
    def saveAs(self, name, length, saveas, isdir):
 
86
        return self.controller.saveAs(self.hash, name, saveas, isdir)
 
87
 
 
88
    def hashcheck_start(self, donefunc):
 
89
        if self.is_dead():
 
90
            self._shutdown()
 
91
            return
 
92
        self.waiting = False
 
93
        self.checking = True
 
94
        self._hashcheckfunc(donefunc)
 
95
 
 
96
    def hashcheck_callback(self):
 
97
        self.checking = False
 
98
        if self.is_dead():
 
99
            self._shutdown()
 
100
            return
 
101
        if not self.d.startEngine(ratelimiter = self.controller.ratelimiter):
 
102
            self._shutdown()
 
103
            return
 
104
        self.d.startRerequester()
 
105
        self.statsfunc = self.d.startStats()
 
106
        self.rawserver.start_listening(self.d.getPortHandler())
 
107
        self.working = True
 
108
 
 
109
    def is_dead(self):
 
110
        return self.doneflag.isSet()
 
111
 
 
112
    def _shutdown(self):
 
113
        self.shutdown(False)
 
114
 
 
115
    def shutdown(self, quiet=True):
 
116
        if self.closed:
 
117
            return
 
118
        self.doneflag.set()
 
119
        self.rawserver.shutdown()
 
120
        if self.checking or self.working:
 
121
            self.d.shutdown()
 
122
        self.waiting = False
 
123
        self.checking = False
 
124
        self.working = False
 
125
        self.closed = True
 
126
        self.controller.was_stopped(self.hash)
 
127
        if not quiet:
 
128
            self.controller.died(self.hash)
 
129
            
 
130
 
 
131
    def display(self, activity = None, fractionDone = None):
 
132
        # really only used by StorageWrapper now
 
133
        if activity:
 
134
            self.status_msg = activity
 
135
        if fractionDone is not None:
 
136
            self.status_done = float(fractionDone)
 
137
 
 
138
    def finished(self):
 
139
        self.seed = True
 
140
 
 
141
    def error(self, msg):
 
142
        if self.doneflag.isSet():
 
143
            self._shutdown()
 
144
        self.status_err.append(msg)
 
145
        self.status_errtime = clock()
 
146
 
 
147
 
 
148
class LaunchMany:
 
149
    def __init__(self, config, Output):
 
150
        try:
 
151
            self.config = config
 
152
            self.Output = Output
 
153
 
 
154
            self.torrent_dir = config['torrent_dir']
 
155
            self.torrent_cache = {}
 
156
            self.file_cache = {}
 
157
            self.blocked_files = {}
 
158
            self.scan_period = config['parse_dir_interval']
 
159
            self.stats_period = config['display_interval']
 
160
 
 
161
            self.torrent_list = []
 
162
            self.downloads = {}
 
163
            self.counter = 0
 
164
            self.doneflag = Event()
 
165
 
 
166
            self.hashcheck_queue = []
 
167
            self.hashcheck_current = None
 
168
            
 
169
            self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'],
 
170
                              config['timeout'], ipv6_enable = config['ipv6_enabled'],
 
171
                              failfunc = self.failed, errorfunc = self.exchandler)
 
172
            upnp_type = UPnP_test(config['upnp_nat_access'])
 
173
            while True:
 
174
                try:
 
175
                    self.listen_port = self.rawserver.find_and_bind(
 
176
                                    config['minport'], config['maxport'], config['bind'],
 
177
                                    ipv6_socket_style = config['ipv6_binds_v4'],
 
178
                                    upnp = upnp_type, randomizer = config['random_port'])
 
179
                    break
 
180
                except socketerror, e:
 
181
                    if upnp_type and e == UPnP_ERROR:
 
182
                        self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP')
 
183
                        upnp_type = 0
 
184
                        continue
 
185
                    self.failed("Couldn't listen - " + str(e))
 
186
                    return
 
187
 
 
188
            self.ratelimiter = RateLimiter(self.rawserver.add_task,
 
189
                                           config['upload_unit_size'])
 
190
            self.ratelimiter.set_upload_rate(config['max_upload_rate'])
 
191
 
 
192
            self.handler = MultiHandler(self.rawserver, self.doneflag)
 
193
            seed(createPeerID())
 
194
            self.rawserver.add_task(self.scan, 0)
 
195
            self.rawserver.add_task(self.stats, 0)
 
196
 
 
197
            self.handler.listen_forever()
 
198
 
 
199
            self.Output.message('shutting down')
 
200
            self.hashcheck_queue = []
 
201
            for hash in self.torrent_list:
 
202
                self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"')
 
203
                self.downloads[hash].shutdown()
 
204
            self.rawserver.shutdown()
 
205
 
 
206
        except:
 
207
            data = StringIO()
 
208
            print_exc(file = data)
 
209
            Output.exception(data.getvalue())
 
210
 
 
211
 
 
212
    def scan(self):
 
213
        self.rawserver.add_task(self.scan, self.scan_period)
 
214
                                
 
215
        r = parsedir(self.torrent_dir, self.torrent_cache,
 
216
                     self.file_cache, self.blocked_files,
 
217
                     return_metainfo = True, errfunc = self.Output.message)
 
218
 
 
219
        ( self.torrent_cache, self.file_cache, self.blocked_files,
 
220
            added, removed ) = r
 
221
 
 
222
        for hash, data in removed.items():
 
223
            self.Output.message('dropped "'+data['path']+'"')
 
224
            self.remove(hash)
 
225
        for hash, data in added.items():
 
226
            self.Output.message('added "'+data['path']+'"')
 
227
            self.add(hash, data)
 
228
 
 
229
    def stats(self):            
 
230
        self.rawserver.add_task(self.stats, self.stats_period)
 
231
        data = []
 
232
        for hash in self.torrent_list:
 
233
            cache = self.torrent_cache[hash]
 
234
            if self.config['display_path']:
 
235
                name = cache['path']
 
236
            else:
 
237
                name = cache['name']
 
238
            size = cache['length']
 
239
            d = self.downloads[hash]
 
240
            progress = '0.0%'
 
241
            peers = 0
 
242
            seeds = 0
 
243
            seedsmsg = "S"
 
244
            dist = 0.0
 
245
            uprate = 0.0
 
246
            dnrate = 0.0
 
247
            upamt = 0
 
248
            dnamt = 0
 
249
            t = 0
 
250
            if d.is_dead():
 
251
                status = 'stopped'
 
252
            elif d.waiting:
 
253
                status = 'waiting for hash check'
 
254
            elif d.checking:
 
255
                status = d.status_msg
 
256
                progress = '%.1f%%' % (d.status_done*100)
 
257
            else:
 
258
                stats = d.statsfunc()
 
259
                s = stats['stats']
 
260
                if d.seed:
 
261
                    status = 'seeding'
 
262
                    progress = '100.0%'
 
263
                    seeds = s.numOldSeeds
 
264
                    seedsmsg = "s"
 
265
                    dist = s.numCopies
 
266
                else:
 
267
                    if s.numSeeds + s.numPeers:
 
268
                        t = stats['time']
 
269
                        if t == 0:  # unlikely
 
270
                            t = 0.01
 
271
                        status = fmttime(t)
 
272
                    else:
 
273
                        t = -1
 
274
                        status = 'connecting to peers'
 
275
                    progress = '%.1f%%' % (int(stats['frac']*1000)/10.0)
 
276
                    seeds = s.numSeeds
 
277
                    dist = s.numCopies2
 
278
                    dnrate = stats['down']
 
279
                peers = s.numPeers
 
280
                uprate = stats['up']
 
281
                upamt = s.upTotal
 
282
                dnamt = s.downTotal
 
283
                   
 
284
            if d.is_dead() or d.status_errtime+300 > clock():
 
285
                msg = d.status_err[-1]
 
286
            else:
 
287
                msg = ''
 
288
 
 
289
            data.append(( name, status, progress, peers, seeds, seedsmsg, dist,
 
290
                          uprate, dnrate, upamt, dnamt, size, t, msg ))
 
291
        stop = self.Output.display(data)
 
292
        if stop:
 
293
            self.doneflag.set()
 
294
 
 
295
    def remove(self, hash):
 
296
        self.torrent_list.remove(hash)
 
297
        self.downloads[hash].shutdown()
 
298
        del self.downloads[hash]
 
299
        
 
300
    def add(self, hash, data):
 
301
        c = self.counter
 
302
        self.counter += 1
 
303
        x = ''
 
304
        for i in xrange(3):
 
305
            x = mapbase64[c & 0x3F]+x
 
306
            c >>= 6
 
307
        peer_id = createPeerID(x)
 
308
        d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id)
 
309
        self.torrent_list.append(hash)
 
310
        self.downloads[hash] = d
 
311
        d.start()
 
312
 
 
313
 
 
314
    def saveAs(self, hash, name, saveas, isdir):
 
315
        x = self.torrent_cache[hash]
 
316
        style = self.config['saveas_style']
 
317
        if style == 1 or style == 3:
 
318
            if saveas:
 
319
                saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])])
 
320
            else:
 
321
                saveas = x['path'][:-1-len(x['type'])]
 
322
            if style == 3:
 
323
                if not os.path.isdir(saveas):
 
324
                    try:
 
325
                        os.mkdir(saveas)
 
326
                    except:
 
327
                        raise OSError("couldn't create directory for "+x['path']
 
328
                                      +" ("+saveas+")")
 
329
                if not isdir:
 
330
                    saveas = os.path.join(saveas, name)
 
331
        else:
 
332
            if saveas:
 
333
                saveas = os.path.join(saveas, name)
 
334
            else:
 
335
                saveas = os.path.join(os.path.split(x['path'])[0], name)
 
336
                
 
337
        if isdir and not os.path.isdir(saveas):
 
338
            try:
 
339
                os.mkdir(saveas)
 
340
            except:
 
341
                raise OSError("couldn't create directory for "+x['path']
 
342
                                      +" ("+saveas+")")
 
343
        return saveas
 
344
 
 
345
 
 
346
    def hashchecksched(self, hash = None):
 
347
        if hash:
 
348
            self.hashcheck_queue.append(hash)
 
349
        if not self.hashcheck_current:
 
350
            self._hashcheck_start()
 
351
 
 
352
    def _hashcheck_start(self):
 
353
        self.hashcheck_current = self.hashcheck_queue.pop(0)
 
354
        self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback)
 
355
 
 
356
    def hashcheck_callback(self):
 
357
        self.downloads[self.hashcheck_current].hashcheck_callback()
 
358
        if self.hashcheck_queue:
 
359
            self._hashcheck_start()
 
360
        else:
 
361
            self.hashcheck_current = None
 
362
 
 
363
    def died(self, hash):
 
364
        if self.torrent_cache.has_key(hash):
 
365
            self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"')
 
366
        
 
367
    def was_stopped(self, hash):
 
368
        try:
 
369
            self.hashcheck_queue.remove(hash)
 
370
        except:
 
371
            pass
 
372
        if self.hashcheck_current == hash:
 
373
            self.hashcheck_current = None
 
374
            if self.hashcheck_queue:
 
375
                self._hashcheck_start()
 
376
 
 
377
    def failed(self, s):
 
378
        self.Output.message('FAILURE: '+s)
 
379
 
 
380
    def exchandler(self, s):
 
381
        self.Output.exception(s)