~ubuntuone-control-tower/ubuntu/lucid/ubuntuone-storage-protocol/stable-1-2

« back to all changes in this revision

Viewing changes to ubuntuone/storageprotocol/client.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-12-07 17:00:00 UTC
  • mfrom: (1.1.7 upstream)
  • Revision ID: james.westby@ubuntu.com-20091207170000-9k5rnx1gz58s2izh
Tags: 1.1.0-0ubuntu1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
from twisted.python import log
25
25
 
26
26
import uuid
27
 
import operator
28
27
from functools import partial
29
28
from itertools import chain
30
29
import logging
821
820
        self.producing = False
822
821
        self.fh = fh
823
822
        self.offset = offset
 
823
        self.finished = False
824
824
 
825
825
    def resumeProducing(self):
826
826
        """IPushProducer interface."""
841
841
        Reads a little from the file, generates a BYTES message, and pass the
842
842
        control to the reactor.  If no more data, it finishes with EOF.
843
843
        """
844
 
        if not self.producing or self.request.cancelled:
 
844
        if not self.producing or self.request.cancelled or self.finished:
845
845
            return
846
846
 
847
847
        if self.offset:
860
860
            message.type = protocol_pb2.Message.EOF
861
861
            self.request.sendMessage(message)
862
862
            self.producing = False
 
863
            self.finished = True
863
864
 
864
865
 
865
866
class PutContent(request.Request):
1041
1042
        if message.type == protocol_pb2.Message.AUTH_AUTHENTICATED:
1042
1043
            self.done()
1043
1044
        elif message.type == protocol_pb2.Message.ERROR:
1044
 
            if message.error.type == protocol_pb2.Error.AUTHENTICATION_FAILED:
1045
 
                self.error(request.StorageProtocolError(
1046
 
                        "Authentication Failed"))
1047
 
            else:
1048
 
                self.error(request.StorageProtocolError(
1049
 
                        "Authentication Error:"+str(message)))
 
1045
            # as the error travels with the exception, we send all here
 
1046
            self.error(request.StorageRequestError(self, message))
1050
1047
        else:
1051
1048
            self.error(request.StorageProtocolError(
1052
1049
                        "Authentication Error:"+str(message)))
1150
1147
    factory = None
1151
1148
 
1152
1149
    def connectionMade(self):
1153
 
        """handle connection Made"""
1154
 
        self.factory.client = self
 
1150
        """Handle connection Made."""
 
1151
        if self.factory.client is None:
 
1152
            self.factory.client = self
1155
1153
        StorageClient.connectionMade(self)
1156
1154
 
1157
1155
    def connectionLost(self, reason=None):
1158
 
        """handle connection lost"""
1159
 
        self.factory.unregisterProtocol(self)
 
1156
        """Handle connection lost."""
 
1157
        if self.factory.client is self:
 
1158
            self.factory.unregisterProtocol(self)
1160
1159
        StorageClient.connectionLost(self, reason=reason)
1161
1160
 
1162
1161
    def write(self, data):
1163
 
        """transport API to capture bytes written"""
1164
 
        self.factory.registerWritten(len(data))
 
1162
        """Transport API to capture bytes written."""
 
1163
        if self.factory.client is self:
 
1164
            self.factory.registerWritten(len(data))
1165
1165
        StorageClient.write(self, data)
1166
1166
 
1167
1167
    def writeSequence(self, seq):
1168
 
        """transport API to capture bytes written in a sequence"""
1169
 
        self.factory.registerWritten(reduce(operator.add, map(len, seq)))
 
1168
        """Transport API to capture bytes written in a sequence."""
 
1169
        if self.factory.client is self:
 
1170
            self.factory.registerWritten(sum(len(x) for x in seq))
1170
1171
        StorageClient.writeSequence(self, seq)
1171
1172
 
1172
1173
    def dataReceived(self, data):
1173
 
        """override trnasport default to capture bytes read"""
1174
 
        self.factory.registerRead(len(data))
 
1174
        """Override transport default to capture bytes read."""
 
1175
        if self.factory.client is self:
 
1176
            self.factory.registerRead(len(data))
1175
1177
        StorageClient.dataReceived(self, data)
1176
1178
 
1177
1179
    def throttleReads(self):
1178
 
        """pause self.transport"""
 
1180
        """Pause self.transport."""
1179
1181
        self.transport.pauseProducing()
1180
1182
 
1181
1183
    def unthrottleReads(self):
1182
 
        """resume self.transport"""
 
1184
        """Resume self.transport."""
1183
1185
        self.transport.resumeProducing()
1184
1186
 
1185
1187
    def throttleWrites(self):
1186
 
        """pause producing"""
 
1188
        """Pause producing."""
1187
1189
        self.pauseProducing()
1188
1190
 
1189
1191
    def unthrottleWrites(self):
1190
 
        """resume producing"""
 
1192
        """Resume producing."""
1191
1193
        self.resumeProducing()
1192
1194
 
1193
1195
 
1250
1252
    def checkReadBandwidth(self):
1251
1253
        """Checks if we've passed bandwidth limits."""
1252
1254
        if self.readLimit is not None and \
1253
 
           self.readThisSecond > self.readLimit:
 
1255
           self.readThisSecond > self.readLimit and \
 
1256
           self.unthrottleReadsID is None:
1254
1257
            self.throttleReads()
1255
1258
            throttle_time = (float(self.readThisSecond) / self.readLimit) - 1.0
1256
1259
            log_debug("pause reads for: %s", str(throttle_time))
1261
1264
    def checkWriteBandwidth(self):
1262
1265
        """Checks if we've passed bandwidth limits."""
1263
1266
        if self.writeLimit is not None and \
1264
 
           self.writtenThisSecond > self.writeLimit:
 
1267
           self.writtenThisSecond > self.writeLimit and \
 
1268
           self.unthrottleWritesID is None:
1265
1269
            self.throttleWrites()
1266
1270
            throttle_time = (float(self.writtenThisSecond) / self.writeLimit) \
1267
1271
                    - 1.0