47
46
from bzrlib.smart import client, protocol, request, vfs
48
47
from bzrlib.transport import ssh
50
#usually already imported, and getting IllegalScoperReplacer on it here.
51
49
from bzrlib import osutils
53
# We must not read any more than 64k at a time so we don't risk "no buffer
54
# space available" errors on some platforms. Windows in particular is likely
55
# to give error 10053 or 10055 if we read more than 64k from a socket.
56
_MAX_READ_SIZE = 64 * 1024
51
# Throughout this module buffer size parameters are either limited to be at
52
# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
53
# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
54
# from non-sockets as well.
55
_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
59
57
def _get_protocol_factory_for_bytes(bytes):
60
58
"""Determine the right protocol factory for 'bytes'.
276
274
def _serve_one_request_unguarded(self, protocol):
277
275
while protocol.next_read_size():
278
276
# We can safely try to read large chunks. If there is less data
279
# than _MAX_READ_SIZE ready, the socket wil just return a short
280
# read immediately rather than block.
281
bytes = self.read_bytes(_MAX_READ_SIZE)
277
# than MAX_SOCKET_CHUNK ready, the socket will just return a
278
# short read immediately rather than block.
279
bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
283
281
self.finished = True
287
285
self._push_back(protocol.unused_data)
289
287
def _read_bytes(self, desired_count):
290
return _read_bytes_from_socket(
291
self.socket.recv, desired_count, self._report_activity)
288
return osutils.read_bytes_from_socket(
289
self.socket, self._report_activity)
293
291
def terminate_due_to_error(self):
294
292
# TODO: This should log to a server log file, but no such thing
295
293
# exists yet. Andrew Bennetts 2006-09-29.
296
osutils.until_no_eintr(self.socket.close)
297
295
self.finished = True
299
297
def _write_out(self, bytes):
334
332
bytes_to_read = protocol.next_read_size()
335
333
if bytes_to_read == 0:
336
334
# Finished serving this request.
337
osutils.until_no_eintr(self._out.flush)
339
337
bytes = self.read_bytes(bytes_to_read)
341
339
# Connection has been closed.
342
340
self.finished = True
343
osutils.until_no_eintr(self._out.flush)
345
343
protocol.accept_bytes(bytes)
347
345
def _read_bytes(self, desired_count):
348
return osutils.until_no_eintr(self._in.read, desired_count)
346
return self._in.read(desired_count)
350
348
def terminate_due_to_error(self):
351
349
# TODO: This should log to a server log file, but no such thing
352
350
# exists yet. Andrew Bennetts 2006-09-29.
353
osutils.until_no_eintr(self._out.close)
354
352
self.finished = True
356
354
def _write_out(self, bytes):
357
osutils.until_no_eintr(self._out.write, bytes)
355
self._out.write(bytes)
360
358
class SmartClientMediumRequest(object):
496
494
class _DebugCounter(object):
497
495
"""An object that counts the HPSS calls made to each client medium.
499
When a medium is garbage-collected, or failing that when atexit functions
500
are run, the total number of calls made on that medium are reported via
497
When a medium is garbage-collected, or failing that when
498
bzrlib.global_state exits, the total number of calls made on that medium
499
are reported via trace.note.
504
502
def __init__(self):
505
503
self.counts = weakref.WeakKeyDictionary()
506
504
client._SmartClient.hooks.install_named_hook(
507
505
'call', self.increment_call_count, 'hpss call counter')
508
atexit.register(self.flush_all)
506
bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
510
508
def track(self, medium):
511
509
"""Start tracking calls made to a medium.
727
725
def _accept_bytes(self, bytes):
728
726
"""See SmartClientStreamMedium.accept_bytes."""
729
osutils.until_no_eintr(self._writeable_pipe.write, bytes)
727
self._writeable_pipe.write(bytes)
730
728
self._report_activity(len(bytes), 'write')
732
730
def _flush(self):
733
731
"""See SmartClientStreamMedium._flush()."""
734
osutils.until_no_eintr(self._writeable_pipe.flush)
732
self._writeable_pipe.flush()
736
734
def _read_bytes(self, count):
737
735
"""See SmartClientStreamMedium._read_bytes."""
738
bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
736
bytes_to_read = min(count, _MAX_READ_SIZE)
737
bytes = self._readable_pipe.read(bytes_to_read)
739
738
self._report_activity(len(bytes), 'read')
742
class SSHParams(object):
743
"""A set of parameters for starting a remote bzr via SSH."""
745
def __init__(self, host, port=None, username=None, password=None,
746
bzr_remote_path='bzr'):
749
self.username = username
750
self.password = password
751
self.bzr_remote_path = bzr_remote_path
743
754
class SmartSSHClientMedium(SmartClientStreamMedium):
744
"""A client medium using SSH."""
755
"""A client medium using SSH.
757
It delegates IO to a SmartClientSocketMedium or
758
SmartClientAlreadyConnectedSocketMedium (depending on platform).
746
def __init__(self, host, port=None, username=None, password=None,
747
base=None, vendor=None, bzr_remote_path=None):
761
def __init__(self, base, ssh_params, vendor=None):
748
762
"""Creates a client that will connect on the first use.
764
:param ssh_params: A SSHParams instance.
750
765
:param vendor: An optional override for the ssh vendor to use. See
751
766
bzrlib.transport.ssh for details on ssh vendors.
753
self._connected = False
755
self._password = password
757
self._username = username
768
self._real_medium = None
769
self._ssh_params = ssh_params
758
770
# for the benefit of progress making a short description of this
760
772
self._scheme = 'bzr+ssh'
762
774
# _DebugCounter so we have to store all the values used in our repr
763
775
# method before calling the super init.
764
776
SmartClientStreamMedium.__init__(self, base)
765
self._read_from = None
777
self._vendor = vendor
766
778
self._ssh_connection = None
767
self._vendor = vendor
768
self._write_to = None
769
self._bzr_remote_path = bzr_remote_path
771
780
def __repr__(self):
772
if self._port is None:
781
if self._ssh_params.port is None:
775
maybe_port = ':%s' % self._port
784
maybe_port = ':%s' % self._ssh_params.port
776
785
return "%s(%s://%s@%s%s/)" % (
777
786
self.__class__.__name__,
788
self._ssh_params.username,
789
self._ssh_params.host,
783
792
def _accept_bytes(self, bytes):
784
793
"""See SmartClientStreamMedium.accept_bytes."""
785
794
self._ensure_connection()
786
osutils.until_no_eintr(self._write_to.write, bytes)
787
self._report_activity(len(bytes), 'write')
795
self._real_medium.accept_bytes(bytes)
789
797
def disconnect(self):
790
798
"""See SmartClientMedium.disconnect()."""
791
if not self._connected:
793
osutils.until_no_eintr(self._read_from.close)
794
osutils.until_no_eintr(self._write_to.close)
795
self._ssh_connection.close()
796
self._connected = False
799
if self._real_medium is not None:
800
self._real_medium.disconnect()
801
self._real_medium = None
802
if self._ssh_connection is not None:
803
self._ssh_connection.close()
804
self._ssh_connection = None
798
806
def _ensure_connection(self):
799
807
"""Connect this medium if not already connected."""
808
if self._real_medium is not None:
802
810
if self._vendor is None:
803
811
vendor = ssh._get_ssh_vendor()
805
813
vendor = self._vendor
806
self._ssh_connection = vendor.connect_ssh(self._username,
807
self._password, self._host, self._port,
808
command=[self._bzr_remote_path, 'serve', '--inet',
814
self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
815
self._ssh_params.password, self._ssh_params.host,
816
self._ssh_params.port,
817
command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
809
818
'--directory=/', '--allow-writes'])
810
self._read_from, self._write_to = \
811
self._ssh_connection.get_filelike_channels()
812
self._connected = True
819
io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
820
if io_kind == 'socket':
821
self._real_medium = SmartClientAlreadyConnectedSocketMedium(
822
self.base, io_object)
823
elif io_kind == 'pipes':
824
read_from, write_to = io_object
825
self._real_medium = SmartSimplePipesClientMedium(
826
read_from, write_to, self.base)
828
raise AssertionError(
829
"Unexpected io_kind %r from %r"
830
% (io_kind, self._ssh_connection))
814
832
def _flush(self):
815
833
"""See SmartClientStreamMedium._flush()."""
816
self._write_to.flush()
834
self._real_medium._flush()
818
836
def _read_bytes(self, count):
819
837
"""See SmartClientStreamMedium.read_bytes."""
820
if not self._connected:
838
if self._real_medium is None:
821
839
raise errors.MediumNotConnected(self)
822
bytes_to_read = min(count, _MAX_READ_SIZE)
823
bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
824
self._report_activity(len(bytes), 'read')
840
return self._real_medium.read_bytes(count)
828
843
# Port 4155 is the default port for bzr://, registered with IANA.
830
845
BZR_DEFAULT_PORT = 4155
833
class SmartTCPClientMedium(SmartClientStreamMedium):
834
"""A client medium using TCP."""
848
class SmartClientSocketMedium(SmartClientStreamMedium):
849
"""A client medium using a socket.
851
This class isn't usable directly. Use one of its subclasses instead.
854
def __init__(self, base):
855
SmartClientStreamMedium.__init__(self, base)
857
self._connected = False
859
def _accept_bytes(self, bytes):
860
"""See SmartClientMedium.accept_bytes."""
861
self._ensure_connection()
862
osutils.send_all(self._socket, bytes, self._report_activity)
864
def _ensure_connection(self):
865
"""Connect this medium if not already connected."""
866
raise NotImplementedError(self._ensure_connection)
869
"""See SmartClientStreamMedium._flush().
871
For sockets we do no flushing. For TCP sockets we may want to turn off
872
TCP_NODELAY and add a means to do a flush, but that can be done in the
876
def _read_bytes(self, count):
877
"""See SmartClientMedium.read_bytes."""
878
if not self._connected:
879
raise errors.MediumNotConnected(self)
880
return osutils.read_bytes_from_socket(
881
self._socket, self._report_activity)
883
def disconnect(self):
884
"""See SmartClientMedium.disconnect()."""
885
if not self._connected:
889
self._connected = False
892
class SmartTCPClientMedium(SmartClientSocketMedium):
893
"""A client medium that creates a TCP connection."""
836
895
def __init__(self, host, port, base):
837
896
"""Creates a client that will connect on the first use."""
838
SmartClientStreamMedium.__init__(self, base)
839
self._connected = False
897
SmartClientSocketMedium.__init__(self, base)
840
898
self._host = host
841
899
self._port = port
844
def _accept_bytes(self, bytes):
845
"""See SmartClientMedium.accept_bytes."""
846
self._ensure_connection()
847
osutils.send_all(self._socket, bytes, self._report_activity)
849
def disconnect(self):
850
"""See SmartClientMedium.disconnect()."""
851
if not self._connected:
853
osutils.until_no_eintr(self._socket.close)
855
self._connected = False
857
901
def _ensure_connection(self):
858
902
"""Connect this medium if not already connected."""
893
937
(self._host, port, err_msg))
894
938
self._connected = True
897
"""See SmartClientStreamMedium._flush().
899
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
900
add a means to do a flush, but that can be done in the future.
903
def _read_bytes(self, count):
904
"""See SmartClientMedium.read_bytes."""
905
if not self._connected:
906
raise errors.MediumNotConnected(self)
907
return _read_bytes_from_socket(
908
self._socket.recv, count, self._report_activity)
941
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
942
"""A client medium for an already connected socket.
944
Note that this class will assume it "owns" the socket, so it will close it
945
when its disconnect method is called.
948
def __init__(self, base, sock):
949
SmartClientSocketMedium.__init__(self, base)
951
self._connected = True
953
def _ensure_connection(self):
954
# Already connected, by definition! So nothing to do.
911
958
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
948
995
self._medium._flush()
951
def _read_bytes_from_socket(sock, desired_count, report_activity):
952
# We ignore the desired_count because on sockets it's more efficient to
953
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
955
bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
956
except socket.error, e:
957
if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
958
# The connection was closed by the other side. Callers expect an
959
# empty string to signal end-of-stream.
964
report_activity(len(bytes), 'read')