30
30
from twisted.python import log
32
32
from ubuntuone.storageprotocol import (
33
protocol_pb2, request, sharersp, volumes, delta
33
protocol_pb2, request, sharersp, volumes, delta)
36
35
log_debug = partial(log.msg, loglevel=logging.DEBUG)
38
38
class StorageClient(request.RequestHandler):
39
39
"""A Basic Storage Protocol client."""
110
110
object when completed.
113
request = oauth.OAuthRequest.from_consumer_and_token(
113
req = oauth.OAuthRequest.from_consumer_and_token(
114
114
oauth_consumer=consumer,
116
116
http_method="GET",
117
117
http_url="storage://server")
118
request.sign_request(
119
119
oauth.OAuthSignatureMethod_PLAINTEXT(), consumer, token)
121
121
# Make sure all parameter values are strings.
122
122
auth_parameters = dict(
123
(key, str(value)) for key, value in request.parameters.iteritems())
123
(key, str(value)) for key, value in req.parameters.iteritems())
124
124
p = Authenticate(self, auth_parameters)
126
126
return p.deferred
383
383
raise TypeError('callback for share_change must be callable')
386
385
def notify_share_change(self, notify_share):
387
386
"""Call the current changed share callback, if any, with the
419
418
raise TypeError('callback for share_answer must be callable')
422
420
def notify_share_answer(self, msg):
423
421
"""Call the current share answer callback, if any, with the info.
596
594
def processMessage(self, message):
597
595
"""Process messages."""
598
# pylint: disable-msg=W0201
599
596
# pylint: disable=W0201
600
597
if message.type == protocol_pb2.Message.NODE_ATTR:
601
598
if self.node_attr_callback is not None:
904
900
@ivar new_generation: the generation that the volume is at now
906
# pylint: disable-msg=C0111
907
902
# pylint: disable=C0111
909
904
def __init__(self, protocol, share, node_id):
1036
1031
request.Request.__init__(self, protocol)
1037
1032
self.query_message = qm = protocol_pb2.Message()
1038
qm.id = 0 # just to have something in the field when calculating size
1033
qm.id = 0 # just to have something in the field when calculating size
1039
1034
qm.type = protocol_pb2.Message.QUERY
1040
1035
self.response = []
1041
1036
self.overflow = None
1078
1073
class BytesMessageProducer(object):
1079
1074
"""Produce BYTES messages from a file."""
1081
def __init__(self, request, fh, offset):
1076
def __init__(self, req, fh, offset):
1082
1077
"""Create a BytesMessageProducer."""
1083
self.request = request
1084
1079
self.producing = False
1086
1081
self.offset = offset
1203
1198
node id of the created object.
1205
1200
@cvar create_message: must be overridden with the correct creation message
1207
@cvar response_message: must be overridden with the correct creation success
1208
message that will be received
1202
@cvar response_message: must be overridden with the correct creation
1203
success message that will be received
1210
1205
@ivar new_id: the id of the node that was created (available upon success)
1211
1206
@ivar new_parent_id: the parent id the node now exists under
1241
1236
def processMessage(self, message):
1242
1237
"""Handle messages."""
1243
# pylint: disable-msg=W0201
1244
1238
# pylint: disable=W0201
1245
1239
if message.type == self.create_response:
1246
1240
self.new_id = message.new.node
1287
1281
def processMessage(self, message):
1288
1282
"""Handle messages."""
1289
# pylint: disable-msg=W0201
1290
1283
# pylint: disable=W0201
1291
1284
if message.type == protocol_pb2.Message.PROTOCOL_VERSION:
1292
1285
self.other_protocol_version = message.protocol.version
1398
1390
def processMessage(self, message):
1399
1391
"""Process the answer from the server."""
1400
# pylint: disable-msg=W0201
1401
1392
# pylint: disable=W0201
1402
1393
if message.type == protocol_pb2.Message.FREE_SPACE_INFO:
1403
1394
self.free_bytes = message.free_space_info.free_bytes
1421
1411
def processMessage(self, message):
1422
1412
"""Process the answer from the server."""
1423
# pylint: disable-msg=W0201
1424
1413
# pylint: disable=W0201
1425
1414
if message.type == protocol_pb2.Message.ACCOUNT_INFO:
1426
1415
self.purchased_bytes = message.account_info.purchased_bytes
1559
1549
def __init__(self, throttling_enabled=False,
1560
1550
read_limit=None, write_limit=None):
1561
1551
"""Create the instance."""
1562
self._readLimit = None # max bytes we should read per second
1563
self._writeLimit = None # max bytes we should write per second
1552
self._readLimit = None # max bytes we should read per second
1553
self._writeLimit = None # max bytes we should write per second
1564
1554
self._throttling_reads = False
1565
1555
self._throttling_writes = False
1566
1556
self._set_read_limit(read_limit)
1599
1589
if not self.valid_limit(limit):
1600
1590
raise ValueError('Read limit must be greater than 0.')
1601
1591
self._readLimit = limit
1602
# it's a property, pylint: disable-msg=W0212
1603
1592
# it's a property, pylint: disable=W0212
1604
1593
readLimit = property(lambda self: self._readLimit, _set_read_limit)
1605
1594
writeLimit = property(lambda self: self._writeLimit, _set_write_limit)
1606
# pylint: enable-msg=W0212
1607
1595
# pylint: enable=W0212
1609
1597
def callLater(self, period, func, *args, **kwargs):
1610
1598
"""Wrapper around L{reactor.callLater} for test purpose."""
1611
1599
return reactor.callLater(period, func, *args, **kwargs)
1613
def maybeCallLater(self, id, period, func):
1601
def maybeCallLater(self, call_id, period, func):
1614
1602
"""Maybe run callLater(period, func).
1616
1604
Only if we don't have a DelayedCall with the
1617
1605
specified id already running.
1619
delayed_call = getattr(self, id)
1607
delayed_call = getattr(self, call_id)
1620
1608
# check if we already have a DelayedCall running
1621
1609
if delayed_call is None or (not delayed_call.active() \
1622
1610
and delayed_call.cancelled):
1635
1623
self.readThisSecond += length
1636
1624
self.checkReadBandwidth()
1638
def _get_throttle_time(self, bytes, limit):
1639
"""Calculate the throttle_time for bytes and limit."""
1640
return (float(bytes) / limit) - 1.0
1626
def _get_throttle_time(self, data_bytes, limit):
1627
"""Calculate the throttle_time for data_bytes and limit."""
1628
return (float(data_bytes) / limit) - 1.0
1642
1630
def checkReadBandwidth(self):
1643
1631
"""Check if we've passed bandwidth limits."""
1676
1664
self.resetReadThisSecondID = self.callLater(1,
1677
1665
self._resetReadThisSecond)
1679
def _resetWrittenThisSecond (self):
1667
def _resetWrittenThisSecond(self):
1680
1668
"""Reset the counter named with 'name' every 1 second."""
1681
1669
self.writtenThisSecond = 0
1682
self.resetWriteThisSecondID = self.callLater(1,
1683
self._resetWrittenThisSecond)
1670
self.resetWriteThisSecondID = self.callLater(
1671
1, self._resetWrittenThisSecond)
1685
1673
def throttleReads(self):
1686
1674
"""Throttle reads on all protocols."""