1
"""Proxy classes for forwarding tornado handlers to be run in separate processes.
3
This module uses ZeroMQ/PyZMQ sockets (DEALER/ROUTER) to enable individual
4
Tornado handlers to be run in a separate backend process. Through the
5
usage of DEALER/ROUTER sockets, multiple backend processes for a given
6
handler can be started and requests will be load balanced among the backend
14
#-----------------------------------------------------------------------------
15
# Copyright (c) 2012 Brian Granger, Min Ragan-Kelley
17
# This file is part of pyzmq
19
# Distributed under the terms of the New BSD License. The full license is in
20
# the file COPYING.BSD, distributed as part of this software.
21
#-----------------------------------------------------------------------------
23
#-----------------------------------------------------------------------------
25
#-----------------------------------------------------------------------------
32
from tornado import web
33
from tornado import stack_context
36
from zmq.eventloop.zmqstream import ZMQStream
37
from zmq.eventloop.ioloop import IOLoop, DelayedCallback
38
from zmq.utils import jsonapi
40
from .zmqweb import ZMQHTTPRequest
42
#-----------------------------------------------------------------------------
44
#-----------------------------------------------------------------------------
47
class ZMQApplicationProxy(object):
48
"""A proxy for a ZeroMQ based ZMQApplication that is using ZMQHTTPRequest.
50
This class is a proxy for a backend that is running a
51
ZMQApplication and MUST be used with the ZMQHTTPRequest class. This
52
version sends the reply parts (each generated by RequestHandler.flush) as
53
a single multipart message for low latency replies. See
54
ZMQStreamingApplicationProxy, for a version that has higher latency, but
55
which sends each reply part as a separate zmq message.
58
def __init__(self, loop=None, context=None):
59
self.loop = loop if loop is not None else IOLoop.instance()
60
self.context = context if context is not None else zmq.Context.instance()
62
self.socket = self.context.socket(zmq.DEALER)
63
self.stream = ZMQStream(self.socket, self.loop)
64
self.stream.on_recv(self._handle_reply)
67
def connect(self, url):
68
"""Connect the service client to the proto://ip:port given in the url."""
70
self.socket.connect(url)
73
"""Bind the service client to the proto://ip:port given in the url."""
77
def send_request(self, request, args, kwargs, handler, timeout):
78
"""Send a request to the service."""
80
req['method'] = request.method
81
req['uri'] = request.uri
82
req['version'] = request.version
83
req['headers'] = dict(request.headers)
85
req['remote_ip'] = request.remote_ip
86
req['protocol'] = request.protocol
87
req['host'] = request.host
88
req['files'] = request.files
89
req['arguments'] = request.arguments
91
req['kwargs'] = kwargs
93
msg_id = bytes(uuid.uuid4())
94
msg_list = [b'|', msg_id, jsonapi.dumps(req)]
97
logging.debug('Sending request: %r', msg_list)
98
self.stream.send_multipart(msg_list)
101
def _handle_timeout():
102
handler.send_error(504) # Gateway timeout
104
self._callbacks.pop(msg_id)
106
logging.error('Unexpected error removing callbacks')
107
dc = DelayedCallback(_handle_timeout, timeout, self.loop)
111
self._callbacks[msg_id] = (handler, dc)
114
def _handle_reply(self, msg_list):
115
logging.debug('Handling reply: %r', msg_list)
116
len_msg_list = len(msg_list)
117
if len_msg_list < 3 or not msg_list[0] == b'|':
118
logging.error('Unexpected reply in ZMQApplicationProxy._handle_reply')
121
replies = msg_list[2:]
122
cb = self._callbacks.pop(msg_id, None)
128
for reply in replies:
130
# The backend has already processed the headers and they are
131
# included in the above write calls, so we manually tell the
132
# handler that the headers are already written.
133
handler._headers_written = True
134
# We set transforms to an empty list because the backend
135
# has already applied all of the transforms.
136
handler._transforms = []
139
logging.error('Unexpected error in ZMQApplicationProxy._handle_reply', exc_info=True)
142
class ZMQStreamingApplicationProxy(ZMQApplicationProxy):
143
"""A proxy for a ZeroMQ based ZMQApplication that is using ZMQStreamingHTTPRequest.
145
This class is a proxy for a backend that is running a
146
ZMQApplication and MUST be used with the ZMQStreamingHTTPRequest class.
147
This version sends the reply parts (each generated by RequestHandler.flush)
148
as separate zmq messages to enable streaming replies. See
149
ZMQApplicationProxy, for a version that has lower latency, but which sends
150
all reply parts as a single zmq message.
153
def _handle_reply(self, msg_list):
154
logging.debug('Handling reply: %r', msg_list)
155
len_msg_list = len(msg_list)
156
if len_msg_list < 3 or not msg_list[0] == b'|':
157
logging.error('Unexpected reply in ZMQStreamingApplicationProxy._handle_reply')
161
cb = self._callbacks.get(msg_id)
164
if reply == b'DATA' and len_msg_list == 4:
166
# Stop the timeout DelayedCallback and set it to None.
168
self._callbacks[msg_id] = (handler, None)
170
handler.write(msg_list[3])
171
# The backend has already processed the headers and they are
172
# included in the above write calls, so we manually tell the
173
# handler that the headers are already written.
174
handler._headers_written = True
175
# We set transforms to an empty list because the backend
176
# has already applied all of the transforms.
177
handler._transforms = []
180
# socket.error is raised if the client disconnects while
184
logging.error('Unexpected write error', exc_info=True)
185
elif reply == b'FINISH':
186
# We are done so we can get rid of the callbacks for this msg_id.
187
self._callbacks.pop(msg_id)
191
# socket.error is raised if the client disconnects while
195
logging.error('Unexpected finish error', exc_info=True)
198
class ZMQRequestHandlerProxy(web.RequestHandler):
199
"""A handler for use with a ZeroMQ backend service client."""
201
SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT", "OPTIONS")
203
def initialize(self, proxy, timeout=0):
204
"""Initialize with a proxy and timeout.
208
proxy : ZMQApplicationProxy. ZMQStreamingApplicationProxy
209
A proxy instance that will be used to send requests to a backend
212
The timeout, in milliseconds. If this timeout is reached
213
before the backend's first reply, then the server is sent a
214
status code of 504 to the browser to indicate a gateway/proxy
215
timeout. Set to 0 or a negative number to disable (infinite
218
# zmqweb Note: This method is empty in the base class.
220
self.timeout = timeout
222
def _execute(self, transforms, *args, **kwargs):
223
"""Executes this request with the given output transforms."""
224
# ZMQWEB NOTE: Transforms should be applied in the backend service so
225
# we null any transforms passed in here. This may be a little too
226
# silent, but there may be other handlers that do need the transforms.
227
self._transforms = []
228
# ZMQWEB NOTE: This following try/except block is taken from the base
229
# class, but is modified to send the request to the proxy.
231
if self.request.method not in self.SUPPORTED_METHODS:
232
raise web.HTTPError(405)
233
# ZMQWEB NOTE: We have removed the XSRF cookie handling from here
234
# as it will be handled in the backend.
236
if not self._finished:
237
# ZMQWEB NOTE: Here is where we send the request to the proxy.
238
# We don't decode args or kwargs as that will be done in the
240
self.proxy.send_request(
241
self.request, args, kwargs, self, self.timeout
244
# ZMQWEB NOTE: We don't call the usual error handling logic
245
# as that will be called by the backend process.
246
logging.error('Unexpected error in _execute', exc_info=True)