2
import time, os.path, stat, re, simplejson, struct
4
from twisted.trial import unittest
6
from twisted.internet import defer
7
from twisted.application import service
8
from foolscap.api import fireEventually
10
from allmydata import interfaces
11
from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
12
from allmydata.storage.server import StorageServer
13
from allmydata.storage.mutable import MutableShareFile
14
from allmydata.storage.immutable import BucketWriter, BucketReader
15
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
16
UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
17
from allmydata.storage.lease import LeaseInfo
18
from allmydata.storage.crawler import BucketCountingCrawler
19
from allmydata.storage.expirer import LeaseCheckingCrawler
20
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
22
from allmydata.interfaces import BadWriteEnablerError
23
from allmydata.test.common import LoggingServiceParent
24
from allmydata.test.common_web import WebRenderingMixin
25
from allmydata.web.storage import StorageStatus, remove_prefix
30
def __init__(self, ignore_disconnectors=False):
31
self.ignore = ignore_disconnectors
32
self.disconnectors = {}
33
def notifyOnDisconnect(self, f, *args, **kwargs):
37
self.disconnectors[m] = (f, args, kwargs)
39
def dontNotifyOnDisconnect(self, marker):
42
del self.disconnectors[marker]
44
class FakeStatsProvider:
45
def count(self, name, delta=1):
47
def register_producer(self, producer):
50
class Bucket(unittest.TestCase):
51
def make_workdir(self, name):
52
basedir = os.path.join("storage", "Bucket", name)
53
incoming = os.path.join(basedir, "tmp", "bucket")
54
final = os.path.join(basedir, "bucket")
55
fileutil.make_dirs(basedir)
56
fileutil.make_dirs(os.path.join(basedir, "tmp"))
57
return incoming, final
59
def bucket_writer_closed(self, bw, consumed):
61
def add_latency(self, category, latency):
63
def count(self, name, delta=1):
68
renew_secret = os.urandom(32)
69
cancel_secret = os.urandom(32)
70
expiration_time = time.time() + 5000
71
return LeaseInfo(owner_num, renew_secret, cancel_secret,
72
expiration_time, "\x00" * 20)
74
def test_create(self):
75
incoming, final = self.make_workdir("test_create")
76
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
78
bw.remote_write(0, "a"*25)
79
bw.remote_write(25, "b"*25)
80
bw.remote_write(50, "c"*25)
81
bw.remote_write(75, "d"*7)
84
def test_readwrite(self):
85
incoming, final = self.make_workdir("test_readwrite")
86
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
88
bw.remote_write(0, "a"*25)
89
bw.remote_write(25, "b"*25)
90
bw.remote_write(50, "c"*7) # last block may be short
94
br = BucketReader(self, bw.finalhome)
95
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
96
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
97
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
101
def callRemote(self, methname, *args, **kwargs):
103
meth = getattr(self.target, "remote_" + methname)
104
return meth(*args, **kwargs)
105
return defer.maybeDeferred(_call)
107
class BucketProxy(unittest.TestCase):
108
def make_bucket(self, name, size):
109
basedir = os.path.join("storage", "BucketProxy", name)
110
incoming = os.path.join(basedir, "tmp", "bucket")
111
final = os.path.join(basedir, "bucket")
112
fileutil.make_dirs(basedir)
113
fileutil.make_dirs(os.path.join(basedir, "tmp"))
114
bw = BucketWriter(self, incoming, final, size, self.make_lease(),
120
def make_lease(self):
122
renew_secret = os.urandom(32)
123
cancel_secret = os.urandom(32)
124
expiration_time = time.time() + 5000
125
return LeaseInfo(owner_num, renew_secret, cancel_secret,
126
expiration_time, "\x00" * 20)
128
def bucket_writer_closed(self, bw, consumed):
130
def add_latency(self, category, latency):
132
def count(self, name, delta=1):
135
def test_create(self):
136
bw, rb, sharefname = self.make_bucket("test_create", 500)
137
bp = WriteBucketProxy(rb,
142
uri_extension_size_max=500, nodeid=None)
143
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
145
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
146
# Let's pretend each share has 100 bytes of data, and that there are
147
# 4 segments (25 bytes each), and 8 shares total. So the two
148
# per-segment merkle trees (crypttext_hash_tree,
149
# block_hashes) will have 4 leaves and 7 nodes each. The per-share
150
# merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
151
# nodes. Furthermore, let's assume the uri_extension is 500 bytes
152
# long. That should make the whole share:
154
# 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
155
# 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
157
sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
159
crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
161
block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
163
share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
165
uri_extension = "s" + "E"*498 + "e"
167
bw, rb, sharefname = self.make_bucket(name, sharesize)
173
uri_extension_size_max=len(uri_extension),
177
d.addCallback(lambda res: bp.put_block(0, "a"*25))
178
d.addCallback(lambda res: bp.put_block(1, "b"*25))
179
d.addCallback(lambda res: bp.put_block(2, "c"*25))
180
d.addCallback(lambda res: bp.put_block(3, "d"*20))
181
d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
182
d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
183
d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
184
d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
185
d.addCallback(lambda res: bp.close())
187
# now read everything back
188
def _start_reading(res):
189
br = BucketReader(self, sharefname)
192
rbp = rbp_class(rb, peerid="abc", storage_index="")
193
self.failUnless("to peer" in repr(rbp))
194
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
196
d1 = rbp.get_block_data(0, 25, 25)
197
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
198
d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
199
d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
200
d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
201
d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
202
d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
203
d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
205
d1.addCallback(lambda res: rbp.get_crypttext_hashes())
206
d1.addCallback(lambda res:
207
self.failUnlessEqual(res, crypttext_hashes))
208
d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
209
d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
210
d1.addCallback(lambda res: rbp.get_share_hashes())
211
d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
212
d1.addCallback(lambda res: rbp.get_uri_extension())
213
d1.addCallback(lambda res:
214
self.failUnlessEqual(res, uri_extension))
218
d.addCallback(_start_reading)
222
def test_readwrite_v1(self):
223
return self._do_test_readwrite("test_readwrite_v1",
224
0x24, WriteBucketProxy, ReadBucketProxy)
226
def test_readwrite_v2(self):
227
return self._do_test_readwrite("test_readwrite_v2",
228
0x44, WriteBucketProxy_v2, ReadBucketProxy)
230
class FakeDiskStorageServer(StorageServer):
231
def stat_disk(self, d):
232
return self.DISKAVAIL
234
class Server(unittest.TestCase):
237
self.sparent = LoggingServiceParent()
238
self.sparent.startService()
239
self._lease_secret = itertools.count()
241
return self.sparent.stopService()
243
def workdir(self, name):
244
basedir = os.path.join("storage", "Server", name)
247
def create(self, name, reserved_space=0, klass=StorageServer):
248
workdir = self.workdir(name)
249
ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
250
stats_provider=FakeStatsProvider())
251
ss.setServiceParent(self.sparent)
254
def test_create(self):
255
ss = self.create("test_create")
257
def allocate(self, ss, storage_index, sharenums, size, canary=None):
258
renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
259
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
261
canary = FakeCanary()
262
return ss.remote_allocate_buckets(storage_index,
263
renew_secret, cancel_secret,
264
sharenums, size, canary)
266
def test_large_share(self):
267
ss = self.create("test_large_share")
269
already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
270
self.failUnlessEqual(already, set())
271
self.failUnlessEqual(set(writers.keys()), set([0]))
273
shnum, bucket = writers.items()[0]
274
# This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
275
bucket.remote_write(2**32, "ab")
276
bucket.remote_close()
278
readers = ss.remote_get_buckets("allocate")
279
reader = readers[shnum]
280
self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
281
test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
283
def test_dont_overfill_dirs(self):
285
This test asserts that if you add a second share whose storage index
286
share lots of leading bits with an extant share (but isn't the exact
287
same storage index), this won't add an entry to the share directory.
289
ss = self.create("test_dont_overfill_dirs")
290
already, writers = self.allocate(ss, "storageindex", [0], 10)
291
for i, wb in writers.items():
292
wb.remote_write(0, "%10d" % i)
294
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
296
children_of_storedir = set(os.listdir(storedir))
298
# Now store another one under another storageindex that has leading
299
# chars the same as the first storageindex.
300
already, writers = self.allocate(ss, "storageindey", [0], 10)
301
for i, wb in writers.items():
302
wb.remote_write(0, "%10d" % i)
304
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
306
new_children_of_storedir = set(os.listdir(storedir))
307
self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
309
def test_remove_incoming(self):
310
ss = self.create("test_remove_incoming")
311
already, writers = self.allocate(ss, "vid", range(3), 10)
312
for i,wb in writers.items():
313
wb.remote_write(0, "%10d" % i)
315
incoming_share_dir = wb.incominghome
316
incoming_bucket_dir = os.path.dirname(incoming_share_dir)
317
incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
318
incoming_dir = os.path.dirname(incoming_prefix_dir)
319
self.failIf(os.path.exists(incoming_bucket_dir))
320
self.failIf(os.path.exists(incoming_prefix_dir))
321
self.failUnless(os.path.exists(incoming_dir))
323
def test_allocate(self):
324
ss = self.create("test_allocate")
326
self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
328
canary = FakeCanary()
329
already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
330
self.failUnlessEqual(already, set())
331
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
333
# while the buckets are open, they should not count as readable
334
self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
337
for i,wb in writers.items():
338
wb.remote_write(0, "%25d" % i)
340
# aborting a bucket that was already closed is a no-op
343
# now they should be readable
344
b = ss.remote_get_buckets("allocate")
345
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
346
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
348
self.failUnless("BucketReader" in b_str, b_str)
349
self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
351
# now if we ask about writing again, the server should offer those
352
# three buckets as already present. It should offer them even if we
353
# don't ask about those specific ones.
354
already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
355
self.failUnlessEqual(already, set([0,1,2]))
356
self.failUnlessEqual(set(writers.keys()), set([3,4]))
358
# while those two buckets are open for writing, the server should
359
# refuse to offer them to uploaders
361
already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
362
self.failUnlessEqual(already2, set([0,1,2]))
363
self.failUnlessEqual(set(writers2.keys()), set([5]))
365
# aborting the writes should remove the tempfiles
366
for i,wb in writers2.items():
368
already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
369
self.failUnlessEqual(already2, set([0,1,2]))
370
self.failUnlessEqual(set(writers2.keys()), set([5]))
372
for i,wb in writers2.items():
374
for i,wb in writers.items():
377
def test_bad_container_version(self):
378
ss = self.create("test_bad_container_version")
379
a,w = self.allocate(ss, "si1", [0], 10)
380
w[0].remote_write(0, "\xff"*10)
383
fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
386
f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
389
b = ss.remote_get_buckets("allocate")
391
e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
392
ss.remote_get_buckets, "si1")
393
self.failUnless(" had version 0 but we wanted 1" in str(e), e)
395
def test_disconnect(self):
396
# simulate a disconnection
397
ss = self.create("test_disconnect")
398
canary = FakeCanary()
399
already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
400
self.failUnlessEqual(already, set())
401
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
402
for (f,args,kwargs) in canary.disconnectors.values():
407
# that ought to delete the incoming shares
408
already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
409
self.failUnlessEqual(already, set())
410
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
412
def test_reserved_space(self):
413
ss = self.create("test_reserved_space", reserved_space=10000,
414
klass=FakeDiskStorageServer)
415
# the FakeDiskStorageServer doesn't do real statvfs() calls
417
# 15k available, 10k reserved, leaves 5k for shares
419
# a newly created and filled share incurs this much overhead, beyond
420
# the size we request.
422
LEASE_SIZE = 4+32+32+4
423
canary = FakeCanary(True)
424
already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
425
self.failUnlessEqual(len(writers), 3)
426
# now the StorageServer should have 3000 bytes provisionally
427
# allocated, allowing only 2000 more to be claimed
428
self.failUnlessEqual(len(ss._active_writers), 3)
430
# allocating 1001-byte shares only leaves room for one
431
already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
432
self.failUnlessEqual(len(writers2), 1)
433
self.failUnlessEqual(len(ss._active_writers), 4)
435
# we abandon the first set, so their provisional allocation should be
439
self.failUnlessEqual(len(ss._active_writers), 1)
440
# now we have a provisional allocation of 1001 bytes
442
# and we close the second set, so their provisional allocation should
443
# become real, long-term allocation, and grows to include the
445
for bw in writers2.values():
446
bw.remote_write(0, "a"*25)
451
self.failUnlessEqual(len(ss._active_writers), 0)
453
allocated = 1001 + OVERHEAD + LEASE_SIZE
455
# we have to manually increase DISKAVAIL, since we're not doing real
457
ss.DISKAVAIL -= allocated
459
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
460
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
461
already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
462
self.failUnlessEqual(len(writers3), 39)
463
self.failUnlessEqual(len(ss._active_writers), 39)
467
self.failUnlessEqual(len(ss._active_writers), 0)
468
ss.disownServiceParent()
472
basedir = self.workdir("test_seek_behavior")
473
fileutil.make_dirs(basedir)
474
filename = os.path.join(basedir, "testfile")
475
f = open(filename, "wb")
478
# mode="w" allows seeking-to-create-holes, but truncates pre-existing
479
# files. mode="a" preserves previous contents but does not allow
480
# seeking-to-create-holes. mode="r+" allows both.
481
f = open(filename, "rb+")
485
filelen = os.stat(filename)[stat.ST_SIZE]
486
self.failUnlessEqual(filelen, 100+3)
487
f2 = open(filename, "rb")
488
self.failUnlessEqual(f2.read(5), "start")
491
def test_leases(self):
492
ss = self.create("test_leases")
493
canary = FakeCanary()
497
rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
498
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
499
already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
500
sharenums, size, canary)
501
self.failUnlessEqual(len(already), 0)
502
self.failUnlessEqual(len(writers), 5)
503
for wb in writers.values():
506
leases = list(ss.get_leases("si0"))
507
self.failUnlessEqual(len(leases), 1)
508
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
510
rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
511
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
512
already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
513
sharenums, size, canary)
514
for wb in writers.values():
517
# take out a second lease on si1
518
rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
519
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
520
already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
521
sharenums, size, canary)
522
self.failUnlessEqual(len(already), 5)
523
self.failUnlessEqual(len(writers), 0)
525
leases = list(ss.get_leases("si1"))
526
self.failUnlessEqual(len(leases), 2)
527
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
529
# and a third lease, using add-lease
530
rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
531
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
532
ss.remote_add_lease("si1", rs2a, cs2a)
533
leases = list(ss.get_leases("si1"))
534
self.failUnlessEqual(len(leases), 3)
535
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
537
# add-lease on a missing storage index is silently ignored
538
self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
540
# check that si0 is readable
541
readers = ss.remote_get_buckets("si0")
542
self.failUnlessEqual(len(readers), 5)
544
# renew the first lease. Only the proper renew_secret should work
545
ss.remote_renew_lease("si0", rs0)
546
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
547
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
549
# check that si0 is still readable
550
readers = ss.remote_get_buckets("si0")
551
self.failUnlessEqual(len(readers), 5)
554
self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
555
self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
556
ss.remote_cancel_lease("si0", cs0)
558
# si0 should now be gone
559
readers = ss.remote_get_buckets("si0")
560
self.failUnlessEqual(len(readers), 0)
561
# and the renew should no longer work
562
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
565
# cancel the first lease on si1, leaving the second and third in place
566
ss.remote_cancel_lease("si1", cs1)
567
readers = ss.remote_get_buckets("si1")
568
self.failUnlessEqual(len(readers), 5)
569
# the corresponding renew should no longer work
570
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
572
leases = list(ss.get_leases("si1"))
573
self.failUnlessEqual(len(leases), 2)
574
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
576
ss.remote_renew_lease("si1", rs2)
577
# cancelling the second and third should make it go away
578
ss.remote_cancel_lease("si1", cs2)
579
ss.remote_cancel_lease("si1", cs2a)
580
readers = ss.remote_get_buckets("si1")
581
self.failUnlessEqual(len(readers), 0)
582
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
583
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
584
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
586
leases = list(ss.get_leases("si1"))
587
self.failUnlessEqual(len(leases), 0)
590
# test overlapping uploads
591
rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
592
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
593
rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
594
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
595
already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
596
sharenums, size, canary)
597
self.failUnlessEqual(len(already), 0)
598
self.failUnlessEqual(len(writers), 5)
599
already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
600
sharenums, size, canary)
601
self.failUnlessEqual(len(already2), 0)
602
self.failUnlessEqual(len(writers2), 0)
603
for wb in writers.values():
606
leases = list(ss.get_leases("si3"))
607
self.failUnlessEqual(len(leases), 1)
609
already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
610
sharenums, size, canary)
611
self.failUnlessEqual(len(already3), 5)
612
self.failUnlessEqual(len(writers3), 0)
614
leases = list(ss.get_leases("si3"))
615
self.failUnlessEqual(len(leases), 2)
617
def test_readonly(self):
618
workdir = self.workdir("test_readonly")
619
ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
620
ss.setServiceParent(self.sparent)
622
already,writers = self.allocate(ss, "vid", [0,1,2], 75)
623
self.failUnlessEqual(already, set())
624
self.failUnlessEqual(writers, {})
626
stats = ss.get_stats()
627
self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
629
if "storage_server.disk_avail" in stats:
630
# windows does not have os.statvfs, so it doesn't give us disk
631
# stats. But if there are stats, readonly_storage means
633
self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
635
def test_discard(self):
636
# discard is really only used for other tests, but we test it anyways
637
workdir = self.workdir("test_discard")
638
ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
639
ss.setServiceParent(self.sparent)
641
canary = FakeCanary()
642
already,writers = self.allocate(ss, "vid", [0,1,2], 75)
643
self.failUnlessEqual(already, set())
644
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
645
for i,wb in writers.items():
646
wb.remote_write(0, "%25d" % i)
648
# since we discard the data, the shares should be present but sparse.
649
# Since we write with some seeks, the data we read back will be all
651
b = ss.remote_get_buckets("vid")
652
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
653
self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
655
def test_advise_corruption(self):
656
workdir = self.workdir("test_advise_corruption")
657
ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
658
ss.setServiceParent(self.sparent)
660
si0_s = base32.b2a("si0")
661
ss.remote_advise_corrupt_share("immutable", "si0", 0,
662
"This share smells funny.\n")
663
reportdir = os.path.join(workdir, "corruption-advisories")
664
reports = os.listdir(reportdir)
665
self.failUnlessEqual(len(reports), 1)
666
report_si0 = reports[0]
667
self.failUnless(si0_s in report_si0, report_si0)
668
f = open(os.path.join(reportdir, report_si0), "r")
671
self.failUnless("type: immutable" in report)
672
self.failUnless(("storage_index: %s" % si0_s) in report)
673
self.failUnless("share_number: 0" in report)
674
self.failUnless("This share smells funny." in report)
676
# test the RIBucketWriter version too
677
si1_s = base32.b2a("si1")
678
already,writers = self.allocate(ss, "si1", [1], 75)
679
self.failUnlessEqual(already, set())
680
self.failUnlessEqual(set(writers.keys()), set([1]))
681
writers[1].remote_write(0, "data")
682
writers[1].remote_close()
684
b = ss.remote_get_buckets("si1")
685
self.failUnlessEqual(set(b.keys()), set([1]))
686
b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
688
reports = os.listdir(reportdir)
689
self.failUnlessEqual(len(reports), 2)
690
report_si1 = [r for r in reports if si1_s in r][0]
691
f = open(os.path.join(reportdir, report_si1), "r")
694
self.failUnless("type: immutable" in report)
695
self.failUnless(("storage_index: %s" % si1_s) in report)
696
self.failUnless("share_number: 1" in report)
697
self.failUnless("This share tastes like dust." in report)
701
class MutableServer(unittest.TestCase):
704
self.sparent = LoggingServiceParent()
705
self._lease_secret = itertools.count()
707
return self.sparent.stopService()
709
def workdir(self, name):
710
basedir = os.path.join("storage", "MutableServer", name)
713
def create(self, name):
714
workdir = self.workdir(name)
715
ss = StorageServer(workdir, "\x00" * 20)
716
ss.setServiceParent(self.sparent)
719
def test_create(self):
720
ss = self.create("test_create")
722
def write_enabler(self, we_tag):
723
return hashutil.tagged_hash("we_blah", we_tag)
725
def renew_secret(self, tag):
726
return hashutil.tagged_hash("renew_blah", str(tag))
728
def cancel_secret(self, tag):
729
return hashutil.tagged_hash("cancel_blah", str(tag))
731
def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
732
write_enabler = self.write_enabler(we_tag)
733
renew_secret = self.renew_secret(lease_tag)
734
cancel_secret = self.cancel_secret(lease_tag)
735
rstaraw = ss.remote_slot_testv_and_readv_and_writev
736
testandwritev = dict( [ (shnum, ([], [], None) )
737
for shnum in sharenums ] )
739
rc = rstaraw(storage_index,
740
(write_enabler, renew_secret, cancel_secret),
743
(did_write, readv_data) = rc
744
self.failUnless(did_write)
745
self.failUnless(isinstance(readv_data, dict))
746
self.failUnlessEqual(len(readv_data), 0)
748
def test_bad_magic(self):
749
ss = self.create("test_bad_magic")
750
self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
751
fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
756
read = ss.remote_slot_readv
757
e = self.failUnlessRaises(UnknownMutableContainerVersionError,
758
read, "si1", [0], [(0,10)])
759
self.failUnless(" had magic " in str(e), e)
760
self.failUnless(" but we wanted " in str(e), e)
762
def test_container_size(self):
763
ss = self.create("test_container_size")
764
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
766
read = ss.remote_slot_readv
767
rstaraw = ss.remote_slot_testv_and_readv_and_writev
768
secrets = ( self.write_enabler("we1"),
769
self.renew_secret("we1"),
770
self.cancel_secret("we1") )
771
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
772
answer = rstaraw("si1", secrets,
773
{0: ([], [(0,data)], len(data)+12)},
775
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
777
# trying to make the container too large will raise an exception
778
TOOBIG = MutableShareFile.MAX_SIZE + 10
779
self.failUnlessRaises(DataTooLargeError,
780
rstaraw, "si1", secrets,
781
{0: ([], [(0,data)], TOOBIG)},
784
# it should be possible to make the container smaller, although at
785
# the moment this doesn't actually affect the share, unless the
786
# container size is dropped to zero, in which case the share is
788
answer = rstaraw("si1", secrets,
789
{0: ([], [(0,data)], len(data)+8)},
791
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
793
answer = rstaraw("si1", secrets,
794
{0: ([], [(0,data)], 0)},
796
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
798
read_answer = read("si1", [0], [(0,10)])
799
self.failUnlessEqual(read_answer, {})
801
def test_allocate(self):
802
ss = self.create("test_allocate")
803
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
806
read = ss.remote_slot_readv
807
self.failUnlessEqual(read("si1", [0], [(0, 10)]),
809
self.failUnlessEqual(read("si1", [], [(0, 10)]),
810
{0: [""], 1: [""], 2: [""]})
811
self.failUnlessEqual(read("si1", [0], [(100, 10)]),
815
secrets = ( self.write_enabler("we1"),
816
self.renew_secret("we1"),
817
self.cancel_secret("we1") )
818
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
819
write = ss.remote_slot_testv_and_readv_and_writev
820
answer = write("si1", secrets,
821
{0: ([], [(0,data)], None)},
823
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
825
self.failUnlessEqual(read("si1", [0], [(0,20)]),
826
{0: ["00000000001111111111"]})
827
self.failUnlessEqual(read("si1", [0], [(95,10)]),
829
#self.failUnlessEqual(s0.remote_get_length(), 100)
831
bad_secrets = ("bad write enabler", secrets[1], secrets[2])
832
f = self.failUnlessRaises(BadWriteEnablerError,
833
write, "si1", bad_secrets,
835
self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
837
# this testv should fail
838
answer = write("si1", secrets,
839
{0: ([(0, 12, "eq", "444444444444"),
840
(20, 5, "eq", "22222"),
847
self.failUnlessEqual(answer, (False,
848
{0: ["000000000011", "22222"],
852
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
855
answer = write("si1", secrets,
856
{0: ([(10, 5, "lt", "11111"),
863
self.failUnlessEqual(answer, (False,
868
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
871
def test_operators(self):
872
# test operators, the data we're comparing is '11111' in all cases.
873
# test both fail+pass, reset data after each one.
874
ss = self.create("test_operators")
876
secrets = ( self.write_enabler("we1"),
877
self.renew_secret("we1"),
878
self.cancel_secret("we1") )
879
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
880
write = ss.remote_slot_testv_and_readv_and_writev
881
read = ss.remote_slot_readv
884
write("si1", secrets,
885
{0: ([], [(0,data)], None)},
891
answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
896
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
897
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
898
self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
901
answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
906
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
907
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
910
answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
915
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
916
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
920
answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
925
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
926
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
929
answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
934
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
935
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
938
answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
943
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
944
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
948
answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
953
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
954
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
957
answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
962
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
963
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
967
answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
972
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
973
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
976
answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
981
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
982
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
986
answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
991
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
992
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
995
answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
1000
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1001
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1004
answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
1009
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1010
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1014
answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
1019
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
1020
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
1023
answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
1028
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1029
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1032
answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
1037
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1038
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1041
# finally, test some operators against empty shares
1042
answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
1047
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
1048
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
1051
def test_readv(self):
1052
ss = self.create("test_readv")
1053
secrets = ( self.write_enabler("we1"),
1054
self.renew_secret("we1"),
1055
self.cancel_secret("we1") )
1056
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1057
write = ss.remote_slot_testv_and_readv_and_writev
1058
read = ss.remote_slot_readv
1059
data = [("%d" % i) * 100 for i in range(3)]
1060
rc = write("si1", secrets,
1061
{0: ([], [(0,data[0])], None),
1062
1: ([], [(0,data[1])], None),
1063
2: ([], [(0,data[2])], None),
1065
self.failUnlessEqual(rc, (True, {}))
1067
answer = read("si1", [], [(0, 10)])
1068
self.failUnlessEqual(answer, {0: ["0"*10],
1072
def compare_leases_without_timestamps(self, leases_a, leases_b):
1073
self.failUnlessEqual(len(leases_a), len(leases_b))
1074
for i in range(len(leases_a)):
1077
self.failUnlessEqual(a.owner_num, b.owner_num)
1078
self.failUnlessEqual(a.renew_secret, b.renew_secret)
1079
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1080
self.failUnlessEqual(a.nodeid, b.nodeid)
1082
def compare_leases(self, leases_a, leases_b):
1083
self.failUnlessEqual(len(leases_a), len(leases_b))
1084
for i in range(len(leases_a)):
1087
self.failUnlessEqual(a.owner_num, b.owner_num)
1088
self.failUnlessEqual(a.renew_secret, b.renew_secret)
1089
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
1090
self.failUnlessEqual(a.nodeid, b.nodeid)
1091
self.failUnlessEqual(a.expiration_time, b.expiration_time)
1093
def test_leases(self):
1094
ss = self.create("test_leases")
1096
return ( self.write_enabler("we1"),
1097
self.renew_secret("we1-%d" % n),
1098
self.cancel_secret("we1-%d" % n) )
1099
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1100
write = ss.remote_slot_testv_and_readv_and_writev
1101
read = ss.remote_slot_readv
1102
rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1103
self.failUnlessEqual(rc, (True, {}))
1105
# create a random non-numeric file in the bucket directory, to
1106
# exercise the code that's supposed to ignore those.
1107
bucket_dir = os.path.join(self.workdir("test_leases"),
1108
"shares", storage_index_to_dir("si1"))
1109
f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1110
f.write("you ought to be ignoring me\n")
1113
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1114
self.failUnlessEqual(len(list(s0.get_leases())), 1)
1116
# add-lease on a missing storage index is silently ignored
1117
self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1119
# re-allocate the slots and use the same secrets, that should update
1121
write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1122
self.failUnlessEqual(len(list(s0.get_leases())), 1)
1125
ss.remote_renew_lease("si1", secrets(0)[1])
1126
self.failUnlessEqual(len(list(s0.get_leases())), 1)
1128
# now allocate them with a bunch of different secrets, to trigger the
1129
# extended lease code. Use add_lease for one of them.
1130
write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1131
self.failUnlessEqual(len(list(s0.get_leases())), 2)
1132
secrets2 = secrets(2)
1133
ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1134
self.failUnlessEqual(len(list(s0.get_leases())), 3)
1135
write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1136
write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1137
write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1139
self.failUnlessEqual(len(list(s0.get_leases())), 6)
1141
# cancel one of them
1142
ss.remote_cancel_lease("si1", secrets(5)[2])
1143
self.failUnlessEqual(len(list(s0.get_leases())), 5)
1145
all_leases = list(s0.get_leases())
1146
# and write enough data to expand the container, forcing the server
1147
# to move the leases
1148
write("si1", secrets(0),
1149
{0: ([], [(0,data)], 200), },
1152
# read back the leases, make sure they're still intact.
1153
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1155
ss.remote_renew_lease("si1", secrets(0)[1])
1156
ss.remote_renew_lease("si1", secrets(1)[1])
1157
ss.remote_renew_lease("si1", secrets(2)[1])
1158
ss.remote_renew_lease("si1", secrets(3)[1])
1159
ss.remote_renew_lease("si1", secrets(4)[1])
1160
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1161
# get a new copy of the leases, with the current timestamps. Reading
1162
# data and failing to renew/cancel leases should leave the timestamps
1164
all_leases = list(s0.get_leases())
1165
# renewing with a bogus token should prompt an error message
1167
# examine the exception thus raised, make sure the old nodeid is
1168
# present, to provide for share migration
1169
e = self.failUnlessRaises(IndexError,
1170
ss.remote_renew_lease, "si1",
1173
self.failUnless("Unable to renew non-existent lease" in e_s)
1174
self.failUnless("I have leases accepted by nodeids:" in e_s)
1175
self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1177
# same for cancelling
1178
self.failUnlessRaises(IndexError,
1179
ss.remote_cancel_lease, "si1",
1181
self.compare_leases(all_leases, list(s0.get_leases()))
1183
# reading shares should not modify the timestamp
1184
read("si1", [], [(0,200)])
1185
self.compare_leases(all_leases, list(s0.get_leases()))
1187
write("si1", secrets(0),
1188
{0: ([], [(200, "make me bigger")], None)}, [])
1189
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1191
write("si1", secrets(0),
1192
{0: ([], [(500, "make me really bigger")], None)}, [])
1193
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1195
# now cancel them all
1196
ss.remote_cancel_lease("si1", secrets(0)[2])
1197
ss.remote_cancel_lease("si1", secrets(1)[2])
1198
ss.remote_cancel_lease("si1", secrets(2)[2])
1199
ss.remote_cancel_lease("si1", secrets(3)[2])
1201
# the slot should still be there
1202
remaining_shares = read("si1", [], [(0,10)])
1203
self.failUnlessEqual(len(remaining_shares), 1)
1204
self.failUnlessEqual(len(list(s0.get_leases())), 1)
1206
# cancelling a non-existent lease should raise an IndexError
1207
self.failUnlessRaises(IndexError,
1208
ss.remote_cancel_lease, "si1", "nonsecret")
1210
# and the slot should still be there
1211
remaining_shares = read("si1", [], [(0,10)])
1212
self.failUnlessEqual(len(remaining_shares), 1)
1213
self.failUnlessEqual(len(list(s0.get_leases())), 1)
1215
ss.remote_cancel_lease("si1", secrets(4)[2])
1216
# now the slot should be gone
1217
no_shares = read("si1", [], [(0,10)])
1218
self.failUnlessEqual(no_shares, {})
1220
# cancelling a lease on a non-existent share should raise an IndexError
1221
self.failUnlessRaises(IndexError,
1222
ss.remote_cancel_lease, "si2", "nonsecret")
1224
def test_remove(self):
1225
ss = self.create("test_remove")
1226
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1228
readv = ss.remote_slot_readv
1229
writev = ss.remote_slot_testv_and_readv_and_writev
1230
secrets = ( self.write_enabler("we1"),
1231
self.renew_secret("we1"),
1232
self.cancel_secret("we1") )
1233
# delete sh0 by setting its size to zero
1234
answer = writev("si1", secrets,
1237
# the answer should mention all the shares that existed before the
1239
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1240
# but a new read should show only sh1 and sh2
1241
self.failUnlessEqual(readv("si1", [], [(0,10)]),
1244
# delete sh1 by setting its size to zero
1245
answer = writev("si1", secrets,
1248
self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1249
self.failUnlessEqual(readv("si1", [], [(0,10)]),
1252
# delete sh2 by setting its size to zero
1253
answer = writev("si1", secrets,
1256
self.failUnlessEqual(answer, (True, {2:[]}) )
1257
self.failUnlessEqual(readv("si1", [], [(0,10)]),
1259
# and the bucket directory should now be gone
1260
si = base32.b2a("si1")
1261
# note: this is a detail of the storage server implementation, and
1262
# may change in the future
1264
prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1265
bucketdir = os.path.join(prefixdir, si)
1266
self.failUnless(os.path.exists(prefixdir))
1267
self.failIf(os.path.exists(bucketdir))
1269
class Stats(unittest.TestCase):
1272
self.sparent = LoggingServiceParent()
1273
self._lease_secret = itertools.count()
1275
return self.sparent.stopService()
1277
def workdir(self, name):
1278
basedir = os.path.join("storage", "Server", name)
1281
def create(self, name):
1282
workdir = self.workdir(name)
1283
ss = StorageServer(workdir, "\x00" * 20)
1284
ss.setServiceParent(self.sparent)
1287
def test_latencies(self):
1288
ss = self.create("test_latencies")
1289
for i in range(10000):
1290
ss.add_latency("allocate", 1.0 * i)
1291
for i in range(1000):
1292
ss.add_latency("renew", 1.0 * i)
1294
ss.add_latency("cancel", 2.0 * i)
1295
ss.add_latency("get", 5.0)
1297
output = ss.get_latencies()
1299
self.failUnlessEqual(sorted(output.keys()),
1300
sorted(["allocate", "renew", "cancel", "get"]))
1301
self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1302
self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1303
self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1304
self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1305
self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1306
self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1307
self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1308
self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1309
self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1311
self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1312
self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1313
self.failUnless(abs(output["renew"]["01_0_percentile"] - 10) < 1)
1314
self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1315
self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1316
self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1317
self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1318
self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1319
self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1321
self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1322
self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1323
self.failUnless(abs(output["cancel"]["01_0_percentile"] - 0) < 1)
1324
self.failUnless(abs(output["cancel"]["10_0_percentile"] - 2) < 1)
1325
self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1326
self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1327
self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1328
self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1329
self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1331
self.failUnlessEqual(len(ss.latencies["get"]), 1)
1332
self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1333
self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1334
self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1335
self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1336
self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1337
self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1338
self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1339
self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1342
s = re.sub(r'<[^>]*>', ' ', s)
1343
s = re.sub(r'\s+', ' ', s)
1346
class MyBucketCountingCrawler(BucketCountingCrawler):
1347
def finished_prefix(self, cycle, prefix):
1348
BucketCountingCrawler.finished_prefix(self, cycle, prefix)
1350
d = self.hook_ds.pop(0)
1353
class MyStorageServer(StorageServer):
1354
def add_bucket_counter(self):
1355
statefile = os.path.join(self.storedir, "bucket_counter.state")
1356
self.bucket_counter = MyBucketCountingCrawler(self, statefile)
1357
self.bucket_counter.setServiceParent(self)
1359
class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1362
self.s = service.MultiService()
1363
self.s.startService()
1365
return self.s.stopService()
1367
def test_bucket_counter(self):
1368
basedir = "storage/BucketCounter/bucket_counter"
1369
fileutil.make_dirs(basedir)
1370
ss = StorageServer(basedir, "\x00" * 20)
1371
# to make sure we capture the bucket-counting-crawler in the middle
1372
# of a cycle, we reach in and reduce its maximum slice time to 0. We
1373
# also make it start sooner than usual.
1374
ss.bucket_counter.slow_start = 0
1375
orig_cpu_slice = ss.bucket_counter.cpu_slice
1376
ss.bucket_counter.cpu_slice = 0
1377
ss.setServiceParent(self.s)
1379
w = StorageStatus(ss)
1381
# this sample is before the crawler has started doing anything
1382
html = w.renderSynchronously()
1383
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1384
s = remove_tags(html)
1385
self.failUnless("Accepting new shares: Yes" in s, s)
1386
self.failUnless("Reserved space: - 0 B (0)" in s, s)
1387
self.failUnless("Total buckets: Not computed yet" in s, s)
1388
self.failUnless("Next crawl in" in s, s)
1390
# give the bucket-counting-crawler one tick to get started. The
1391
# cpu_slice=0 will force it to yield right after it processes the
1394
d = fireEventually()
1395
def _check(ignored):
1396
# are we really right after the first prefix?
1397
state = ss.bucket_counter.get_state()
1398
self.failUnlessEqual(state["last-complete-prefix"],
1399
ss.bucket_counter.prefixes[0])
1400
ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1401
html = w.renderSynchronously()
1402
s = remove_tags(html)
1403
self.failUnless(" Current crawl " in s, s)
1404
self.failUnless(" (next work in " in s, s)
1405
d.addCallback(_check)
1407
# now give it enough time to complete a full cycle
1409
return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1410
d.addCallback(lambda ignored: self.poll(_watch))
1411
def _check2(ignored):
1412
ss.bucket_counter.cpu_slice = orig_cpu_slice
1413
html = w.renderSynchronously()
1414
s = remove_tags(html)
1415
self.failUnless("Total buckets: 0 (the number of" in s, s)
1416
self.failUnless("Next crawl in 59 minutes" in s, s)
1417
d.addCallback(_check2)
1420
def test_bucket_counter_cleanup(self):
1421
basedir = "storage/BucketCounter/bucket_counter_cleanup"
1422
fileutil.make_dirs(basedir)
1423
ss = StorageServer(basedir, "\x00" * 20)
1424
# to make sure we capture the bucket-counting-crawler in the middle
1425
# of a cycle, we reach in and reduce its maximum slice time to 0.
1426
ss.bucket_counter.slow_start = 0
1427
orig_cpu_slice = ss.bucket_counter.cpu_slice
1428
ss.bucket_counter.cpu_slice = 0
1429
ss.setServiceParent(self.s)
1431
d = fireEventually()
1433
def _after_first_prefix(ignored):
1434
ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1435
# now sneak in and mess with its state, to make sure it cleans up
1436
# properly at the end of the cycle
1437
state = ss.bucket_counter.state
1438
self.failUnlessEqual(state["last-complete-prefix"],
1439
ss.bucket_counter.prefixes[0])
1440
state["bucket-counts"][-12] = {}
1441
state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1442
ss.bucket_counter.save_state()
1443
d.addCallback(_after_first_prefix)
1445
# now give it enough time to complete a cycle
1447
return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1448
d.addCallback(lambda ignored: self.poll(_watch))
1449
def _check2(ignored):
1450
ss.bucket_counter.cpu_slice = orig_cpu_slice
1451
s = ss.bucket_counter.get_state()
1452
self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
1453
self.failIf("bogusprefix!" in s["storage-index-samples"],
1454
s["storage-index-samples"].keys())
1455
d.addCallback(_check2)
1458
def test_bucket_counter_eta(self):
1459
basedir = "storage/BucketCounter/bucket_counter_eta"
1460
fileutil.make_dirs(basedir)
1461
ss = MyStorageServer(basedir, "\x00" * 20)
1462
ss.bucket_counter.slow_start = 0
1463
# these will be fired inside finished_prefix()
1464
hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
1465
w = StorageStatus(ss)
1467
d = defer.Deferred()
1469
def _check_1(ignored):
1470
# no ETA is available yet
1471
html = w.renderSynchronously()
1472
s = remove_tags(html)
1473
self.failUnlessIn("complete (next work", s)
1475
def _check_2(ignored):
1476
# one prefix has finished, so an ETA based upon that elapsed time
1477
# should be available.
1478
html = w.renderSynchronously()
1479
s = remove_tags(html)
1480
self.failUnlessIn("complete (ETA ", s)
1482
def _check_3(ignored):
1483
# two prefixes have finished
1484
html = w.renderSynchronously()
1485
s = remove_tags(html)
1486
self.failUnlessIn("complete (ETA ", s)
1489
hooks[0].addCallback(_check_1).addErrback(d.errback)
1490
hooks[1].addCallback(_check_2).addErrback(d.errback)
1491
hooks[2].addCallback(_check_3).addErrback(d.errback)
1493
ss.setServiceParent(self.s)
1496
class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
1497
stop_after_first_bucket = False
1498
def process_bucket(self, *args, **kwargs):
1499
LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
1500
if self.stop_after_first_bucket:
1501
self.stop_after_first_bucket = False
1502
self.cpu_slice = -1.0
1503
def yielding(self, sleep_time):
1504
if not self.stop_after_first_bucket:
1505
self.cpu_slice = 500
1507
class BrokenStatResults:
1509
class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
1512
bsr = BrokenStatResults()
1513
for attrname in dir(s):
1514
if attrname.startswith("_"):
1516
if attrname == "st_blocks":
1518
setattr(bsr, attrname, getattr(s, attrname))
1521
class InstrumentedStorageServer(StorageServer):
1522
LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
1523
class No_ST_BLOCKS_StorageServer(StorageServer):
1524
LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
1526
class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
1529
self.s = service.MultiService()
1530
self.s.startService()
1532
return self.s.stopService()
1534
def make_shares(self, ss):
1536
return (si, hashutil.tagged_hash("renew", si),
1537
hashutil.tagged_hash("cancel", si))
1538
def make_mutable(si):
1539
return (si, hashutil.tagged_hash("renew", si),
1540
hashutil.tagged_hash("cancel", si),
1541
hashutil.tagged_hash("write-enabler", si))
1542
def make_extra_lease(si, num):
1543
return (hashutil.tagged_hash("renew-%d" % num, si),
1544
hashutil.tagged_hash("cancel-%d" % num, si))
1546
immutable_si_0, rs0, cs0 = make("\x00" * 16)
1547
immutable_si_1, rs1, cs1 = make("\x01" * 16)
1548
rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
1549
mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
1550
mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
1551
rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
1553
canary = FakeCanary()
1554
# note: 'tahoe debug dump-share' will not handle this file, since the
1555
# inner contents are not a valid CHK share
1556
data = "\xff" * 1000
1558
a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1560
w[0].remote_write(0, data)
1563
a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1565
w[0].remote_write(0, data)
1567
ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
1569
writev = ss.remote_slot_testv_and_readv_and_writev
1570
writev(mutable_si_2, (we2, rs2, cs2),
1571
{0: ([], [(0,data)], len(data))}, [])
1572
writev(mutable_si_3, (we3, rs3, cs3),
1573
{0: ([], [(0,data)], len(data))}, [])
1574
ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
1576
self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
1577
self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
1578
self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
1580
def test_basic(self):
1581
basedir = "storage/LeaseCrawler/basic"
1582
fileutil.make_dirs(basedir)
1583
ss = InstrumentedStorageServer(basedir, "\x00" * 20)
1584
# make it start sooner than usual.
1585
lc = ss.lease_checker
1588
lc.stop_after_first_bucket = True
1589
webstatus = StorageStatus(ss)
1591
# create a few shares, with some leases on them
1592
self.make_shares(ss)
1593
[immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1595
# add a non-sharefile to exercise another code path
1596
fn = os.path.join(ss.sharedir,
1597
storage_index_to_dir(immutable_si_0),
1600
f.write("I am not a share.\n")
1603
# this is before the crawl has started, so we're not in a cycle yet
1604
initial_state = lc.get_state()
1605
self.failIf(lc.get_progress()["cycle-in-progress"])
1606
self.failIf("cycle-to-date" in initial_state)
1607
self.failIf("estimated-remaining-cycle" in initial_state)
1608
self.failIf("estimated-current-cycle" in initial_state)
1609
self.failUnless("history" in initial_state)
1610
self.failUnlessEqual(initial_state["history"], {})
1612
ss.setServiceParent(self.s)
1616
d = fireEventually()
1618
# now examine the state right after the first bucket has been
1620
def _after_first_bucket(ignored):
1621
initial_state = lc.get_state()
1622
self.failUnless("cycle-to-date" in initial_state)
1623
self.failUnless("estimated-remaining-cycle" in initial_state)
1624
self.failUnless("estimated-current-cycle" in initial_state)
1625
self.failUnless("history" in initial_state)
1626
self.failUnlessEqual(initial_state["history"], {})
1628
so_far = initial_state["cycle-to-date"]
1629
self.failUnlessEqual(so_far["expiration-enabled"], False)
1630
self.failUnless("configured-expiration-mode" in so_far)
1631
self.failUnless("lease-age-histogram" in so_far)
1632
lah = so_far["lease-age-histogram"]
1633
self.failUnlessEqual(type(lah), list)
1634
self.failUnlessEqual(len(lah), 1)
1635
self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
1636
self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
1637
self.failUnlessEqual(so_far["corrupt-shares"], [])
1638
sr1 = so_far["space-recovered"]
1639
self.failUnlessEqual(sr1["examined-buckets"], 1)
1640
self.failUnlessEqual(sr1["examined-shares"], 1)
1641
self.failUnlessEqual(sr1["actual-shares"], 0)
1642
self.failUnlessEqual(sr1["configured-diskbytes"], 0)
1643
self.failUnlessEqual(sr1["original-sharebytes"], 0)
1644
left = initial_state["estimated-remaining-cycle"]
1645
sr2 = left["space-recovered"]
1646
self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
1647
self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
1648
self.failIfEqual(sr2["actual-shares"], None)
1649
self.failIfEqual(sr2["configured-diskbytes"], None)
1650
self.failIfEqual(sr2["original-sharebytes"], None)
1651
d.addCallback(_after_first_bucket)
1652
d.addCallback(lambda ign: self.render1(webstatus))
1653
def _check_html_in_cycle(html):
1654
s = remove_tags(html)
1655
self.failUnlessIn("So far, this cycle has examined "
1656
"1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
1657
self.failUnlessIn("and has recovered: "
1658
"0 shares, 0 buckets (0 mutable / 0 immutable), "
1659
"0 B (0 B / 0 B)", s)
1660
self.failUnlessIn("If expiration were enabled, "
1661
"we would have recovered: "
1662
"0 shares, 0 buckets (0 mutable / 0 immutable),"
1663
" 0 B (0 B / 0 B) by now", s)
1664
self.failUnlessIn("and the remainder of this cycle "
1665
"would probably recover: "
1666
"0 shares, 0 buckets (0 mutable / 0 immutable),"
1667
" 0 B (0 B / 0 B)", s)
1668
self.failUnlessIn("and the whole cycle would probably recover: "
1669
"0 shares, 0 buckets (0 mutable / 0 immutable),"
1670
" 0 B (0 B / 0 B)", s)
1671
self.failUnlessIn("if we were strictly using each lease's default "
1672
"31-day lease lifetime", s)
1673
self.failUnlessIn("this cycle would be expected to recover: ", s)
1674
d.addCallback(_check_html_in_cycle)
1676
# wait for the crawler to finish the first cycle. Nothing should have
1679
return bool(lc.get_state()["last-cycle-finished"] is not None)
1680
d.addCallback(lambda ign: self.poll(_wait))
1682
def _after_first_cycle(ignored):
1684
self.failIf("cycle-to-date" in s)
1685
self.failIf("estimated-remaining-cycle" in s)
1686
self.failIf("estimated-current-cycle" in s)
1687
last = s["history"][0]
1688
self.failUnless("cycle-start-finish-times" in last)
1689
self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
1690
self.failUnlessEqual(last["expiration-enabled"], False)
1691
self.failUnless("configured-expiration-mode" in last)
1693
self.failUnless("lease-age-histogram" in last)
1694
lah = last["lease-age-histogram"]
1695
self.failUnlessEqual(type(lah), list)
1696
self.failUnlessEqual(len(lah), 1)
1697
self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
1699
self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1700
self.failUnlessEqual(last["corrupt-shares"], [])
1702
rec = last["space-recovered"]
1703
self.failUnlessEqual(rec["examined-buckets"], 4)
1704
self.failUnlessEqual(rec["examined-shares"], 4)
1705
self.failUnlessEqual(rec["actual-buckets"], 0)
1706
self.failUnlessEqual(rec["original-buckets"], 0)
1707
self.failUnlessEqual(rec["configured-buckets"], 0)
1708
self.failUnlessEqual(rec["actual-shares"], 0)
1709
self.failUnlessEqual(rec["original-shares"], 0)
1710
self.failUnlessEqual(rec["configured-shares"], 0)
1711
self.failUnlessEqual(rec["actual-diskbytes"], 0)
1712
self.failUnlessEqual(rec["original-diskbytes"], 0)
1713
self.failUnlessEqual(rec["configured-diskbytes"], 0)
1714
self.failUnlessEqual(rec["actual-sharebytes"], 0)
1715
self.failUnlessEqual(rec["original-sharebytes"], 0)
1716
self.failUnlessEqual(rec["configured-sharebytes"], 0)
1718
def _get_sharefile(si):
1719
return list(ss._iter_share_files(si))[0]
1720
def count_leases(si):
1721
return len(list(_get_sharefile(si).get_leases()))
1722
self.failUnlessEqual(count_leases(immutable_si_0), 1)
1723
self.failUnlessEqual(count_leases(immutable_si_1), 2)
1724
self.failUnlessEqual(count_leases(mutable_si_2), 1)
1725
self.failUnlessEqual(count_leases(mutable_si_3), 2)
1726
d.addCallback(_after_first_cycle)
1727
d.addCallback(lambda ign: self.render1(webstatus))
1728
def _check_html(html):
1729
s = remove_tags(html)
1730
self.failUnlessIn("recovered: 0 shares, 0 buckets "
1731
"(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
1732
"but expiration was not enabled", s)
1733
d.addCallback(_check_html)
1734
d.addCallback(lambda ign: self.render_json(webstatus))
1735
def _check_json(json):
1736
data = simplejson.loads(json)
1737
self.failUnless("lease-checker" in data)
1738
self.failUnless("lease-checker-progress" in data)
1739
d.addCallback(_check_json)
1742
def backdate_lease(self, sf, renew_secret, new_expire_time):
1743
# ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
1744
# "renew" a lease with a new_expire_time that is older than what the
1745
# current lease has), so we have to reach inside it.
1746
for i,lease in enumerate(sf.get_leases()):
1747
if lease.renew_secret == renew_secret:
1748
lease.expiration_time = new_expire_time
1749
f = open(sf.home, 'rb+')
1750
sf._write_lease_record(f, i, lease)
1753
raise IndexError("unable to renew non-existent lease")
1755
def test_expire_age(self):
1756
basedir = "storage/LeaseCrawler/expire_age"
1757
fileutil.make_dirs(basedir)
1758
# setting expiration_time to 2000 means that any lease which is more
1759
# than 2000s old will be expired.
1760
ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1761
expiration_enabled=True,
1762
expiration_mode="age",
1763
expiration_override_lease_duration=2000)
1764
# make it start sooner than usual.
1765
lc = ss.lease_checker
1767
lc.stop_after_first_bucket = True
1768
webstatus = StorageStatus(ss)
1770
# create a few shares, with some leases on them
1771
self.make_shares(ss)
1772
[immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1774
def count_shares(si):
1775
return len(list(ss._iter_share_files(si)))
1776
def _get_sharefile(si):
1777
return list(ss._iter_share_files(si))[0]
1778
def count_leases(si):
1779
return len(list(_get_sharefile(si).get_leases()))
1781
self.failUnlessEqual(count_shares(immutable_si_0), 1)
1782
self.failUnlessEqual(count_leases(immutable_si_0), 1)
1783
self.failUnlessEqual(count_shares(immutable_si_1), 1)
1784
self.failUnlessEqual(count_leases(immutable_si_1), 2)
1785
self.failUnlessEqual(count_shares(mutable_si_2), 1)
1786
self.failUnlessEqual(count_leases(mutable_si_2), 1)
1787
self.failUnlessEqual(count_shares(mutable_si_3), 1)
1788
self.failUnlessEqual(count_leases(mutable_si_3), 2)
1790
# artificially crank back the expiration time on the first lease of
1791
# each share, to make it look like it expired already (age=1000s).
1792
# Some shares have an extra lease which is set to expire at the
1793
# default time in 31 days from now (age=31days). We then run the
1794
# crawler, which will expire the first lease, making some shares get
1795
# deleted and others stay alive (with one remaining lease)
1798
sf0 = _get_sharefile(immutable_si_0)
1799
self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
1800
sf0_size = os.stat(sf0.home).st_size
1802
# immutable_si_1 gets an extra lease
1803
sf1 = _get_sharefile(immutable_si_1)
1804
self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
1806
sf2 = _get_sharefile(mutable_si_2)
1807
self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
1808
sf2_size = os.stat(sf2.home).st_size
1810
# mutable_si_3 gets an extra lease
1811
sf3 = _get_sharefile(mutable_si_3)
1812
self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
1814
ss.setServiceParent(self.s)
1816
d = fireEventually()
1817
# examine the state right after the first bucket has been processed
1818
def _after_first_bucket(ignored):
1819
p = lc.get_progress()
1820
self.failUnless(p["cycle-in-progress"])
1821
d.addCallback(_after_first_bucket)
1822
d.addCallback(lambda ign: self.render1(webstatus))
1823
def _check_html_in_cycle(html):
1824
s = remove_tags(html)
1825
# the first bucket encountered gets deleted, and its prefix
1826
# happens to be about 1/5th of the way through the ring, so the
1827
# predictor thinks we'll have 5 shares and that we'll delete them
1828
# all. This part of the test depends upon the SIs landing right
1829
# where they do now.
1830
self.failUnlessIn("The remainder of this cycle is expected to "
1831
"recover: 4 shares, 4 buckets", s)
1832
self.failUnlessIn("The whole cycle is expected to examine "
1833
"5 shares in 5 buckets and to recover: "
1834
"5 shares, 5 buckets", s)
1835
d.addCallback(_check_html_in_cycle)
1837
# wait for the crawler to finish the first cycle. Two shares should
1840
return bool(lc.get_state()["last-cycle-finished"] is not None)
1841
d.addCallback(lambda ign: self.poll(_wait))
1843
def _after_first_cycle(ignored):
1844
self.failUnlessEqual(count_shares(immutable_si_0), 0)
1845
self.failUnlessEqual(count_shares(immutable_si_1), 1)
1846
self.failUnlessEqual(count_leases(immutable_si_1), 1)
1847
self.failUnlessEqual(count_shares(mutable_si_2), 0)
1848
self.failUnlessEqual(count_shares(mutable_si_3), 1)
1849
self.failUnlessEqual(count_leases(mutable_si_3), 1)
1852
last = s["history"][0]
1854
self.failUnlessEqual(last["expiration-enabled"], True)
1855
self.failUnlessEqual(last["configured-expiration-mode"],
1856
("age", 2000, None, ("mutable", "immutable")))
1857
self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
1859
rec = last["space-recovered"]
1860
self.failUnlessEqual(rec["examined-buckets"], 4)
1861
self.failUnlessEqual(rec["examined-shares"], 4)
1862
self.failUnlessEqual(rec["actual-buckets"], 2)
1863
self.failUnlessEqual(rec["original-buckets"], 2)
1864
self.failUnlessEqual(rec["configured-buckets"], 2)
1865
self.failUnlessEqual(rec["actual-shares"], 2)
1866
self.failUnlessEqual(rec["original-shares"], 2)
1867
self.failUnlessEqual(rec["configured-shares"], 2)
1868
size = sf0_size + sf2_size
1869
self.failUnlessEqual(rec["actual-sharebytes"], size)
1870
self.failUnlessEqual(rec["original-sharebytes"], size)
1871
self.failUnlessEqual(rec["configured-sharebytes"], size)
1872
# different platforms have different notions of "blocks used by
1873
# this file", so merely assert that it's a number
1874
self.failUnless(rec["actual-diskbytes"] >= 0,
1875
rec["actual-diskbytes"])
1876
self.failUnless(rec["original-diskbytes"] >= 0,
1877
rec["original-diskbytes"])
1878
self.failUnless(rec["configured-diskbytes"] >= 0,
1879
rec["configured-diskbytes"])
1880
d.addCallback(_after_first_cycle)
1881
d.addCallback(lambda ign: self.render1(webstatus))
1882
def _check_html(html):
1883
s = remove_tags(html)
1884
self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
1885
self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
1886
self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
1887
d.addCallback(_check_html)
1890
def test_expire_cutoff_date(self):
1891
basedir = "storage/LeaseCrawler/expire_cutoff_date"
1892
fileutil.make_dirs(basedir)
1893
# setting cutoff-date to 2000 seconds ago means that any lease which
1894
# is more than 2000s old will be expired.
1896
then = int(now - 2000)
1897
ss = InstrumentedStorageServer(basedir, "\x00" * 20,
1898
expiration_enabled=True,
1899
expiration_mode="cutoff-date",
1900
expiration_cutoff_date=then)
1901
# make it start sooner than usual.
1902
lc = ss.lease_checker
1904
lc.stop_after_first_bucket = True
1905
webstatus = StorageStatus(ss)
1907
# create a few shares, with some leases on them
1908
self.make_shares(ss)
1909
[immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
1911
def count_shares(si):
1912
return len(list(ss._iter_share_files(si)))
1913
def _get_sharefile(si):
1914
return list(ss._iter_share_files(si))[0]
1915
def count_leases(si):
1916
return len(list(_get_sharefile(si).get_leases()))
1918
self.failUnlessEqual(count_shares(immutable_si_0), 1)
1919
self.failUnlessEqual(count_leases(immutable_si_0), 1)
1920
self.failUnlessEqual(count_shares(immutable_si_1), 1)
1921
self.failUnlessEqual(count_leases(immutable_si_1), 2)
1922
self.failUnlessEqual(count_shares(mutable_si_2), 1)
1923
self.failUnlessEqual(count_leases(mutable_si_2), 1)
1924
self.failUnlessEqual(count_shares(mutable_si_3), 1)
1925
self.failUnlessEqual(count_leases(mutable_si_3), 2)
1927
# artificially crank back the expiration time on the first lease of
1928
# each share, to make it look like was renewed 3000s ago. To achieve
1929
# this, we need to set the expiration time to now-3000+31days. This
1930
# will change when the lease format is improved to contain both
1931
# create/renew time and duration.
1932
new_expiration_time = now - 3000 + 31*24*60*60
1934
# Some shares have an extra lease which is set to expire at the
1935
# default time in 31 days from now (age=31days). We then run the
1936
# crawler, which will expire the first lease, making some shares get
1937
# deleted and others stay alive (with one remaining lease)
1939
sf0 = _get_sharefile(immutable_si_0)
1940
self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
1941
sf0_size = os.stat(sf0.home).st_size
1943
# immutable_si_1 gets an extra lease
1944
sf1 = _get_sharefile(immutable_si_1)
1945
self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
1947
sf2 = _get_sharefile(mutable_si_2)
1948
self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
1949
sf2_size = os.stat(sf2.home).st_size
1951
# mutable_si_3 gets an extra lease
1952
sf3 = _get_sharefile(mutable_si_3)
1953
self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
1955
ss.setServiceParent(self.s)
1957
d = fireEventually()
1958
# examine the state right after the first bucket has been processed
1959
def _after_first_bucket(ignored):
1960
p = lc.get_progress()
1961
self.failUnless(p["cycle-in-progress"])
1962
d.addCallback(_after_first_bucket)
1963
d.addCallback(lambda ign: self.render1(webstatus))
1964
def _check_html_in_cycle(html):
1965
s = remove_tags(html)
1966
# the first bucket encountered gets deleted, and its prefix
1967
# happens to be about 1/5th of the way through the ring, so the
1968
# predictor thinks we'll have 5 shares and that we'll delete them
1969
# all. This part of the test depends upon the SIs landing right
1970
# where they do now.
1971
self.failUnlessIn("The remainder of this cycle is expected to "
1972
"recover: 4 shares, 4 buckets", s)
1973
self.failUnlessIn("The whole cycle is expected to examine "
1974
"5 shares in 5 buckets and to recover: "
1975
"5 shares, 5 buckets", s)
1976
d.addCallback(_check_html_in_cycle)
1978
# wait for the crawler to finish the first cycle. Two shares should
1981
return bool(lc.get_state()["last-cycle-finished"] is not None)
1982
d.addCallback(lambda ign: self.poll(_wait))
1984
def _after_first_cycle(ignored):
1985
self.failUnlessEqual(count_shares(immutable_si_0), 0)
1986
self.failUnlessEqual(count_shares(immutable_si_1), 1)
1987
self.failUnlessEqual(count_leases(immutable_si_1), 1)
1988
self.failUnlessEqual(count_shares(mutable_si_2), 0)
1989
self.failUnlessEqual(count_shares(mutable_si_3), 1)
1990
self.failUnlessEqual(count_leases(mutable_si_3), 1)
1993
last = s["history"][0]
1995
self.failUnlessEqual(last["expiration-enabled"], True)
1996
self.failUnlessEqual(last["configured-expiration-mode"],
1997
("cutoff-date", None, then,
1998
("mutable", "immutable")))
1999
self.failUnlessEqual(last["leases-per-share-histogram"],
2002
rec = last["space-recovered"]
2003
self.failUnlessEqual(rec["examined-buckets"], 4)
2004
self.failUnlessEqual(rec["examined-shares"], 4)
2005
self.failUnlessEqual(rec["actual-buckets"], 2)
2006
self.failUnlessEqual(rec["original-buckets"], 0)
2007
self.failUnlessEqual(rec["configured-buckets"], 2)
2008
self.failUnlessEqual(rec["actual-shares"], 2)
2009
self.failUnlessEqual(rec["original-shares"], 0)
2010
self.failUnlessEqual(rec["configured-shares"], 2)
2011
size = sf0_size + sf2_size
2012
self.failUnlessEqual(rec["actual-sharebytes"], size)
2013
self.failUnlessEqual(rec["original-sharebytes"], 0)
2014
self.failUnlessEqual(rec["configured-sharebytes"], size)
2015
# different platforms have different notions of "blocks used by
2016
# this file", so merely assert that it's a number
2017
self.failUnless(rec["actual-diskbytes"] >= 0,
2018
rec["actual-diskbytes"])
2019
self.failUnless(rec["original-diskbytes"] >= 0,
2020
rec["original-diskbytes"])
2021
self.failUnless(rec["configured-diskbytes"] >= 0,
2022
rec["configured-diskbytes"])
2023
d.addCallback(_after_first_cycle)
2024
d.addCallback(lambda ign: self.render1(webstatus))
2025
def _check_html(html):
2026
s = remove_tags(html)
2027
self.failUnlessIn("Expiration Enabled:"
2028
" expired leases will be removed", s)
2029
date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
2030
substr = "Leases created or last renewed before %s will be considered expired." % date
2031
self.failUnlessIn(substr, s)
2032
self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
2033
d.addCallback(_check_html)
2036
def test_only_immutable(self):
2037
basedir = "storage/LeaseCrawler/only_immutable"
2038
fileutil.make_dirs(basedir)
2040
then = int(now - 2000)
2041
ss = StorageServer(basedir, "\x00" * 20,
2042
expiration_enabled=True,
2043
expiration_mode="cutoff-date",
2044
expiration_cutoff_date=then,
2045
expiration_sharetypes=("immutable",))
2046
lc = ss.lease_checker
2048
webstatus = StorageStatus(ss)
2050
self.make_shares(ss)
2051
[immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2052
# set all leases to be expirable
2053
new_expiration_time = now - 3000 + 31*24*60*60
2055
def count_shares(si):
2056
return len(list(ss._iter_share_files(si)))
2057
def _get_sharefile(si):
2058
return list(ss._iter_share_files(si))[0]
2059
def count_leases(si):
2060
return len(list(_get_sharefile(si).get_leases()))
2062
sf0 = _get_sharefile(immutable_si_0)
2063
self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2064
sf1 = _get_sharefile(immutable_si_1)
2065
self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2066
self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2067
sf2 = _get_sharefile(mutable_si_2)
2068
self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2069
sf3 = _get_sharefile(mutable_si_3)
2070
self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2071
self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2073
ss.setServiceParent(self.s)
2075
return bool(lc.get_state()["last-cycle-finished"] is not None)
2076
d = self.poll(_wait)
2078
def _after_first_cycle(ignored):
2079
self.failUnlessEqual(count_shares(immutable_si_0), 0)
2080
self.failUnlessEqual(count_shares(immutable_si_1), 0)
2081
self.failUnlessEqual(count_shares(mutable_si_2), 1)
2082
self.failUnlessEqual(count_leases(mutable_si_2), 1)
2083
self.failUnlessEqual(count_shares(mutable_si_3), 1)
2084
self.failUnlessEqual(count_leases(mutable_si_3), 2)
2085
d.addCallback(_after_first_cycle)
2086
d.addCallback(lambda ign: self.render1(webstatus))
2087
def _check_html(html):
2088
s = remove_tags(html)
2089
self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
2090
d.addCallback(_check_html)
2093
def test_only_mutable(self):
2094
basedir = "storage/LeaseCrawler/only_mutable"
2095
fileutil.make_dirs(basedir)
2097
then = int(now - 2000)
2098
ss = StorageServer(basedir, "\x00" * 20,
2099
expiration_enabled=True,
2100
expiration_mode="cutoff-date",
2101
expiration_cutoff_date=then,
2102
expiration_sharetypes=("mutable",))
2103
lc = ss.lease_checker
2105
webstatus = StorageStatus(ss)
2107
self.make_shares(ss)
2108
[immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2109
# set all leases to be expirable
2110
new_expiration_time = now - 3000 + 31*24*60*60
2112
def count_shares(si):
2113
return len(list(ss._iter_share_files(si)))
2114
def _get_sharefile(si):
2115
return list(ss._iter_share_files(si))[0]
2116
def count_leases(si):
2117
return len(list(_get_sharefile(si).get_leases()))
2119
sf0 = _get_sharefile(immutable_si_0)
2120
self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
2121
sf1 = _get_sharefile(immutable_si_1)
2122
self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
2123
self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
2124
sf2 = _get_sharefile(mutable_si_2)
2125
self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
2126
sf3 = _get_sharefile(mutable_si_3)
2127
self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
2128
self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
2130
ss.setServiceParent(self.s)
2132
return bool(lc.get_state()["last-cycle-finished"] is not None)
2133
d = self.poll(_wait)
2135
def _after_first_cycle(ignored):
2136
self.failUnlessEqual(count_shares(immutable_si_0), 1)
2137
self.failUnlessEqual(count_leases(immutable_si_0), 1)
2138
self.failUnlessEqual(count_shares(immutable_si_1), 1)
2139
self.failUnlessEqual(count_leases(immutable_si_1), 2)
2140
self.failUnlessEqual(count_shares(mutable_si_2), 0)
2141
self.failUnlessEqual(count_shares(mutable_si_3), 0)
2142
d.addCallback(_after_first_cycle)
2143
d.addCallback(lambda ign: self.render1(webstatus))
2144
def _check_html(html):
2145
s = remove_tags(html)
2146
self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
2147
d.addCallback(_check_html)
2150
def test_bad_mode(self):
2151
basedir = "storage/LeaseCrawler/bad_mode"
2152
fileutil.make_dirs(basedir)
2153
e = self.failUnlessRaises(ValueError,
2154
StorageServer, basedir, "\x00" * 20,
2155
expiration_mode="bogus")
2156
self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
2158
def test_parse_duration(self):
2162
p = time_format.parse_duration
2163
self.failUnlessEqual(p("7days"), 7*DAY)
2164
self.failUnlessEqual(p("31day"), 31*DAY)
2165
self.failUnlessEqual(p("60 days"), 60*DAY)
2166
self.failUnlessEqual(p("2mo"), 2*MONTH)
2167
self.failUnlessEqual(p("3 month"), 3*MONTH)
2168
self.failUnlessEqual(p("2years"), 2*YEAR)
2169
e = self.failUnlessRaises(ValueError, p, "2kumquats")
2170
self.failUnless("no unit (like day, month, or year) in '2kumquats'"
2173
def test_parse_date(self):
2174
p = time_format.parse_date
2175
self.failUnless(isinstance(p("2009-03-18"), int))
2176
self.failUnlessEqual(p("2009-03-18"), 1237334400)
2178
def test_limited_history(self):
2179
basedir = "storage/LeaseCrawler/limited_history"
2180
fileutil.make_dirs(basedir)
2181
ss = StorageServer(basedir, "\x00" * 20)
2182
# make it start sooner than usual.
2183
lc = ss.lease_checker
2187
# create a few shares, with some leases on them
2188
self.make_shares(ss)
2190
ss.setServiceParent(self.s)
2192
def _wait_until_15_cycles_done():
2193
last = lc.state["last-cycle-finished"]
2194
if last is not None and last >= 15:
2199
d = self.poll(_wait_until_15_cycles_done)
2201
def _check(ignored):
2204
self.failUnlessEqual(len(h), 10)
2205
self.failUnlessEqual(max(h.keys()), 15)
2206
self.failUnlessEqual(min(h.keys()), 6)
2207
d.addCallback(_check)
2210
def test_unpredictable_future(self):
2211
basedir = "storage/LeaseCrawler/unpredictable_future"
2212
fileutil.make_dirs(basedir)
2213
ss = StorageServer(basedir, "\x00" * 20)
2214
# make it start sooner than usual.
2215
lc = ss.lease_checker
2217
lc.cpu_slice = -1.0 # stop quickly
2219
self.make_shares(ss)
2221
ss.setServiceParent(self.s)
2223
d = fireEventually()
2224
def _check(ignored):
2225
# this should fire after the first bucket is complete, but before
2226
# the first prefix is complete, so the progress-measurer won't
2227
# think we've gotten far enough to raise our percent-complete
2228
# above 0%, triggering the cannot-predict-the-future code in
2229
# expirer.py . This will have to change if/when the
2230
# progress-measurer gets smart enough to count buckets (we'll
2231
# have to interrupt it even earlier, before it's finished the
2234
self.failUnless("cycle-to-date" in s)
2235
self.failUnless("estimated-remaining-cycle" in s)
2236
self.failUnless("estimated-current-cycle" in s)
2238
left = s["estimated-remaining-cycle"]["space-recovered"]
2239
self.failUnlessEqual(left["actual-buckets"], None)
2240
self.failUnlessEqual(left["original-buckets"], None)
2241
self.failUnlessEqual(left["configured-buckets"], None)
2242
self.failUnlessEqual(left["actual-shares"], None)
2243
self.failUnlessEqual(left["original-shares"], None)
2244
self.failUnlessEqual(left["configured-shares"], None)
2245
self.failUnlessEqual(left["actual-diskbytes"], None)
2246
self.failUnlessEqual(left["original-diskbytes"], None)
2247
self.failUnlessEqual(left["configured-diskbytes"], None)
2248
self.failUnlessEqual(left["actual-sharebytes"], None)
2249
self.failUnlessEqual(left["original-sharebytes"], None)
2250
self.failUnlessEqual(left["configured-sharebytes"], None)
2252
full = s["estimated-remaining-cycle"]["space-recovered"]
2253
self.failUnlessEqual(full["actual-buckets"], None)
2254
self.failUnlessEqual(full["original-buckets"], None)
2255
self.failUnlessEqual(full["configured-buckets"], None)
2256
self.failUnlessEqual(full["actual-shares"], None)
2257
self.failUnlessEqual(full["original-shares"], None)
2258
self.failUnlessEqual(full["configured-shares"], None)
2259
self.failUnlessEqual(full["actual-diskbytes"], None)
2260
self.failUnlessEqual(full["original-diskbytes"], None)
2261
self.failUnlessEqual(full["configured-diskbytes"], None)
2262
self.failUnlessEqual(full["actual-sharebytes"], None)
2263
self.failUnlessEqual(full["original-sharebytes"], None)
2264
self.failUnlessEqual(full["configured-sharebytes"], None)
2266
d.addCallback(_check)
2269
def test_no_st_blocks(self):
2270
basedir = "storage/LeaseCrawler/no_st_blocks"
2271
fileutil.make_dirs(basedir)
2272
ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
2273
expiration_mode="age",
2274
expiration_override_lease_duration=-1000)
2275
# a negative expiration_time= means the "configured-"
2276
# space-recovered counts will be non-zero, since all shares will have
2279
# make it start sooner than usual.
2280
lc = ss.lease_checker
2283
self.make_shares(ss)
2284
ss.setServiceParent(self.s)
2286
return bool(lc.get_state()["last-cycle-finished"] is not None)
2287
d = self.poll(_wait)
2289
def _check(ignored):
2291
last = s["history"][0]
2292
rec = last["space-recovered"]
2293
self.failUnlessEqual(rec["configured-buckets"], 4)
2294
self.failUnlessEqual(rec["configured-shares"], 4)
2295
self.failUnless(rec["configured-sharebytes"] > 0,
2296
rec["configured-sharebytes"])
2297
# without the .st_blocks field in os.stat() results, we should be
2298
# reporting diskbytes==sharebytes
2299
self.failUnlessEqual(rec["configured-sharebytes"],
2300
rec["configured-diskbytes"])
2301
d.addCallback(_check)
2304
def test_share_corruption(self):
2305
self._poll_should_ignore_these_errors = [
2306
UnknownMutableContainerVersionError,
2307
UnknownImmutableContainerVersionError,
2309
basedir = "storage/LeaseCrawler/share_corruption"
2310
fileutil.make_dirs(basedir)
2311
ss = InstrumentedStorageServer(basedir, "\x00" * 20)
2312
w = StorageStatus(ss)
2313
# make it start sooner than usual.
2314
lc = ss.lease_checker
2315
lc.stop_after_first_bucket = True
2319
# create a few shares, with some leases on them
2320
self.make_shares(ss)
2322
# now corrupt one, and make sure the lease-checker keeps going
2323
[immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
2324
first = min(self.sis)
2325
first_b32 = base32.b2a(first)
2326
fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
2329
f.write("BAD MAGIC")
2331
# if get_share_file() doesn't see the correct mutable magic, it
2332
# assumes the file is an immutable share, and then
2333
# immutable.ShareFile sees a bad version. So regardless of which kind
2334
# of share we corrupted, this will trigger an
2335
# UnknownImmutableContainerVersionError.
2337
# also create an empty bucket
2338
empty_si = base32.b2a("\x04"*16)
2339
empty_bucket_dir = os.path.join(ss.sharedir,
2340
storage_index_to_dir(empty_si))
2341
fileutil.make_dirs(empty_bucket_dir)
2343
ss.setServiceParent(self.s)
2345
d = fireEventually()
2347
# now examine the state right after the first bucket has been
2349
def _after_first_bucket(ignored):
2350
so_far = lc.get_state()["cycle-to-date"]
2351
rec = so_far["space-recovered"]
2352
self.failUnlessEqual(rec["examined-buckets"], 1)
2353
self.failUnlessEqual(rec["examined-shares"], 0)
2354
self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
2355
d.addCallback(_after_first_bucket)
2357
d.addCallback(lambda ign: self.render_json(w))
2358
def _check_json(json):
2359
data = simplejson.loads(json)
2360
# grr. json turns all dict keys into strings.
2361
so_far = data["lease-checker"]["cycle-to-date"]
2362
corrupt_shares = so_far["corrupt-shares"]
2363
# it also turns all tuples into lists
2364
self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2365
d.addCallback(_check_json)
2366
d.addCallback(lambda ign: self.render1(w))
2367
def _check_html(html):
2368
s = remove_tags(html)
2369
self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2370
d.addCallback(_check_html)
2373
return bool(lc.get_state()["last-cycle-finished"] is not None)
2374
d.addCallback(lambda ign: self.poll(_wait))
2376
def _after_first_cycle(ignored):
2378
last = s["history"][0]
2379
rec = last["space-recovered"]
2380
self.failUnlessEqual(rec["examined-buckets"], 5)
2381
self.failUnlessEqual(rec["examined-shares"], 3)
2382
self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
2383
d.addCallback(_after_first_cycle)
2384
d.addCallback(lambda ign: self.render_json(w))
2385
def _check_json_history(json):
2386
data = simplejson.loads(json)
2387
last = data["lease-checker"]["history"]["0"]
2388
corrupt_shares = last["corrupt-shares"]
2389
self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
2390
d.addCallback(_check_json_history)
2391
d.addCallback(lambda ign: self.render1(w))
2392
def _check_html_history(html):
2393
s = remove_tags(html)
2394
self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
2395
d.addCallback(_check_html_history)
2398
self.flushLoggedErrors(UnknownMutableContainerVersionError,
2399
UnknownImmutableContainerVersionError)
2404
def render_json(self, page):
2405
d = self.render1(page, args={"t": ["json"]})
2408
class NoStatvfsServer(StorageServer):
2409
def do_statvfs(self):
2410
raise AttributeError
2412
class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
2415
self.s = service.MultiService()
2416
self.s.startService()
2418
return self.s.stopService()
2420
def test_no_server(self):
2421
w = StorageStatus(None)
2422
html = w.renderSynchronously()
2423
self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
2425
def test_status(self):
2426
basedir = "storage/WebStatus/status"
2427
fileutil.make_dirs(basedir)
2428
ss = StorageServer(basedir, "\x00" * 20)
2429
ss.setServiceParent(self.s)
2430
w = StorageStatus(ss)
2432
def _check_html(html):
2433
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2434
s = remove_tags(html)
2435
self.failUnless("Accepting new shares: Yes" in s, s)
2436
self.failUnless("Reserved space: - 0 B (0)" in s, s)
2437
d.addCallback(_check_html)
2438
d.addCallback(lambda ign: self.render_json(w))
2439
def _check_json(json):
2440
data = simplejson.loads(json)
2442
self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
2443
self.failUnlessEqual(s["storage_server.reserved_space"], 0)
2444
self.failUnless("bucket-counter" in data)
2445
self.failUnless("lease-checker" in data)
2446
d.addCallback(_check_json)
2449
def render_json(self, page):
2450
d = self.render1(page, args={"t": ["json"]})
2453
def test_status_no_statvfs(self):
2454
# windows has no os.statvfs . Make sure the code handles that even on
2456
basedir = "storage/WebStatus/status_no_statvfs"
2457
fileutil.make_dirs(basedir)
2458
ss = NoStatvfsServer(basedir, "\x00" * 20)
2459
ss.setServiceParent(self.s)
2460
w = StorageStatus(ss)
2461
html = w.renderSynchronously()
2462
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2463
s = remove_tags(html)
2464
self.failUnless("Accepting new shares: Yes" in s, s)
2465
self.failUnless("Total disk space: ?" in s, s)
2467
def test_readonly(self):
2468
basedir = "storage/WebStatus/readonly"
2469
fileutil.make_dirs(basedir)
2470
ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
2471
ss.setServiceParent(self.s)
2472
w = StorageStatus(ss)
2473
html = w.renderSynchronously()
2474
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2475
s = remove_tags(html)
2476
self.failUnless("Accepting new shares: No" in s, s)
2478
def test_reserved(self):
2479
basedir = "storage/WebStatus/reserved"
2480
fileutil.make_dirs(basedir)
2481
ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2482
ss.setServiceParent(self.s)
2483
w = StorageStatus(ss)
2484
html = w.renderSynchronously()
2485
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2486
s = remove_tags(html)
2487
self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2489
def test_huge_reserved(self):
2490
basedir = "storage/WebStatus/reserved"
2491
fileutil.make_dirs(basedir)
2492
ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
2493
ss.setServiceParent(self.s)
2494
w = StorageStatus(ss)
2495
html = w.renderSynchronously()
2496
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
2497
s = remove_tags(html)
2498
self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
2500
def test_util(self):
2501
w = StorageStatus(None)
2502
self.failUnlessEqual(w.render_space(None, None), "?")
2503
self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
2504
self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
2505
self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
2506
self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
2507
self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)