10
# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
12
# This file is part of pyzmq.
14
# pyzmq is free software; you can redistribute it and/or modify it under
15
# the terms of the Lesser GNU General Public License as published by
16
# the Free Software Foundation; either version 3 of the License, or
17
# (at your option) any later version.
19
# pyzmq is distributed in the hope that it will be useful,
20
# but WITHOUT ANY WARRANTY; without even the implied warranty of
21
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22
# Lesser GNU General Public License for more details.
24
# You should have received a copy of the Lesser GNU General Public License
25
# along with this program. If not, see <http://www.gnu.org/licenses/>.
9
#-----------------------------------------------------------------------------
10
# Copyright (c) 2010-2012 Brian Granger, Min Ragan-Kelley
12
# This file is part of pyzmq
14
# Distributed under the terms of the New BSD License. The full license is in
15
# the file COPYING.BSD, distributed as part of this software.
16
#-----------------------------------------------------------------------------
28
18
#-----------------------------------------------------------------------------
48
38
*Warning* as with most 'threadsafe' Python objects, this is only
49
39
threadsafe as long as you do not use private methods or attributes.
50
Private names are prefixed with '_', such as 'self._setup_socket()'.
40
Private names are prefixed with '_', such as ``self._setup_socket()``.
52
42
For thread safety, you do not pass Sockets to this, but rather Socket
59
dev = Device(zmq.QUEUE, zmq.XREQ, zmq.XREP)
49
dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER)
61
51
Similar to zmq.device, but socket types instead of sockets themselves are
62
52
passed, and the sockets are created in the work thread, to avoid issues
78
68
bind_{in_out}(iface)
79
passthrough for {in|out}_socket.bind(iface), to be called in the thread
69
passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread
80
70
connect_{in_out}(iface)
81
passthrough for {in|out}_socket.connect(iface), to be called in the
71
passthrough for ``{in|out}_socket.connect(iface)``, to be called in the
83
73
setsockopt_{in_out}(opt,value)
84
passthrough for {in|out}_socket.setsockopt(opt, value), to be called in
74
passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in
90
80
sets whether the thread should be run as a daemon
91
81
Default is true, because if it is false, the thread will not
92
82
exit unless it is killed
83
context_factory : callable (class attribute)
84
Function for creating the Context. This will be Context.intance
85
in ThreadDevices, and Context in ProcessDevices. The only reason
86
it is not instance() in ProcessDevices is that there may be a stale
87
Context instance already initialized, and the forked environment
88
should *never* try to use it.
91
context_factory = Context.instance
95
93
def __init__(self, device_type, in_type, out_type):
96
94
self.device_type = device_type
108
106
def bind_in(self, addr):
109
107
"""Enqueue ZMQ address for binding on in_socket.
111
See ``zmq.Socket.bind`` for details.
109
See zmq.Socket.bind for details.
113
111
self._in_binds.append(addr)
115
113
def connect_in(self, addr):
116
114
"""Enqueue ZMQ address for connecting on in_socket.
118
See ``zmq.Socket.connect`` for details.
116
See zmq.Socket.connect for details.
120
118
self._in_connects.append(addr)
122
120
def setsockopt_in(self, opt, value):
123
121
"""Enqueue setsockopt(opt, value) for in_socket
125
See ``zmq.Socket.setsockopt`` for details.
123
See zmq.Socket.setsockopt for details.
127
125
self._in_sockopts.append((opt, value))
129
127
def bind_out(self, iface):
130
128
"""Enqueue ZMQ address for binding on out_socket.
132
See ``zmq.Socket.bind`` for details.
130
See zmq.Socket.bind for details.
134
132
self._out_binds.append(iface)
136
134
def connect_out(self, iface):
137
135
"""Enqueue ZMQ address for connecting on out_socket.
139
See ``zmq.Socket.connect`` for details.
137
See zmq.Socket.connect for details.
141
139
self._out_connects.append(iface)
143
141
def setsockopt_out(self, opt, value):
144
142
"""Enqueue setsockopt(opt, value) for out_socket
146
See ``zmq.Socket.setsockopt`` for details.
144
See zmq.Socket.setsockopt for details.
148
146
self._out_sockopts.append((opt, value))
150
148
def _setup_sockets(self):
149
ctx = self.context_factory()
152
151
self._context = ctx
154
153
# create the sockets
192
191
return self.run()
194
193
def join(self,timeout=None):
194
"""wait for me to finish, like Thread.join.
196
Reimplemented appropriately by sublcasses."""
195
197
tic = time.time()
197
199
while not self.done and not (timeout is not None and toc-tic > timeout):
203
205
"""Base class for launching Devices in background processes and threads."""
209
self.launcher = self.launch_class(target=self.run)
211
self.launcher = self._launch_class(target=self.run)
210
212
self.launcher.daemon = self.daemon
211
213
return self.launcher.start()
217
219
class ThreadDevice(BackgroundDevice):
218
220
"""A Device that will be run in a background Thread.
220
See `Device` for details.
222
See Device for details.
224
226
class ProcessDevice(BackgroundDevice):
225
227
"""A Device that will be run in a background Process.
227
See `Device` for details.
229
See Device for details.
231
_launch_class=Process
232
context_factory = Context
232
235
__all__ = [ 'Device', 'ThreadDevice']