~ubuntu-branches/ubuntu/karmic/tahoe-lafs/karmic

« back to all changes in this revision

Viewing changes to contrib/fuse/impl_b/pyfuse/greenhandler.py

  • Committer: Bazaar Package Importer
  • Author(s): Zooko O'Whielacronx (Hacker)
  • Date: 2009-09-24 00:00:05 UTC
  • Revision ID: james.westby@ubuntu.com-20090924000005-ixe2n4yngmk49ysz
Tags: upstream-1.5.0
ImportĀ upstreamĀ versionĀ 1.5.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import sys, os, Queue, atexit
 
2
 
 
3
dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
4
dir = os.path.join(dir, 'pypeers')
 
5
if dir not in sys.path:
 
6
    sys.path.append(dir)
 
7
del dir
 
8
 
 
9
from greensock import *
 
10
import threadchannel
 
11
 
 
12
 
 
13
def _read_from_kernel(handler):
 
14
    while True:
 
15
        msg = read(handler.fd, handler.MAX_READ)
 
16
        if not msg:
 
17
            print >> sys.stderr, "out-kernel connexion closed"
 
18
            break
 
19
        autogreenlet(handler.handle_message, msg)
 
20
 
 
21
def add_handler(handler):
 
22
    autogreenlet(_read_from_kernel, handler)
 
23
    atexit.register(handler.close)
 
24
 
 
25
# ____________________________________________________________
 
26
 
 
27
THREAD_QUEUE = None
 
28
 
 
29
def thread_runner(n):
 
30
    while True:
 
31
        #print 'thread runner %d waiting' % n
 
32
        operation, answer = THREAD_QUEUE.get()
 
33
        #print 'thread_runner %d: %r' % (n, operation)
 
34
        try:
 
35
            res = True, operation()
 
36
        except Exception:
 
37
            res = False, sys.exc_info()
 
38
        #print 'thread_runner %d: got %d bytes' % (n, len(res or ''))
 
39
        answer.send(res)
 
40
 
 
41
 
 
42
def start_bkgnd_thread():
 
43
    global THREAD_QUEUE, THREAD_LOCK
 
44
    import thread
 
45
    threadchannel.startup()
 
46
    THREAD_LOCK = thread.allocate_lock()
 
47
    THREAD_QUEUE = Queue.Queue()
 
48
    for i in range(4):
 
49
        thread.start_new_thread(thread_runner, (i,))
 
50
 
 
51
def wget(*args, **kwds):
 
52
    from wget import wget
 
53
 
 
54
    def operation():
 
55
        kwds['unlock'] = THREAD_LOCK
 
56
        THREAD_LOCK.acquire()
 
57
        try:
 
58
            return wget(*args, **kwds)
 
59
        finally:
 
60
            THREAD_LOCK.release()
 
61
 
 
62
    if THREAD_QUEUE is None:
 
63
        start_bkgnd_thread()
 
64
    answer = threadchannel.ThreadChannel()
 
65
    THREAD_QUEUE.put((operation, answer))
 
66
    ok, res = answer.receive()
 
67
    if not ok:
 
68
        typ, value, tb = res
 
69
        raise typ, value, tb
 
70
    #print 'wget returns %d bytes' % (len(res or ''),)
 
71
    return res