1
"""Classes for running 0MQ Devices in the background.
10
# Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
12
# This file is part of pyzmq.
14
# pyzmq is free software; you can redistribute it and/or modify it under
15
# the terms of the Lesser GNU General Public License as published by
16
# the Free Software Foundation; either version 3 of the License, or
17
# (at your option) any later version.
19
# pyzmq is distributed in the hope that it will be useful,
20
# but WITHOUT ANY WARRANTY; without even the implied warranty of
21
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22
# Lesser GNU General Public License for more details.
24
# You should have received a copy of the Lesser GNU General Public License
25
# along with this program. If not, see <http://www.gnu.org/licenses/>.
28
#-----------------------------------------------------------------------------
30
#-----------------------------------------------------------------------------
33
from threading import Thread
35
from multiprocessing import Process
39
from zmq.core import device, Context
41
#-----------------------------------------------------------------------------
43
#-----------------------------------------------------------------------------
46
"""A Threadsafe 0MQ Device.
48
*Warning* as with most 'threadsafe' Python objects, this is only
49
threadsafe as long as you do not use private methods or attributes.
50
Private names are prefixed with '_', such as 'self._setup_socket()'.
52
For thread safety, you do not pass Sockets to this, but rather Socket
55
Device(device_type, in_socket_type, out_socket_type)
59
dev = Device(zmq.QUEUE, zmq.XREQ, zmq.XREP)
61
Similar to zmq.device, but socket types instead of sockets themselves are
62
passed, and the sockets are created in the work thread, to avoid issues
63
with thread safety. As a result, additional bind_{in|out} and
64
connect_{in|out} methods and setsockopt_{in|out} allow users to specify
65
connections for the sockets.
72
zmq socket types, to be passed later to context.socket(). e.g.
73
zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used
74
for both in_socket and out_socket.
79
passthrough for {in|out}_socket.bind(iface), to be called in the thread
80
connect_{in_out}(iface)
81
passthrough for {in|out}_socket.connect(iface), to be called in the
83
setsockopt_{in_out}(opt,value)
84
passthrough for {in|out}_socket.setsockopt(opt, value), to be called in
90
sets whether the thread should be run as a daemon
91
Default is true, because if it is false, the thread will not
92
exit unless it is killed
95
def __init__(self, device_type, in_type, out_type):
96
self.device_type = device_type
97
self.in_type = in_type
98
self.out_type = out_type
99
self._in_binds = list()
100
self._in_connects = list()
101
self._in_sockopts = list()
102
self._out_binds = list()
103
self._out_connects = list()
104
self._out_sockopts = list()
108
def bind_in(self, addr):
109
"""Enqueue ZMQ address for binding on in_socket.
111
See ``zmq.Socket.bind`` for details.
113
self._in_binds.append(addr)
115
def connect_in(self, addr):
116
"""Enqueue ZMQ address for connecting on in_socket.
118
See ``zmq.Socket.connect`` for details.
120
self._in_connects.append(addr)
122
def setsockopt_in(self, opt, value):
123
"""Enqueue setsockopt(opt, value) for in_socket
125
See ``zmq.Socket.setsockopt`` for details.
127
self._in_sockopts.append((opt, value))
129
def bind_out(self, iface):
130
"""Enqueue ZMQ address for binding on out_socket.
132
See ``zmq.Socket.bind`` for details.
134
self._out_binds.append(iface)
136
def connect_out(self, iface):
137
"""Enqueue ZMQ address for connecting on out_socket.
139
See ``zmq.Socket.connect`` for details.
141
self._out_connects.append(iface)
143
def setsockopt_out(self, opt, value):
144
"""Enqueue setsockopt(opt, value) for out_socket
146
See ``zmq.Socket.setsockopt`` for details.
148
self._out_sockopts.append((opt, value))
150
def _setup_sockets(self):
155
ins = ctx.socket(self.in_type)
156
if self.out_type < 0:
159
outs = ctx.socket(self.out_type)
161
# set sockopts (must be done first, in case of zmq.IDENTITY)
162
for opt,value in self._in_sockopts:
163
ins.setsockopt(opt, value)
164
for opt,value in self._out_sockopts:
165
outs.setsockopt(opt, value)
167
for iface in self._in_binds:
169
for iface in self._out_binds:
172
for iface in self._in_connects:
174
for iface in self._out_connects:
180
"""The runner method.
182
Do not call me directly, instead call ``self.start()``, just like a
185
ins,outs = self._setup_sockets()
186
rc = device(self.device_type, ins, outs)
191
"""Start the device. Override me in subclass for other launchers."""
194
def join(self,timeout=None):
197
while not self.done and not (timeout is not None and toc-tic > timeout):
202
class BackgroundDevice(Device):
203
"""Base class for launching Devices in background processes and threads."""
209
self.launcher = self.launch_class(target=self.run)
210
self.launcher.daemon = self.daemon
211
return self.launcher.start()
213
def join(self, timeout=None):
214
return self.launcher.join(timeout=timeout)
217
class ThreadDevice(BackgroundDevice):
218
"""A Device that will be run in a background Thread.
220
See `Device` for details.
224
class ProcessDevice(BackgroundDevice):
225
"""A Device that will be run in a background Process.
227
See `Device` for details.
232
__all__ = [ 'Device', 'ThreadDevice']
233
if Process is not None:
234
__all__.append('ProcessDevice')