~ubuntu-branches/debian/jessie/python-eventlet/jessie

« back to all changes in this revision

Viewing changes to eventlet/green/zmq.py

  • Committer: Bazaar Package Importer
  • Author(s): Stefano Rivera
  • Date: 2011-06-02 16:18:16 UTC
  • mfrom: (1.1.4 upstream)
  • Revision ID: james.westby@ubuntu.com-20110602161816-c888ncsqx70pfvfu
Tags: 0.9.15-1
* New upstream release.
  - Drop all patches, accepted upstream.
* Correct DEP3 headers (first line of Description is the subject)
* Bump Standards-Version to 3.9.2, no changes needed.
* Drop Breaks: ${python:Breaks}, no longer used by dh_python2.
* debian/copyright: Update to DEP5 Format r174.
* Restore doc/modules/zmq.rst and BD on Sphinx 1.0.
* reuseaddr.patch: The logic for deciding whether to use SO_REUSEADDR was
  inverted.
* retry-on-timeout.patch: If an operation times out, try one last time.
  (LP: #771512)

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
"""
3
3
__zmq__ = __import__('zmq')
4
4
from eventlet import sleep
5
 
from eventlet.hubs import trampoline, get_hub
 
5
from eventlet.hubs import trampoline, _threadlocal
 
6
from eventlet.patcher import slurp_properties
6
7
 
7
8
__patched__ = ['Context', 'Socket']
8
 
globals().update(dict([(var, getattr(__zmq__, var))
9
 
                       for var in __zmq__.__all__
10
 
                       if not (var.startswith('__')
11
 
                            or
12
 
                              var in __patched__)
13
 
                       ]))
14
 
 
15
 
 
16
 
def get_hub_name_from_instance(hub):
17
 
    """Get the string name the eventlet uses to refer to hub
18
 
 
19
 
    :param hub: An eventlet hub
20
 
    """
21
 
    return hub.__class__.__module__.rsplit('.',1)[-1]
 
9
slurp_properties(__zmq__, globals(), ignore=__patched__)
 
10
 
22
11
 
23
12
def Context(io_threads=1):
24
13
    """Factory function replacement for :class:`zmq.core.context.Context`
31
20
    instance per thread. This is due to the way :class:`zmq.core.poll.Poller`
32
21
    works
33
22
    """
34
 
    hub = get_hub()
35
 
    hub_name = get_hub_name_from_instance(hub)
36
 
    if hub_name != 'zeromq':
37
 
        raise RuntimeError("Hub must be 'zeromq', got '%s'" % hub_name)
38
 
    return hub.get_context(io_threads)
 
23
    try:
 
24
        return _threadlocal.context
 
25
    except AttributeError:
 
26
        _threadlocal.context = _Context(io_threads)
 
27
        return _threadlocal.context
39
28
 
40
29
class _Context(__zmq__.Context):
41
30
    """Internal subclass of :class:`zmq.core.context.Context`
68
57
    ``zmq.EAGAIN`` (retry) error is raised
69
58
    """
70
59
 
71
 
 
72
 
    def _send_message(self, msg, flags=0):
73
 
        flags |= __zmq__.NOBLOCK
74
 
        while True:
75
 
            try:
76
 
                super(Socket, self)._send_message(msg, flags)
77
 
                return
78
 
            except __zmq__.ZMQError, e:
79
 
                if e.errno != EAGAIN:
80
 
                    raise
81
 
            trampoline(self, write=True)
82
 
 
83
 
    def _send_copy(self, msg, flags=0):
84
 
        flags |= __zmq__.NOBLOCK
85
 
        while True:
86
 
            try:
87
 
                super(Socket, self)._send_copy(msg, flags)
88
 
                return
89
 
            except __zmq__.ZMQError, e:
90
 
                if e.errno != EAGAIN:
91
 
                    raise
92
 
            trampoline(self, write=True)
93
 
 
94
 
    def _recv_message(self, flags=0, track=False):
95
 
 
96
 
        flags |= __zmq__.NOBLOCK
97
 
        while True:
98
 
            try:
99
 
                m = super(Socket, self)._recv_message(flags, track)
100
 
                if m is not None:
101
 
                    return m
102
 
            except __zmq__.ZMQError, e:
103
 
                if e.errno != EAGAIN:
104
 
                    raise
105
 
            trampoline(self, read=True)
106
 
 
107
 
    def _recv_copy(self, flags=0):
108
 
        flags |= __zmq__.NOBLOCK
109
 
        while True:
110
 
            try:
111
 
                m = super(Socket, self)._recv_copy(flags)
112
 
                if m is not None:
113
 
                    return m
114
 
            except __zmq__.ZMQError, e:
115
 
                if e.errno != EAGAIN:
116
 
                    raise
117
 
            trampoline(self, read=True)
118
 
 
 
60
    def _sock_wait(self, read=False, write=False):
 
61
        """
 
62
        First checks if there are events in the socket, to avoid
 
63
        edge trigger problems with race conditions.  Then if there
 
64
        are none it will trampoline and when coming back check
 
65
        for the events.
 
66
        """
 
67
        events = self.getsockopt(__zmq__.EVENTS)
 
68
 
 
69
        if read and (events & __zmq__.POLLIN):
 
70
            return events
 
71
        elif write and (events & __zmq__.POLLOUT):
 
72
            return events
 
73
        else:
 
74
            # ONLY trampoline on read events for the zmq FD
 
75
            trampoline(self.getsockopt(__zmq__.FD), read=True)
 
76
            return self.getsockopt(__zmq__.EVENTS)
 
77
 
 
78
    def send(self, msg, flags=0, copy=True, track=False):
 
79
        """
 
80
        Override this instead of the internal _send_* methods 
 
81
        since those change and it's not clear when/how they're
 
82
        called in real code.
 
83
        """
 
84
        if flags & __zmq__.NOBLOCK:
 
85
            super(Socket, self).send(msg, flags=flags, track=track, copy=copy)
 
86
            return
 
87
 
 
88
        flags |= __zmq__.NOBLOCK
 
89
 
 
90
        while True:
 
91
            try:
 
92
                self._sock_wait(write=True)
 
93
                super(Socket, self).send(msg, flags=flags, track=track,
 
94
                                         copy=copy)
 
95
                return
 
96
            except __zmq__.ZMQError, e:
 
97
                if e.errno != EAGAIN:
 
98
                    raise
 
99
 
 
100
    def recv(self, flags=0, copy=True, track=False):
 
101
        """
 
102
        Override this instead of the internal _recv_* methods 
 
103
        since those change and it's not clear when/how they're
 
104
        called in real code.
 
105
        """
 
106
        if flags & __zmq__.NOBLOCK:
 
107
            return super(Socket, self).recv(flags=flags, track=track, copy=copy)
 
108
 
 
109
        flags |= __zmq__.NOBLOCK
 
110
 
 
111
        while True:
 
112
            try:
 
113
                self._sock_wait(read=True)
 
114
                m = super(Socket, self).recv(flags=flags, track=track, copy=copy)
 
115
                if m is not None:
 
116
                    return m
 
117
            except __zmq__.ZMQError, e:
 
118
                if e.errno != EAGAIN:
 
119
                    raise
119
120
 
120
121