~ubuntuone-hackers/u1db/for-demo

« back to all changes in this revision

Viewing changes to u1db/remote/sync_server.py

Incorporate remote sync

Show diffs side-by-side

added added

removed removed

Lines of Context:
150
150
        self._requests = reqs
151
151
        self._responder = responder
152
152
        self._client_version = None
153
 
        self._sent_response = False
154
153
 
155
154
    def received_header(self, headers):
156
155
        self._client_version = headers['client_version']
157
156
        self._lookup_request(headers['request'])
158
 
        self._check_send_response()
159
157
 
160
158
    def _lookup_request(self, request_name):
161
159
        factory = self._requests.get(request_name)
162
160
        if factory is None:
163
161
            raise errors.UnknownRequest(request_name)
164
 
        self._request = factory(self._request_state)
 
162
        self._responder.request_name = request_name
 
163
        self._request = factory(self._request_state, self._responder)
165
164
 
166
165
    def received_args(self, kwargs):
167
166
        self._request.handle_args(**kwargs)
168
 
        self._check_send_response()
 
167
 
 
168
    def received_stream_entry(self, entry):
 
169
        self._request.handle_stream_entry(entry)
169
170
 
170
171
    def received_end(self):
171
172
        self._request.handle_end()
172
 
        self._check_send_response()
173
 
        if self._request.response is None:
 
173
        if not self._responder._started:
174
174
            raise errors.BadProtocolStream("Client sent end-of-message,"
175
175
                " but the Request did not generate a response."
176
176
                " for Request: %s" % (self._request,))
177
 
 
178
 
    def _check_send_response(self):
179
 
        if self._request.response is None or self._sent_response:
180
 
            return
181
 
        self._sent_response = True
182
 
        self._responder.send_response(self._request.response)
183
 
 
 
177
        self._responder._finish_response()
184
178
 
185
179
class Responder(object):
186
 
    """Encoder responses from the server back to the client."""
 
180
    """Encode responses from the server back to the client."""
187
181
 
188
182
    def __init__(self, conn):
189
 
        """Turn an RPCResponse into bytes-on-the-wire."""
190
183
        self._conn = conn
191
184
        self._out_buffer = buffers.BufferedWriter(self._write_to_client,
192
185
            BUFFER_SIZE)
193
186
        self._encoder = protocol.ProtocolEncoderV1(self._out_buffer.write)
 
187
        self._started = False
 
188
        self.request_name = ''
194
189
 
195
190
    def _write_to_client(self, content):
196
191
        self._conn.sendall(content)
197
192
 
198
 
    def send_response(self, response):
199
 
        """Send a RPCResponse back to the caller."""
 
193
    def _start_response(self):
 
194
        if self._started:
 
195
            return
 
196
        self._started = True
200
197
        self._out_buffer.write(protocol.PROTOCOL_HEADER_V1)
201
198
        response_header = compat.OrderedDict([
202
199
            ('server_version', _u1db_version),
203
 
            ('request', response.request_name),
204
 
            ('status', response.status),
205
 
            ])
 
200
            ('request', self.request_name),
 
201
        ])
206
202
        self._encoder.encode_dict('h', response_header)
207
 
        if response.response_kwargs:
208
 
            self._encoder.encode_dict('a', response.response_kwargs)
 
203
 
 
204
    # have a way to transmit an error
 
205
 
 
206
    def send_response(self, **kwargs):
 
207
        """send/finalize response."""
 
208
        self._start_response()
 
209
        if kwargs:
 
210
            self._encoder.encode_dict('a', kwargs)
 
211
 
 
212
    def stream_entry(self, entry):
 
213
        "send stream entry as part of the response."
 
214
        self._start_response()
 
215
        self._encoder.encode_dict('x', entry)
 
216
 
 
217
    def _finish_response(self):
209
218
        self._encoder.encode_end()
210
219
        self._out_buffer.flush()
211
 
 
212