150
150
self._requests = reqs
151
151
self._responder = responder
152
152
self._client_version = None
153
self._sent_response = False
155
154
def received_header(self, headers):
156
155
self._client_version = headers['client_version']
157
156
self._lookup_request(headers['request'])
158
self._check_send_response()
160
158
def _lookup_request(self, request_name):
161
159
factory = self._requests.get(request_name)
162
160
if factory is None:
163
161
raise errors.UnknownRequest(request_name)
164
self._request = factory(self._request_state)
162
self._responder.request_name = request_name
163
self._request = factory(self._request_state, self._responder)
166
165
def received_args(self, kwargs):
167
166
self._request.handle_args(**kwargs)
168
self._check_send_response()
168
def received_stream_entry(self, entry):
169
self._request.handle_stream_entry(entry)
170
171
def received_end(self):
171
172
self._request.handle_end()
172
self._check_send_response()
173
if self._request.response is None:
173
if not self._responder._started:
174
174
raise errors.BadProtocolStream("Client sent end-of-message,"
175
175
" but the Request did not generate a response."
176
176
" for Request: %s" % (self._request,))
178
def _check_send_response(self):
179
if self._request.response is None or self._sent_response:
181
self._sent_response = True
182
self._responder.send_response(self._request.response)
177
self._responder._finish_response()
185
179
class Responder(object):
186
"""Encoder responses from the server back to the client."""
180
"""Encode responses from the server back to the client."""
188
182
def __init__(self, conn):
189
"""Turn an RPCResponse into bytes-on-the-wire."""
190
183
self._conn = conn
191
184
self._out_buffer = buffers.BufferedWriter(self._write_to_client,
193
186
self._encoder = protocol.ProtocolEncoderV1(self._out_buffer.write)
187
self._started = False
188
self.request_name = ''
195
190
def _write_to_client(self, content):
196
191
self._conn.sendall(content)
198
def send_response(self, response):
199
"""Send a RPCResponse back to the caller."""
193
def _start_response(self):
200
197
self._out_buffer.write(protocol.PROTOCOL_HEADER_V1)
201
198
response_header = compat.OrderedDict([
202
199
('server_version', _u1db_version),
203
('request', response.request_name),
204
('status', response.status),
200
('request', self.request_name),
206
202
self._encoder.encode_dict('h', response_header)
207
if response.response_kwargs:
208
self._encoder.encode_dict('a', response.response_kwargs)
204
# have a way to transmit an error
206
def send_response(self, **kwargs):
207
"""send/finalize response."""
208
self._start_response()
210
self._encoder.encode_dict('a', kwargs)
212
def stream_entry(self, entry):
213
"send stream entry as part of the response."
214
self._start_response()
215
self._encoder.encode_dict('x', entry)
217
def _finish_response(self):
209
218
self._encoder.encode_end()
210
219
self._out_buffer.flush()