~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/devices/basedevice.py

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-02-15 09:08:36 UTC
  • mfrom: (2.1.2 experimental)
  • Revision ID: james.westby@ubuntu.com-20110215090836-phh4slym1g6muucn
Tags: 2.0.10.1-2
* Team upload.
* Upload to unstable
* Add Breaks: ${python:Breaks}

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Classes for running 0MQ Devices in the background.
 
2
 
 
3
Authors
 
4
-------
 
5
* MinRK
 
6
* Brian Granger
 
7
"""
 
8
 
 
9
#
 
10
#    Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
 
11
#
 
12
#    This file is part of pyzmq.
 
13
#
 
14
#    pyzmq is free software; you can redistribute it and/or modify it under
 
15
#    the terms of the Lesser GNU General Public License as published by
 
16
#    the Free Software Foundation; either version 3 of the License, or
 
17
#    (at your option) any later version.
 
18
#
 
19
#    pyzmq is distributed in the hope that it will be useful,
 
20
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
21
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
22
#    Lesser GNU General Public License for more details.
 
23
#
 
24
#    You should have received a copy of the Lesser GNU General Public License
 
25
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
26
#
 
27
 
 
28
#-----------------------------------------------------------------------------
 
29
# Imports
 
30
#-----------------------------------------------------------------------------
 
31
 
 
32
import time
 
33
from threading import Thread
 
34
try:
 
35
    from multiprocessing import Process
 
36
except ImportError:
 
37
    Process = None
 
38
 
 
39
from zmq.core import device, Context
 
40
 
 
41
#-----------------------------------------------------------------------------
 
42
# Classes
 
43
#-----------------------------------------------------------------------------
 
44
 
 
45
class Device:
 
46
    """A Threadsafe 0MQ Device.
 
47
    
 
48
    *Warning* as with most 'threadsafe' Python objects, this is only
 
49
    threadsafe as long as you do not use private methods or attributes.
 
50
    Private names are prefixed with '_', such as 'self._setup_socket()'.
 
51
    
 
52
    For thread safety, you do not pass Sockets to this, but rather Socket
 
53
    types::
 
54
 
 
55
        Device(device_type, in_socket_type, out_socket_type)
 
56
 
 
57
    For instance::
 
58
 
 
59
    dev = Device(zmq.QUEUE, zmq.XREQ, zmq.XREP)
 
60
 
 
61
    Similar to zmq.device, but socket types instead of sockets themselves are
 
62
    passed, and the sockets are created in the work thread, to avoid issues
 
63
    with thread safety. As a result, additional bind_{in|out} and
 
64
    connect_{in|out} methods and setsockopt_{in|out} allow users to specify
 
65
    connections for the sockets.
 
66
    
 
67
    Parameters
 
68
    ----------
 
69
    device_type : int
 
70
        The 0MQ Device type
 
71
    {in|out}_type : int
 
72
        zmq socket types, to be passed later to context.socket(). e.g.
 
73
        zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used
 
74
        for both in_socket and out_socket.
 
75
        
 
76
    Methods
 
77
    -------
 
78
    bind_{in_out}(iface)
 
79
        passthrough for {in|out}_socket.bind(iface), to be called in the thread
 
80
    connect_{in_out}(iface)
 
81
        passthrough for {in|out}_socket.connect(iface), to be called in the
 
82
        thread
 
83
    setsockopt_{in_out}(opt,value)
 
84
        passthrough for {in|out}_socket.setsockopt(opt, value), to be called in
 
85
        the thread
 
86
    
 
87
    Attributes
 
88
    ----------
 
89
    daemon: int
 
90
        sets whether the thread should be run as a daemon
 
91
        Default is true, because if it is false, the thread will not
 
92
        exit unless it is killed
 
93
    """
 
94
 
 
95
    def __init__(self, device_type, in_type, out_type):
 
96
        self.device_type = device_type
 
97
        self.in_type = in_type
 
98
        self.out_type = out_type
 
99
        self._in_binds = list()
 
100
        self._in_connects = list()
 
101
        self._in_sockopts = list()
 
102
        self._out_binds = list()
 
103
        self._out_connects = list()
 
104
        self._out_sockopts = list()
 
105
        self.daemon = True
 
106
        self.done = False
 
107
    
 
108
    def bind_in(self, addr):
 
109
        """Enqueue ZMQ address for binding on in_socket.
 
110
 
 
111
        See ``zmq.Socket.bind`` for details.
 
112
        """
 
113
        self._in_binds.append(addr)
 
114
    
 
115
    def connect_in(self, addr):
 
116
        """Enqueue ZMQ address for connecting on in_socket.
 
117
 
 
118
        See ``zmq.Socket.connect`` for details.
 
119
        """
 
120
        self._in_connects.append(addr)
 
121
    
 
122
    def setsockopt_in(self, opt, value):
 
123
        """Enqueue setsockopt(opt, value) for in_socket
 
124
 
 
125
        See ``zmq.Socket.setsockopt`` for details.
 
126
        """
 
127
        self._in_sockopts.append((opt, value))
 
128
    
 
129
    def bind_out(self, iface):
 
130
        """Enqueue ZMQ address for binding on out_socket.
 
131
 
 
132
        See ``zmq.Socket.bind`` for details.
 
133
        """
 
134
        self._out_binds.append(iface)
 
135
    
 
136
    def connect_out(self, iface):
 
137
        """Enqueue ZMQ address for connecting on out_socket.
 
138
 
 
139
        See ``zmq.Socket.connect`` for details.
 
140
        """
 
141
        self._out_connects.append(iface)
 
142
    
 
143
    def setsockopt_out(self, opt, value):
 
144
        """Enqueue setsockopt(opt, value) for out_socket
 
145
 
 
146
        See ``zmq.Socket.setsockopt`` for details.
 
147
        """
 
148
        self._out_sockopts.append((opt, value))
 
149
    
 
150
    def _setup_sockets(self):
 
151
        ctx = Context()
 
152
        self._context = ctx
 
153
        
 
154
        # create the sockets
 
155
        ins = ctx.socket(self.in_type)
 
156
        if self.out_type < 0:
 
157
            outs = ins
 
158
        else:
 
159
            outs = ctx.socket(self.out_type)
 
160
        
 
161
        # set sockopts (must be done first, in case of zmq.IDENTITY)
 
162
        for opt,value in self._in_sockopts:
 
163
            ins.setsockopt(opt, value)
 
164
        for opt,value in self._out_sockopts:
 
165
            outs.setsockopt(opt, value)
 
166
        
 
167
        for iface in self._in_binds:
 
168
            ins.bind(iface)
 
169
        for iface in self._out_binds:
 
170
            outs.bind(iface)
 
171
        
 
172
        for iface in self._in_connects:
 
173
            ins.connect(iface)
 
174
        for iface in self._out_connects:
 
175
            outs.connect(iface)
 
176
        
 
177
        return ins,outs
 
178
    
 
179
    def run(self):
 
180
        """The runner method.
 
181
 
 
182
        Do not call me directly, instead call ``self.start()``, just like a
 
183
        Thread.
 
184
        """
 
185
        ins,outs = self._setup_sockets()
 
186
        rc = device(self.device_type, ins, outs)
 
187
        self.done = True
 
188
        return rc
 
189
    
 
190
    def start(self):
 
191
        """Start the device. Override me in subclass for other launchers."""
 
192
        return self.run()
 
193
 
 
194
    def join(self,timeout=None):
 
195
        tic = time.time()
 
196
        toc = tic
 
197
        while not self.done and not (timeout is not None and toc-tic > timeout):
 
198
            time.sleep(.001)
 
199
            toc = time.time()
 
200
 
 
201
 
 
202
class BackgroundDevice(Device):
 
203
    """Base class for launching Devices in background processes and threads."""
 
204
 
 
205
    launcher=None
 
206
    launch_class=None
 
207
 
 
208
    def start(self):
 
209
        self.launcher = self.launch_class(target=self.run)
 
210
        self.launcher.daemon = self.daemon
 
211
        return self.launcher.start()
 
212
 
 
213
    def join(self, timeout=None):
 
214
        return self.launcher.join(timeout=timeout)
 
215
 
 
216
 
 
217
class ThreadDevice(BackgroundDevice):
 
218
    """A Device that will be run in a background Thread.
 
219
 
 
220
    See `Device` for details.
 
221
    """
 
222
    launch_class=Thread
 
223
 
 
224
class ProcessDevice(BackgroundDevice):
 
225
    """A Device that will be run in a background Process.
 
226
 
 
227
    See `Device` for details.
 
228
    """
 
229
    launch_class=Process
 
230
 
 
231
 
 
232
__all__ = [ 'Device', 'ThreadDevice']
 
233
if Process is not None:
 
234
    __all__.append('ProcessDevice')