~ubuntu-branches/ubuntu/karmic/tahoe-lafs/karmic

« back to all changes in this revision

Viewing changes to src/allmydata/test/test_storage.py

  • Committer: Bazaar Package Importer
  • Author(s): Zooko O'Whielacronx (Hacker)
  • Date: 2009-09-24 00:00:05 UTC
  • Revision ID: james.westby@ubuntu.com-20090924000005-ixe2n4yngmk49ysz
Tags: upstream-1.5.0
ImportĀ upstreamĀ versionĀ 1.5.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
import time, os.path, stat, re, simplejson, struct
 
3
 
 
4
from twisted.trial import unittest
 
5
 
 
6
from twisted.internet import defer
 
7
from twisted.application import service
 
8
from foolscap.api import fireEventually
 
9
import itertools
 
10
from allmydata import interfaces
 
11
from allmydata.util import fileutil, hashutil, base32, pollmixin, time_format
 
12
from allmydata.storage.server import StorageServer
 
13
from allmydata.storage.mutable import MutableShareFile
 
14
from allmydata.storage.immutable import BucketWriter, BucketReader
 
15
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
 
16
     UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
 
17
from allmydata.storage.lease import LeaseInfo
 
18
from allmydata.storage.crawler import BucketCountingCrawler
 
19
from allmydata.storage.expirer import LeaseCheckingCrawler
 
20
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
 
21
     ReadBucketProxy
 
22
from allmydata.interfaces import BadWriteEnablerError
 
23
from allmydata.test.common import LoggingServiceParent
 
24
from allmydata.test.common_web import WebRenderingMixin
 
25
from allmydata.web.storage import StorageStatus, remove_prefix
 
26
 
 
27
class Marker:
 
28
    pass
 
29
class FakeCanary:
 
30
    def __init__(self, ignore_disconnectors=False):
 
31
        self.ignore = ignore_disconnectors
 
32
        self.disconnectors = {}
 
33
    def notifyOnDisconnect(self, f, *args, **kwargs):
 
34
        if self.ignore:
 
35
            return
 
36
        m = Marker()
 
37
        self.disconnectors[m] = (f, args, kwargs)
 
38
        return m
 
39
    def dontNotifyOnDisconnect(self, marker):
 
40
        if self.ignore:
 
41
            return
 
42
        del self.disconnectors[marker]
 
43
 
 
44
class FakeStatsProvider:
 
45
    def count(self, name, delta=1):
 
46
        pass
 
47
    def register_producer(self, producer):
 
48
        pass
 
49
 
 
50
class Bucket(unittest.TestCase):
 
51
    def make_workdir(self, name):
 
52
        basedir = os.path.join("storage", "Bucket", name)
 
53
        incoming = os.path.join(basedir, "tmp", "bucket")
 
54
        final = os.path.join(basedir, "bucket")
 
55
        fileutil.make_dirs(basedir)
 
56
        fileutil.make_dirs(os.path.join(basedir, "tmp"))
 
57
        return incoming, final
 
58
 
 
59
    def bucket_writer_closed(self, bw, consumed):
 
60
        pass
 
61
    def add_latency(self, category, latency):
 
62
        pass
 
63
    def count(self, name, delta=1):
 
64
        pass
 
65
 
 
66
    def make_lease(self):
 
67
        owner_num = 0
 
68
        renew_secret = os.urandom(32)
 
69
        cancel_secret = os.urandom(32)
 
70
        expiration_time = time.time() + 5000
 
71
        return LeaseInfo(owner_num, renew_secret, cancel_secret,
 
72
                         expiration_time, "\x00" * 20)
 
73
 
 
74
    def test_create(self):
 
75
        incoming, final = self.make_workdir("test_create")
 
76
        bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
 
77
                          FakeCanary())
 
78
        bw.remote_write(0, "a"*25)
 
79
        bw.remote_write(25, "b"*25)
 
80
        bw.remote_write(50, "c"*25)
 
81
        bw.remote_write(75, "d"*7)
 
82
        bw.remote_close()
 
83
 
 
84
    def test_readwrite(self):
 
85
        incoming, final = self.make_workdir("test_readwrite")
 
86
        bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
 
87
                          FakeCanary())
 
88
        bw.remote_write(0, "a"*25)
 
89
        bw.remote_write(25, "b"*25)
 
90
        bw.remote_write(50, "c"*7) # last block may be short
 
91
        bw.remote_close()
 
92
 
 
93
        # now read from it
 
94
        br = BucketReader(self, bw.finalhome)
 
95
        self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
 
96
        self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
 
97
        self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
 
98
 
 
99
class RemoteBucket:
 
100
 
 
101
    def callRemote(self, methname, *args, **kwargs):
 
102
        def _call():
 
103
            meth = getattr(self.target, "remote_" + methname)
 
104
            return meth(*args, **kwargs)
 
105
        return defer.maybeDeferred(_call)
 
106
 
 
107
class BucketProxy(unittest.TestCase):
 
108
    def make_bucket(self, name, size):
 
109
        basedir = os.path.join("storage", "BucketProxy", name)
 
110
        incoming = os.path.join(basedir, "tmp", "bucket")
 
111
        final = os.path.join(basedir, "bucket")
 
112
        fileutil.make_dirs(basedir)
 
113
        fileutil.make_dirs(os.path.join(basedir, "tmp"))
 
114
        bw = BucketWriter(self, incoming, final, size, self.make_lease(),
 
115
                          FakeCanary())
 
116
        rb = RemoteBucket()
 
117
        rb.target = bw
 
118
        return bw, rb, final
 
119
 
 
120
    def make_lease(self):
 
121
        owner_num = 0
 
122
        renew_secret = os.urandom(32)
 
123
        cancel_secret = os.urandom(32)
 
124
        expiration_time = time.time() + 5000
 
125
        return LeaseInfo(owner_num, renew_secret, cancel_secret,
 
126
                         expiration_time, "\x00" * 20)
 
127
 
 
128
    def bucket_writer_closed(self, bw, consumed):
 
129
        pass
 
130
    def add_latency(self, category, latency):
 
131
        pass
 
132
    def count(self, name, delta=1):
 
133
        pass
 
134
 
 
135
    def test_create(self):
 
136
        bw, rb, sharefname = self.make_bucket("test_create", 500)
 
137
        bp = WriteBucketProxy(rb,
 
138
                              data_size=300,
 
139
                              block_size=10,
 
140
                              num_segments=5,
 
141
                              num_share_hashes=3,
 
142
                              uri_extension_size_max=500, nodeid=None)
 
143
        self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
 
144
 
 
145
    def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
 
146
        # Let's pretend each share has 100 bytes of data, and that there are
 
147
        # 4 segments (25 bytes each), and 8 shares total. So the two
 
148
        # per-segment merkle trees (crypttext_hash_tree,
 
149
        # block_hashes) will have 4 leaves and 7 nodes each. The per-share
 
150
        # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
 
151
        # nodes. Furthermore, let's assume the uri_extension is 500 bytes
 
152
        # long. That should make the whole share:
 
153
        #
 
154
        # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
 
155
        # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
 
156
 
 
157
        sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
 
158
 
 
159
        crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
 
160
                            for i in range(7)]
 
161
        block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
 
162
                        for i in range(7)]
 
163
        share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
 
164
                        for i in (1,9,13)]
 
165
        uri_extension = "s" + "E"*498 + "e"
 
166
 
 
167
        bw, rb, sharefname = self.make_bucket(name, sharesize)
 
168
        bp = wbp_class(rb,
 
169
                       data_size=95,
 
170
                       block_size=25,
 
171
                       num_segments=4,
 
172
                       num_share_hashes=3,
 
173
                       uri_extension_size_max=len(uri_extension),
 
174
                       nodeid=None)
 
175
 
 
176
        d = bp.put_header()
 
177
        d.addCallback(lambda res: bp.put_block(0, "a"*25))
 
178
        d.addCallback(lambda res: bp.put_block(1, "b"*25))
 
179
        d.addCallback(lambda res: bp.put_block(2, "c"*25))
 
180
        d.addCallback(lambda res: bp.put_block(3, "d"*20))
 
181
        d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
 
182
        d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
 
183
        d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
 
184
        d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
 
185
        d.addCallback(lambda res: bp.close())
 
186
 
 
187
        # now read everything back
 
188
        def _start_reading(res):
 
189
            br = BucketReader(self, sharefname)
 
190
            rb = RemoteBucket()
 
191
            rb.target = br
 
192
            rbp = rbp_class(rb, peerid="abc", storage_index="")
 
193
            self.failUnless("to peer" in repr(rbp))
 
194
            self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
 
195
 
 
196
            d1 = rbp.get_block_data(0, 25, 25)
 
197
            d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
 
198
            d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
 
199
            d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
 
200
            d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
 
201
            d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
 
202
            d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
 
203
            d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
 
204
 
 
205
            d1.addCallback(lambda res: rbp.get_crypttext_hashes())
 
206
            d1.addCallback(lambda res:
 
207
                           self.failUnlessEqual(res, crypttext_hashes))
 
208
            d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
 
209
            d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
 
210
            d1.addCallback(lambda res: rbp.get_share_hashes())
 
211
            d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
 
212
            d1.addCallback(lambda res: rbp.get_uri_extension())
 
213
            d1.addCallback(lambda res:
 
214
                           self.failUnlessEqual(res, uri_extension))
 
215
 
 
216
            return d1
 
217
 
 
218
        d.addCallback(_start_reading)
 
219
 
 
220
        return d
 
221
 
 
222
    def test_readwrite_v1(self):
 
223
        return self._do_test_readwrite("test_readwrite_v1",
 
224
                                       0x24, WriteBucketProxy, ReadBucketProxy)
 
225
 
 
226
    def test_readwrite_v2(self):
 
227
        return self._do_test_readwrite("test_readwrite_v2",
 
228
                                       0x44, WriteBucketProxy_v2, ReadBucketProxy)
 
229
 
 
230
class FakeDiskStorageServer(StorageServer):
 
231
    def stat_disk(self, d):
 
232
        return self.DISKAVAIL
 
233
 
 
234
class Server(unittest.TestCase):
 
235
 
 
236
    def setUp(self):
 
237
        self.sparent = LoggingServiceParent()
 
238
        self.sparent.startService()
 
239
        self._lease_secret = itertools.count()
 
240
    def tearDown(self):
 
241
        return self.sparent.stopService()
 
242
 
 
243
    def workdir(self, name):
 
244
        basedir = os.path.join("storage", "Server", name)
 
245
        return basedir
 
246
 
 
247
    def create(self, name, reserved_space=0, klass=StorageServer):
 
248
        workdir = self.workdir(name)
 
249
        ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
 
250
                   stats_provider=FakeStatsProvider())
 
251
        ss.setServiceParent(self.sparent)
 
252
        return ss
 
253
 
 
254
    def test_create(self):
 
255
        ss = self.create("test_create")
 
256
 
 
257
    def allocate(self, ss, storage_index, sharenums, size, canary=None):
 
258
        renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
 
259
        cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
 
260
        if not canary:
 
261
            canary = FakeCanary()
 
262
        return ss.remote_allocate_buckets(storage_index,
 
263
                                          renew_secret, cancel_secret,
 
264
                                          sharenums, size, canary)
 
265
 
 
266
    def test_large_share(self):
 
267
        ss = self.create("test_large_share")
 
268
 
 
269
        already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
 
270
        self.failUnlessEqual(already, set())
 
271
        self.failUnlessEqual(set(writers.keys()), set([0]))
 
272
 
 
273
        shnum, bucket = writers.items()[0]
 
274
        # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
 
275
        bucket.remote_write(2**32, "ab")
 
276
        bucket.remote_close()
 
277
 
 
278
        readers = ss.remote_get_buckets("allocate")
 
279
        reader = readers[shnum]
 
280
        self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
 
281
    test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
 
282
 
 
283
    def test_dont_overfill_dirs(self):
 
284
        """
 
285
        This test asserts that if you add a second share whose storage index
 
286
        share lots of leading bits with an extant share (but isn't the exact
 
287
        same storage index), this won't add an entry to the share directory.
 
288
        """
 
289
        ss = self.create("test_dont_overfill_dirs")
 
290
        already, writers = self.allocate(ss, "storageindex", [0], 10)
 
291
        for i, wb in writers.items():
 
292
            wb.remote_write(0, "%10d" % i)
 
293
            wb.remote_close()
 
294
        storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
 
295
                                "shares")
 
296
        children_of_storedir = set(os.listdir(storedir))
 
297
 
 
298
        # Now store another one under another storageindex that has leading
 
299
        # chars the same as the first storageindex.
 
300
        already, writers = self.allocate(ss, "storageindey", [0], 10)
 
301
        for i, wb in writers.items():
 
302
            wb.remote_write(0, "%10d" % i)
 
303
            wb.remote_close()
 
304
        storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
 
305
                                "shares")
 
306
        new_children_of_storedir = set(os.listdir(storedir))
 
307
        self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
 
308
 
 
309
    def test_remove_incoming(self):
 
310
        ss = self.create("test_remove_incoming")
 
311
        already, writers = self.allocate(ss, "vid", range(3), 10)
 
312
        for i,wb in writers.items():
 
313
            wb.remote_write(0, "%10d" % i)
 
314
            wb.remote_close()
 
315
        incoming_share_dir = wb.incominghome
 
316
        incoming_bucket_dir = os.path.dirname(incoming_share_dir)
 
317
        incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
 
318
        incoming_dir = os.path.dirname(incoming_prefix_dir)
 
319
        self.failIf(os.path.exists(incoming_bucket_dir))
 
320
        self.failIf(os.path.exists(incoming_prefix_dir))
 
321
        self.failUnless(os.path.exists(incoming_dir))
 
322
 
 
323
    def test_allocate(self):
 
324
        ss = self.create("test_allocate")
 
325
 
 
326
        self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
 
327
 
 
328
        canary = FakeCanary()
 
329
        already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
 
330
        self.failUnlessEqual(already, set())
 
331
        self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
 
332
 
 
333
        # while the buckets are open, they should not count as readable
 
334
        self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
 
335
 
 
336
        # close the buckets
 
337
        for i,wb in writers.items():
 
338
            wb.remote_write(0, "%25d" % i)
 
339
            wb.remote_close()
 
340
            # aborting a bucket that was already closed is a no-op
 
341
            wb.remote_abort()
 
342
 
 
343
        # now they should be readable
 
344
        b = ss.remote_get_buckets("allocate")
 
345
        self.failUnlessEqual(set(b.keys()), set([0,1,2]))
 
346
        self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
 
347
        b_str = str(b[0])
 
348
        self.failUnless("BucketReader" in b_str, b_str)
 
349
        self.failUnless("mfwgy33dmf2g 0" in b_str, b_str)
 
350
 
 
351
        # now if we ask about writing again, the server should offer those
 
352
        # three buckets as already present. It should offer them even if we
 
353
        # don't ask about those specific ones.
 
354
        already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
 
355
        self.failUnlessEqual(already, set([0,1,2]))
 
356
        self.failUnlessEqual(set(writers.keys()), set([3,4]))
 
357
 
 
358
        # while those two buckets are open for writing, the server should
 
359
        # refuse to offer them to uploaders
 
360
 
 
361
        already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
 
362
        self.failUnlessEqual(already2, set([0,1,2]))
 
363
        self.failUnlessEqual(set(writers2.keys()), set([5]))
 
364
 
 
365
        # aborting the writes should remove the tempfiles
 
366
        for i,wb in writers2.items():
 
367
            wb.remote_abort()
 
368
        already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
 
369
        self.failUnlessEqual(already2, set([0,1,2]))
 
370
        self.failUnlessEqual(set(writers2.keys()), set([5]))
 
371
 
 
372
        for i,wb in writers2.items():
 
373
            wb.remote_abort()
 
374
        for i,wb in writers.items():
 
375
            wb.remote_abort()
 
376
 
 
377
    def test_bad_container_version(self):
 
378
        ss = self.create("test_bad_container_version")
 
379
        a,w = self.allocate(ss, "si1", [0], 10)
 
380
        w[0].remote_write(0, "\xff"*10)
 
381
        w[0].remote_close()
 
382
 
 
383
        fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
 
384
        f = open(fn, "rb+")
 
385
        f.seek(0)
 
386
        f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
 
387
        f.close()
 
388
 
 
389
        b = ss.remote_get_buckets("allocate")
 
390
 
 
391
        e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
 
392
                                  ss.remote_get_buckets, "si1")
 
393
        self.failUnless(" had version 0 but we wanted 1" in str(e), e)
 
394
 
 
395
    def test_disconnect(self):
 
396
        # simulate a disconnection
 
397
        ss = self.create("test_disconnect")
 
398
        canary = FakeCanary()
 
399
        already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
 
400
        self.failUnlessEqual(already, set())
 
401
        self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
 
402
        for (f,args,kwargs) in canary.disconnectors.values():
 
403
            f(*args, **kwargs)
 
404
        del already
 
405
        del writers
 
406
 
 
407
        # that ought to delete the incoming shares
 
408
        already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
 
409
        self.failUnlessEqual(already, set())
 
410
        self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
 
411
 
 
412
    def test_reserved_space(self):
 
413
        ss = self.create("test_reserved_space", reserved_space=10000,
 
414
                         klass=FakeDiskStorageServer)
 
415
        # the FakeDiskStorageServer doesn't do real statvfs() calls
 
416
        ss.DISKAVAIL = 15000
 
417
        # 15k available, 10k reserved, leaves 5k for shares
 
418
 
 
419
        # a newly created and filled share incurs this much overhead, beyond
 
420
        # the size we request.
 
421
        OVERHEAD = 3*4
 
422
        LEASE_SIZE = 4+32+32+4
 
423
        canary = FakeCanary(True)
 
424
        already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
 
425
        self.failUnlessEqual(len(writers), 3)
 
426
        # now the StorageServer should have 3000 bytes provisionally
 
427
        # allocated, allowing only 2000 more to be claimed
 
428
        self.failUnlessEqual(len(ss._active_writers), 3)
 
429
 
 
430
        # allocating 1001-byte shares only leaves room for one
 
431
        already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
 
432
        self.failUnlessEqual(len(writers2), 1)
 
433
        self.failUnlessEqual(len(ss._active_writers), 4)
 
434
 
 
435
        # we abandon the first set, so their provisional allocation should be
 
436
        # returned
 
437
        del already
 
438
        del writers
 
439
        self.failUnlessEqual(len(ss._active_writers), 1)
 
440
        # now we have a provisional allocation of 1001 bytes
 
441
 
 
442
        # and we close the second set, so their provisional allocation should
 
443
        # become real, long-term allocation, and grows to include the
 
444
        # overhead.
 
445
        for bw in writers2.values():
 
446
            bw.remote_write(0, "a"*25)
 
447
            bw.remote_close()
 
448
        del already2
 
449
        del writers2
 
450
        del bw
 
451
        self.failUnlessEqual(len(ss._active_writers), 0)
 
452
 
 
453
        allocated = 1001 + OVERHEAD + LEASE_SIZE
 
454
 
 
455
        # we have to manually increase DISKAVAIL, since we're not doing real
 
456
        # disk measurements
 
457
        ss.DISKAVAIL -= allocated
 
458
 
 
459
        # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
 
460
        # 5000-1085=3915 free, therefore we can fit 39 100byte shares
 
461
        already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
 
462
        self.failUnlessEqual(len(writers3), 39)
 
463
        self.failUnlessEqual(len(ss._active_writers), 39)
 
464
 
 
465
        del already3
 
466
        del writers3
 
467
        self.failUnlessEqual(len(ss._active_writers), 0)
 
468
        ss.disownServiceParent()
 
469
        del ss
 
470
 
 
471
    def test_seek(self):
 
472
        basedir = self.workdir("test_seek_behavior")
 
473
        fileutil.make_dirs(basedir)
 
474
        filename = os.path.join(basedir, "testfile")
 
475
        f = open(filename, "wb")
 
476
        f.write("start")
 
477
        f.close()
 
478
        # mode="w" allows seeking-to-create-holes, but truncates pre-existing
 
479
        # files. mode="a" preserves previous contents but does not allow
 
480
        # seeking-to-create-holes. mode="r+" allows both.
 
481
        f = open(filename, "rb+")
 
482
        f.seek(100)
 
483
        f.write("100")
 
484
        f.close()
 
485
        filelen = os.stat(filename)[stat.ST_SIZE]
 
486
        self.failUnlessEqual(filelen, 100+3)
 
487
        f2 = open(filename, "rb")
 
488
        self.failUnlessEqual(f2.read(5), "start")
 
489
 
 
490
 
 
491
    def test_leases(self):
 
492
        ss = self.create("test_leases")
 
493
        canary = FakeCanary()
 
494
        sharenums = range(5)
 
495
        size = 100
 
496
 
 
497
        rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
 
498
                   hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
 
499
        already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
 
500
                                                     sharenums, size, canary)
 
501
        self.failUnlessEqual(len(already), 0)
 
502
        self.failUnlessEqual(len(writers), 5)
 
503
        for wb in writers.values():
 
504
            wb.remote_close()
 
505
 
 
506
        leases = list(ss.get_leases("si0"))
 
507
        self.failUnlessEqual(len(leases), 1)
 
508
        self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
 
509
 
 
510
        rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
 
511
                   hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
 
512
        already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
 
513
                                                     sharenums, size, canary)
 
514
        for wb in writers.values():
 
515
            wb.remote_close()
 
516
 
 
517
        # take out a second lease on si1
 
518
        rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
 
519
                   hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
 
520
        already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
 
521
                                                     sharenums, size, canary)
 
522
        self.failUnlessEqual(len(already), 5)
 
523
        self.failUnlessEqual(len(writers), 0)
 
524
 
 
525
        leases = list(ss.get_leases("si1"))
 
526
        self.failUnlessEqual(len(leases), 2)
 
527
        self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
 
528
 
 
529
        # and a third lease, using add-lease
 
530
        rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
 
531
                     hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
 
532
        ss.remote_add_lease("si1", rs2a, cs2a)
 
533
        leases = list(ss.get_leases("si1"))
 
534
        self.failUnlessEqual(len(leases), 3)
 
535
        self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
 
536
 
 
537
        # add-lease on a missing storage index is silently ignored
 
538
        self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
 
539
 
 
540
        # check that si0 is readable
 
541
        readers = ss.remote_get_buckets("si0")
 
542
        self.failUnlessEqual(len(readers), 5)
 
543
 
 
544
        # renew the first lease. Only the proper renew_secret should work
 
545
        ss.remote_renew_lease("si0", rs0)
 
546
        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
 
547
        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
 
548
 
 
549
        # check that si0 is still readable
 
550
        readers = ss.remote_get_buckets("si0")
 
551
        self.failUnlessEqual(len(readers), 5)
 
552
 
 
553
        # now cancel it
 
554
        self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
 
555
        self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
 
556
        ss.remote_cancel_lease("si0", cs0)
 
557
 
 
558
        # si0 should now be gone
 
559
        readers = ss.remote_get_buckets("si0")
 
560
        self.failUnlessEqual(len(readers), 0)
 
561
        # and the renew should no longer work
 
562
        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
 
563
 
 
564
 
 
565
        # cancel the first lease on si1, leaving the second and third in place
 
566
        ss.remote_cancel_lease("si1", cs1)
 
567
        readers = ss.remote_get_buckets("si1")
 
568
        self.failUnlessEqual(len(readers), 5)
 
569
        # the corresponding renew should no longer work
 
570
        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
 
571
 
 
572
        leases = list(ss.get_leases("si1"))
 
573
        self.failUnlessEqual(len(leases), 2)
 
574
        self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
 
575
 
 
576
        ss.remote_renew_lease("si1", rs2)
 
577
        # cancelling the second and third should make it go away
 
578
        ss.remote_cancel_lease("si1", cs2)
 
579
        ss.remote_cancel_lease("si1", cs2a)
 
580
        readers = ss.remote_get_buckets("si1")
 
581
        self.failUnlessEqual(len(readers), 0)
 
582
        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
 
583
        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
 
584
        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
 
585
 
 
586
        leases = list(ss.get_leases("si1"))
 
587
        self.failUnlessEqual(len(leases), 0)
 
588
 
 
589
 
 
590
        # test overlapping uploads
 
591
        rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
 
592
                   hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
 
593
        rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
 
594
                   hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
 
595
        already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
 
596
                                                     sharenums, size, canary)
 
597
        self.failUnlessEqual(len(already), 0)
 
598
        self.failUnlessEqual(len(writers), 5)
 
599
        already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
 
600
                                                       sharenums, size, canary)
 
601
        self.failUnlessEqual(len(already2), 0)
 
602
        self.failUnlessEqual(len(writers2), 0)
 
603
        for wb in writers.values():
 
604
            wb.remote_close()
 
605
 
 
606
        leases = list(ss.get_leases("si3"))
 
607
        self.failUnlessEqual(len(leases), 1)
 
608
 
 
609
        already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
 
610
                                                       sharenums, size, canary)
 
611
        self.failUnlessEqual(len(already3), 5)
 
612
        self.failUnlessEqual(len(writers3), 0)
 
613
 
 
614
        leases = list(ss.get_leases("si3"))
 
615
        self.failUnlessEqual(len(leases), 2)
 
616
 
 
617
    def test_readonly(self):
 
618
        workdir = self.workdir("test_readonly")
 
619
        ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
 
620
        ss.setServiceParent(self.sparent)
 
621
 
 
622
        already,writers = self.allocate(ss, "vid", [0,1,2], 75)
 
623
        self.failUnlessEqual(already, set())
 
624
        self.failUnlessEqual(writers, {})
 
625
 
 
626
        stats = ss.get_stats()
 
627
        self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
 
628
                             False)
 
629
        if "storage_server.disk_avail" in stats:
 
630
            # windows does not have os.statvfs, so it doesn't give us disk
 
631
            # stats. But if there are stats, readonly_storage means
 
632
            # disk_avail=0
 
633
            self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
 
634
 
 
635
    def test_discard(self):
 
636
        # discard is really only used for other tests, but we test it anyways
 
637
        workdir = self.workdir("test_discard")
 
638
        ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
 
639
        ss.setServiceParent(self.sparent)
 
640
 
 
641
        canary = FakeCanary()
 
642
        already,writers = self.allocate(ss, "vid", [0,1,2], 75)
 
643
        self.failUnlessEqual(already, set())
 
644
        self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
 
645
        for i,wb in writers.items():
 
646
            wb.remote_write(0, "%25d" % i)
 
647
            wb.remote_close()
 
648
        # since we discard the data, the shares should be present but sparse.
 
649
        # Since we write with some seeks, the data we read back will be all
 
650
        # zeros.
 
651
        b = ss.remote_get_buckets("vid")
 
652
        self.failUnlessEqual(set(b.keys()), set([0,1,2]))
 
653
        self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
 
654
 
 
655
    def test_advise_corruption(self):
 
656
        workdir = self.workdir("test_advise_corruption")
 
657
        ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
 
658
        ss.setServiceParent(self.sparent)
 
659
 
 
660
        si0_s = base32.b2a("si0")
 
661
        ss.remote_advise_corrupt_share("immutable", "si0", 0,
 
662
                                       "This share smells funny.\n")
 
663
        reportdir = os.path.join(workdir, "corruption-advisories")
 
664
        reports = os.listdir(reportdir)
 
665
        self.failUnlessEqual(len(reports), 1)
 
666
        report_si0 = reports[0]
 
667
        self.failUnless(si0_s in report_si0, report_si0)
 
668
        f = open(os.path.join(reportdir, report_si0), "r")
 
669
        report = f.read()
 
670
        f.close()
 
671
        self.failUnless("type: immutable" in report)
 
672
        self.failUnless(("storage_index: %s" % si0_s) in report)
 
673
        self.failUnless("share_number: 0" in report)
 
674
        self.failUnless("This share smells funny." in report)
 
675
 
 
676
        # test the RIBucketWriter version too
 
677
        si1_s = base32.b2a("si1")
 
678
        already,writers = self.allocate(ss, "si1", [1], 75)
 
679
        self.failUnlessEqual(already, set())
 
680
        self.failUnlessEqual(set(writers.keys()), set([1]))
 
681
        writers[1].remote_write(0, "data")
 
682
        writers[1].remote_close()
 
683
 
 
684
        b = ss.remote_get_buckets("si1")
 
685
        self.failUnlessEqual(set(b.keys()), set([1]))
 
686
        b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
 
687
 
 
688
        reports = os.listdir(reportdir)
 
689
        self.failUnlessEqual(len(reports), 2)
 
690
        report_si1 = [r for r in reports if si1_s in r][0]
 
691
        f = open(os.path.join(reportdir, report_si1), "r")
 
692
        report = f.read()
 
693
        f.close()
 
694
        self.failUnless("type: immutable" in report)
 
695
        self.failUnless(("storage_index: %s" % si1_s) in report)
 
696
        self.failUnless("share_number: 1" in report)
 
697
        self.failUnless("This share tastes like dust." in report)
 
698
 
 
699
 
 
700
 
 
701
class MutableServer(unittest.TestCase):
 
702
 
 
703
    def setUp(self):
 
704
        self.sparent = LoggingServiceParent()
 
705
        self._lease_secret = itertools.count()
 
706
    def tearDown(self):
 
707
        return self.sparent.stopService()
 
708
 
 
709
    def workdir(self, name):
 
710
        basedir = os.path.join("storage", "MutableServer", name)
 
711
        return basedir
 
712
 
 
713
    def create(self, name):
 
714
        workdir = self.workdir(name)
 
715
        ss = StorageServer(workdir, "\x00" * 20)
 
716
        ss.setServiceParent(self.sparent)
 
717
        return ss
 
718
 
 
719
    def test_create(self):
 
720
        ss = self.create("test_create")
 
721
 
 
722
    def write_enabler(self, we_tag):
 
723
        return hashutil.tagged_hash("we_blah", we_tag)
 
724
 
 
725
    def renew_secret(self, tag):
 
726
        return hashutil.tagged_hash("renew_blah", str(tag))
 
727
 
 
728
    def cancel_secret(self, tag):
 
729
        return hashutil.tagged_hash("cancel_blah", str(tag))
 
730
 
 
731
    def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
 
732
        write_enabler = self.write_enabler(we_tag)
 
733
        renew_secret = self.renew_secret(lease_tag)
 
734
        cancel_secret = self.cancel_secret(lease_tag)
 
735
        rstaraw = ss.remote_slot_testv_and_readv_and_writev
 
736
        testandwritev = dict( [ (shnum, ([], [], None) )
 
737
                         for shnum in sharenums ] )
 
738
        readv = []
 
739
        rc = rstaraw(storage_index,
 
740
                     (write_enabler, renew_secret, cancel_secret),
 
741
                     testandwritev,
 
742
                     readv)
 
743
        (did_write, readv_data) = rc
 
744
        self.failUnless(did_write)
 
745
        self.failUnless(isinstance(readv_data, dict))
 
746
        self.failUnlessEqual(len(readv_data), 0)
 
747
 
 
748
    def test_bad_magic(self):
 
749
        ss = self.create("test_bad_magic")
 
750
        self.allocate(ss, "si1", "we1", self._lease_secret.next(), set([0]), 10)
 
751
        fn = os.path.join(ss.sharedir, storage_index_to_dir("si1"), "0")
 
752
        f = open(fn, "rb+")
 
753
        f.seek(0)
 
754
        f.write("BAD MAGIC")
 
755
        f.close()
 
756
        read = ss.remote_slot_readv
 
757
        e = self.failUnlessRaises(UnknownMutableContainerVersionError,
 
758
                                  read, "si1", [0], [(0,10)])
 
759
        self.failUnless(" had magic " in str(e), e)
 
760
        self.failUnless(" but we wanted " in str(e), e)
 
761
 
 
762
    def test_container_size(self):
 
763
        ss = self.create("test_container_size")
 
764
        self.allocate(ss, "si1", "we1", self._lease_secret.next(),
 
765
                      set([0,1,2]), 100)
 
766
        read = ss.remote_slot_readv
 
767
        rstaraw = ss.remote_slot_testv_and_readv_and_writev
 
768
        secrets = ( self.write_enabler("we1"),
 
769
                    self.renew_secret("we1"),
 
770
                    self.cancel_secret("we1") )
 
771
        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
 
772
        answer = rstaraw("si1", secrets,
 
773
                         {0: ([], [(0,data)], len(data)+12)},
 
774
                         [])
 
775
        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
 
776
 
 
777
        # trying to make the container too large will raise an exception
 
778
        TOOBIG = MutableShareFile.MAX_SIZE + 10
 
779
        self.failUnlessRaises(DataTooLargeError,
 
780
                              rstaraw, "si1", secrets,
 
781
                              {0: ([], [(0,data)], TOOBIG)},
 
782
                              [])
 
783
 
 
784
        # it should be possible to make the container smaller, although at
 
785
        # the moment this doesn't actually affect the share, unless the
 
786
        # container size is dropped to zero, in which case the share is
 
787
        # deleted.
 
788
        answer = rstaraw("si1", secrets,
 
789
                         {0: ([], [(0,data)], len(data)+8)},
 
790
                         [])
 
791
        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
 
792
 
 
793
        answer = rstaraw("si1", secrets,
 
794
                         {0: ([], [(0,data)], 0)},
 
795
                         [])
 
796
        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
 
797
 
 
798
        read_answer = read("si1", [0], [(0,10)])
 
799
        self.failUnlessEqual(read_answer, {})
 
800
 
 
801
    def test_allocate(self):
 
802
        ss = self.create("test_allocate")
 
803
        self.allocate(ss, "si1", "we1", self._lease_secret.next(),
 
804
                      set([0,1,2]), 100)
 
805
 
 
806
        read = ss.remote_slot_readv
 
807
        self.failUnlessEqual(read("si1", [0], [(0, 10)]),
 
808
                             {0: [""]})
 
809
        self.failUnlessEqual(read("si1", [], [(0, 10)]),
 
810
                             {0: [""], 1: [""], 2: [""]})
 
811
        self.failUnlessEqual(read("si1", [0], [(100, 10)]),
 
812
                             {0: [""]})
 
813
 
 
814
        # try writing to one
 
815
        secrets = ( self.write_enabler("we1"),
 
816
                    self.renew_secret("we1"),
 
817
                    self.cancel_secret("we1") )
 
818
        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
 
819
        write = ss.remote_slot_testv_and_readv_and_writev
 
820
        answer = write("si1", secrets,
 
821
                       {0: ([], [(0,data)], None)},
 
822
                       [])
 
823
        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
 
824
 
 
825
        self.failUnlessEqual(read("si1", [0], [(0,20)]),
 
826
                             {0: ["00000000001111111111"]})
 
827
        self.failUnlessEqual(read("si1", [0], [(95,10)]),
 
828
                             {0: ["99999"]})
 
829
        #self.failUnlessEqual(s0.remote_get_length(), 100)
 
830
 
 
831
        bad_secrets = ("bad write enabler", secrets[1], secrets[2])
 
832
        f = self.failUnlessRaises(BadWriteEnablerError,
 
833
                                  write, "si1", bad_secrets,
 
834
                                  {}, [])
 
835
        self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
 
836
 
 
837
        # this testv should fail
 
838
        answer = write("si1", secrets,
 
839
                       {0: ([(0, 12, "eq", "444444444444"),
 
840
                             (20, 5, "eq", "22222"),
 
841
                             ],
 
842
                            [(0, "x"*100)],
 
843
                            None),
 
844
                        },
 
845
                       [(0,12), (20,5)],
 
846
                       )
 
847
        self.failUnlessEqual(answer, (False,
 
848
                                      {0: ["000000000011", "22222"],
 
849
                                       1: ["", ""],
 
850
                                       2: ["", ""],
 
851
                                       }))
 
852
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
853
 
 
854
        # as should this one
 
855
        answer = write("si1", secrets,
 
856
                       {0: ([(10, 5, "lt", "11111"),
 
857
                             ],
 
858
                            [(0, "x"*100)],
 
859
                            None),
 
860
                        },
 
861
                       [(10,5)],
 
862
                       )
 
863
        self.failUnlessEqual(answer, (False,
 
864
                                      {0: ["11111"],
 
865
                                       1: [""],
 
866
                                       2: [""]},
 
867
                                      ))
 
868
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
869
 
 
870
 
 
871
    def test_operators(self):
 
872
        # test operators, the data we're comparing is '11111' in all cases.
 
873
        # test both fail+pass, reset data after each one.
 
874
        ss = self.create("test_operators")
 
875
 
 
876
        secrets = ( self.write_enabler("we1"),
 
877
                    self.renew_secret("we1"),
 
878
                    self.cancel_secret("we1") )
 
879
        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
 
880
        write = ss.remote_slot_testv_and_readv_and_writev
 
881
        read = ss.remote_slot_readv
 
882
 
 
883
        def reset():
 
884
            write("si1", secrets,
 
885
                  {0: ([], [(0,data)], None)},
 
886
                  [])
 
887
 
 
888
        reset()
 
889
 
 
890
        #  lt
 
891
        answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
 
892
                                             ],
 
893
                                            [(0, "x"*100)],
 
894
                                            None,
 
895
                                            )}, [(10,5)])
 
896
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
897
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
898
        self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
 
899
        reset()
 
900
 
 
901
        answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
 
902
                                             ],
 
903
                                            [(0, "x"*100)],
 
904
                                            None,
 
905
                                            )}, [(10,5)])
 
906
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
907
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
908
        reset()
 
909
 
 
910
        answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
 
911
                                             ],
 
912
                                            [(0, "y"*100)],
 
913
                                            None,
 
914
                                            )}, [(10,5)])
 
915
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
916
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
917
        reset()
 
918
 
 
919
        #  le
 
920
        answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
 
921
                                             ],
 
922
                                            [(0, "x"*100)],
 
923
                                            None,
 
924
                                            )}, [(10,5)])
 
925
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
926
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
927
        reset()
 
928
 
 
929
        answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
 
930
                                             ],
 
931
                                            [(0, "y"*100)],
 
932
                                            None,
 
933
                                            )}, [(10,5)])
 
934
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
935
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
936
        reset()
 
937
 
 
938
        answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
 
939
                                             ],
 
940
                                            [(0, "y"*100)],
 
941
                                            None,
 
942
                                            )}, [(10,5)])
 
943
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
944
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
945
        reset()
 
946
 
 
947
        #  eq
 
948
        answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
 
949
                                             ],
 
950
                                            [(0, "x"*100)],
 
951
                                            None,
 
952
                                            )}, [(10,5)])
 
953
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
954
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
955
        reset()
 
956
 
 
957
        answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
 
958
                                             ],
 
959
                                            [(0, "y"*100)],
 
960
                                            None,
 
961
                                            )}, [(10,5)])
 
962
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
963
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
964
        reset()
 
965
 
 
966
        #  ne
 
967
        answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
 
968
                                             ],
 
969
                                            [(0, "x"*100)],
 
970
                                            None,
 
971
                                            )}, [(10,5)])
 
972
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
973
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
974
        reset()
 
975
 
 
976
        answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
 
977
                                             ],
 
978
                                            [(0, "y"*100)],
 
979
                                            None,
 
980
                                            )}, [(10,5)])
 
981
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
982
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
983
        reset()
 
984
 
 
985
        #  ge
 
986
        answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
 
987
                                             ],
 
988
                                            [(0, "y"*100)],
 
989
                                            None,
 
990
                                            )}, [(10,5)])
 
991
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
992
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
993
        reset()
 
994
 
 
995
        answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
 
996
                                             ],
 
997
                                            [(0, "y"*100)],
 
998
                                            None,
 
999
                                            )}, [(10,5)])
 
1000
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
1001
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
1002
        reset()
 
1003
 
 
1004
        answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
 
1005
                                             ],
 
1006
                                            [(0, "y"*100)],
 
1007
                                            None,
 
1008
                                            )}, [(10,5)])
 
1009
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
1010
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
1011
        reset()
 
1012
 
 
1013
        #  gt
 
1014
        answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
 
1015
                                             ],
 
1016
                                            [(0, "y"*100)],
 
1017
                                            None,
 
1018
                                            )}, [(10,5)])
 
1019
        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
 
1020
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
 
1021
        reset()
 
1022
 
 
1023
        answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
 
1024
                                             ],
 
1025
                                            [(0, "x"*100)],
 
1026
                                            None,
 
1027
                                            )}, [(10,5)])
 
1028
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
1029
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
1030
        reset()
 
1031
 
 
1032
        answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
 
1033
                                             ],
 
1034
                                            [(0, "x"*100)],
 
1035
                                            None,
 
1036
                                            )}, [(10,5)])
 
1037
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
1038
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
1039
        reset()
 
1040
 
 
1041
        # finally, test some operators against empty shares
 
1042
        answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
 
1043
                                             ],
 
1044
                                            [(0, "x"*100)],
 
1045
                                            None,
 
1046
                                            )}, [(10,5)])
 
1047
        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
 
1048
        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
1049
        reset()
 
1050
 
 
1051
    def test_readv(self):
 
1052
        ss = self.create("test_readv")
 
1053
        secrets = ( self.write_enabler("we1"),
 
1054
                    self.renew_secret("we1"),
 
1055
                    self.cancel_secret("we1") )
 
1056
        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
 
1057
        write = ss.remote_slot_testv_and_readv_and_writev
 
1058
        read = ss.remote_slot_readv
 
1059
        data = [("%d" % i) * 100 for i in range(3)]
 
1060
        rc = write("si1", secrets,
 
1061
                   {0: ([], [(0,data[0])], None),
 
1062
                    1: ([], [(0,data[1])], None),
 
1063
                    2: ([], [(0,data[2])], None),
 
1064
                    }, [])
 
1065
        self.failUnlessEqual(rc, (True, {}))
 
1066
 
 
1067
        answer = read("si1", [], [(0, 10)])
 
1068
        self.failUnlessEqual(answer, {0: ["0"*10],
 
1069
                                      1: ["1"*10],
 
1070
                                      2: ["2"*10]})
 
1071
 
 
1072
    def compare_leases_without_timestamps(self, leases_a, leases_b):
 
1073
        self.failUnlessEqual(len(leases_a), len(leases_b))
 
1074
        for i in range(len(leases_a)):
 
1075
            a = leases_a[i]
 
1076
            b = leases_b[i]
 
1077
            self.failUnlessEqual(a.owner_num,       b.owner_num)
 
1078
            self.failUnlessEqual(a.renew_secret,    b.renew_secret)
 
1079
            self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
 
1080
            self.failUnlessEqual(a.nodeid,          b.nodeid)
 
1081
 
 
1082
    def compare_leases(self, leases_a, leases_b):
 
1083
        self.failUnlessEqual(len(leases_a), len(leases_b))
 
1084
        for i in range(len(leases_a)):
 
1085
            a = leases_a[i]
 
1086
            b = leases_b[i]
 
1087
            self.failUnlessEqual(a.owner_num,       b.owner_num)
 
1088
            self.failUnlessEqual(a.renew_secret,    b.renew_secret)
 
1089
            self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
 
1090
            self.failUnlessEqual(a.nodeid,          b.nodeid)
 
1091
            self.failUnlessEqual(a.expiration_time, b.expiration_time)
 
1092
 
 
1093
    def test_leases(self):
 
1094
        ss = self.create("test_leases")
 
1095
        def secrets(n):
 
1096
            return ( self.write_enabler("we1"),
 
1097
                     self.renew_secret("we1-%d" % n),
 
1098
                     self.cancel_secret("we1-%d" % n) )
 
1099
        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
 
1100
        write = ss.remote_slot_testv_and_readv_and_writev
 
1101
        read = ss.remote_slot_readv
 
1102
        rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
 
1103
        self.failUnlessEqual(rc, (True, {}))
 
1104
 
 
1105
        # create a random non-numeric file in the bucket directory, to
 
1106
        # exercise the code that's supposed to ignore those.
 
1107
        bucket_dir = os.path.join(self.workdir("test_leases"),
 
1108
                                  "shares", storage_index_to_dir("si1"))
 
1109
        f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
 
1110
        f.write("you ought to be ignoring me\n")
 
1111
        f.close()
 
1112
 
 
1113
        s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
 
1114
        self.failUnlessEqual(len(list(s0.get_leases())), 1)
 
1115
 
 
1116
        # add-lease on a missing storage index is silently ignored
 
1117
        self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
 
1118
 
 
1119
        # re-allocate the slots and use the same secrets, that should update
 
1120
        # the lease
 
1121
        write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
 
1122
        self.failUnlessEqual(len(list(s0.get_leases())), 1)
 
1123
 
 
1124
        # renew it directly
 
1125
        ss.remote_renew_lease("si1", secrets(0)[1])
 
1126
        self.failUnlessEqual(len(list(s0.get_leases())), 1)
 
1127
 
 
1128
        # now allocate them with a bunch of different secrets, to trigger the
 
1129
        # extended lease code. Use add_lease for one of them.
 
1130
        write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
 
1131
        self.failUnlessEqual(len(list(s0.get_leases())), 2)
 
1132
        secrets2 = secrets(2)
 
1133
        ss.remote_add_lease("si1", secrets2[1], secrets2[2])
 
1134
        self.failUnlessEqual(len(list(s0.get_leases())), 3)
 
1135
        write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
 
1136
        write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
 
1137
        write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
 
1138
 
 
1139
        self.failUnlessEqual(len(list(s0.get_leases())), 6)
 
1140
 
 
1141
        # cancel one of them
 
1142
        ss.remote_cancel_lease("si1", secrets(5)[2])
 
1143
        self.failUnlessEqual(len(list(s0.get_leases())), 5)
 
1144
 
 
1145
        all_leases = list(s0.get_leases())
 
1146
        # and write enough data to expand the container, forcing the server
 
1147
        # to move the leases
 
1148
        write("si1", secrets(0),
 
1149
              {0: ([], [(0,data)], 200), },
 
1150
              [])
 
1151
 
 
1152
        # read back the leases, make sure they're still intact.
 
1153
        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
 
1154
 
 
1155
        ss.remote_renew_lease("si1", secrets(0)[1])
 
1156
        ss.remote_renew_lease("si1", secrets(1)[1])
 
1157
        ss.remote_renew_lease("si1", secrets(2)[1])
 
1158
        ss.remote_renew_lease("si1", secrets(3)[1])
 
1159
        ss.remote_renew_lease("si1", secrets(4)[1])
 
1160
        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
 
1161
        # get a new copy of the leases, with the current timestamps. Reading
 
1162
        # data and failing to renew/cancel leases should leave the timestamps
 
1163
        # alone.
 
1164
        all_leases = list(s0.get_leases())
 
1165
        # renewing with a bogus token should prompt an error message
 
1166
 
 
1167
        # examine the exception thus raised, make sure the old nodeid is
 
1168
        # present, to provide for share migration
 
1169
        e = self.failUnlessRaises(IndexError,
 
1170
                                  ss.remote_renew_lease, "si1",
 
1171
                                  secrets(20)[1])
 
1172
        e_s = str(e)
 
1173
        self.failUnless("Unable to renew non-existent lease" in e_s)
 
1174
        self.failUnless("I have leases accepted by nodeids:" in e_s)
 
1175
        self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
 
1176
 
 
1177
        # same for cancelling
 
1178
        self.failUnlessRaises(IndexError,
 
1179
                              ss.remote_cancel_lease, "si1",
 
1180
                              secrets(20)[2])
 
1181
        self.compare_leases(all_leases, list(s0.get_leases()))
 
1182
 
 
1183
        # reading shares should not modify the timestamp
 
1184
        read("si1", [], [(0,200)])
 
1185
        self.compare_leases(all_leases, list(s0.get_leases()))
 
1186
 
 
1187
        write("si1", secrets(0),
 
1188
              {0: ([], [(200, "make me bigger")], None)}, [])
 
1189
        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
 
1190
 
 
1191
        write("si1", secrets(0),
 
1192
              {0: ([], [(500, "make me really bigger")], None)}, [])
 
1193
        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
 
1194
 
 
1195
        # now cancel them all
 
1196
        ss.remote_cancel_lease("si1", secrets(0)[2])
 
1197
        ss.remote_cancel_lease("si1", secrets(1)[2])
 
1198
        ss.remote_cancel_lease("si1", secrets(2)[2])
 
1199
        ss.remote_cancel_lease("si1", secrets(3)[2])
 
1200
 
 
1201
        # the slot should still be there
 
1202
        remaining_shares = read("si1", [], [(0,10)])
 
1203
        self.failUnlessEqual(len(remaining_shares), 1)
 
1204
        self.failUnlessEqual(len(list(s0.get_leases())), 1)
 
1205
 
 
1206
        # cancelling a non-existent lease should raise an IndexError
 
1207
        self.failUnlessRaises(IndexError,
 
1208
                              ss.remote_cancel_lease, "si1", "nonsecret")
 
1209
 
 
1210
        # and the slot should still be there
 
1211
        remaining_shares = read("si1", [], [(0,10)])
 
1212
        self.failUnlessEqual(len(remaining_shares), 1)
 
1213
        self.failUnlessEqual(len(list(s0.get_leases())), 1)
 
1214
 
 
1215
        ss.remote_cancel_lease("si1", secrets(4)[2])
 
1216
        # now the slot should be gone
 
1217
        no_shares = read("si1", [], [(0,10)])
 
1218
        self.failUnlessEqual(no_shares, {})
 
1219
 
 
1220
        # cancelling a lease on a non-existent share should raise an IndexError
 
1221
        self.failUnlessRaises(IndexError,
 
1222
                              ss.remote_cancel_lease, "si2", "nonsecret")
 
1223
 
 
1224
    def test_remove(self):
 
1225
        ss = self.create("test_remove")
 
1226
        self.allocate(ss, "si1", "we1", self._lease_secret.next(),
 
1227
                      set([0,1,2]), 100)
 
1228
        readv = ss.remote_slot_readv
 
1229
        writev = ss.remote_slot_testv_and_readv_and_writev
 
1230
        secrets = ( self.write_enabler("we1"),
 
1231
                    self.renew_secret("we1"),
 
1232
                    self.cancel_secret("we1") )
 
1233
        # delete sh0 by setting its size to zero
 
1234
        answer = writev("si1", secrets,
 
1235
                        {0: ([], [], 0)},
 
1236
                        [])
 
1237
        # the answer should mention all the shares that existed before the
 
1238
        # write
 
1239
        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
 
1240
        # but a new read should show only sh1 and sh2
 
1241
        self.failUnlessEqual(readv("si1", [], [(0,10)]),
 
1242
                             {1: [""], 2: [""]})
 
1243
 
 
1244
        # delete sh1 by setting its size to zero
 
1245
        answer = writev("si1", secrets,
 
1246
                        {1: ([], [], 0)},
 
1247
                        [])
 
1248
        self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
 
1249
        self.failUnlessEqual(readv("si1", [], [(0,10)]),
 
1250
                             {2: [""]})
 
1251
 
 
1252
        # delete sh2 by setting its size to zero
 
1253
        answer = writev("si1", secrets,
 
1254
                        {2: ([], [], 0)},
 
1255
                        [])
 
1256
        self.failUnlessEqual(answer, (True, {2:[]}) )
 
1257
        self.failUnlessEqual(readv("si1", [], [(0,10)]),
 
1258
                             {})
 
1259
        # and the bucket directory should now be gone
 
1260
        si = base32.b2a("si1")
 
1261
        # note: this is a detail of the storage server implementation, and
 
1262
        # may change in the future
 
1263
        prefix = si[:2]
 
1264
        prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
 
1265
        bucketdir = os.path.join(prefixdir, si)
 
1266
        self.failUnless(os.path.exists(prefixdir))
 
1267
        self.failIf(os.path.exists(bucketdir))
 
1268
 
 
1269
class Stats(unittest.TestCase):
 
1270
 
 
1271
    def setUp(self):
 
1272
        self.sparent = LoggingServiceParent()
 
1273
        self._lease_secret = itertools.count()
 
1274
    def tearDown(self):
 
1275
        return self.sparent.stopService()
 
1276
 
 
1277
    def workdir(self, name):
 
1278
        basedir = os.path.join("storage", "Server", name)
 
1279
        return basedir
 
1280
 
 
1281
    def create(self, name):
 
1282
        workdir = self.workdir(name)
 
1283
        ss = StorageServer(workdir, "\x00" * 20)
 
1284
        ss.setServiceParent(self.sparent)
 
1285
        return ss
 
1286
 
 
1287
    def test_latencies(self):
 
1288
        ss = self.create("test_latencies")
 
1289
        for i in range(10000):
 
1290
            ss.add_latency("allocate", 1.0 * i)
 
1291
        for i in range(1000):
 
1292
            ss.add_latency("renew", 1.0 * i)
 
1293
        for i in range(10):
 
1294
            ss.add_latency("cancel", 2.0 * i)
 
1295
        ss.add_latency("get", 5.0)
 
1296
 
 
1297
        output = ss.get_latencies()
 
1298
 
 
1299
        self.failUnlessEqual(sorted(output.keys()),
 
1300
                             sorted(["allocate", "renew", "cancel", "get"]))
 
1301
        self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
 
1302
        self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
 
1303
        self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
 
1304
        self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
 
1305
        self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
 
1306
        self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
 
1307
        self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
 
1308
        self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
 
1309
        self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
 
1310
 
 
1311
        self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
 
1312
        self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
 
1313
        self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
 
1314
        self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
 
1315
        self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
 
1316
        self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
 
1317
        self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
 
1318
        self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
 
1319
        self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
 
1320
 
 
1321
        self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
 
1322
        self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
 
1323
        self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
 
1324
        self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
 
1325
        self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
 
1326
        self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
 
1327
        self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
 
1328
        self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
 
1329
        self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
 
1330
 
 
1331
        self.failUnlessEqual(len(ss.latencies["get"]), 1)
 
1332
        self.failUnless(abs(output["get"]["mean"] - 5) < 1)
 
1333
        self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
 
1334
        self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
 
1335
        self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
 
1336
        self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
 
1337
        self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
 
1338
        self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
 
1339
        self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
 
1340
 
 
1341
def remove_tags(s):
 
1342
    s = re.sub(r'<[^>]*>', ' ', s)
 
1343
    s = re.sub(r'\s+', ' ', s)
 
1344
    return s
 
1345
 
 
1346
class MyBucketCountingCrawler(BucketCountingCrawler):
 
1347
    def finished_prefix(self, cycle, prefix):
 
1348
        BucketCountingCrawler.finished_prefix(self, cycle, prefix)
 
1349
        if self.hook_ds:
 
1350
            d = self.hook_ds.pop(0)
 
1351
            d.callback(None)
 
1352
 
 
1353
class MyStorageServer(StorageServer):
 
1354
    def add_bucket_counter(self):
 
1355
        statefile = os.path.join(self.storedir, "bucket_counter.state")
 
1356
        self.bucket_counter = MyBucketCountingCrawler(self, statefile)
 
1357
        self.bucket_counter.setServiceParent(self)
 
1358
 
 
1359
class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
 
1360
 
 
1361
    def setUp(self):
 
1362
        self.s = service.MultiService()
 
1363
        self.s.startService()
 
1364
    def tearDown(self):
 
1365
        return self.s.stopService()
 
1366
 
 
1367
    def test_bucket_counter(self):
 
1368
        basedir = "storage/BucketCounter/bucket_counter"
 
1369
        fileutil.make_dirs(basedir)
 
1370
        ss = StorageServer(basedir, "\x00" * 20)
 
1371
        # to make sure we capture the bucket-counting-crawler in the middle
 
1372
        # of a cycle, we reach in and reduce its maximum slice time to 0. We
 
1373
        # also make it start sooner than usual.
 
1374
        ss.bucket_counter.slow_start = 0
 
1375
        orig_cpu_slice = ss.bucket_counter.cpu_slice
 
1376
        ss.bucket_counter.cpu_slice = 0
 
1377
        ss.setServiceParent(self.s)
 
1378
 
 
1379
        w = StorageStatus(ss)
 
1380
 
 
1381
        # this sample is before the crawler has started doing anything
 
1382
        html = w.renderSynchronously()
 
1383
        self.failUnless("<h1>Storage Server Status</h1>" in html, html)
 
1384
        s = remove_tags(html)
 
1385
        self.failUnless("Accepting new shares: Yes" in s, s)
 
1386
        self.failUnless("Reserved space: - 0 B (0)" in s, s)
 
1387
        self.failUnless("Total buckets: Not computed yet" in s, s)
 
1388
        self.failUnless("Next crawl in" in s, s)
 
1389
 
 
1390
        # give the bucket-counting-crawler one tick to get started. The
 
1391
        # cpu_slice=0 will force it to yield right after it processes the
 
1392
        # first prefix
 
1393
 
 
1394
        d = fireEventually()
 
1395
        def _check(ignored):
 
1396
            # are we really right after the first prefix?
 
1397
            state = ss.bucket_counter.get_state()
 
1398
            self.failUnlessEqual(state["last-complete-prefix"],
 
1399
                                 ss.bucket_counter.prefixes[0])
 
1400
            ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
 
1401
            html = w.renderSynchronously()
 
1402
            s = remove_tags(html)
 
1403
            self.failUnless(" Current crawl " in s, s)
 
1404
            self.failUnless(" (next work in " in s, s)
 
1405
        d.addCallback(_check)
 
1406
 
 
1407
        # now give it enough time to complete a full cycle
 
1408
        def _watch():
 
1409
            return not ss.bucket_counter.get_progress()["cycle-in-progress"]
 
1410
        d.addCallback(lambda ignored: self.poll(_watch))
 
1411
        def _check2(ignored):
 
1412
            ss.bucket_counter.cpu_slice = orig_cpu_slice
 
1413
            html = w.renderSynchronously()
 
1414
            s = remove_tags(html)
 
1415
            self.failUnless("Total buckets: 0 (the number of" in s, s)
 
1416
            self.failUnless("Next crawl in 59 minutes" in s, s)
 
1417
        d.addCallback(_check2)
 
1418
        return d
 
1419
 
 
1420
    def test_bucket_counter_cleanup(self):
 
1421
        basedir = "storage/BucketCounter/bucket_counter_cleanup"
 
1422
        fileutil.make_dirs(basedir)
 
1423
        ss = StorageServer(basedir, "\x00" * 20)
 
1424
        # to make sure we capture the bucket-counting-crawler in the middle
 
1425
        # of a cycle, we reach in and reduce its maximum slice time to 0.
 
1426
        ss.bucket_counter.slow_start = 0
 
1427
        orig_cpu_slice = ss.bucket_counter.cpu_slice
 
1428
        ss.bucket_counter.cpu_slice = 0
 
1429
        ss.setServiceParent(self.s)
 
1430
 
 
1431
        d = fireEventually()
 
1432
 
 
1433
        def _after_first_prefix(ignored):
 
1434
            ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
 
1435
            # now sneak in and mess with its state, to make sure it cleans up
 
1436
            # properly at the end of the cycle
 
1437
            state = ss.bucket_counter.state
 
1438
            self.failUnlessEqual(state["last-complete-prefix"],
 
1439
                                 ss.bucket_counter.prefixes[0])
 
1440
            state["bucket-counts"][-12] = {}
 
1441
            state["storage-index-samples"]["bogusprefix!"] = (-12, [])
 
1442
            ss.bucket_counter.save_state()
 
1443
        d.addCallback(_after_first_prefix)
 
1444
 
 
1445
        # now give it enough time to complete a cycle
 
1446
        def _watch():
 
1447
            return not ss.bucket_counter.get_progress()["cycle-in-progress"]
 
1448
        d.addCallback(lambda ignored: self.poll(_watch))
 
1449
        def _check2(ignored):
 
1450
            ss.bucket_counter.cpu_slice = orig_cpu_slice
 
1451
            s = ss.bucket_counter.get_state()
 
1452
            self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
 
1453
            self.failIf("bogusprefix!" in s["storage-index-samples"],
 
1454
                        s["storage-index-samples"].keys())
 
1455
        d.addCallback(_check2)
 
1456
        return d
 
1457
 
 
1458
    def test_bucket_counter_eta(self):
 
1459
        basedir = "storage/BucketCounter/bucket_counter_eta"
 
1460
        fileutil.make_dirs(basedir)
 
1461
        ss = MyStorageServer(basedir, "\x00" * 20)
 
1462
        ss.bucket_counter.slow_start = 0
 
1463
        # these will be fired inside finished_prefix()
 
1464
        hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
 
1465
        w = StorageStatus(ss)
 
1466
 
 
1467
        d = defer.Deferred()
 
1468
 
 
1469
        def _check_1(ignored):
 
1470
            # no ETA is available yet
 
1471
            html = w.renderSynchronously()
 
1472
            s = remove_tags(html)
 
1473
            self.failUnlessIn("complete (next work", s)
 
1474
 
 
1475
        def _check_2(ignored):
 
1476
            # one prefix has finished, so an ETA based upon that elapsed time
 
1477
            # should be available.
 
1478
            html = w.renderSynchronously()
 
1479
            s = remove_tags(html)
 
1480
            self.failUnlessIn("complete (ETA ", s)
 
1481
 
 
1482
        def _check_3(ignored):
 
1483
            # two prefixes have finished
 
1484
            html = w.renderSynchronously()
 
1485
            s = remove_tags(html)
 
1486
            self.failUnlessIn("complete (ETA ", s)
 
1487
            d.callback("done")
 
1488
 
 
1489
        hooks[0].addCallback(_check_1).addErrback(d.errback)
 
1490
        hooks[1].addCallback(_check_2).addErrback(d.errback)
 
1491
        hooks[2].addCallback(_check_3).addErrback(d.errback)
 
1492
 
 
1493
        ss.setServiceParent(self.s)
 
1494
        return d
 
1495
 
 
1496
class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
 
1497
    stop_after_first_bucket = False
 
1498
    def process_bucket(self, *args, **kwargs):
 
1499
        LeaseCheckingCrawler.process_bucket(self, *args, **kwargs)
 
1500
        if self.stop_after_first_bucket:
 
1501
            self.stop_after_first_bucket = False
 
1502
            self.cpu_slice = -1.0
 
1503
    def yielding(self, sleep_time):
 
1504
        if not self.stop_after_first_bucket:
 
1505
            self.cpu_slice = 500
 
1506
 
 
1507
class BrokenStatResults:
 
1508
    pass
 
1509
class No_ST_BLOCKS_LeaseCheckingCrawler(LeaseCheckingCrawler):
 
1510
    def stat(self, fn):
 
1511
        s = os.stat(fn)
 
1512
        bsr = BrokenStatResults()
 
1513
        for attrname in dir(s):
 
1514
            if attrname.startswith("_"):
 
1515
                continue
 
1516
            if attrname == "st_blocks":
 
1517
                continue
 
1518
            setattr(bsr, attrname, getattr(s, attrname))
 
1519
        return bsr
 
1520
 
 
1521
class InstrumentedStorageServer(StorageServer):
 
1522
    LeaseCheckerClass = InstrumentedLeaseCheckingCrawler
 
1523
class No_ST_BLOCKS_StorageServer(StorageServer):
 
1524
    LeaseCheckerClass = No_ST_BLOCKS_LeaseCheckingCrawler
 
1525
 
 
1526
class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
 
1527
 
 
1528
    def setUp(self):
 
1529
        self.s = service.MultiService()
 
1530
        self.s.startService()
 
1531
    def tearDown(self):
 
1532
        return self.s.stopService()
 
1533
 
 
1534
    def make_shares(self, ss):
 
1535
        def make(si):
 
1536
            return (si, hashutil.tagged_hash("renew", si),
 
1537
                    hashutil.tagged_hash("cancel", si))
 
1538
        def make_mutable(si):
 
1539
            return (si, hashutil.tagged_hash("renew", si),
 
1540
                    hashutil.tagged_hash("cancel", si),
 
1541
                    hashutil.tagged_hash("write-enabler", si))
 
1542
        def make_extra_lease(si, num):
 
1543
            return (hashutil.tagged_hash("renew-%d" % num, si),
 
1544
                    hashutil.tagged_hash("cancel-%d" % num, si))
 
1545
 
 
1546
        immutable_si_0, rs0, cs0 = make("\x00" * 16)
 
1547
        immutable_si_1, rs1, cs1 = make("\x01" * 16)
 
1548
        rs1a, cs1a = make_extra_lease(immutable_si_1, 1)
 
1549
        mutable_si_2, rs2, cs2, we2 = make_mutable("\x02" * 16)
 
1550
        mutable_si_3, rs3, cs3, we3 = make_mutable("\x03" * 16)
 
1551
        rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
 
1552
        sharenums = [0]
 
1553
        canary = FakeCanary()
 
1554
        # note: 'tahoe debug dump-share' will not handle this file, since the
 
1555
        # inner contents are not a valid CHK share
 
1556
        data = "\xff" * 1000
 
1557
 
 
1558
        a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
 
1559
                                         1000, canary)
 
1560
        w[0].remote_write(0, data)
 
1561
        w[0].remote_close()
 
1562
 
 
1563
        a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
 
1564
                                         1000, canary)
 
1565
        w[0].remote_write(0, data)
 
1566
        w[0].remote_close()
 
1567
        ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
 
1568
 
 
1569
        writev = ss.remote_slot_testv_and_readv_and_writev
 
1570
        writev(mutable_si_2, (we2, rs2, cs2),
 
1571
               {0: ([], [(0,data)], len(data))}, [])
 
1572
        writev(mutable_si_3, (we3, rs3, cs3),
 
1573
               {0: ([], [(0,data)], len(data))}, [])
 
1574
        ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
 
1575
 
 
1576
        self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
 
1577
        self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
 
1578
        self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
 
1579
 
 
1580
    def test_basic(self):
 
1581
        basedir = "storage/LeaseCrawler/basic"
 
1582
        fileutil.make_dirs(basedir)
 
1583
        ss = InstrumentedStorageServer(basedir, "\x00" * 20)
 
1584
        # make it start sooner than usual.
 
1585
        lc = ss.lease_checker
 
1586
        lc.slow_start = 0
 
1587
        lc.cpu_slice = 500
 
1588
        lc.stop_after_first_bucket = True
 
1589
        webstatus = StorageStatus(ss)
 
1590
 
 
1591
        # create a few shares, with some leases on them
 
1592
        self.make_shares(ss)
 
1593
        [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
 
1594
 
 
1595
        # add a non-sharefile to exercise another code path
 
1596
        fn = os.path.join(ss.sharedir,
 
1597
                          storage_index_to_dir(immutable_si_0),
 
1598
                          "not-a-share")
 
1599
        f = open(fn, "wb")
 
1600
        f.write("I am not a share.\n")
 
1601
        f.close()
 
1602
 
 
1603
        # this is before the crawl has started, so we're not in a cycle yet
 
1604
        initial_state = lc.get_state()
 
1605
        self.failIf(lc.get_progress()["cycle-in-progress"])
 
1606
        self.failIf("cycle-to-date" in initial_state)
 
1607
        self.failIf("estimated-remaining-cycle" in initial_state)
 
1608
        self.failIf("estimated-current-cycle" in initial_state)
 
1609
        self.failUnless("history" in initial_state)
 
1610
        self.failUnlessEqual(initial_state["history"], {})
 
1611
 
 
1612
        ss.setServiceParent(self.s)
 
1613
 
 
1614
        DAY = 24*60*60
 
1615
 
 
1616
        d = fireEventually()
 
1617
 
 
1618
        # now examine the state right after the first bucket has been
 
1619
        # processed.
 
1620
        def _after_first_bucket(ignored):
 
1621
            initial_state = lc.get_state()
 
1622
            self.failUnless("cycle-to-date" in initial_state)
 
1623
            self.failUnless("estimated-remaining-cycle" in initial_state)
 
1624
            self.failUnless("estimated-current-cycle" in initial_state)
 
1625
            self.failUnless("history" in initial_state)
 
1626
            self.failUnlessEqual(initial_state["history"], {})
 
1627
 
 
1628
            so_far = initial_state["cycle-to-date"]
 
1629
            self.failUnlessEqual(so_far["expiration-enabled"], False)
 
1630
            self.failUnless("configured-expiration-mode" in so_far)
 
1631
            self.failUnless("lease-age-histogram" in so_far)
 
1632
            lah = so_far["lease-age-histogram"]
 
1633
            self.failUnlessEqual(type(lah), list)
 
1634
            self.failUnlessEqual(len(lah), 1)
 
1635
            self.failUnlessEqual(lah, [ (0.0, DAY, 1) ] )
 
1636
            self.failUnlessEqual(so_far["leases-per-share-histogram"], {1: 1})
 
1637
            self.failUnlessEqual(so_far["corrupt-shares"], [])
 
1638
            sr1 = so_far["space-recovered"]
 
1639
            self.failUnlessEqual(sr1["examined-buckets"], 1)
 
1640
            self.failUnlessEqual(sr1["examined-shares"], 1)
 
1641
            self.failUnlessEqual(sr1["actual-shares"], 0)
 
1642
            self.failUnlessEqual(sr1["configured-diskbytes"], 0)
 
1643
            self.failUnlessEqual(sr1["original-sharebytes"], 0)
 
1644
            left = initial_state["estimated-remaining-cycle"]
 
1645
            sr2 = left["space-recovered"]
 
1646
            self.failUnless(sr2["examined-buckets"] > 0, sr2["examined-buckets"])
 
1647
            self.failUnless(sr2["examined-shares"] > 0, sr2["examined-shares"])
 
1648
            self.failIfEqual(sr2["actual-shares"], None)
 
1649
            self.failIfEqual(sr2["configured-diskbytes"], None)
 
1650
            self.failIfEqual(sr2["original-sharebytes"], None)
 
1651
        d.addCallback(_after_first_bucket)
 
1652
        d.addCallback(lambda ign: self.render1(webstatus))
 
1653
        def _check_html_in_cycle(html):
 
1654
            s = remove_tags(html)
 
1655
            self.failUnlessIn("So far, this cycle has examined "
 
1656
                              "1 shares in 1 buckets (0 mutable / 1 immutable) ", s)
 
1657
            self.failUnlessIn("and has recovered: "
 
1658
                              "0 shares, 0 buckets (0 mutable / 0 immutable), "
 
1659
                              "0 B (0 B / 0 B)", s)
 
1660
            self.failUnlessIn("If expiration were enabled, "
 
1661
                              "we would have recovered: "
 
1662
                              "0 shares, 0 buckets (0 mutable / 0 immutable),"
 
1663
                              " 0 B (0 B / 0 B) by now", s)
 
1664
            self.failUnlessIn("and the remainder of this cycle "
 
1665
                              "would probably recover: "
 
1666
                              "0 shares, 0 buckets (0 mutable / 0 immutable),"
 
1667
                              " 0 B (0 B / 0 B)", s)
 
1668
            self.failUnlessIn("and the whole cycle would probably recover: "
 
1669
                              "0 shares, 0 buckets (0 mutable / 0 immutable),"
 
1670
                              " 0 B (0 B / 0 B)", s)
 
1671
            self.failUnlessIn("if we were strictly using each lease's default "
 
1672
                              "31-day lease lifetime", s)
 
1673
            self.failUnlessIn("this cycle would be expected to recover: ", s)
 
1674
        d.addCallback(_check_html_in_cycle)
 
1675
 
 
1676
        # wait for the crawler to finish the first cycle. Nothing should have
 
1677
        # been removed.
 
1678
        def _wait():
 
1679
            return bool(lc.get_state()["last-cycle-finished"] is not None)
 
1680
        d.addCallback(lambda ign: self.poll(_wait))
 
1681
 
 
1682
        def _after_first_cycle(ignored):
 
1683
            s = lc.get_state()
 
1684
            self.failIf("cycle-to-date" in s)
 
1685
            self.failIf("estimated-remaining-cycle" in s)
 
1686
            self.failIf("estimated-current-cycle" in s)
 
1687
            last = s["history"][0]
 
1688
            self.failUnless("cycle-start-finish-times" in last)
 
1689
            self.failUnlessEqual(type(last["cycle-start-finish-times"]), tuple)
 
1690
            self.failUnlessEqual(last["expiration-enabled"], False)
 
1691
            self.failUnless("configured-expiration-mode" in last)
 
1692
 
 
1693
            self.failUnless("lease-age-histogram" in last)
 
1694
            lah = last["lease-age-histogram"]
 
1695
            self.failUnlessEqual(type(lah), list)
 
1696
            self.failUnlessEqual(len(lah), 1)
 
1697
            self.failUnlessEqual(lah, [ (0.0, DAY, 6) ] )
 
1698
 
 
1699
            self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
 
1700
            self.failUnlessEqual(last["corrupt-shares"], [])
 
1701
 
 
1702
            rec = last["space-recovered"]
 
1703
            self.failUnlessEqual(rec["examined-buckets"], 4)
 
1704
            self.failUnlessEqual(rec["examined-shares"], 4)
 
1705
            self.failUnlessEqual(rec["actual-buckets"], 0)
 
1706
            self.failUnlessEqual(rec["original-buckets"], 0)
 
1707
            self.failUnlessEqual(rec["configured-buckets"], 0)
 
1708
            self.failUnlessEqual(rec["actual-shares"], 0)
 
1709
            self.failUnlessEqual(rec["original-shares"], 0)
 
1710
            self.failUnlessEqual(rec["configured-shares"], 0)
 
1711
            self.failUnlessEqual(rec["actual-diskbytes"], 0)
 
1712
            self.failUnlessEqual(rec["original-diskbytes"], 0)
 
1713
            self.failUnlessEqual(rec["configured-diskbytes"], 0)
 
1714
            self.failUnlessEqual(rec["actual-sharebytes"], 0)
 
1715
            self.failUnlessEqual(rec["original-sharebytes"], 0)
 
1716
            self.failUnlessEqual(rec["configured-sharebytes"], 0)
 
1717
 
 
1718
            def _get_sharefile(si):
 
1719
                return list(ss._iter_share_files(si))[0]
 
1720
            def count_leases(si):
 
1721
                return len(list(_get_sharefile(si).get_leases()))
 
1722
            self.failUnlessEqual(count_leases(immutable_si_0), 1)
 
1723
            self.failUnlessEqual(count_leases(immutable_si_1), 2)
 
1724
            self.failUnlessEqual(count_leases(mutable_si_2), 1)
 
1725
            self.failUnlessEqual(count_leases(mutable_si_3), 2)
 
1726
        d.addCallback(_after_first_cycle)
 
1727
        d.addCallback(lambda ign: self.render1(webstatus))
 
1728
        def _check_html(html):
 
1729
            s = remove_tags(html)
 
1730
            self.failUnlessIn("recovered: 0 shares, 0 buckets "
 
1731
                              "(0 mutable / 0 immutable), 0 B (0 B / 0 B) "
 
1732
                              "but expiration was not enabled", s)
 
1733
        d.addCallback(_check_html)
 
1734
        d.addCallback(lambda ign: self.render_json(webstatus))
 
1735
        def _check_json(json):
 
1736
            data = simplejson.loads(json)
 
1737
            self.failUnless("lease-checker" in data)
 
1738
            self.failUnless("lease-checker-progress" in data)
 
1739
        d.addCallback(_check_json)
 
1740
        return d
 
1741
 
 
1742
    def backdate_lease(self, sf, renew_secret, new_expire_time):
 
1743
        # ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
 
1744
        # "renew" a lease with a new_expire_time that is older than what the
 
1745
        # current lease has), so we have to reach inside it.
 
1746
        for i,lease in enumerate(sf.get_leases()):
 
1747
            if lease.renew_secret == renew_secret:
 
1748
                lease.expiration_time = new_expire_time
 
1749
                f = open(sf.home, 'rb+')
 
1750
                sf._write_lease_record(f, i, lease)
 
1751
                f.close()
 
1752
                return
 
1753
        raise IndexError("unable to renew non-existent lease")
 
1754
 
 
1755
    def test_expire_age(self):
 
1756
        basedir = "storage/LeaseCrawler/expire_age"
 
1757
        fileutil.make_dirs(basedir)
 
1758
        # setting expiration_time to 2000 means that any lease which is more
 
1759
        # than 2000s old will be expired.
 
1760
        ss = InstrumentedStorageServer(basedir, "\x00" * 20,
 
1761
                                       expiration_enabled=True,
 
1762
                                       expiration_mode="age",
 
1763
                                       expiration_override_lease_duration=2000)
 
1764
        # make it start sooner than usual.
 
1765
        lc = ss.lease_checker
 
1766
        lc.slow_start = 0
 
1767
        lc.stop_after_first_bucket = True
 
1768
        webstatus = StorageStatus(ss)
 
1769
 
 
1770
        # create a few shares, with some leases on them
 
1771
        self.make_shares(ss)
 
1772
        [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
 
1773
 
 
1774
        def count_shares(si):
 
1775
            return len(list(ss._iter_share_files(si)))
 
1776
        def _get_sharefile(si):
 
1777
            return list(ss._iter_share_files(si))[0]
 
1778
        def count_leases(si):
 
1779
            return len(list(_get_sharefile(si).get_leases()))
 
1780
 
 
1781
        self.failUnlessEqual(count_shares(immutable_si_0), 1)
 
1782
        self.failUnlessEqual(count_leases(immutable_si_0), 1)
 
1783
        self.failUnlessEqual(count_shares(immutable_si_1), 1)
 
1784
        self.failUnlessEqual(count_leases(immutable_si_1), 2)
 
1785
        self.failUnlessEqual(count_shares(mutable_si_2), 1)
 
1786
        self.failUnlessEqual(count_leases(mutable_si_2), 1)
 
1787
        self.failUnlessEqual(count_shares(mutable_si_3), 1)
 
1788
        self.failUnlessEqual(count_leases(mutable_si_3), 2)
 
1789
 
 
1790
        # artificially crank back the expiration time on the first lease of
 
1791
        # each share, to make it look like it expired already (age=1000s).
 
1792
        # Some shares have an extra lease which is set to expire at the
 
1793
        # default time in 31 days from now (age=31days). We then run the
 
1794
        # crawler, which will expire the first lease, making some shares get
 
1795
        # deleted and others stay alive (with one remaining lease)
 
1796
        now = time.time()
 
1797
 
 
1798
        sf0 = _get_sharefile(immutable_si_0)
 
1799
        self.backdate_lease(sf0, self.renew_secrets[0], now - 1000)
 
1800
        sf0_size = os.stat(sf0.home).st_size
 
1801
 
 
1802
        # immutable_si_1 gets an extra lease
 
1803
        sf1 = _get_sharefile(immutable_si_1)
 
1804
        self.backdate_lease(sf1, self.renew_secrets[1], now - 1000)
 
1805
 
 
1806
        sf2 = _get_sharefile(mutable_si_2)
 
1807
        self.backdate_lease(sf2, self.renew_secrets[3], now - 1000)
 
1808
        sf2_size = os.stat(sf2.home).st_size
 
1809
 
 
1810
        # mutable_si_3 gets an extra lease
 
1811
        sf3 = _get_sharefile(mutable_si_3)
 
1812
        self.backdate_lease(sf3, self.renew_secrets[4], now - 1000)
 
1813
 
 
1814
        ss.setServiceParent(self.s)
 
1815
 
 
1816
        d = fireEventually()
 
1817
        # examine the state right after the first bucket has been processed
 
1818
        def _after_first_bucket(ignored):
 
1819
            p = lc.get_progress()
 
1820
            self.failUnless(p["cycle-in-progress"])
 
1821
        d.addCallback(_after_first_bucket)
 
1822
        d.addCallback(lambda ign: self.render1(webstatus))
 
1823
        def _check_html_in_cycle(html):
 
1824
            s = remove_tags(html)
 
1825
            # the first bucket encountered gets deleted, and its prefix
 
1826
            # happens to be about 1/5th of the way through the ring, so the
 
1827
            # predictor thinks we'll have 5 shares and that we'll delete them
 
1828
            # all. This part of the test depends upon the SIs landing right
 
1829
            # where they do now.
 
1830
            self.failUnlessIn("The remainder of this cycle is expected to "
 
1831
                              "recover: 4 shares, 4 buckets", s)
 
1832
            self.failUnlessIn("The whole cycle is expected to examine "
 
1833
                              "5 shares in 5 buckets and to recover: "
 
1834
                              "5 shares, 5 buckets", s)
 
1835
        d.addCallback(_check_html_in_cycle)
 
1836
 
 
1837
        # wait for the crawler to finish the first cycle. Two shares should
 
1838
        # have been removed
 
1839
        def _wait():
 
1840
            return bool(lc.get_state()["last-cycle-finished"] is not None)
 
1841
        d.addCallback(lambda ign: self.poll(_wait))
 
1842
 
 
1843
        def _after_first_cycle(ignored):
 
1844
            self.failUnlessEqual(count_shares(immutable_si_0), 0)
 
1845
            self.failUnlessEqual(count_shares(immutable_si_1), 1)
 
1846
            self.failUnlessEqual(count_leases(immutable_si_1), 1)
 
1847
            self.failUnlessEqual(count_shares(mutable_si_2), 0)
 
1848
            self.failUnlessEqual(count_shares(mutable_si_3), 1)
 
1849
            self.failUnlessEqual(count_leases(mutable_si_3), 1)
 
1850
 
 
1851
            s = lc.get_state()
 
1852
            last = s["history"][0]
 
1853
 
 
1854
            self.failUnlessEqual(last["expiration-enabled"], True)
 
1855
            self.failUnlessEqual(last["configured-expiration-mode"],
 
1856
                                 ("age", 2000, None, ("mutable", "immutable")))
 
1857
            self.failUnlessEqual(last["leases-per-share-histogram"], {1: 2, 2: 2})
 
1858
 
 
1859
            rec = last["space-recovered"]
 
1860
            self.failUnlessEqual(rec["examined-buckets"], 4)
 
1861
            self.failUnlessEqual(rec["examined-shares"], 4)
 
1862
            self.failUnlessEqual(rec["actual-buckets"], 2)
 
1863
            self.failUnlessEqual(rec["original-buckets"], 2)
 
1864
            self.failUnlessEqual(rec["configured-buckets"], 2)
 
1865
            self.failUnlessEqual(rec["actual-shares"], 2)
 
1866
            self.failUnlessEqual(rec["original-shares"], 2)
 
1867
            self.failUnlessEqual(rec["configured-shares"], 2)
 
1868
            size = sf0_size + sf2_size
 
1869
            self.failUnlessEqual(rec["actual-sharebytes"], size)
 
1870
            self.failUnlessEqual(rec["original-sharebytes"], size)
 
1871
            self.failUnlessEqual(rec["configured-sharebytes"], size)
 
1872
            # different platforms have different notions of "blocks used by
 
1873
            # this file", so merely assert that it's a number
 
1874
            self.failUnless(rec["actual-diskbytes"] >= 0,
 
1875
                            rec["actual-diskbytes"])
 
1876
            self.failUnless(rec["original-diskbytes"] >= 0,
 
1877
                            rec["original-diskbytes"])
 
1878
            self.failUnless(rec["configured-diskbytes"] >= 0,
 
1879
                            rec["configured-diskbytes"])
 
1880
        d.addCallback(_after_first_cycle)
 
1881
        d.addCallback(lambda ign: self.render1(webstatus))
 
1882
        def _check_html(html):
 
1883
            s = remove_tags(html)
 
1884
            self.failUnlessIn("Expiration Enabled: expired leases will be removed", s)
 
1885
            self.failUnlessIn("Leases created or last renewed more than 33 minutes ago will be considered expired.", s)
 
1886
            self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
 
1887
        d.addCallback(_check_html)
 
1888
        return d
 
1889
 
 
1890
    def test_expire_cutoff_date(self):
 
1891
        basedir = "storage/LeaseCrawler/expire_cutoff_date"
 
1892
        fileutil.make_dirs(basedir)
 
1893
        # setting cutoff-date to 2000 seconds ago means that any lease which
 
1894
        # is more than 2000s old will be expired.
 
1895
        now = time.time()
 
1896
        then = int(now - 2000)
 
1897
        ss = InstrumentedStorageServer(basedir, "\x00" * 20,
 
1898
                                       expiration_enabled=True,
 
1899
                                       expiration_mode="cutoff-date",
 
1900
                                       expiration_cutoff_date=then)
 
1901
        # make it start sooner than usual.
 
1902
        lc = ss.lease_checker
 
1903
        lc.slow_start = 0
 
1904
        lc.stop_after_first_bucket = True
 
1905
        webstatus = StorageStatus(ss)
 
1906
 
 
1907
        # create a few shares, with some leases on them
 
1908
        self.make_shares(ss)
 
1909
        [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
 
1910
 
 
1911
        def count_shares(si):
 
1912
            return len(list(ss._iter_share_files(si)))
 
1913
        def _get_sharefile(si):
 
1914
            return list(ss._iter_share_files(si))[0]
 
1915
        def count_leases(si):
 
1916
            return len(list(_get_sharefile(si).get_leases()))
 
1917
 
 
1918
        self.failUnlessEqual(count_shares(immutable_si_0), 1)
 
1919
        self.failUnlessEqual(count_leases(immutable_si_0), 1)
 
1920
        self.failUnlessEqual(count_shares(immutable_si_1), 1)
 
1921
        self.failUnlessEqual(count_leases(immutable_si_1), 2)
 
1922
        self.failUnlessEqual(count_shares(mutable_si_2), 1)
 
1923
        self.failUnlessEqual(count_leases(mutable_si_2), 1)
 
1924
        self.failUnlessEqual(count_shares(mutable_si_3), 1)
 
1925
        self.failUnlessEqual(count_leases(mutable_si_3), 2)
 
1926
 
 
1927
        # artificially crank back the expiration time on the first lease of
 
1928
        # each share, to make it look like was renewed 3000s ago. To achieve
 
1929
        # this, we need to set the expiration time to now-3000+31days. This
 
1930
        # will change when the lease format is improved to contain both
 
1931
        # create/renew time and duration.
 
1932
        new_expiration_time = now - 3000 + 31*24*60*60
 
1933
 
 
1934
        # Some shares have an extra lease which is set to expire at the
 
1935
        # default time in 31 days from now (age=31days). We then run the
 
1936
        # crawler, which will expire the first lease, making some shares get
 
1937
        # deleted and others stay alive (with one remaining lease)
 
1938
 
 
1939
        sf0 = _get_sharefile(immutable_si_0)
 
1940
        self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
 
1941
        sf0_size = os.stat(sf0.home).st_size
 
1942
 
 
1943
        # immutable_si_1 gets an extra lease
 
1944
        sf1 = _get_sharefile(immutable_si_1)
 
1945
        self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
 
1946
 
 
1947
        sf2 = _get_sharefile(mutable_si_2)
 
1948
        self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
 
1949
        sf2_size = os.stat(sf2.home).st_size
 
1950
 
 
1951
        # mutable_si_3 gets an extra lease
 
1952
        sf3 = _get_sharefile(mutable_si_3)
 
1953
        self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
 
1954
 
 
1955
        ss.setServiceParent(self.s)
 
1956
 
 
1957
        d = fireEventually()
 
1958
        # examine the state right after the first bucket has been processed
 
1959
        def _after_first_bucket(ignored):
 
1960
            p = lc.get_progress()
 
1961
            self.failUnless(p["cycle-in-progress"])
 
1962
        d.addCallback(_after_first_bucket)
 
1963
        d.addCallback(lambda ign: self.render1(webstatus))
 
1964
        def _check_html_in_cycle(html):
 
1965
            s = remove_tags(html)
 
1966
            # the first bucket encountered gets deleted, and its prefix
 
1967
            # happens to be about 1/5th of the way through the ring, so the
 
1968
            # predictor thinks we'll have 5 shares and that we'll delete them
 
1969
            # all. This part of the test depends upon the SIs landing right
 
1970
            # where they do now.
 
1971
            self.failUnlessIn("The remainder of this cycle is expected to "
 
1972
                              "recover: 4 shares, 4 buckets", s)
 
1973
            self.failUnlessIn("The whole cycle is expected to examine "
 
1974
                              "5 shares in 5 buckets and to recover: "
 
1975
                              "5 shares, 5 buckets", s)
 
1976
        d.addCallback(_check_html_in_cycle)
 
1977
 
 
1978
        # wait for the crawler to finish the first cycle. Two shares should
 
1979
        # have been removed
 
1980
        def _wait():
 
1981
            return bool(lc.get_state()["last-cycle-finished"] is not None)
 
1982
        d.addCallback(lambda ign: self.poll(_wait))
 
1983
 
 
1984
        def _after_first_cycle(ignored):
 
1985
            self.failUnlessEqual(count_shares(immutable_si_0), 0)
 
1986
            self.failUnlessEqual(count_shares(immutable_si_1), 1)
 
1987
            self.failUnlessEqual(count_leases(immutable_si_1), 1)
 
1988
            self.failUnlessEqual(count_shares(mutable_si_2), 0)
 
1989
            self.failUnlessEqual(count_shares(mutable_si_3), 1)
 
1990
            self.failUnlessEqual(count_leases(mutable_si_3), 1)
 
1991
 
 
1992
            s = lc.get_state()
 
1993
            last = s["history"][0]
 
1994
 
 
1995
            self.failUnlessEqual(last["expiration-enabled"], True)
 
1996
            self.failUnlessEqual(last["configured-expiration-mode"],
 
1997
                                 ("cutoff-date", None, then,
 
1998
                                  ("mutable", "immutable")))
 
1999
            self.failUnlessEqual(last["leases-per-share-histogram"],
 
2000
                                 {1: 2, 2: 2})
 
2001
 
 
2002
            rec = last["space-recovered"]
 
2003
            self.failUnlessEqual(rec["examined-buckets"], 4)
 
2004
            self.failUnlessEqual(rec["examined-shares"], 4)
 
2005
            self.failUnlessEqual(rec["actual-buckets"], 2)
 
2006
            self.failUnlessEqual(rec["original-buckets"], 0)
 
2007
            self.failUnlessEqual(rec["configured-buckets"], 2)
 
2008
            self.failUnlessEqual(rec["actual-shares"], 2)
 
2009
            self.failUnlessEqual(rec["original-shares"], 0)
 
2010
            self.failUnlessEqual(rec["configured-shares"], 2)
 
2011
            size = sf0_size + sf2_size
 
2012
            self.failUnlessEqual(rec["actual-sharebytes"], size)
 
2013
            self.failUnlessEqual(rec["original-sharebytes"], 0)
 
2014
            self.failUnlessEqual(rec["configured-sharebytes"], size)
 
2015
            # different platforms have different notions of "blocks used by
 
2016
            # this file", so merely assert that it's a number
 
2017
            self.failUnless(rec["actual-diskbytes"] >= 0,
 
2018
                            rec["actual-diskbytes"])
 
2019
            self.failUnless(rec["original-diskbytes"] >= 0,
 
2020
                            rec["original-diskbytes"])
 
2021
            self.failUnless(rec["configured-diskbytes"] >= 0,
 
2022
                            rec["configured-diskbytes"])
 
2023
        d.addCallback(_after_first_cycle)
 
2024
        d.addCallback(lambda ign: self.render1(webstatus))
 
2025
        def _check_html(html):
 
2026
            s = remove_tags(html)
 
2027
            self.failUnlessIn("Expiration Enabled:"
 
2028
                              " expired leases will be removed", s)
 
2029
            date = time.strftime("%Y-%m-%d (%d-%b-%Y) UTC", time.gmtime(then))
 
2030
            substr = "Leases created or last renewed before %s will be considered expired." % date
 
2031
            self.failUnlessIn(substr, s)
 
2032
            self.failUnlessIn(" recovered: 2 shares, 2 buckets (1 mutable / 1 immutable), ", s)
 
2033
        d.addCallback(_check_html)
 
2034
        return d
 
2035
 
 
2036
    def test_only_immutable(self):
 
2037
        basedir = "storage/LeaseCrawler/only_immutable"
 
2038
        fileutil.make_dirs(basedir)
 
2039
        now = time.time()
 
2040
        then = int(now - 2000)
 
2041
        ss = StorageServer(basedir, "\x00" * 20,
 
2042
                           expiration_enabled=True,
 
2043
                           expiration_mode="cutoff-date",
 
2044
                           expiration_cutoff_date=then,
 
2045
                           expiration_sharetypes=("immutable",))
 
2046
        lc = ss.lease_checker
 
2047
        lc.slow_start = 0
 
2048
        webstatus = StorageStatus(ss)
 
2049
 
 
2050
        self.make_shares(ss)
 
2051
        [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
 
2052
        # set all leases to be expirable
 
2053
        new_expiration_time = now - 3000 + 31*24*60*60
 
2054
 
 
2055
        def count_shares(si):
 
2056
            return len(list(ss._iter_share_files(si)))
 
2057
        def _get_sharefile(si):
 
2058
            return list(ss._iter_share_files(si))[0]
 
2059
        def count_leases(si):
 
2060
            return len(list(_get_sharefile(si).get_leases()))
 
2061
 
 
2062
        sf0 = _get_sharefile(immutable_si_0)
 
2063
        self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
 
2064
        sf1 = _get_sharefile(immutable_si_1)
 
2065
        self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
 
2066
        self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
 
2067
        sf2 = _get_sharefile(mutable_si_2)
 
2068
        self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
 
2069
        sf3 = _get_sharefile(mutable_si_3)
 
2070
        self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
 
2071
        self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
 
2072
 
 
2073
        ss.setServiceParent(self.s)
 
2074
        def _wait():
 
2075
            return bool(lc.get_state()["last-cycle-finished"] is not None)
 
2076
        d = self.poll(_wait)
 
2077
 
 
2078
        def _after_first_cycle(ignored):
 
2079
            self.failUnlessEqual(count_shares(immutable_si_0), 0)
 
2080
            self.failUnlessEqual(count_shares(immutable_si_1), 0)
 
2081
            self.failUnlessEqual(count_shares(mutable_si_2), 1)
 
2082
            self.failUnlessEqual(count_leases(mutable_si_2), 1)
 
2083
            self.failUnlessEqual(count_shares(mutable_si_3), 1)
 
2084
            self.failUnlessEqual(count_leases(mutable_si_3), 2)
 
2085
        d.addCallback(_after_first_cycle)
 
2086
        d.addCallback(lambda ign: self.render1(webstatus))
 
2087
        def _check_html(html):
 
2088
            s = remove_tags(html)
 
2089
            self.failUnlessIn("The following sharetypes will be expired: immutable.", s)
 
2090
        d.addCallback(_check_html)
 
2091
        return d
 
2092
 
 
2093
    def test_only_mutable(self):
 
2094
        basedir = "storage/LeaseCrawler/only_mutable"
 
2095
        fileutil.make_dirs(basedir)
 
2096
        now = time.time()
 
2097
        then = int(now - 2000)
 
2098
        ss = StorageServer(basedir, "\x00" * 20,
 
2099
                           expiration_enabled=True,
 
2100
                           expiration_mode="cutoff-date",
 
2101
                           expiration_cutoff_date=then,
 
2102
                           expiration_sharetypes=("mutable",))
 
2103
        lc = ss.lease_checker
 
2104
        lc.slow_start = 0
 
2105
        webstatus = StorageStatus(ss)
 
2106
 
 
2107
        self.make_shares(ss)
 
2108
        [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
 
2109
        # set all leases to be expirable
 
2110
        new_expiration_time = now - 3000 + 31*24*60*60
 
2111
 
 
2112
        def count_shares(si):
 
2113
            return len(list(ss._iter_share_files(si)))
 
2114
        def _get_sharefile(si):
 
2115
            return list(ss._iter_share_files(si))[0]
 
2116
        def count_leases(si):
 
2117
            return len(list(_get_sharefile(si).get_leases()))
 
2118
 
 
2119
        sf0 = _get_sharefile(immutable_si_0)
 
2120
        self.backdate_lease(sf0, self.renew_secrets[0], new_expiration_time)
 
2121
        sf1 = _get_sharefile(immutable_si_1)
 
2122
        self.backdate_lease(sf1, self.renew_secrets[1], new_expiration_time)
 
2123
        self.backdate_lease(sf1, self.renew_secrets[2], new_expiration_time)
 
2124
        sf2 = _get_sharefile(mutable_si_2)
 
2125
        self.backdate_lease(sf2, self.renew_secrets[3], new_expiration_time)
 
2126
        sf3 = _get_sharefile(mutable_si_3)
 
2127
        self.backdate_lease(sf3, self.renew_secrets[4], new_expiration_time)
 
2128
        self.backdate_lease(sf3, self.renew_secrets[5], new_expiration_time)
 
2129
 
 
2130
        ss.setServiceParent(self.s)
 
2131
        def _wait():
 
2132
            return bool(lc.get_state()["last-cycle-finished"] is not None)
 
2133
        d = self.poll(_wait)
 
2134
 
 
2135
        def _after_first_cycle(ignored):
 
2136
            self.failUnlessEqual(count_shares(immutable_si_0), 1)
 
2137
            self.failUnlessEqual(count_leases(immutable_si_0), 1)
 
2138
            self.failUnlessEqual(count_shares(immutable_si_1), 1)
 
2139
            self.failUnlessEqual(count_leases(immutable_si_1), 2)
 
2140
            self.failUnlessEqual(count_shares(mutable_si_2), 0)
 
2141
            self.failUnlessEqual(count_shares(mutable_si_3), 0)
 
2142
        d.addCallback(_after_first_cycle)
 
2143
        d.addCallback(lambda ign: self.render1(webstatus))
 
2144
        def _check_html(html):
 
2145
            s = remove_tags(html)
 
2146
            self.failUnlessIn("The following sharetypes will be expired: mutable.", s)
 
2147
        d.addCallback(_check_html)
 
2148
        return d
 
2149
 
 
2150
    def test_bad_mode(self):
 
2151
        basedir = "storage/LeaseCrawler/bad_mode"
 
2152
        fileutil.make_dirs(basedir)
 
2153
        e = self.failUnlessRaises(ValueError,
 
2154
                                  StorageServer, basedir, "\x00" * 20,
 
2155
                                  expiration_mode="bogus")
 
2156
        self.failUnless("GC mode 'bogus' must be 'age' or 'cutoff-date'" in str(e), str(e))
 
2157
 
 
2158
    def test_parse_duration(self):
 
2159
        DAY = 24*60*60
 
2160
        MONTH = 31*DAY
 
2161
        YEAR = 365*DAY
 
2162
        p = time_format.parse_duration
 
2163
        self.failUnlessEqual(p("7days"), 7*DAY)
 
2164
        self.failUnlessEqual(p("31day"), 31*DAY)
 
2165
        self.failUnlessEqual(p("60 days"), 60*DAY)
 
2166
        self.failUnlessEqual(p("2mo"), 2*MONTH)
 
2167
        self.failUnlessEqual(p("3 month"), 3*MONTH)
 
2168
        self.failUnlessEqual(p("2years"), 2*YEAR)
 
2169
        e = self.failUnlessRaises(ValueError, p, "2kumquats")
 
2170
        self.failUnless("no unit (like day, month, or year) in '2kumquats'"
 
2171
                        in str(e), str(e))
 
2172
 
 
2173
    def test_parse_date(self):
 
2174
        p = time_format.parse_date
 
2175
        self.failUnless(isinstance(p("2009-03-18"), int))
 
2176
        self.failUnlessEqual(p("2009-03-18"), 1237334400)
 
2177
 
 
2178
    def test_limited_history(self):
 
2179
        basedir = "storage/LeaseCrawler/limited_history"
 
2180
        fileutil.make_dirs(basedir)
 
2181
        ss = StorageServer(basedir, "\x00" * 20)
 
2182
        # make it start sooner than usual.
 
2183
        lc = ss.lease_checker
 
2184
        lc.slow_start = 0
 
2185
        lc.cpu_slice = 500
 
2186
 
 
2187
        # create a few shares, with some leases on them
 
2188
        self.make_shares(ss)
 
2189
 
 
2190
        ss.setServiceParent(self.s)
 
2191
 
 
2192
        def _wait_until_15_cycles_done():
 
2193
            last = lc.state["last-cycle-finished"]
 
2194
            if last is not None and last >= 15:
 
2195
                return True
 
2196
            if lc.timer:
 
2197
                lc.timer.reset(0)
 
2198
            return False
 
2199
        d = self.poll(_wait_until_15_cycles_done)
 
2200
 
 
2201
        def _check(ignored):
 
2202
            s = lc.get_state()
 
2203
            h = s["history"]
 
2204
            self.failUnlessEqual(len(h), 10)
 
2205
            self.failUnlessEqual(max(h.keys()), 15)
 
2206
            self.failUnlessEqual(min(h.keys()), 6)
 
2207
        d.addCallback(_check)
 
2208
        return d
 
2209
 
 
2210
    def test_unpredictable_future(self):
 
2211
        basedir = "storage/LeaseCrawler/unpredictable_future"
 
2212
        fileutil.make_dirs(basedir)
 
2213
        ss = StorageServer(basedir, "\x00" * 20)
 
2214
        # make it start sooner than usual.
 
2215
        lc = ss.lease_checker
 
2216
        lc.slow_start = 0
 
2217
        lc.cpu_slice = -1.0 # stop quickly
 
2218
 
 
2219
        self.make_shares(ss)
 
2220
 
 
2221
        ss.setServiceParent(self.s)
 
2222
 
 
2223
        d = fireEventually()
 
2224
        def _check(ignored):
 
2225
            # this should fire after the first bucket is complete, but before
 
2226
            # the first prefix is complete, so the progress-measurer won't
 
2227
            # think we've gotten far enough to raise our percent-complete
 
2228
            # above 0%, triggering the cannot-predict-the-future code in
 
2229
            # expirer.py . This will have to change if/when the
 
2230
            # progress-measurer gets smart enough to count buckets (we'll
 
2231
            # have to interrupt it even earlier, before it's finished the
 
2232
            # first bucket).
 
2233
            s = lc.get_state()
 
2234
            self.failUnless("cycle-to-date" in s)
 
2235
            self.failUnless("estimated-remaining-cycle" in s)
 
2236
            self.failUnless("estimated-current-cycle" in s)
 
2237
 
 
2238
            left = s["estimated-remaining-cycle"]["space-recovered"]
 
2239
            self.failUnlessEqual(left["actual-buckets"], None)
 
2240
            self.failUnlessEqual(left["original-buckets"], None)
 
2241
            self.failUnlessEqual(left["configured-buckets"], None)
 
2242
            self.failUnlessEqual(left["actual-shares"], None)
 
2243
            self.failUnlessEqual(left["original-shares"], None)
 
2244
            self.failUnlessEqual(left["configured-shares"], None)
 
2245
            self.failUnlessEqual(left["actual-diskbytes"], None)
 
2246
            self.failUnlessEqual(left["original-diskbytes"], None)
 
2247
            self.failUnlessEqual(left["configured-diskbytes"], None)
 
2248
            self.failUnlessEqual(left["actual-sharebytes"], None)
 
2249
            self.failUnlessEqual(left["original-sharebytes"], None)
 
2250
            self.failUnlessEqual(left["configured-sharebytes"], None)
 
2251
 
 
2252
            full = s["estimated-remaining-cycle"]["space-recovered"]
 
2253
            self.failUnlessEqual(full["actual-buckets"], None)
 
2254
            self.failUnlessEqual(full["original-buckets"], None)
 
2255
            self.failUnlessEqual(full["configured-buckets"], None)
 
2256
            self.failUnlessEqual(full["actual-shares"], None)
 
2257
            self.failUnlessEqual(full["original-shares"], None)
 
2258
            self.failUnlessEqual(full["configured-shares"], None)
 
2259
            self.failUnlessEqual(full["actual-diskbytes"], None)
 
2260
            self.failUnlessEqual(full["original-diskbytes"], None)
 
2261
            self.failUnlessEqual(full["configured-diskbytes"], None)
 
2262
            self.failUnlessEqual(full["actual-sharebytes"], None)
 
2263
            self.failUnlessEqual(full["original-sharebytes"], None)
 
2264
            self.failUnlessEqual(full["configured-sharebytes"], None)
 
2265
 
 
2266
        d.addCallback(_check)
 
2267
        return d
 
2268
 
 
2269
    def test_no_st_blocks(self):
 
2270
        basedir = "storage/LeaseCrawler/no_st_blocks"
 
2271
        fileutil.make_dirs(basedir)
 
2272
        ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
 
2273
                                        expiration_mode="age",
 
2274
                                        expiration_override_lease_duration=-1000)
 
2275
        # a negative expiration_time= means the "configured-"
 
2276
        # space-recovered counts will be non-zero, since all shares will have
 
2277
        # expired by then
 
2278
 
 
2279
        # make it start sooner than usual.
 
2280
        lc = ss.lease_checker
 
2281
        lc.slow_start = 0
 
2282
 
 
2283
        self.make_shares(ss)
 
2284
        ss.setServiceParent(self.s)
 
2285
        def _wait():
 
2286
            return bool(lc.get_state()["last-cycle-finished"] is not None)
 
2287
        d = self.poll(_wait)
 
2288
 
 
2289
        def _check(ignored):
 
2290
            s = lc.get_state()
 
2291
            last = s["history"][0]
 
2292
            rec = last["space-recovered"]
 
2293
            self.failUnlessEqual(rec["configured-buckets"], 4)
 
2294
            self.failUnlessEqual(rec["configured-shares"], 4)
 
2295
            self.failUnless(rec["configured-sharebytes"] > 0,
 
2296
                            rec["configured-sharebytes"])
 
2297
            # without the .st_blocks field in os.stat() results, we should be
 
2298
            # reporting diskbytes==sharebytes
 
2299
            self.failUnlessEqual(rec["configured-sharebytes"],
 
2300
                                 rec["configured-diskbytes"])
 
2301
        d.addCallback(_check)
 
2302
        return d
 
2303
 
 
2304
    def test_share_corruption(self):
 
2305
        self._poll_should_ignore_these_errors = [
 
2306
            UnknownMutableContainerVersionError,
 
2307
            UnknownImmutableContainerVersionError,
 
2308
            ]
 
2309
        basedir = "storage/LeaseCrawler/share_corruption"
 
2310
        fileutil.make_dirs(basedir)
 
2311
        ss = InstrumentedStorageServer(basedir, "\x00" * 20)
 
2312
        w = StorageStatus(ss)
 
2313
        # make it start sooner than usual.
 
2314
        lc = ss.lease_checker
 
2315
        lc.stop_after_first_bucket = True
 
2316
        lc.slow_start = 0
 
2317
        lc.cpu_slice = 500
 
2318
 
 
2319
        # create a few shares, with some leases on them
 
2320
        self.make_shares(ss)
 
2321
 
 
2322
        # now corrupt one, and make sure the lease-checker keeps going
 
2323
        [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3] = self.sis
 
2324
        first = min(self.sis)
 
2325
        first_b32 = base32.b2a(first)
 
2326
        fn = os.path.join(ss.sharedir, storage_index_to_dir(first), "0")
 
2327
        f = open(fn, "rb+")
 
2328
        f.seek(0)
 
2329
        f.write("BAD MAGIC")
 
2330
        f.close()
 
2331
        # if get_share_file() doesn't see the correct mutable magic, it
 
2332
        # assumes the file is an immutable share, and then
 
2333
        # immutable.ShareFile sees a bad version. So regardless of which kind
 
2334
        # of share we corrupted, this will trigger an
 
2335
        # UnknownImmutableContainerVersionError.
 
2336
 
 
2337
        # also create an empty bucket
 
2338
        empty_si = base32.b2a("\x04"*16)
 
2339
        empty_bucket_dir = os.path.join(ss.sharedir,
 
2340
                                        storage_index_to_dir(empty_si))
 
2341
        fileutil.make_dirs(empty_bucket_dir)
 
2342
 
 
2343
        ss.setServiceParent(self.s)
 
2344
 
 
2345
        d = fireEventually()
 
2346
 
 
2347
        # now examine the state right after the first bucket has been
 
2348
        # processed.
 
2349
        def _after_first_bucket(ignored):
 
2350
            so_far = lc.get_state()["cycle-to-date"]
 
2351
            rec = so_far["space-recovered"]
 
2352
            self.failUnlessEqual(rec["examined-buckets"], 1)
 
2353
            self.failUnlessEqual(rec["examined-shares"], 0)
 
2354
            self.failUnlessEqual(so_far["corrupt-shares"], [(first_b32, 0)])
 
2355
        d.addCallback(_after_first_bucket)
 
2356
 
 
2357
        d.addCallback(lambda ign: self.render_json(w))
 
2358
        def _check_json(json):
 
2359
            data = simplejson.loads(json)
 
2360
            # grr. json turns all dict keys into strings.
 
2361
            so_far = data["lease-checker"]["cycle-to-date"]
 
2362
            corrupt_shares = so_far["corrupt-shares"]
 
2363
            # it also turns all tuples into lists
 
2364
            self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
 
2365
        d.addCallback(_check_json)
 
2366
        d.addCallback(lambda ign: self.render1(w))
 
2367
        def _check_html(html):
 
2368
            s = remove_tags(html)
 
2369
            self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
 
2370
        d.addCallback(_check_html)
 
2371
 
 
2372
        def _wait():
 
2373
            return bool(lc.get_state()["last-cycle-finished"] is not None)
 
2374
        d.addCallback(lambda ign: self.poll(_wait))
 
2375
 
 
2376
        def _after_first_cycle(ignored):
 
2377
            s = lc.get_state()
 
2378
            last = s["history"][0]
 
2379
            rec = last["space-recovered"]
 
2380
            self.failUnlessEqual(rec["examined-buckets"], 5)
 
2381
            self.failUnlessEqual(rec["examined-shares"], 3)
 
2382
            self.failUnlessEqual(last["corrupt-shares"], [(first_b32, 0)])
 
2383
        d.addCallback(_after_first_cycle)
 
2384
        d.addCallback(lambda ign: self.render_json(w))
 
2385
        def _check_json_history(json):
 
2386
            data = simplejson.loads(json)
 
2387
            last = data["lease-checker"]["history"]["0"]
 
2388
            corrupt_shares = last["corrupt-shares"]
 
2389
            self.failUnlessEqual(corrupt_shares, [[first_b32, 0]])
 
2390
        d.addCallback(_check_json_history)
 
2391
        d.addCallback(lambda ign: self.render1(w))
 
2392
        def _check_html_history(html):
 
2393
            s = remove_tags(html)
 
2394
            self.failUnlessIn("Corrupt shares: SI %s shnum 0" % first_b32, s)
 
2395
        d.addCallback(_check_html_history)
 
2396
 
 
2397
        def _cleanup(res):
 
2398
            self.flushLoggedErrors(UnknownMutableContainerVersionError,
 
2399
                                   UnknownImmutableContainerVersionError)
 
2400
            return res
 
2401
        d.addBoth(_cleanup)
 
2402
        return d
 
2403
 
 
2404
    def render_json(self, page):
 
2405
        d = self.render1(page, args={"t": ["json"]})
 
2406
        return d
 
2407
 
 
2408
class NoStatvfsServer(StorageServer):
 
2409
    def do_statvfs(self):
 
2410
        raise AttributeError
 
2411
 
 
2412
class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
 
2413
 
 
2414
    def setUp(self):
 
2415
        self.s = service.MultiService()
 
2416
        self.s.startService()
 
2417
    def tearDown(self):
 
2418
        return self.s.stopService()
 
2419
 
 
2420
    def test_no_server(self):
 
2421
        w = StorageStatus(None)
 
2422
        html = w.renderSynchronously()
 
2423
        self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
 
2424
 
 
2425
    def test_status(self):
 
2426
        basedir = "storage/WebStatus/status"
 
2427
        fileutil.make_dirs(basedir)
 
2428
        ss = StorageServer(basedir, "\x00" * 20)
 
2429
        ss.setServiceParent(self.s)
 
2430
        w = StorageStatus(ss)
 
2431
        d = self.render1(w)
 
2432
        def _check_html(html):
 
2433
            self.failUnless("<h1>Storage Server Status</h1>" in html, html)
 
2434
            s = remove_tags(html)
 
2435
            self.failUnless("Accepting new shares: Yes" in s, s)
 
2436
            self.failUnless("Reserved space: - 0 B (0)" in s, s)
 
2437
        d.addCallback(_check_html)
 
2438
        d.addCallback(lambda ign: self.render_json(w))
 
2439
        def _check_json(json):
 
2440
            data = simplejson.loads(json)
 
2441
            s = data["stats"]
 
2442
            self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
 
2443
            self.failUnlessEqual(s["storage_server.reserved_space"], 0)
 
2444
            self.failUnless("bucket-counter" in data)
 
2445
            self.failUnless("lease-checker" in data)
 
2446
        d.addCallback(_check_json)
 
2447
        return d
 
2448
 
 
2449
    def render_json(self, page):
 
2450
        d = self.render1(page, args={"t": ["json"]})
 
2451
        return d
 
2452
 
 
2453
    def test_status_no_statvfs(self):
 
2454
        # windows has no os.statvfs . Make sure the code handles that even on
 
2455
        # unix.
 
2456
        basedir = "storage/WebStatus/status_no_statvfs"
 
2457
        fileutil.make_dirs(basedir)
 
2458
        ss = NoStatvfsServer(basedir, "\x00" * 20)
 
2459
        ss.setServiceParent(self.s)
 
2460
        w = StorageStatus(ss)
 
2461
        html = w.renderSynchronously()
 
2462
        self.failUnless("<h1>Storage Server Status</h1>" in html, html)
 
2463
        s = remove_tags(html)
 
2464
        self.failUnless("Accepting new shares: Yes" in s, s)
 
2465
        self.failUnless("Total disk space: ?" in s, s)
 
2466
 
 
2467
    def test_readonly(self):
 
2468
        basedir = "storage/WebStatus/readonly"
 
2469
        fileutil.make_dirs(basedir)
 
2470
        ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
 
2471
        ss.setServiceParent(self.s)
 
2472
        w = StorageStatus(ss)
 
2473
        html = w.renderSynchronously()
 
2474
        self.failUnless("<h1>Storage Server Status</h1>" in html, html)
 
2475
        s = remove_tags(html)
 
2476
        self.failUnless("Accepting new shares: No" in s, s)
 
2477
 
 
2478
    def test_reserved(self):
 
2479
        basedir = "storage/WebStatus/reserved"
 
2480
        fileutil.make_dirs(basedir)
 
2481
        ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
 
2482
        ss.setServiceParent(self.s)
 
2483
        w = StorageStatus(ss)
 
2484
        html = w.renderSynchronously()
 
2485
        self.failUnless("<h1>Storage Server Status</h1>" in html, html)
 
2486
        s = remove_tags(html)
 
2487
        self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
 
2488
 
 
2489
    def test_huge_reserved(self):
 
2490
        basedir = "storage/WebStatus/reserved"
 
2491
        fileutil.make_dirs(basedir)
 
2492
        ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
 
2493
        ss.setServiceParent(self.s)
 
2494
        w = StorageStatus(ss)
 
2495
        html = w.renderSynchronously()
 
2496
        self.failUnless("<h1>Storage Server Status</h1>" in html, html)
 
2497
        s = remove_tags(html)
 
2498
        self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
 
2499
 
 
2500
    def test_util(self):
 
2501
        w = StorageStatus(None)
 
2502
        self.failUnlessEqual(w.render_space(None, None), "?")
 
2503
        self.failUnlessEqual(w.render_space(None, 10e6), "10000000")
 
2504
        self.failUnlessEqual(w.render_abbrev_space(None, None), "?")
 
2505
        self.failUnlessEqual(w.render_abbrev_space(None, 10e6), "10.00 MB")
 
2506
        self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
 
2507
        self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
 
2508