1041
1042
if message.type == protocol_pb2.Message.AUTH_AUTHENTICATED:
1043
1044
elif message.type == protocol_pb2.Message.ERROR:
1044
if message.error.type == protocol_pb2.Error.AUTHENTICATION_FAILED:
1045
self.error(request.StorageProtocolError(
1046
"Authentication Failed"))
1048
self.error(request.StorageProtocolError(
1049
"Authentication Error:"+str(message)))
1045
# as the error travels with the exception, we send all here
1046
self.error(request.StorageRequestError(self, message))
1051
1048
self.error(request.StorageProtocolError(
1052
1049
"Authentication Error:"+str(message)))
1152
1149
def connectionMade(self):
1153
"""handle connection Made"""
1154
self.factory.client = self
1150
"""Handle connection Made."""
1151
if self.factory.client is None:
1152
self.factory.client = self
1155
1153
StorageClient.connectionMade(self)
1157
1155
def connectionLost(self, reason=None):
1158
"""handle connection lost"""
1159
self.factory.unregisterProtocol(self)
1156
"""Handle connection lost."""
1157
if self.factory.client is self:
1158
self.factory.unregisterProtocol(self)
1160
1159
StorageClient.connectionLost(self, reason=reason)
1162
1161
def write(self, data):
1163
"""transport API to capture bytes written"""
1164
self.factory.registerWritten(len(data))
1162
"""Transport API to capture bytes written."""
1163
if self.factory.client is self:
1164
self.factory.registerWritten(len(data))
1165
1165
StorageClient.write(self, data)
1167
1167
def writeSequence(self, seq):
1168
"""transport API to capture bytes written in a sequence"""
1169
self.factory.registerWritten(reduce(operator.add, map(len, seq)))
1168
"""Transport API to capture bytes written in a sequence."""
1169
if self.factory.client is self:
1170
self.factory.registerWritten(sum(len(x) for x in seq))
1170
1171
StorageClient.writeSequence(self, seq)
1172
1173
def dataReceived(self, data):
1173
"""override trnasport default to capture bytes read"""
1174
self.factory.registerRead(len(data))
1174
"""Override transport default to capture bytes read."""
1175
if self.factory.client is self:
1176
self.factory.registerRead(len(data))
1175
1177
StorageClient.dataReceived(self, data)
1177
1179
def throttleReads(self):
1178
"""pause self.transport"""
1180
"""Pause self.transport."""
1179
1181
self.transport.pauseProducing()
1181
1183
def unthrottleReads(self):
1182
"""resume self.transport"""
1184
"""Resume self.transport."""
1183
1185
self.transport.resumeProducing()
1185
1187
def throttleWrites(self):
1186
"""pause producing"""
1188
"""Pause producing."""
1187
1189
self.pauseProducing()
1189
1191
def unthrottleWrites(self):
1190
"""resume producing"""
1192
"""Resume producing."""
1191
1193
self.resumeProducing()
1250
1252
def checkReadBandwidth(self):
1251
1253
"""Checks if we've passed bandwidth limits."""
1252
1254
if self.readLimit is not None and \
1253
self.readThisSecond > self.readLimit:
1255
self.readThisSecond > self.readLimit and \
1256
self.unthrottleReadsID is None:
1254
1257
self.throttleReads()
1255
1258
throttle_time = (float(self.readThisSecond) / self.readLimit) - 1.0
1256
1259
log_debug("pause reads for: %s", str(throttle_time))
1261
1264
def checkWriteBandwidth(self):
1262
1265
"""Checks if we've passed bandwidth limits."""
1263
1266
if self.writeLimit is not None and \
1264
self.writtenThisSecond > self.writeLimit:
1267
self.writtenThisSecond > self.writeLimit and \
1268
self.unthrottleWritesID is None:
1265
1269
self.throttleWrites()
1266
1270
throttle_time = (float(self.writtenThisSecond) / self.writeLimit) \