~ubuntu-branches/ubuntu/utopic/python-eventlet/utopic

« back to all changes in this revision

Viewing changes to eventlet/hubs/zeromq.py

  • Committer: Bazaar Package Importer
  • Author(s): Soren Hansen
  • Date: 2011-05-17 11:52:34 UTC
  • mfrom: (1.1.4 upstream) (4.1.1 sid)
  • Revision ID: james.westby@ubuntu.com-20110517115234-fcxnkhmr7kcvesdz
Tags: 0.9.15-0ubuntu1
* New upstream release.
  - Drop wrap-greenpipe.patch: Included upstream.
  - Drop disable-psycopg-patcher-test.patch: Included upstream.
* Merge packaging changes from Debian.
* Disable zmq (tests do not pass).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from eventlet import patcher
2
 
from eventlet.green import zmq
3
 
from eventlet.hubs import _threadlocal
4
 
from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
5
 
from eventlet.support import clear_sys_exc_info
6
 
import sys
7
 
 
8
 
time = patcher.original('time')
9
 
select = patcher.original('select')
10
 
sleep = time.sleep
11
 
 
12
 
EXC_MASK = zmq.POLLERR
13
 
READ_MASK = zmq.POLLIN
14
 
WRITE_MASK = zmq.POLLOUT
15
 
 
16
 
class Hub(BaseHub):
17
 
    def __init__(self, clock=time.time):
18
 
        BaseHub.__init__(self, clock)
19
 
        self.poll = zmq.Poller()
20
 
 
21
 
    def get_context(self, io_threads=1):
22
 
        """zmq's Context must be unique within a hub
23
 
 
24
 
        The zeromq API documentation states:
25
 
        All zmq sockets passed to the zmq_poll() function must share the same
26
 
        zmq context and must belong to the thread calling zmq_poll()
27
 
 
28
 
        As zmq_poll is what's eventually being called then we need to insure
29
 
        that all sockets that are going to be passed to zmq_poll (via
30
 
        hub.do_poll) are in the same context
31
 
        """
32
 
        try:
33
 
            return _threadlocal.context
34
 
        except AttributeError:
35
 
            _threadlocal.context = zmq._Context(io_threads)
36
 
            return _threadlocal.context
37
 
 
38
 
    def add(self, evtype, fileno, cb):
39
 
        listener = super(Hub, self).add(evtype, fileno, cb)
40
 
        self.register(fileno, new=True)
41
 
        return listener
42
 
 
43
 
    def remove(self, listener):
44
 
        super(Hub, self).remove(listener)
45
 
        self.register(listener.fileno)
46
 
 
47
 
    def register(self, fileno, new=False):
48
 
        mask = 0
49
 
        if self.listeners[READ].get(fileno):
50
 
            mask |= READ_MASK
51
 
        if self.listeners[WRITE].get(fileno):
52
 
            mask |= WRITE_MASK
53
 
        if mask:
54
 
            self.poll.register(fileno, mask)
55
 
        else:
56
 
            self.poll.unregister(fileno)
57
 
 
58
 
    def remove_descriptor(self, fileno):
59
 
        super(Hub, self).remove_descriptor(fileno)
60
 
        try:
61
 
            self.poll.unregister(fileno)
62
 
        except (KeyError, ValueError, IOError, OSError):
63
 
            # raised if we try to remove a fileno that was
64
 
            # already removed/invalid
65
 
            pass
66
 
 
67
 
    def do_poll(self, seconds):
68
 
        # zmq.Poller.poll expects milliseconds
69
 
        return self.poll.poll(seconds * 1000.0)
70
 
 
71
 
    def wait(self, seconds=None):
72
 
        readers = self.listeners[READ]
73
 
        writers = self.listeners[WRITE]
74
 
 
75
 
        if not readers and not writers:
76
 
            if seconds:
77
 
                sleep(seconds)
78
 
            return
79
 
        try:
80
 
            presult = self.do_poll(seconds)
81
 
        except zmq.ZMQError, e:
82
 
            # In the poll hub this part exists to special case some exceptions
83
 
            # from socket. There may be some error numbers that wider use of
84
 
            # this hub will throw up as needing special treatment so leaving
85
 
            # this block and this comment as a remineder
86
 
            raise
87
 
        SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
88
 
 
89
 
        if self.debug_blocking:
90
 
            self.block_detect_pre()
91
 
 
92
 
        for fileno, event in presult:
93
 
            try:
94
 
                if event & READ_MASK:
95
 
                    readers.get(fileno, noop).cb(fileno)
96
 
                if event & WRITE_MASK:
97
 
                    writers.get(fileno, noop).cb(fileno)
98
 
                if event & EXC_MASK:
99
 
                    # zmq.POLLERR is returned for any error condition in the
100
 
                    # underlying fd (as passed through to poll/epoll)
101
 
                    readers.get(fileno, noop).cb(fileno)
102
 
                    writers.get(fileno, noop).cb(fileno)
103
 
            except SYSTEM_EXCEPTIONS:
104
 
                raise
105
 
            except:
106
 
                self.squelch_exception(fileno, sys.exc_info())
107
 
                clear_sys_exc_info()
108
 
 
109
 
        if self.debug_blocking:
110
 
            self.block_detect_post()