~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/web/proxy.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Proxy classes for forwarding tornado handlers to be run in separate processes.
 
2
 
 
3
This module uses ZeroMQ/PyZMQ sockets (DEALER/ROUTER) to enable individual
 
4
Tornado handlers to be run in a separate backend process. Through the
 
5
usage of DEALER/ROUTER sockets, multiple backend processes for a given
 
6
handler can be started and requests will be load balanced among the backend
 
7
processes.
 
8
 
 
9
Authors:
 
10
 
 
11
* Brian Granger
 
12
"""
 
13
 
 
14
#-----------------------------------------------------------------------------
 
15
#  Copyright (c) 2012 Brian Granger, Min Ragan-Kelley
 
16
#
 
17
#  This file is part of pyzmq
 
18
#
 
19
#  Distributed under the terms of the New BSD License.  The full license is in
 
20
#  the file COPYING.BSD, distributed as part of this software.
 
21
#-----------------------------------------------------------------------------
 
22
 
 
23
#-----------------------------------------------------------------------------
 
24
# Imports
 
25
#-----------------------------------------------------------------------------
 
26
 
 
27
import logging
 
28
import socket
 
29
import time
 
30
import uuid
 
31
 
 
32
from tornado import web
 
33
from tornado import stack_context
 
34
 
 
35
import zmq
 
36
from zmq.eventloop.zmqstream import ZMQStream
 
37
from zmq.eventloop.ioloop import IOLoop, DelayedCallback
 
38
from zmq.utils import jsonapi
 
39
 
 
40
from .zmqweb import ZMQHTTPRequest
 
41
 
 
42
#-----------------------------------------------------------------------------
 
43
# Service client
 
44
#-----------------------------------------------------------------------------
 
45
 
 
46
 
 
47
class ZMQApplicationProxy(object):
 
48
    """A proxy for a ZeroMQ based ZMQApplication that is using ZMQHTTPRequest.
 
49
 
 
50
    This class is a proxy for a backend that is running a
 
51
    ZMQApplication and MUST be used with the ZMQHTTPRequest class. This
 
52
    version sends the reply parts (each generated by RequestHandler.flush) as
 
53
    a single multipart message for low latency replies. See
 
54
    ZMQStreamingApplicationProxy, for a version that has higher latency, but
 
55
    which sends each reply part as a separate zmq message.
 
56
    """
 
57
 
 
58
    def __init__(self, loop=None, context=None):
 
59
        self.loop = loop if loop is not None else IOLoop.instance()
 
60
        self.context = context if context is not None else zmq.Context.instance()
 
61
        self._callbacks = {}
 
62
        self.socket = self.context.socket(zmq.DEALER)
 
63
        self.stream = ZMQStream(self.socket, self.loop)
 
64
        self.stream.on_recv(self._handle_reply)
 
65
        self.urls = []
 
66
 
 
67
    def connect(self, url):
 
68
        """Connect the service client to the proto://ip:port given in the url."""
 
69
        self.urls.append(url)
 
70
        self.socket.connect(url)
 
71
 
 
72
    def bind(self, url):
 
73
        """Bind the service client to the proto://ip:port given in the url."""
 
74
        self.urls.append(url)
 
75
        self.socket.bind(url)
 
76
 
 
77
    def send_request(self, request, args, kwargs, handler, timeout):
 
78
        """Send a request to the service."""
 
79
        req = {}
 
80
        req['method'] = request.method
 
81
        req['uri'] = request.uri
 
82
        req['version'] = request.version
 
83
        req['headers'] = dict(request.headers)
 
84
        body = request.body
 
85
        req['remote_ip'] = request.remote_ip
 
86
        req['protocol'] = request.protocol
 
87
        req['host'] = request.host
 
88
        req['files'] = request.files
 
89
        req['arguments'] = request.arguments
 
90
        req['args'] = args
 
91
        req['kwargs'] = kwargs
 
92
 
 
93
        msg_id = bytes(uuid.uuid4())
 
94
        msg_list = [b'|', msg_id, jsonapi.dumps(req)]
 
95
        if body:
 
96
            msg_list.append(body)
 
97
        logging.debug('Sending request: %r', msg_list)
 
98
        self.stream.send_multipart(msg_list)
 
99
 
 
100
        if timeout > 0:
 
101
            def _handle_timeout():
 
102
                handler.send_error(504) # Gateway timeout
 
103
                try:
 
104
                    self._callbacks.pop(msg_id)
 
105
                except KeyError:
 
106
                    logging.error('Unexpected error removing callbacks')
 
107
            dc = DelayedCallback(_handle_timeout, timeout, self.loop)
 
108
            dc.start()
 
109
        else:
 
110
            dc = None
 
111
        self._callbacks[msg_id] = (handler, dc)
 
112
        return msg_id
 
113
 
 
114
    def _handle_reply(self, msg_list):
 
115
        logging.debug('Handling reply: %r', msg_list)
 
116
        len_msg_list = len(msg_list)
 
117
        if len_msg_list < 3 or not msg_list[0] == b'|':
 
118
            logging.error('Unexpected reply in ZMQApplicationProxy._handle_reply')
 
119
            return
 
120
        msg_id = msg_list[1]
 
121
        replies = msg_list[2:]
 
122
        cb = self._callbacks.pop(msg_id, None)
 
123
        if cb is not None:
 
124
            handler, dc = cb
 
125
            if dc is not None:
 
126
                dc.stop()
 
127
            try:
 
128
                for reply in replies:
 
129
                    handler.write(reply)
 
130
                # The backend has already processed the headers and they are
 
131
                # included in the above write calls, so we manually tell the
 
132
                # handler that the headers are already written.
 
133
                handler._headers_written = True
 
134
                # We set transforms to an empty list because the backend
 
135
                # has already applied all of the transforms.
 
136
                handler._transforms = []
 
137
                handler.finish()
 
138
            except:
 
139
                logging.error('Unexpected error in ZMQApplicationProxy._handle_reply', exc_info=True)
 
140
 
 
141
 
 
142
class ZMQStreamingApplicationProxy(ZMQApplicationProxy):
 
143
    """A proxy for a ZeroMQ based ZMQApplication that is using ZMQStreamingHTTPRequest.
 
144
 
 
145
    This class is a proxy for a backend that is running a
 
146
    ZMQApplication and MUST be used with the ZMQStreamingHTTPRequest class.
 
147
    This version sends the reply parts (each generated by RequestHandler.flush)
 
148
    as separate zmq messages to enable streaming replies. See
 
149
    ZMQApplicationProxy, for a version that has lower latency, but which sends
 
150
    all reply parts as a single zmq message.
 
151
    """
 
152
 
 
153
    def _handle_reply(self, msg_list):
 
154
        logging.debug('Handling reply: %r', msg_list)
 
155
        len_msg_list = len(msg_list)
 
156
        if len_msg_list < 3 or not msg_list[0] == b'|':
 
157
            logging.error('Unexpected reply in ZMQStreamingApplicationProxy._handle_reply')
 
158
            return
 
159
        msg_id = msg_list[1]
 
160
        reply = msg_list[2]
 
161
        cb = self._callbacks.get(msg_id)
 
162
        if cb is not None:
 
163
            handler, dc = cb
 
164
            if reply == b'DATA' and len_msg_list == 4:
 
165
                if dc is not None:
 
166
                    # Stop the timeout DelayedCallback and set it to None.
 
167
                    dc.stop()
 
168
                    self._callbacks[msg_id] = (handler, None)
 
169
                try:
 
170
                    handler.write(msg_list[3])
 
171
                    # The backend has already processed the headers and they are
 
172
                    # included in the above write calls, so we manually tell the
 
173
                    # handler that the headers are already written.
 
174
                    handler._headers_written = True
 
175
                    # We set transforms to an empty list because the backend
 
176
                    # has already applied all of the transforms.
 
177
                    handler._transforms = []
 
178
                    handler.flush()
 
179
                except socket.error:
 
180
                    # socket.error is raised if the client disconnects while
 
181
                    # we are sending.
 
182
                    pass
 
183
                except:
 
184
                    logging.error('Unexpected write error', exc_info=True)
 
185
            elif reply == b'FINISH':
 
186
                # We are done so we can get rid of the callbacks for this msg_id.
 
187
                self._callbacks.pop(msg_id)
 
188
                try:
 
189
                    handler.finish()
 
190
                except socket.error:
 
191
                    # socket.error is raised if the client disconnects while
 
192
                    # we are sending.
 
193
                    pass
 
194
                except:
 
195
                    logging.error('Unexpected finish error', exc_info=True)
 
196
 
 
197
 
 
198
class ZMQRequestHandlerProxy(web.RequestHandler):
 
199
    """A handler for use with a ZeroMQ backend service client."""
 
200
 
 
201
    SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT", "OPTIONS")
 
202
 
 
203
    def initialize(self, proxy, timeout=0):
 
204
        """Initialize with a proxy and timeout.
 
205
 
 
206
        Parameters
 
207
        ----------
 
208
        proxy : ZMQApplicationProxy. ZMQStreamingApplicationProxy
 
209
            A proxy instance that will be used to send requests to a backend
 
210
            process.
 
211
        timeout : int
 
212
            The timeout, in milliseconds. If this timeout is reached
 
213
            before the backend's first reply, then the server is sent a
 
214
            status code of 504 to the browser to indicate a gateway/proxy
 
215
            timeout. Set to 0 or a negative number to disable (infinite 
 
216
            timeout).
 
217
        """
 
218
        # zmqweb Note: This method is empty in the base class.
 
219
        self.proxy = proxy
 
220
        self.timeout = timeout
 
221
 
 
222
    def _execute(self, transforms, *args, **kwargs):
 
223
        """Executes this request with the given output transforms."""
 
224
        # ZMQWEB NOTE: Transforms should be applied in the backend service so
 
225
        # we null any transforms passed in here. This may be a little too
 
226
        # silent, but there may be other handlers that do need the transforms.
 
227
        self._transforms = []
 
228
        # ZMQWEB NOTE: This following try/except block is taken from the base
 
229
        # class, but is modified to send the request to the proxy.
 
230
        try:
 
231
            if self.request.method not in self.SUPPORTED_METHODS:
 
232
                raise web.HTTPError(405)
 
233
            # ZMQWEB NOTE: We have removed the XSRF cookie handling from here
 
234
            # as it will be handled in the backend.
 
235
            self.prepare()
 
236
            if not self._finished:
 
237
                # ZMQWEB NOTE: Here is where we send the request to the proxy.
 
238
                # We don't decode args or kwargs as that will be done in the
 
239
                # backen.
 
240
                self.proxy.send_request(
 
241
                    self.request, args, kwargs, self, self.timeout
 
242
                )
 
243
        except Exception:
 
244
            # ZMQWEB NOTE: We don't call the usual error handling logic
 
245
            # as that will be called by the backend process.
 
246
            logging.error('Unexpected error in _execute', exc_info=True)
 
247