~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/green/core.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#-----------------------------------------------------------------------------
 
2
#  Copyright (c) 2011-2012 Travis Cline
 
3
#
 
4
#  This file is part of pyzmq
 
5
#  It is adapted from upstream project zeromq_gevent under the New BSD License
 
6
#
 
7
#  Distributed under the terms of the New BSD License.  The full license is in
 
8
#  the file COPYING.BSD, distributed as part of this software.
 
9
#-----------------------------------------------------------------------------
 
10
 
 
11
"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
 
12
"""
 
13
 
 
14
from __future__ import print_function
 
15
 
 
16
import sys
 
17
import time
 
18
 
 
19
import zmq
 
20
 
 
21
from zmq import Context as _original_Context
 
22
from zmq import Socket as _original_Socket
 
23
 
 
24
import gevent
 
25
from gevent.event import AsyncResult
 
26
from gevent.hub import get_hub
 
27
 
 
28
 
 
29
def _stop(evt):
 
30
    """simple wrapper for stopping an Event, allowing for method rename in gevent 1.0"""
 
31
    try:
 
32
        evt.stop()
 
33
    except AttributeError as e:
 
34
        # gevent<1.0 compat
 
35
        evt.cancel()
 
36
 
 
37
class _Socket(_original_Socket):
 
38
    """Green version of :class:`zmq.core.socket.Socket`
 
39
 
 
40
    The following methods are overridden:
 
41
 
 
42
        * send
 
43
        * recv
 
44
 
 
45
    To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
 
46
    is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised.
 
47
    
 
48
    The `__state_changed` method is triggered when the zmq.FD for the socket is
 
49
    marked as readable and triggers the necessary read and write events (which
 
50
    are waited for in the recv and send methods).
 
51
 
 
52
    Some double underscore prefixes are used to minimize pollution of
 
53
    :class:`zmq.core.socket.Socket`'s namespace.
 
54
    """
 
55
    
 
56
    def __init__(self, context, socket_type):
 
57
        self.__in_send_multipart = False
 
58
        self.__in_recv_multipart = False
 
59
        self.__setup_events()
 
60
 
 
61
    def __del__(self):
 
62
        self.close()
 
63
 
 
64
    def close(self, linger=None):
 
65
        super(_Socket, self).close(linger)
 
66
        self.__cleanup_events()
 
67
 
 
68
    def __cleanup_events(self):
 
69
        # close the _state_event event, keeps the number of active file descriptors down
 
70
        if getattr(self, '_state_event', None):
 
71
            _stop(self._state_event)
 
72
            self._state_event = None
 
73
        # if the socket has entered a close state resume any waiting greenlets
 
74
        self.__writable.set()
 
75
        self.__readable.set()
 
76
 
 
77
    def __setup_events(self):
 
78
        self.__readable = AsyncResult()
 
79
        self.__writable = AsyncResult()
 
80
        self.__readable.set()
 
81
        self.__writable.set()
 
82
        
 
83
        try:
 
84
            self._state_event = get_hub().loop.io(self.getsockopt(zmq.FD), 1) # read state watcher
 
85
            self._state_event.start(self.__state_changed)
 
86
        except AttributeError:
 
87
            # for gevent<1.0 compatibility
 
88
            from gevent.core import read_event
 
89
            self._state_event = read_event(self.getsockopt(zmq.FD), self.__state_changed, persist=True)
 
90
 
 
91
    def __state_changed(self, event=None, _evtype=None):
 
92
        if self.closed:
 
93
            self.__cleanup_events()
 
94
            return
 
95
        try:
 
96
            # avoid triggering __state_changed from inside __state_changed
 
97
            events = super(_Socket, self).getsockopt(zmq.EVENTS)
 
98
        except zmq.ZMQError as exc:
 
99
            self.__writable.set_exception(exc)
 
100
            self.__readable.set_exception(exc)
 
101
        else:
 
102
            if events & zmq.POLLOUT:
 
103
                self.__writable.set()
 
104
            if events & zmq.POLLIN:
 
105
                self.__readable.set()
 
106
 
 
107
    def _wait_write(self):
 
108
        assert self.__writable.ready(), "Only one greenlet can be waiting on this event"
 
109
        self.__writable = AsyncResult()
 
110
        # timeout is because libzmq cannot be trusted to properly signal a new send event:
 
111
        # this is effectively a maximum poll interval of 1s
 
112
        tic = time.time()
 
113
        timeout = gevent.Timeout(seconds=1)
 
114
        try:
 
115
            timeout.start()
 
116
            self.__writable.get(block=True)
 
117
        except gevent.Timeout as t:
 
118
            if t is not timeout:
 
119
                raise
 
120
            toc = time.time()
 
121
            # gevent bug: get can raise timeout even on clean return
 
122
            # don't display zmq bug warning for gevent bug (this is getting ridiculous)
 
123
            if toc-tic > 0.9 and self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
 
124
                print("BUG: gevent missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
 
125
        finally:
 
126
            self.__writable.set()
 
127
            timeout.cancel()
 
128
 
 
129
    def _wait_read(self):
 
130
        assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
 
131
        self.__readable = AsyncResult()
 
132
        # timeout is because libzmq cannot always be trusted to play nice with libevent.
 
133
        # I can only confirm that this actually happens for send, but lets be symmetrical
 
134
        # with our dirty hacks.
 
135
        # this is effectively a maximum poll interval of 1s
 
136
        tic = time.time()
 
137
        timeout = gevent.Timeout(seconds=1)
 
138
        try:
 
139
            timeout.start()
 
140
            self.__readable.get(block=True)
 
141
        except gevent.Timeout as t:
 
142
            if t is not timeout:
 
143
                raise
 
144
            toc = time.time()
 
145
            # gevent bug: get can raise timeout even on clean return
 
146
            # don't display zmq bug warning for gevent bug (this is getting ridiculous)
 
147
            if toc-tic > 0.9 and self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
 
148
                print("BUG: gevent missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
 
149
        finally:
 
150
            self.__readable.set()
 
151
            timeout.cancel()
 
152
 
 
153
    def send(self, data, flags=0, copy=True, track=False):
 
154
        """send, which will only block current greenlet
 
155
        
 
156
        state_changed always fires exactly once (success or fail) at the
 
157
        end of this method.
 
158
        """
 
159
        
 
160
        # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
 
161
        if flags & zmq.NOBLOCK:
 
162
            try:
 
163
                msg = super(_Socket, self).send(data, flags, copy, track)
 
164
            finally:
 
165
                if not self.__in_send_multipart:
 
166
                    self.__state_changed()
 
167
            return msg
 
168
        # ensure the zmq.NOBLOCK flag is part of flags
 
169
        flags |= zmq.NOBLOCK
 
170
        while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
 
171
            try:
 
172
                # attempt the actual call
 
173
                msg = super(_Socket, self).send(data, flags, copy, track)
 
174
            except zmq.ZMQError as e:
 
175
                # if the raised ZMQError is not EAGAIN, reraise
 
176
                if e.errno != zmq.EAGAIN:
 
177
                    if not self.__in_send_multipart:
 
178
                        self.__state_changed()
 
179
                    raise
 
180
            else:
 
181
                if not self.__in_send_multipart:
 
182
                    self.__state_changed()
 
183
                return msg
 
184
            # defer to the event loop until we're notified the socket is writable
 
185
            self._wait_write()
 
186
 
 
187
    def recv(self, flags=0, copy=True, track=False):
 
188
        """recv, which will only block current greenlet
 
189
        
 
190
        state_changed always fires exactly once (success or fail) at the
 
191
        end of this method.
 
192
        """
 
193
        if flags & zmq.NOBLOCK:
 
194
            try:
 
195
                msg = super(_Socket, self).recv(flags, copy, track)
 
196
            finally:
 
197
                if not self.__in_recv_multipart:
 
198
                    self.__state_changed()
 
199
            return msg
 
200
        
 
201
        flags |= zmq.NOBLOCK
 
202
        while True:
 
203
            try:
 
204
                msg = super(_Socket, self).recv(flags, copy, track)
 
205
            except zmq.ZMQError as e:
 
206
                if e.errno != zmq.EAGAIN:
 
207
                    if not self.__in_recv_multipart:
 
208
                        self.__state_changed()
 
209
                    raise
 
210
            else:
 
211
                if not self.__in_recv_multipart:
 
212
                    self.__state_changed()
 
213
                return msg
 
214
            self._wait_read()
 
215
    
 
216
    def send_multipart(self, *args, **kwargs):
 
217
        """wrap send_multipart to prevent state_changed on each partial send"""
 
218
        self.__in_send_multipart = True
 
219
        try:
 
220
            msg = super(_Socket, self).send_multipart(*args, **kwargs)
 
221
        finally:
 
222
            self.__in_send_multipart = False
 
223
            self.__state_changed()
 
224
        return msg
 
225
    
 
226
    def recv_multipart(self, *args, **kwargs):
 
227
        """wrap recv_multipart to prevent state_changed on each partial recv"""
 
228
        self.__in_recv_multipart = True
 
229
        try:
 
230
            msg = super(_Socket, self).recv_multipart(*args, **kwargs)
 
231
        finally:
 
232
            self.__in_recv_multipart = False
 
233
            self.__state_changed()
 
234
        return msg
 
235
    
 
236
    def getsockopt(self, opt):
 
237
        """trigger state_changed on getsockopt(EVENTS)"""
 
238
        optval = super(_Socket, self).getsockopt(opt)
 
239
        if opt == zmq.EVENTS:
 
240
            self.__state_changed()
 
241
        return optval
 
242
 
 
243
 
 
244
class _Context(_original_Context):
 
245
    """Replacement for :class:`zmq.core.context.Context`
 
246
 
 
247
    Ensures that the greened Socket above is used in calls to `socket`.
 
248
    """
 
249
    _socket_class = _Socket