5
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
7
# This program is free software: you can redistribute it and/or modify it
8
8
# under the terms of the GNU Affero General Public License version 3,
9
9
# as published by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
14
# PURPOSE. See the GNU Affero General Public License for more details.
16
16
# You should have received a copy of the GNU Affero General Public License
31
31
from ubuntuone.storageprotocol.dircontent_pb2 import \
32
32
DirectoryContent, DIRECTORY
34
35
class NotDirectory(Exception):
35
36
"""This wasnt a directory"""
37
39
def delay_time(step):
38
40
"""generates a delay for each step"""
39
41
return (((math.exp(step) - 1) / 10) /
40
42
(1 + random.random()))
42
45
def retry(function):
43
46
"""This function will be retried when it raises TRY_AGAIN from the server.
45
48
def inner(self, *args, **kwargs):
47
51
def do_retry(failure, step):
49
53
if failure.check(request.StorageRequestError) \
53
57
d = defer.Deferred()
54
58
reactor.callLater(delay_time(step), d.callback, None)
55
59
d.addCallback(lambda _: function(self, *args, **kwargs))
56
d.addErrback(do_retry, step+1)
60
d.addErrback(do_retry, step + 1)
85
89
message = query[0][1].response[0]
86
90
return message.hash
89
[(request.ROOT, node_id, request.UNKNOWN_HASH)]
92
d = self.query([(request.ROOT, node_id, request.UNKNOWN_HASH)])
91
93
d.addCallback(_got_query)
127
130
if entry.name == name and entry.node_type == DIRECTORY:
128
131
print "is directory", entry
129
132
return entry.node
130
raise NotDirectory("name %s is not a directory"%name)
133
raise NotDirectory("name %s is not a directory" % name)
131
134
d.addCallback(is_directory)
138
141
d.addCallback(lambda x: setattr(self, "cwd", full_path))
143
145
def mkfile(self, name):
144
146
"""make a file named name in cwd."""
145
147
d = self.get_cwd_id()
146
148
d.addCallback(lambda _:
147
self.make_file(request.ROOT, self.cwd_id, name)
149
self.make_file(request.ROOT, self.cwd_id, name))
151
152
def mkdir(self, name):
152
153
"""make a dir named name in cwd."""
153
154
d = self.get_cwd_id()
154
155
d.addCallback(lambda _:
155
self.make_dir(request.ROOT, self.cwd_id, name)
156
self.make_dir(request.ROOT, self.cwd_id, name))
159
159
def put(self, name, content):
197
197
class EasyClientFactory(StorageClientFactory):
198
198
"""A test oriented protocol factory."""
199
# no init: pylint: disable-msg=W0232
200
# pylint: disable=W0232
202
199
protocol = EasyClient
204
def __init__(self, defer):
201
def __init__(self, deferrer):
205
202
"""create a factory."""
204
self.defer = deferrer
208
def clientConnectionMade(self, client):
206
def clientConnectionMade(self, client_obj):
209
207
"""on client connection made."""
210
# pylint: disable-msg=W0201
211
# pylint: disable=W0201
213
self.defer.callback(client)
208
self.client = client_obj
209
self.defer.callback(self.client)
215
211
def clientConnectionFailed(self, connector, reason):
216
212
"""We failed at connecting."""
217
213
self.defer.errback(reason)
219
216
def client(host, port):
220
217
"""return a deferred that will succeed with a connected client."""
221
218
d = defer.Deferred()
223
220
reactor.connectTCP(host, port, factory)
226
224
def authenticated_client(host, port, token="open sesame"):
227
225
"""return a deferred that will succeed with an authenticated client."""
228
226
d = client(host, port)
228
def auth(client_obj):
230
229
"""do the auth."""
231
230
d = client.dummy_authenticate(token)
232
d.addCallback(lambda _: client)
231
d.addCallback(lambda _: client_obj)
234
233
d.addCallback(auth)
237
237
def skip_error(failure, error):
238
238
"""try: except $error: pass errback"""
239
239
if failure.check(request.StorageRequestError) and \
245
246
# deferred utilities
246
247
skip_result = lambda _, f, *args, **kwargs: f(*args, **kwargs)
247
250
def sr_result(result, f, *args, **kwargs):
248
251
"""skip the result when calling the function and then return it."""
249
252
f(*args, **kwargs)
252
256
def log(*args, **kwargs):
253
257
"""print args and kwargs."""
258
263
def show_error(failure):
259
264
"""print the traceback."""
260
265
print failure.getTraceback()
262
268
if __name__ == "__main__":
264
# pylint: disable-msg=C0111
265
# pylint: disable=C0111
267
def create_dirs(client, num_dirs):
270
def create_dirs(client_obj, num_dirs):
271
"""Create directories."""
268
272
d = defer.succeed(None)
269
273
for i in range(0, num_dirs):
270
d.addCallback(skip_result, client.mkdir, "%s"%(i))
274
d.addCallback(skip_result, client_obj.mkdir, "%s" % (i))
271
275
d.addErrback(skip_error, protocol_pb2.Error.ALREADY_EXISTS)
272
d.addCallback(skip_result, log, "Directory %s created."%i)
276
d.addCallback(skip_result, log, "Directory %s created." % i)
275
def make_files_client(client, number, num_files):
276
d = client.chdir("%s"%(number))
279
def make_files_client(client_obj, number, num_files):
281
d = client_obj.chdir("%s" % (number))
277
282
for i in range(0, num_files):
278
283
d.addCallback(skip_result, log, "Client %s creating file %s."
280
d.addCallback(skip_result, client.mkfile, "%s"%(i))
285
d.addCallback(skip_result, client.mkfile, "%s" % (i))
281
286
d.addCallback(skip_result, log, "Client %s created file %s."
285
290
NUM_CLIENTS = 200
288
port = int(open("tmp/ubuntuone-api.port").read())
289
d = authenticated_client("localhost", int(port))
290
d.addCallback(create_dirs, NUM_CLIENTS)
293
port_num = int(open("tmp/ubuntuone-api.port").read())
294
deferred = authenticated_client("localhost", int(port_num))
295
deferred.addCallback(create_dirs, NUM_CLIENTS)
292
297
def fire_clients(_):
298
"""Fire off the client connections."""
294
300
for i in range(NUM_CLIENTS):
295
d = authenticated_client("localhost", int(port))
301
d = authenticated_client("localhost", int(port_num))
296
302
d.addCallback(sr_result, log, "client", i, "logged in")
297
303
d.addCallback(make_files_client, i, NUM_FILES)
298
304
d.addErrback(show_error)
301
307
return defer.DeferredList(dlist)
303
d.addCallback(fire_clients)
304
d.addCallback(lambda _: reactor.stop())
305
d.addErrback(show_error)
309
deferred.addCallback(fire_clients)
310
deferred.addCallback(lambda _: reactor.stop())
311
deferred.addErrback(show_error)
306
312
print "Starting reactor"
307
313
start_time = time.time()