~ubuntu-branches/ubuntu/karmic/tahoe-lafs/karmic

« back to all changes in this revision

Viewing changes to src/allmydata/client.py

  • Committer: Bazaar Package Importer
  • Author(s): Zooko O'Whielacronx (Hacker)
  • Date: 2009-09-24 00:00:05 UTC
  • Revision ID: james.westby@ubuntu.com-20090924000005-ixe2n4yngmk49ysz
Tags: upstream-1.5.0
ImportĀ upstreamĀ versionĀ 1.5.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os, stat, time, weakref
 
2
from allmydata.interfaces import RIStorageServer
 
3
from allmydata import node
 
4
 
 
5
from zope.interface import implements
 
6
from twisted.internet import reactor
 
7
from twisted.application.internet import TimerService
 
8
from foolscap.api import Referenceable
 
9
from pycryptopp.publickey import rsa
 
10
 
 
11
import allmydata
 
12
from allmydata.storage.server import StorageServer
 
13
from allmydata import storage_client
 
14
from allmydata.immutable.upload import Uploader
 
15
from allmydata.immutable.download import Downloader
 
16
from allmydata.immutable.filenode import FileNode, LiteralFileNode
 
17
from allmydata.immutable.offloaded import Helper
 
18
from allmydata.control import ControlServer
 
19
from allmydata.introducer.client import IntroducerClient
 
20
from allmydata.util import hashutil, base32, pollmixin, cachedir, log
 
21
from allmydata.util.abbreviate import parse_abbreviated_size
 
22
from allmydata.util.time_format import parse_duration, parse_date
 
23
from allmydata.uri import LiteralFileURI, UnknownURI
 
24
from allmydata.dirnode import DirectoryNode
 
25
from allmydata.mutable.filenode import MutableFileNode
 
26
from allmydata.unknown import UnknownNode
 
27
from allmydata.stats import StatsProvider
 
28
from allmydata.history import History
 
29
from allmydata.interfaces import IURI, IDirectoryURI, IStatsProducer, \
 
30
     IReadonlyDirectoryURI, IFileURI, IMutableFileURI, RIStubClient, \
 
31
     UnhandledCapTypeError
 
32
 
 
33
KiB=1024
 
34
MiB=1024*KiB
 
35
GiB=1024*MiB
 
36
TiB=1024*GiB
 
37
PiB=1024*TiB
 
38
 
 
39
class StubClient(Referenceable):
 
40
    implements(RIStubClient)
 
41
 
 
42
def _make_secret():
 
43
    return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
 
44
 
 
45
class Client(node.Node, pollmixin.PollMixin):
 
46
    implements(IStatsProducer)
 
47
 
 
48
    PORTNUMFILE = "client.port"
 
49
    STOREDIR = 'storage'
 
50
    NODETYPE = "client"
 
51
    SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
 
52
 
 
53
    # This means that if a storage server treats me as though I were a
 
54
    # 1.0.0 storage client, it will work as they expect.
 
55
    OLDEST_SUPPORTED_VERSION = "1.0.0"
 
56
 
 
57
    # this is a tuple of (needed, desired, total, max_segment_size). 'needed'
 
58
    # is the number of shares required to reconstruct a file. 'desired' means
 
59
    # that we will abort an upload unless we can allocate space for at least
 
60
    # this many. 'total' is the total number of shares created by encoding.
 
61
    # If everybody has room then this is is how many we will upload.
 
62
    DEFAULT_ENCODING_PARAMETERS = {"k": 3,
 
63
                                   "happy": 7,
 
64
                                   "n": 10,
 
65
                                   "max_segment_size": 128*KiB,
 
66
                                   }
 
67
 
 
68
    # set this to override the size of the RSA keys created for new mutable
 
69
    # files. The default of None means to let mutable.filenode choose its own
 
70
    # size, which means 2048 bits.
 
71
    DEFAULT_MUTABLE_KEYSIZE = None
 
72
 
 
73
    def __init__(self, basedir="."):
 
74
        node.Node.__init__(self, basedir)
 
75
        self.started_timestamp = time.time()
 
76
        self.logSource="Client"
 
77
        self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
 
78
        self.init_introducer_client()
 
79
        self.init_stats_provider()
 
80
        self.init_lease_secret()
 
81
        self.init_storage()
 
82
        self.init_control()
 
83
        if self.get_config("helper", "enabled", False, boolean=True):
 
84
            self.init_helper()
 
85
        self.init_client()
 
86
        self._key_generator = None
 
87
        key_gen_furl = self.get_config("client", "key_generator.furl", None)
 
88
        if key_gen_furl:
 
89
            self.init_key_gen(key_gen_furl)
 
90
        # ControlServer and Helper are attached after Tub startup
 
91
        self.init_ftp_server()
 
92
        self.init_sftp_server()
 
93
 
 
94
        hotline_file = os.path.join(self.basedir,
 
95
                                    self.SUICIDE_PREVENTION_HOTLINE_FILE)
 
96
        if os.path.exists(hotline_file):
 
97
            age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
 
98
            self.log("hotline file noticed (%ds old), starting timer" % age)
 
99
            hotline = TimerService(1.0, self._check_hotline, hotline_file)
 
100
            hotline.setServiceParent(self)
 
101
 
 
102
        # this needs to happen last, so it can use getServiceNamed() to
 
103
        # acquire references to StorageServer and other web-statusable things
 
104
        webport = self.get_config("node", "web.port", None)
 
105
        if webport:
 
106
            self.init_web(webport) # strports string
 
107
 
 
108
    def read_old_config_files(self):
 
109
        node.Node.read_old_config_files(self)
 
110
        copy = self._copy_config_from_file
 
111
        copy("introducer.furl", "client", "introducer.furl")
 
112
        copy("helper.furl", "client", "helper.furl")
 
113
        copy("key_generator.furl", "client", "key_generator.furl")
 
114
        copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
 
115
        if os.path.exists(os.path.join(self.basedir, "no_storage")):
 
116
            self.set_config("storage", "enabled", "false")
 
117
        if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
 
118
            self.set_config("storage", "readonly", "true")
 
119
        if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
 
120
            self.set_config("storage", "debug_discard", "true")
 
121
        if os.path.exists(os.path.join(self.basedir, "run_helper")):
 
122
            self.set_config("helper", "enabled", "true")
 
123
 
 
124
    def init_introducer_client(self):
 
125
        self.introducer_furl = self.get_config("client", "introducer.furl")
 
126
        ic = IntroducerClient(self.tub, self.introducer_furl,
 
127
                              self.nickname,
 
128
                              str(allmydata.__full_version__),
 
129
                              str(self.OLDEST_SUPPORTED_VERSION))
 
130
        self.introducer_client = ic
 
131
        # hold off on starting the IntroducerClient until our tub has been
 
132
        # started, so we'll have a useful address on our RemoteReference, so
 
133
        # that the introducer's status page will show us.
 
134
        d = self.when_tub_ready()
 
135
        def _start_introducer_client(res):
 
136
            ic.setServiceParent(self)
 
137
        d.addCallback(_start_introducer_client)
 
138
        d.addErrback(log.err, facility="tahoe.init",
 
139
                     level=log.BAD, umid="URyI5w")
 
140
 
 
141
    def init_stats_provider(self):
 
142
        gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
 
143
        self.stats_provider = StatsProvider(self, gatherer_furl)
 
144
        self.add_service(self.stats_provider)
 
145
        self.stats_provider.register_producer(self)
 
146
 
 
147
    def get_stats(self):
 
148
        return { 'node.uptime': time.time() - self.started_timestamp }
 
149
 
 
150
    def init_lease_secret(self):
 
151
        secret_s = self.get_or_create_private_config("secret", _make_secret)
 
152
        self._lease_secret = base32.a2b(secret_s)
 
153
 
 
154
    def init_storage(self):
 
155
        # should we run a storage server (and publish it for others to use)?
 
156
        if not self.get_config("storage", "enabled", True, boolean=True):
 
157
            return
 
158
        readonly = self.get_config("storage", "readonly", False, boolean=True)
 
159
 
 
160
        storedir = os.path.join(self.basedir, self.STOREDIR)
 
161
 
 
162
        data = self.get_config("storage", "reserved_space", None)
 
163
        reserved = None
 
164
        try:
 
165
            reserved = parse_abbreviated_size(data)
 
166
        except ValueError:
 
167
            log.msg("[storage]reserved_space= contains unparseable value %s"
 
168
                    % data)
 
169
        if reserved is None:
 
170
            reserved = 0
 
171
        discard = self.get_config("storage", "debug_discard", False,
 
172
                                  boolean=True)
 
173
 
 
174
        expire = self.get_config("storage", "expire.enabled", False, boolean=True)
 
175
        if expire:
 
176
            mode = self.get_config("storage", "expire.mode") # require a mode
 
177
        else:
 
178
            mode = self.get_config("storage", "expire.mode", "age")
 
179
 
 
180
        o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
 
181
        if o_l_d is not None:
 
182
            o_l_d = parse_duration(o_l_d)
 
183
 
 
184
        cutoff_date = None
 
185
        if mode == "cutoff-date":
 
186
            cutoff_date = self.get_config("storage", "expire.cutoff_date")
 
187
            cutoff_date = parse_date(cutoff_date)
 
188
 
 
189
        sharetypes = []
 
190
        if self.get_config("storage", "expire.immutable", True, boolean=True):
 
191
            sharetypes.append("immutable")
 
192
        if self.get_config("storage", "expire.mutable", True, boolean=True):
 
193
            sharetypes.append("mutable")
 
194
        expiration_sharetypes = tuple(sharetypes)
 
195
 
 
196
        ss = StorageServer(storedir, self.nodeid,
 
197
                           reserved_space=reserved,
 
198
                           discard_storage=discard,
 
199
                           readonly_storage=readonly,
 
200
                           stats_provider=self.stats_provider,
 
201
                           expiration_enabled=expire,
 
202
                           expiration_mode=mode,
 
203
                           expiration_override_lease_duration=o_l_d,
 
204
                           expiration_cutoff_date=cutoff_date,
 
205
                           expiration_sharetypes=expiration_sharetypes)
 
206
        self.add_service(ss)
 
207
 
 
208
        d = self.when_tub_ready()
 
209
        # we can't do registerReference until the Tub is ready
 
210
        def _publish(res):
 
211
            furl_file = os.path.join(self.basedir, "private", "storage.furl")
 
212
            furl = self.tub.registerReference(ss, furlFile=furl_file)
 
213
            ri_name = RIStorageServer.__remote_name__
 
214
            self.introducer_client.publish(furl, "storage", ri_name)
 
215
        d.addCallback(_publish)
 
216
        d.addErrback(log.err, facility="tahoe.init",
 
217
                     level=log.BAD, umid="aLGBKw")
 
218
 
 
219
    def init_client(self):
 
220
        helper_furl = self.get_config("client", "helper.furl", None)
 
221
        DEP = self.DEFAULT_ENCODING_PARAMETERS
 
222
        DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
 
223
        DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
 
224
        DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
 
225
        convergence_s = self.get_or_create_private_config('convergence', _make_secret)
 
226
        self.convergence = base32.a2b(convergence_s)
 
227
        self._node_cache = weakref.WeakValueDictionary() # uri -> node
 
228
 
 
229
        self.init_client_storage_broker()
 
230
        self.add_service(History(self.stats_provider))
 
231
        self.add_service(Uploader(helper_furl, self.stats_provider))
 
232
        download_cachedir = os.path.join(self.basedir,
 
233
                                         "private", "cache", "download")
 
234
        self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
 
235
        self.download_cache_dirman.setServiceParent(self)
 
236
        self.add_service(Downloader(self.stats_provider))
 
237
        self.init_stub_client()
 
238
 
 
239
    def init_client_storage_broker(self):
 
240
        # create a StorageFarmBroker object, for use by Uploader/Downloader
 
241
        # (and everybody else who wants to use storage servers)
 
242
        sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
 
243
        self.storage_broker = sb
 
244
 
 
245
        # load static server specifications from tahoe.cfg, if any.
 
246
        # Not quite ready yet.
 
247
        #if self.config.has_section("client-server-selection"):
 
248
        #    server_params = {} # maps serverid to dict of parameters
 
249
        #    for (name, value) in self.config.items("client-server-selection"):
 
250
        #        pieces = name.split(".")
 
251
        #        if pieces[0] == "server":
 
252
        #            serverid = pieces[1]
 
253
        #            if serverid not in server_params:
 
254
        #                server_params[serverid] = {}
 
255
        #            server_params[serverid][pieces[2]] = value
 
256
        #    for serverid, params in server_params.items():
 
257
        #        server_type = params.pop("type")
 
258
        #        if server_type == "tahoe-foolscap":
 
259
        #            s = storage_client.NativeStorageClient(*params)
 
260
        #        else:
 
261
        #            msg = ("unrecognized server type '%s' in "
 
262
        #                   "tahoe.cfg [client-server-selection]server.%s.type"
 
263
        #                   % (server_type, serverid))
 
264
        #            raise storage_client.UnknownServerTypeError(msg)
 
265
        #        sb.add_server(s.serverid, s)
 
266
 
 
267
        # check to see if we're supposed to use the introducer too
 
268
        if self.get_config("client-server-selection", "use_introducer",
 
269
                           default=True, boolean=True):
 
270
            sb.use_introducer(self.introducer_client)
 
271
 
 
272
    def get_storage_broker(self):
 
273
        return self.storage_broker
 
274
 
 
275
    def init_stub_client(self):
 
276
        def _publish(res):
 
277
            # we publish an empty object so that the introducer can count how
 
278
            # many clients are connected and see what versions they're
 
279
            # running.
 
280
            sc = StubClient()
 
281
            furl = self.tub.registerReference(sc)
 
282
            ri_name = RIStubClient.__remote_name__
 
283
            self.introducer_client.publish(furl, "stub_client", ri_name)
 
284
        d = self.when_tub_ready()
 
285
        d.addCallback(_publish)
 
286
        d.addErrback(log.err, facility="tahoe.init",
 
287
                     level=log.BAD, umid="OEHq3g")
 
288
 
 
289
    def get_history(self):
 
290
        return self.getServiceNamed("history")
 
291
 
 
292
    def init_control(self):
 
293
        d = self.when_tub_ready()
 
294
        def _publish(res):
 
295
            c = ControlServer()
 
296
            c.setServiceParent(self)
 
297
            control_url = self.tub.registerReference(c)
 
298
            self.write_private_config("control.furl", control_url + "\n")
 
299
        d.addCallback(_publish)
 
300
        d.addErrback(log.err, facility="tahoe.init",
 
301
                     level=log.BAD, umid="d3tNXA")
 
302
 
 
303
    def init_helper(self):
 
304
        d = self.when_tub_ready()
 
305
        def _publish(self):
 
306
            h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
 
307
            h.setServiceParent(self)
 
308
            # TODO: this is confusing. BASEDIR/private/helper.furl is created
 
309
            # by the helper. BASEDIR/helper.furl is consumed by the client
 
310
            # who wants to use the helper. I like having the filename be the
 
311
            # same, since that makes 'cp' work smoothly, but the difference
 
312
            # between config inputs and generated outputs is hard to see.
 
313
            helper_furlfile = os.path.join(self.basedir,
 
314
                                           "private", "helper.furl")
 
315
            self.tub.registerReference(h, furlFile=helper_furlfile)
 
316
        d.addCallback(_publish)
 
317
        d.addErrback(log.err, facility="tahoe.init",
 
318
                     level=log.BAD, umid="K0mW5w")
 
319
 
 
320
    def init_key_gen(self, key_gen_furl):
 
321
        d = self.when_tub_ready()
 
322
        def _subscribe(self):
 
323
            self.tub.connectTo(key_gen_furl, self._got_key_generator)
 
324
        d.addCallback(_subscribe)
 
325
        d.addErrback(log.err, facility="tahoe.init",
 
326
                     level=log.BAD, umid="z9DMzw")
 
327
 
 
328
    def _got_key_generator(self, key_generator):
 
329
        self._key_generator = key_generator
 
330
        key_generator.notifyOnDisconnect(self._lost_key_generator)
 
331
 
 
332
    def _lost_key_generator(self):
 
333
        self._key_generator = None
 
334
 
 
335
    def init_web(self, webport):
 
336
        self.log("init_web(webport=%s)", args=(webport,))
 
337
 
 
338
        from allmydata.webish import WebishServer
 
339
        nodeurl_path = os.path.join(self.basedir, "node.url")
 
340
        staticdir = self.get_config("node", "web.static", "public_html")
 
341
        staticdir = os.path.expanduser(staticdir)
 
342
        ws = WebishServer(self, webport, nodeurl_path, staticdir)
 
343
        self.add_service(ws)
 
344
 
 
345
    def init_ftp_server(self):
 
346
        if self.get_config("ftpd", "enabled", False, boolean=True):
 
347
            accountfile = self.get_config("ftpd", "accounts.file", None)
 
348
            accounturl = self.get_config("ftpd", "accounts.url", None)
 
349
            ftp_portstr = self.get_config("ftpd", "port", "8021")
 
350
 
 
351
            from allmydata.frontends import ftpd
 
352
            s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
 
353
            s.setServiceParent(self)
 
354
 
 
355
    def init_sftp_server(self):
 
356
        if self.get_config("sftpd", "enabled", False, boolean=True):
 
357
            accountfile = self.get_config("sftpd", "accounts.file", None)
 
358
            accounturl = self.get_config("sftpd", "accounts.url", None)
 
359
            sftp_portstr = self.get_config("sftpd", "port", "8022")
 
360
            pubkey_file = self.get_config("sftpd", "host_pubkey_file")
 
361
            privkey_file = self.get_config("sftpd", "host_privkey_file")
 
362
 
 
363
            from allmydata.frontends import sftpd
 
364
            s = sftpd.SFTPServer(self, accountfile, accounturl,
 
365
                                 sftp_portstr, pubkey_file, privkey_file)
 
366
            s.setServiceParent(self)
 
367
 
 
368
    def _check_hotline(self, hotline_file):
 
369
        if os.path.exists(hotline_file):
 
370
            mtime = os.stat(hotline_file)[stat.ST_MTIME]
 
371
            if mtime > time.time() - 120.0:
 
372
                return
 
373
            else:
 
374
                self.log("hotline file too old, shutting down")
 
375
        else:
 
376
            self.log("hotline file missing, shutting down")
 
377
        reactor.stop()
 
378
 
 
379
    def get_encoding_parameters(self):
 
380
        return self.DEFAULT_ENCODING_PARAMETERS
 
381
 
 
382
    def connected_to_introducer(self):
 
383
        if self.introducer_client:
 
384
            return self.introducer_client.connected_to_introducer()
 
385
        return False
 
386
 
 
387
    def get_renewal_secret(self):
 
388
        return hashutil.my_renewal_secret_hash(self._lease_secret)
 
389
 
 
390
    def get_cancel_secret(self):
 
391
        return hashutil.my_cancel_secret_hash(self._lease_secret)
 
392
 
 
393
    def debug_wait_for_client_connections(self, num_clients):
 
394
        """Return a Deferred that fires (with None) when we have connections
 
395
        to the given number of peers. Useful for tests that set up a
 
396
        temporary test network and need to know when it is safe to proceed
 
397
        with an upload or download."""
 
398
        def _check():
 
399
            return len(self.storage_broker.get_all_servers()) >= num_clients
 
400
        d = self.poll(_check, 0.5)
 
401
        d.addCallback(lambda res: None)
 
402
        return d
 
403
 
 
404
 
 
405
    # these four methods are the primitives for creating filenodes and
 
406
    # dirnodes. The first takes a URI and produces a filenode or (new-style)
 
407
    # dirnode. The other three create brand-new filenodes/dirnodes.
 
408
 
 
409
    def create_node_from_uri(self, writecap, readcap=None):
 
410
        # this returns synchronously.
 
411
        u = writecap or readcap
 
412
        if not u:
 
413
            # maybe the writecap was hidden because we're in a readonly
 
414
            # directory, and the future cap format doesn't have a readcap, or
 
415
            # something.
 
416
            return UnknownNode(writecap, readcap)
 
417
        u = IURI(u)
 
418
        if isinstance(u, UnknownURI):
 
419
            return UnknownNode(writecap, readcap)
 
420
        u_s = u.to_string()
 
421
        if u_s not in self._node_cache:
 
422
            if IReadonlyDirectoryURI.providedBy(u):
 
423
                # read-only dirnodes
 
424
                node = DirectoryNode(self).init_from_uri(u)
 
425
            elif IDirectoryURI.providedBy(u):
 
426
                # dirnodes
 
427
                node = DirectoryNode(self).init_from_uri(u)
 
428
            elif IFileURI.providedBy(u):
 
429
                if isinstance(u, LiteralFileURI):
 
430
                    node = LiteralFileNode(u, self) # LIT
 
431
                else:
 
432
                    node = FileNode(u, self, self.download_cache_dirman) # CHK
 
433
            elif IMutableFileURI.providedBy(u):
 
434
                node = MutableFileNode(self).init_from_uri(u)
 
435
            else:
 
436
                raise UnhandledCapTypeError("cap is recognized, but has no Node")
 
437
            self._node_cache[u_s] = node  # note: WeakValueDictionary
 
438
        return self._node_cache[u_s]
 
439
 
 
440
    def create_empty_dirnode(self):
 
441
        d = self.create_mutable_file()
 
442
        d.addCallback(DirectoryNode.create_with_mutablefile, self)
 
443
        return d
 
444
 
 
445
    def create_mutable_file(self, contents="", keysize=None):
 
446
        keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
 
447
        n = MutableFileNode(self)
 
448
        d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
 
449
        d.addCallback(lambda res: n)
 
450
        return d
 
451
 
 
452
    def _generate_pubprivkeys(self, key_size):
 
453
        if self._key_generator:
 
454
            d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
 
455
            def make_key_objs((verifying_key, signing_key)):
 
456
                v = rsa.create_verifying_key_from_string(verifying_key)
 
457
                s = rsa.create_signing_key_from_string(signing_key)
 
458
                return v, s
 
459
            d.addCallback(make_key_objs)
 
460
            return d
 
461
        else:
 
462
            # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
 
463
            # secs
 
464
            signer = rsa.generate(key_size)
 
465
            verifier = signer.get_verifying_key()
 
466
            return verifier, signer
 
467
 
 
468
    def upload(self, uploadable):
 
469
        uploader = self.getServiceNamed("uploader")
 
470
        return uploader.upload(uploadable, history=self.get_history())
 
471
 
 
472
 
 
473
    def list_all_upload_statuses(self):
 
474
        return self.get_history().list_all_upload_statuses()
 
475
 
 
476
    def list_all_download_statuses(self):
 
477
        return self.get_history().list_all_download_statuses()
 
478
 
 
479
    def list_all_mapupdate_statuses(self):
 
480
        return self.get_history().list_all_mapupdate_statuses()
 
481
    def list_all_publish_statuses(self):
 
482
        return self.get_history().list_all_publish_statuses()
 
483
    def list_all_retrieve_statuses(self):
 
484
        return self.get_history().list_all_retrieve_statuses()
 
485
 
 
486
    def list_all_helper_statuses(self):
 
487
        try:
 
488
            helper = self.getServiceNamed("helper")
 
489
        except KeyError:
 
490
            return []
 
491
        return helper.get_all_upload_statuses()