32
from bzrlib.lazy_import import lazy_import
33
lazy_import(globals(), """
31
34
from bzrlib import (
36
from bzrlib.smart.protocol import (
38
SmartClientRequestProtocolOne,
39
SmartServerRequestProtocolOne,
40
SmartServerRequestProtocolTwo,
40
from bzrlib.smart import protocol
42
41
from bzrlib.transport import ssh
45
class SmartServerStreamMedium(object):
45
# We must not read any more than 64k at a time so we don't risk "no buffer
46
# space available" errors on some platforms. Windows in particular is likely
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
48
_MAX_READ_SIZE = 64 * 1024
51
def _get_protocol_factory_for_bytes(bytes):
52
"""Determine the right protocol factory for 'bytes'.
54
This will return an appropriate protocol factory depending on the version
55
of the protocol being used, as determined by inspecting the given bytes.
56
The bytes should have at least one newline byte (i.e. be a whole line),
57
otherwise it's possible that a request will be incorrectly identified as
60
Typical use would be::
62
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
63
server_protocol = factory(transport, write_func, root_client_path)
64
server_protocol.accept_bytes(unused_bytes)
66
:param bytes: a str of bytes of the start of the request.
67
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
68
a callable that takes three args: transport, write_func,
69
root_client_path. unused_bytes are any bytes that were not part of a
70
protocol version marker.
72
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
73
protocol_factory = protocol.build_server_protocol_three
74
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
75
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
76
protocol_factory = protocol.SmartServerRequestProtocolTwo
77
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
79
protocol_factory = protocol.SmartServerRequestProtocolOne
80
return protocol_factory, bytes
83
def _get_line(read_bytes_func):
84
"""Read bytes using read_bytes_func until a newline byte.
86
This isn't particularly efficient, so should only be used when the
87
expected size of the line is quite short.
89
:returns: a tuple of two strs: (line, excess)
93
while newline_pos == -1:
94
new_bytes = read_bytes_func(1)
97
# Ran out of bytes before receiving a complete line.
99
newline_pos = bytes.find('\n')
100
line = bytes[:newline_pos+1]
101
excess = bytes[newline_pos+1:]
105
class SmartMedium(object):
106
"""Base class for smart protocol media, both client- and server-side."""
109
self._push_back_buffer = None
111
def _push_back(self, bytes):
112
"""Return unused bytes to the medium, because they belong to the next
115
This sets the _push_back_buffer to the given bytes.
117
if self._push_back_buffer is not None:
118
raise AssertionError(
119
"_push_back called when self._push_back_buffer is %r"
120
% (self._push_back_buffer,))
123
self._push_back_buffer = bytes
125
def _get_push_back_buffer(self):
126
if self._push_back_buffer == '':
127
raise AssertionError(
128
'%s._push_back_buffer should never be the empty string, '
129
'which can be confused with EOF' % (self,))
130
bytes = self._push_back_buffer
131
self._push_back_buffer = None
134
def read_bytes(self, desired_count):
135
"""Read some bytes from this medium.
137
:returns: some bytes, possibly more or less than the number requested
138
in 'desired_count' depending on the medium.
140
if self._push_back_buffer is not None:
141
return self._get_push_back_buffer()
142
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
143
return self._read_bytes(bytes_to_read)
145
def _read_bytes(self, count):
146
raise NotImplementedError(self._read_bytes)
149
"""Read bytes from this request's response until a newline byte.
151
This isn't particularly efficient, so should only be used when the
152
expected size of the line is quite short.
154
:returns: a string of bytes ending in a newline (byte 0x0A).
156
line, excess = _get_line(self.read_bytes)
157
self._push_back(excess)
161
class SmartServerStreamMedium(SmartMedium):
46
162
"""Handles smart commands coming over a stream.
48
164
The stream may be a pipe connected to sshd, or a tcp socket, or an
69
185
self.backing_transport = backing_transport
70
186
self.root_client_path = root_client_path
71
187
self.finished = False
72
self._push_back_buffer = None
74
def _push_back(self, bytes):
75
"""Return unused bytes to the medium, because they belong to the next
78
This sets the _push_back_buffer to the given bytes.
80
if self._push_back_buffer is not None:
82
"_push_back called when self._push_back_buffer is %r"
83
% (self._push_back_buffer,))
86
self._push_back_buffer = bytes
88
def _get_push_back_buffer(self):
89
if self._push_back_buffer == '':
91
'%s._push_back_buffer should never be the empty string, '
92
'which can be confused with EOF' % (self,))
93
bytes = self._push_back_buffer
94
self._push_back_buffer = None
188
SmartMedium.__init__(self)
98
191
"""Serve requests until the client disconnects."""
144
232
"""Called when an unhandled exception from the protocol occurs."""
145
233
raise NotImplementedError(self.terminate_due_to_error)
147
def _get_bytes(self, desired_count):
235
def _read_bytes(self, desired_count):
148
236
"""Get some bytes from the medium.
150
238
:param desired_count: number of bytes we want to read.
152
raise NotImplementedError(self._get_bytes)
155
"""Read bytes from this request's response until a newline byte.
157
This isn't particularly efficient, so should only be used when the
158
expected size of the line is quite short.
160
:returns: a string of bytes ending in a newline (byte 0x0A).
164
while newline_pos == -1:
165
new_bytes = self._get_bytes(1)
168
# Ran out of bytes before receiving a complete line.
170
newline_pos = bytes.find('\n')
171
line = bytes[:newline_pos+1]
172
self._push_back(bytes[newline_pos+1:])
240
raise NotImplementedError(self._read_bytes)
176
243
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
189
256
def _serve_one_request_unguarded(self, protocol):
190
257
while protocol.next_read_size():
191
bytes = self._get_bytes(4096)
258
# We can safely try to read large chunks. If there is less data
259
# than _MAX_READ_SIZE ready, the socket wil just return a short
260
# read immediately rather than block.
261
bytes = self.read_bytes(_MAX_READ_SIZE)
193
263
self.finished = True
195
265
protocol.accept_bytes(bytes)
197
self._push_back(protocol.excess_buffer)
267
self._push_back(protocol.unused_data)
199
def _get_bytes(self, desired_count):
200
if self._push_back_buffer is not None:
201
return self._get_push_back_buffer()
269
def _read_bytes(self, desired_count):
202
270
# We ignore the desired_count because on sockets it's more efficient to
204
return self.socket.recv(4096)
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
206
274
def terminate_due_to_error(self):
207
"""Called when an unhandled exception from the protocol occurs."""
208
275
# TODO: This should log to a server log file, but no such thing
209
276
# exists yet. Andrew Bennetts 2006-09-29.
210
277
self.socket.close()
371
439
return self._read_bytes(count)
373
441
def _read_bytes(self, count):
374
"""Helper for read_bytes.
442
"""Helper for SmartClientMediumRequest.read_bytes.
376
444
read_bytes checks the state of the request to determing if bytes
377
445
should be read. After that it hands off to _read_bytes to do the
448
By default this forwards to self._medium.read_bytes because we are
449
operating on the medium's stream.
380
raise NotImplementedError(self._read_bytes)
451
return self._medium.read_bytes(count)
382
453
def read_line(self):
383
"""Read bytes from this request's response until a newline byte.
454
line = self._read_line()
455
if not line.endswith('\n'):
456
# end of file encountered reading from server
457
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
462
def _read_line(self):
463
"""Helper for SmartClientMediumRequest.read_line.
385
This isn't particularly efficient, so should only be used when the
386
expected size of the line is quite short.
388
:returns: a string of bytes ending in a newline (byte 0x0A).
465
By default this forwards to self._medium._get_line because we are
466
operating on the medium's stream.
390
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
392
while not line or line[-1] != '\n':
393
new_char = self.read_bytes(1)
396
# end of file encountered reading from server
397
raise errors.ConnectionReset(
398
"please check connectivity and permissions",
399
"(and try -Dhpss if further diagnosis is required)")
403
class SmartClientMedium(object):
468
return self._medium._get_line()
471
class SmartClientMedium(SmartMedium):
404
472
"""Smart client is a medium for sending smart protocol requests over."""
474
def __init__(self, base):
407
475
super(SmartClientMedium, self).__init__()
408
477
self._protocol_version_error = None
409
478
self._protocol_version = None
479
self._done_hello = False
480
# Be optimistic: we assume the remote end can accept new remote
481
# requests until we get an error saying otherwise.
482
# _remote_version_is_before tracks the bzr version the remote side
483
# can be based on what we've seen so far.
484
self._remote_version_is_before = None
486
def _is_remote_before(self, version_tuple):
487
"""Is it possible the remote side supports RPCs for a given version?
491
needed_version = (1, 2)
492
if medium._is_remote_before(needed_version):
493
fallback_to_pre_1_2_rpc()
497
except UnknownSmartMethod:
498
medium._remember_remote_is_before(needed_version)
499
fallback_to_pre_1_2_rpc()
501
:seealso: _remember_remote_is_before
503
if self._remote_version_is_before is None:
504
# So far, the remote side seems to support everything
506
return version_tuple >= self._remote_version_is_before
508
def _remember_remote_is_before(self, version_tuple):
509
"""Tell this medium that the remote side is older the given version.
511
:seealso: _is_remote_before
513
if (self._remote_version_is_before is not None and
514
version_tuple > self._remote_version_is_before):
515
raise AssertionError(
516
"_remember_remote_is_before(%r) called, but "
517
"_remember_remote_is_before(%r) was called previously."
518
% (version_tuple, self._remote_version_is_before))
519
self._remote_version_is_before = version_tuple
411
521
def protocol_version(self):
412
"""Find out the best protocol version to use."""
522
"""Find out if 'hello' smart request works."""
413
523
if self._protocol_version_error is not None:
414
524
raise self._protocol_version_error
415
if self._protocol_version is None:
525
if not self._done_hello:
417
527
medium_request = self.get_request()
418
528
# Send a 'hello' request in protocol version one, for maximum
419
529
# backwards compatibility.
420
client_protocol = SmartClientRequestProtocolOne(medium_request)
421
self._protocol_version = client_protocol.query_version()
530
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
531
client_protocol.query_version()
532
self._done_hello = True
422
533
except errors.SmartProtocolError, e:
423
534
# Cache the error, just like we would cache a successful
425
536
self._protocol_version_error = e
427
return self._protocol_version
540
def should_probe(self):
541
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
544
Some transports are unambiguously smart-only; there's no need to check
545
if the transport is able to carry smart requests, because that's all
546
it is for. In those cases, this method should return False.
548
But some HTTP transports can sometimes fail to carry smart requests,
549
but still be usuable for accessing remote bzrdirs via plain file
550
accesses. So for those transports, their media should return True here
551
so that RemoteBzrDirFormat can determine if it is appropriate for that
429
556
def disconnect(self):
430
557
"""If this medium maintains a persistent connection, close it.