~eday/burrow/prototype-conversion

« back to all changes in this revision

Viewing changes to burrowd/frontend/wsgi.py

  • Committer: Eric Day
  • Date: 2011-03-17 23:42:41 UTC
  • Revision ID: eday@oddments.org-20110317234241-ult80xn9d1lon867
First chunk of code from prototype. Beyond the prototype, configuration, module loading, and log handling was added.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 OpenStack LLC.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
#     http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
'''WSGI frontend for the burrow server.'''
 
16
 
 
17
import json
 
18
import time
 
19
 
 
20
import eventlet
 
21
import eventlet.wsgi
 
22
import routes
 
23
import routes.middleware
 
24
import webob.dec
 
25
import webob.exc
 
26
 
 
27
import burrowd.frontend
 
28
 
 
29
# Default configuration values for this module.
 
30
DEFAULT_HOST = '0.0.0.0'
 
31
DEFAULT_PORT = 8080
 
32
DEFAULT_BACKLOG = 64
 
33
DEFAULT_SSL = False
 
34
DEFAULT_SSL_CERTFILE = 'example.pem'
 
35
DEFAULT_SSL_KEYFILE = 'example.key'
 
36
DEFAULT_THREAD_POOL_SIZE = 0
 
37
DEFAULT_TTL = 600
 
38
DEFAULT_HIDE = 0
 
39
 
 
40
 
 
41
def queue_exists(method):
 
42
    '''Decorator to ensure an account and queue exists. If the wait
 
43
    option is given, this will block until a message in the queue is
 
44
    ready or the timeout expires.'''
 
45
    def wrapper(self, req, account, queue, *args, **kwargs):
 
46
        wait = 0
 
47
        if 'wait' in req.params:
 
48
            wait = int(req.params['wait'])
 
49
            if wait > 0:
 
50
                wait += time.time()
 
51
        res = webob.exc.HTTPNotFound()
 
52
        while True:
 
53
            if self.backend.queue_exists(account, queue):
 
54
                res = method(self, req, account, queue, *args, **kwargs)
 
55
            if wait == 0 or res.status_int != 404:
 
56
                break
 
57
            now = time.time()
 
58
            if wait - now > 0:
 
59
                self.backend.wait(account, queue, wait - now)
 
60
            if wait < time.time():
 
61
                break
 
62
        return res
 
63
    return wrapper
 
64
 
 
65
 
 
66
class Frontend(burrowd.frontend.Frontend):
 
67
 
 
68
    def __init__(self, config, backend):
 
69
        super(Frontend, self).__init__(config, backend)
 
70
        self.default_ttl = self.config.get('default_ttl', DEFAULT_TTL)
 
71
        self.default_hide = self.config.get('default_hide', DEFAULT_HIDE)
 
72
        mapper = routes.Mapper()
 
73
        mapper.connect('/', action='root')
 
74
        mapper.connect('/{account}', action='account')
 
75
        mapper.connect('/{account}/{queue}', action='queue')
 
76
        mapper.connect('/{account}/{queue}/{message_id}', action='message')
 
77
        self._routes = routes.middleware.RoutesMiddleware(self._route, mapper)
 
78
 
 
79
    def run(self, thread_pool):
 
80
        '''Create the listening socket and start the thread that runs
 
81
        the WSGI server. This extra thread is needed since the WSGI
 
82
        server function blocks.'''
 
83
        host = self.config.get('host', DEFAULT_HOST)
 
84
        port = self.config.getint('port', DEFAULT_PORT)
 
85
        backlog = self.config.getint('backlog', DEFAULT_BACKLOG)
 
86
        socket = eventlet.listen((host, port), backlog=backlog)
 
87
        self.log.info(_('Listening on %s:%d') % (host, port))
 
88
        if self.config.getboolean('ssl', DEFAULT_SSL):
 
89
            certfile = self.config.get('ssl_certfile', DEFAULT_SSL_CERTFILE)
 
90
            keyfile = self.config.get('ssl_keyfile', DEFAULT_SSL_KEYFILE)
 
91
            socket = eventlet.green.ssl.wrap_socket(socket, certfile=certfile,
 
92
                keyfile=keyfile)
 
93
        thread_pool.spawn_n(self._run, socket, thread_pool)
 
94
 
 
95
    def _run(self, socket, thread_pool):
 
96
        '''Thread to run the WSGI server.'''
 
97
        thread_pool_size = self.config.getint('thread_pool_size',
 
98
            DEFAULT_THREAD_POOL_SIZE)
 
99
        log_format = '%(client_ip)s "%(request_line)s" %(status_code)s ' \
 
100
                     '%(body_length)s %(wall_seconds).6f'
 
101
        if thread_pool_size == 0:
 
102
            eventlet.wsgi.server(socket, self, log=WSGILog(self.log),
 
103
                log_format=log_format, custom_pool=thread_pool)
 
104
        else:
 
105
            eventlet.wsgi.server(socket, self, log=WSGILog(self.log),
 
106
                log_format=log_format, max_size=thread_pool_size)
 
107
 
 
108
    def __call__(self, *args, **kwargs):
 
109
        return self._routes(*args, **kwargs)
 
110
 
 
111
    @webob.dec.wsgify
 
112
    def _route(self, req):
 
113
        args = req.environ['wsgiorg.routing_args'][1]
 
114
        if not args:
 
115
            return webob.exc.HTTPNotFound()
 
116
        action = args.pop('action')
 
117
        method = getattr(self, '_%s_%s' % (req.method.lower(), action), False)
 
118
        if not method:
 
119
            return webob.exc.HTTPBadRequest()
 
120
        return method(req, **args)
 
121
 
 
122
    @webob.dec.wsgify
 
123
    def _delete_root(self, req):
 
124
        self.backend.delete_accounts()
 
125
        return webob.exc.HTTPNoContent()
 
126
 
 
127
    @webob.dec.wsgify
 
128
    def _get_root(self, req):
 
129
        accounts = self.backend.get_accounts()
 
130
        if len(accounts) == 0:
 
131
            return webob.exc.HTTPNotFound()
 
132
        return webob.exc.HTTPOk(body=json.dumps(accounts, indent=2))
 
133
 
 
134
    @webob.dec.wsgify
 
135
    def _delete_account(self, req, account):
 
136
        self.backend.delete_account(account)
 
137
        return webob.exc.HTTPNoContent()
 
138
 
 
139
    @webob.dec.wsgify
 
140
    def _get_account(self, req, account):
 
141
        queues = self.backend.get_queues(account)
 
142
        if len(queues) == 0:
 
143
            return webob.exc.HTTPNotFound()
 
144
        return webob.exc.HTTPOk(body=json.dumps(queues, indent=2))
 
145
 
 
146
    @webob.dec.wsgify
 
147
    @queue_exists
 
148
    def _delete_queue(self, req, account, queue):
 
149
        limit, marker, match_hidden = self._parse_filters(req)
 
150
        messages = self.backend.delete_messages(account, queue, limit, marker,
 
151
            match_hidden)
 
152
        return self._return_messages(req, account, queue, messages, 'none')
 
153
 
 
154
    @webob.dec.wsgify
 
155
    @queue_exists
 
156
    def _get_queue(self, req, account, queue):
 
157
        limit, marker, match_hidden = self._parse_filters(req)
 
158
        messages = self.backend.get_messages(account, queue, limit, marker,
 
159
            match_hidden)
 
160
        return self._return_messages(req, account, queue, messages, 'all')
 
161
 
 
162
    @webob.dec.wsgify
 
163
    @queue_exists
 
164
    def _post_queue(self, req, account, queue):
 
165
        limit, marker, match_hidden = self._parse_filters(req)
 
166
        ttl, hide = self._parse_metadata(req)
 
167
        messages = self.backend.update_messages(account, queue, limit, marker,
 
168
            match_hidden, ttl, hide)
 
169
        return self._return_messages(req, account, queue, messages, 'all')
 
170
 
 
171
    @webob.dec.wsgify
 
172
    @queue_exists
 
173
    def _delete_message(self, req, account, queue, message_id):
 
174
        message = self.backend.delete_message(account, queue, message_id)
 
175
        if message is None:
 
176
            return webob.exc.HTTPNotFound()
 
177
        return self._return_message(req, account, queue, message, 'none')
 
178
 
 
179
    @webob.dec.wsgify
 
180
    @queue_exists
 
181
    def _get_message(self, req, account, queue, message_id):
 
182
        message = self.backend.get_message(account, queue, message_id)
 
183
        if message is None:
 
184
            return webob.exc.HTTPNotFound()
 
185
        return self._return_message(req, account, queue, message, 'all')
 
186
 
 
187
    @webob.dec.wsgify
 
188
    @queue_exists
 
189
    def _post_message(self, req, account, queue, message_id):
 
190
        ttl, hide = self._parse_metadata(req)
 
191
        message = self.backend.update_message(account, queue, message_id, ttl,
 
192
                                              hide)
 
193
        if message is None:
 
194
            return webob.exc.HTTPNotFound()
 
195
        return self._return_message(req, account, queue, message, 'id')
 
196
 
 
197
    @webob.dec.wsgify
 
198
    def _put_message(self, req, account, queue, message_id):
 
199
        (ttl, hide) = self._parse_metadata(req, self.default_ttl,
 
200
                                           self.default_hide)
 
201
        if self.backend.put_message(account, queue, message_id, ttl, hide, \
 
202
                                    req.body):
 
203
            return webob.exc.HTTPCreated()
 
204
        return webob.exc.HTTPNoContent()
 
205
 
 
206
    def _filter_message(self, detail, message):
 
207
        if detail == 'id':
 
208
            return dict(id=message['id'])
 
209
        elif detail == 'metadata':
 
210
            message = message.copy()
 
211
            del message['body']
 
212
            return message
 
213
        elif detail == 'all':
 
214
            return message
 
215
        return None
 
216
 
 
217
    def _return_message(self, req, account, queue, message, detail):
 
218
        if 'detail' in req.params:
 
219
            detail = req.params['detail']
 
220
        message = self._filter_message(detail, message)
 
221
        if message is not None:
 
222
            body = {account: {queue: [message]}}
 
223
            return webob.exc.HTTPOk(body=json.dumps(body, indent=2))
 
224
        return webob.exc.HTTPNoContent()
 
225
 
 
226
    def _return_messages(self, req, account, queue, messages, detail):
 
227
        if len(messages) == 0:
 
228
            return webob.exc.HTTPNotFound()
 
229
        if 'detail' in req.params:
 
230
            detail = req.params['detail']
 
231
        filtered_messages = []
 
232
        for message in messages:
 
233
            message = self._filter_message(detail, message)
 
234
            if message is not None:
 
235
                filtered_messages.append(message)
 
236
        if len(filtered_messages) == 0:
 
237
            return webob.exc.HTTPNoContent()
 
238
        body = {account: {queue: filtered_messages}}
 
239
        return webob.exc.HTTPOk(body=json.dumps(body, indent=2))
 
240
 
 
241
    def _parse_filters(self, req):
 
242
        limit = None
 
243
        if 'limit' in req.params:
 
244
            limit = int(req.params['limit'])
 
245
        marker = None
 
246
        if 'marker' in req.params:
 
247
            marker = req.params['marker']
 
248
        match_hidden = False
 
249
        if 'hidden' in req.params and req.params['hidden'].lower() == 'true':
 
250
            match_hidden = True
 
251
        return limit, marker, match_hidden
 
252
 
 
253
    def _parse_metadata(self, req, default_ttl=None, default_hide=None):
 
254
        if 'ttl' in req.params:
 
255
            ttl = int(req.params['ttl'])
 
256
        else:
 
257
            ttl = default_ttl
 
258
        if ttl is not None and ttl > 0:
 
259
            ttl += int(time.time())
 
260
        if 'hide' in req.params:
 
261
            hide = int(req.params['hide'])
 
262
        else:
 
263
            hide = default_hide
 
264
        if hide is not None and hide > 0:
 
265
            hide += int(time.time())
 
266
        return ttl, hide
 
267
 
 
268
 
 
269
class WSGILog(object):
 
270
    '''Class for eventlet.wsgi.server to forward logging messages.'''
 
271
 
 
272
    def __init__(self, log):
 
273
        self.log = log
 
274
 
 
275
    def write(self, message):
 
276
        self.log.debug(message.rstrip())