1
import os, stat, time, weakref
2
from allmydata.interfaces import RIStorageServer
3
from allmydata import node
5
from zope.interface import implements
6
from twisted.internet import reactor
7
from twisted.application.internet import TimerService
8
from foolscap.api import Referenceable
9
from pycryptopp.publickey import rsa
12
from allmydata.storage.server import StorageServer
13
from allmydata import storage_client
14
from allmydata.immutable.upload import Uploader
15
from allmydata.immutable.download import Downloader
16
from allmydata.immutable.filenode import FileNode, LiteralFileNode
17
from allmydata.immutable.offloaded import Helper
18
from allmydata.control import ControlServer
19
from allmydata.introducer.client import IntroducerClient
20
from allmydata.util import hashutil, base32, pollmixin, cachedir, log
21
from allmydata.util.abbreviate import parse_abbreviated_size
22
from allmydata.util.time_format import parse_duration, parse_date
23
from allmydata.uri import LiteralFileURI, UnknownURI
24
from allmydata.dirnode import DirectoryNode
25
from allmydata.mutable.filenode import MutableFileNode
26
from allmydata.unknown import UnknownNode
27
from allmydata.stats import StatsProvider
28
from allmydata.history import History
29
from allmydata.interfaces import IURI, IDirectoryURI, IStatsProducer, \
30
IReadonlyDirectoryURI, IFileURI, IMutableFileURI, RIStubClient, \
39
class StubClient(Referenceable):
40
implements(RIStubClient)
43
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
45
class Client(node.Node, pollmixin.PollMixin):
46
implements(IStatsProducer)
48
PORTNUMFILE = "client.port"
51
SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
53
# This means that if a storage server treats me as though I were a
54
# 1.0.0 storage client, it will work as they expect.
55
OLDEST_SUPPORTED_VERSION = "1.0.0"
57
# this is a tuple of (needed, desired, total, max_segment_size). 'needed'
58
# is the number of shares required to reconstruct a file. 'desired' means
59
# that we will abort an upload unless we can allocate space for at least
60
# this many. 'total' is the total number of shares created by encoding.
61
# If everybody has room then this is is how many we will upload.
62
DEFAULT_ENCODING_PARAMETERS = {"k": 3,
65
"max_segment_size": 128*KiB,
68
# set this to override the size of the RSA keys created for new mutable
69
# files. The default of None means to let mutable.filenode choose its own
70
# size, which means 2048 bits.
71
DEFAULT_MUTABLE_KEYSIZE = None
73
def __init__(self, basedir="."):
74
node.Node.__init__(self, basedir)
75
self.started_timestamp = time.time()
76
self.logSource="Client"
77
self.DEFAULT_ENCODING_PARAMETERS = self.DEFAULT_ENCODING_PARAMETERS.copy()
78
self.init_introducer_client()
79
self.init_stats_provider()
80
self.init_lease_secret()
83
if self.get_config("helper", "enabled", False, boolean=True):
86
self._key_generator = None
87
key_gen_furl = self.get_config("client", "key_generator.furl", None)
89
self.init_key_gen(key_gen_furl)
90
# ControlServer and Helper are attached after Tub startup
91
self.init_ftp_server()
92
self.init_sftp_server()
94
hotline_file = os.path.join(self.basedir,
95
self.SUICIDE_PREVENTION_HOTLINE_FILE)
96
if os.path.exists(hotline_file):
97
age = time.time() - os.stat(hotline_file)[stat.ST_MTIME]
98
self.log("hotline file noticed (%ds old), starting timer" % age)
99
hotline = TimerService(1.0, self._check_hotline, hotline_file)
100
hotline.setServiceParent(self)
102
# this needs to happen last, so it can use getServiceNamed() to
103
# acquire references to StorageServer and other web-statusable things
104
webport = self.get_config("node", "web.port", None)
106
self.init_web(webport) # strports string
108
def read_old_config_files(self):
109
node.Node.read_old_config_files(self)
110
copy = self._copy_config_from_file
111
copy("introducer.furl", "client", "introducer.furl")
112
copy("helper.furl", "client", "helper.furl")
113
copy("key_generator.furl", "client", "key_generator.furl")
114
copy("stats_gatherer.furl", "client", "stats_gatherer.furl")
115
if os.path.exists(os.path.join(self.basedir, "no_storage")):
116
self.set_config("storage", "enabled", "false")
117
if os.path.exists(os.path.join(self.basedir, "readonly_storage")):
118
self.set_config("storage", "readonly", "true")
119
if os.path.exists(os.path.join(self.basedir, "debug_discard_storage")):
120
self.set_config("storage", "debug_discard", "true")
121
if os.path.exists(os.path.join(self.basedir, "run_helper")):
122
self.set_config("helper", "enabled", "true")
124
def init_introducer_client(self):
125
self.introducer_furl = self.get_config("client", "introducer.furl")
126
ic = IntroducerClient(self.tub, self.introducer_furl,
128
str(allmydata.__full_version__),
129
str(self.OLDEST_SUPPORTED_VERSION))
130
self.introducer_client = ic
131
# hold off on starting the IntroducerClient until our tub has been
132
# started, so we'll have a useful address on our RemoteReference, so
133
# that the introducer's status page will show us.
134
d = self.when_tub_ready()
135
def _start_introducer_client(res):
136
ic.setServiceParent(self)
137
d.addCallback(_start_introducer_client)
138
d.addErrback(log.err, facility="tahoe.init",
139
level=log.BAD, umid="URyI5w")
141
def init_stats_provider(self):
142
gatherer_furl = self.get_config("client", "stats_gatherer.furl", None)
143
self.stats_provider = StatsProvider(self, gatherer_furl)
144
self.add_service(self.stats_provider)
145
self.stats_provider.register_producer(self)
148
return { 'node.uptime': time.time() - self.started_timestamp }
150
def init_lease_secret(self):
151
secret_s = self.get_or_create_private_config("secret", _make_secret)
152
self._lease_secret = base32.a2b(secret_s)
154
def init_storage(self):
155
# should we run a storage server (and publish it for others to use)?
156
if not self.get_config("storage", "enabled", True, boolean=True):
158
readonly = self.get_config("storage", "readonly", False, boolean=True)
160
storedir = os.path.join(self.basedir, self.STOREDIR)
162
data = self.get_config("storage", "reserved_space", None)
165
reserved = parse_abbreviated_size(data)
167
log.msg("[storage]reserved_space= contains unparseable value %s"
171
discard = self.get_config("storage", "debug_discard", False,
174
expire = self.get_config("storage", "expire.enabled", False, boolean=True)
176
mode = self.get_config("storage", "expire.mode") # require a mode
178
mode = self.get_config("storage", "expire.mode", "age")
180
o_l_d = self.get_config("storage", "expire.override_lease_duration", None)
181
if o_l_d is not None:
182
o_l_d = parse_duration(o_l_d)
185
if mode == "cutoff-date":
186
cutoff_date = self.get_config("storage", "expire.cutoff_date")
187
cutoff_date = parse_date(cutoff_date)
190
if self.get_config("storage", "expire.immutable", True, boolean=True):
191
sharetypes.append("immutable")
192
if self.get_config("storage", "expire.mutable", True, boolean=True):
193
sharetypes.append("mutable")
194
expiration_sharetypes = tuple(sharetypes)
196
ss = StorageServer(storedir, self.nodeid,
197
reserved_space=reserved,
198
discard_storage=discard,
199
readonly_storage=readonly,
200
stats_provider=self.stats_provider,
201
expiration_enabled=expire,
202
expiration_mode=mode,
203
expiration_override_lease_duration=o_l_d,
204
expiration_cutoff_date=cutoff_date,
205
expiration_sharetypes=expiration_sharetypes)
208
d = self.when_tub_ready()
209
# we can't do registerReference until the Tub is ready
211
furl_file = os.path.join(self.basedir, "private", "storage.furl")
212
furl = self.tub.registerReference(ss, furlFile=furl_file)
213
ri_name = RIStorageServer.__remote_name__
214
self.introducer_client.publish(furl, "storage", ri_name)
215
d.addCallback(_publish)
216
d.addErrback(log.err, facility="tahoe.init",
217
level=log.BAD, umid="aLGBKw")
219
def init_client(self):
220
helper_furl = self.get_config("client", "helper.furl", None)
221
DEP = self.DEFAULT_ENCODING_PARAMETERS
222
DEP["k"] = int(self.get_config("client", "shares.needed", DEP["k"]))
223
DEP["n"] = int(self.get_config("client", "shares.total", DEP["n"]))
224
DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
225
convergence_s = self.get_or_create_private_config('convergence', _make_secret)
226
self.convergence = base32.a2b(convergence_s)
227
self._node_cache = weakref.WeakValueDictionary() # uri -> node
229
self.init_client_storage_broker()
230
self.add_service(History(self.stats_provider))
231
self.add_service(Uploader(helper_furl, self.stats_provider))
232
download_cachedir = os.path.join(self.basedir,
233
"private", "cache", "download")
234
self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
235
self.download_cache_dirman.setServiceParent(self)
236
self.add_service(Downloader(self.stats_provider))
237
self.init_stub_client()
239
def init_client_storage_broker(self):
240
# create a StorageFarmBroker object, for use by Uploader/Downloader
241
# (and everybody else who wants to use storage servers)
242
sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True)
243
self.storage_broker = sb
245
# load static server specifications from tahoe.cfg, if any.
246
# Not quite ready yet.
247
#if self.config.has_section("client-server-selection"):
248
# server_params = {} # maps serverid to dict of parameters
249
# for (name, value) in self.config.items("client-server-selection"):
250
# pieces = name.split(".")
251
# if pieces[0] == "server":
252
# serverid = pieces[1]
253
# if serverid not in server_params:
254
# server_params[serverid] = {}
255
# server_params[serverid][pieces[2]] = value
256
# for serverid, params in server_params.items():
257
# server_type = params.pop("type")
258
# if server_type == "tahoe-foolscap":
259
# s = storage_client.NativeStorageClient(*params)
261
# msg = ("unrecognized server type '%s' in "
262
# "tahoe.cfg [client-server-selection]server.%s.type"
263
# % (server_type, serverid))
264
# raise storage_client.UnknownServerTypeError(msg)
265
# sb.add_server(s.serverid, s)
267
# check to see if we're supposed to use the introducer too
268
if self.get_config("client-server-selection", "use_introducer",
269
default=True, boolean=True):
270
sb.use_introducer(self.introducer_client)
272
def get_storage_broker(self):
273
return self.storage_broker
275
def init_stub_client(self):
277
# we publish an empty object so that the introducer can count how
278
# many clients are connected and see what versions they're
281
furl = self.tub.registerReference(sc)
282
ri_name = RIStubClient.__remote_name__
283
self.introducer_client.publish(furl, "stub_client", ri_name)
284
d = self.when_tub_ready()
285
d.addCallback(_publish)
286
d.addErrback(log.err, facility="tahoe.init",
287
level=log.BAD, umid="OEHq3g")
289
def get_history(self):
290
return self.getServiceNamed("history")
292
def init_control(self):
293
d = self.when_tub_ready()
296
c.setServiceParent(self)
297
control_url = self.tub.registerReference(c)
298
self.write_private_config("control.furl", control_url + "\n")
299
d.addCallback(_publish)
300
d.addErrback(log.err, facility="tahoe.init",
301
level=log.BAD, umid="d3tNXA")
303
def init_helper(self):
304
d = self.when_tub_ready()
306
h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
307
h.setServiceParent(self)
308
# TODO: this is confusing. BASEDIR/private/helper.furl is created
309
# by the helper. BASEDIR/helper.furl is consumed by the client
310
# who wants to use the helper. I like having the filename be the
311
# same, since that makes 'cp' work smoothly, but the difference
312
# between config inputs and generated outputs is hard to see.
313
helper_furlfile = os.path.join(self.basedir,
314
"private", "helper.furl")
315
self.tub.registerReference(h, furlFile=helper_furlfile)
316
d.addCallback(_publish)
317
d.addErrback(log.err, facility="tahoe.init",
318
level=log.BAD, umid="K0mW5w")
320
def init_key_gen(self, key_gen_furl):
321
d = self.when_tub_ready()
322
def _subscribe(self):
323
self.tub.connectTo(key_gen_furl, self._got_key_generator)
324
d.addCallback(_subscribe)
325
d.addErrback(log.err, facility="tahoe.init",
326
level=log.BAD, umid="z9DMzw")
328
def _got_key_generator(self, key_generator):
329
self._key_generator = key_generator
330
key_generator.notifyOnDisconnect(self._lost_key_generator)
332
def _lost_key_generator(self):
333
self._key_generator = None
335
def init_web(self, webport):
336
self.log("init_web(webport=%s)", args=(webport,))
338
from allmydata.webish import WebishServer
339
nodeurl_path = os.path.join(self.basedir, "node.url")
340
staticdir = self.get_config("node", "web.static", "public_html")
341
staticdir = os.path.expanduser(staticdir)
342
ws = WebishServer(self, webport, nodeurl_path, staticdir)
345
def init_ftp_server(self):
346
if self.get_config("ftpd", "enabled", False, boolean=True):
347
accountfile = self.get_config("ftpd", "accounts.file", None)
348
accounturl = self.get_config("ftpd", "accounts.url", None)
349
ftp_portstr = self.get_config("ftpd", "port", "8021")
351
from allmydata.frontends import ftpd
352
s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
353
s.setServiceParent(self)
355
def init_sftp_server(self):
356
if self.get_config("sftpd", "enabled", False, boolean=True):
357
accountfile = self.get_config("sftpd", "accounts.file", None)
358
accounturl = self.get_config("sftpd", "accounts.url", None)
359
sftp_portstr = self.get_config("sftpd", "port", "8022")
360
pubkey_file = self.get_config("sftpd", "host_pubkey_file")
361
privkey_file = self.get_config("sftpd", "host_privkey_file")
363
from allmydata.frontends import sftpd
364
s = sftpd.SFTPServer(self, accountfile, accounturl,
365
sftp_portstr, pubkey_file, privkey_file)
366
s.setServiceParent(self)
368
def _check_hotline(self, hotline_file):
369
if os.path.exists(hotline_file):
370
mtime = os.stat(hotline_file)[stat.ST_MTIME]
371
if mtime > time.time() - 120.0:
374
self.log("hotline file too old, shutting down")
376
self.log("hotline file missing, shutting down")
379
def get_encoding_parameters(self):
380
return self.DEFAULT_ENCODING_PARAMETERS
382
def connected_to_introducer(self):
383
if self.introducer_client:
384
return self.introducer_client.connected_to_introducer()
387
def get_renewal_secret(self):
388
return hashutil.my_renewal_secret_hash(self._lease_secret)
390
def get_cancel_secret(self):
391
return hashutil.my_cancel_secret_hash(self._lease_secret)
393
def debug_wait_for_client_connections(self, num_clients):
394
"""Return a Deferred that fires (with None) when we have connections
395
to the given number of peers. Useful for tests that set up a
396
temporary test network and need to know when it is safe to proceed
397
with an upload or download."""
399
return len(self.storage_broker.get_all_servers()) >= num_clients
400
d = self.poll(_check, 0.5)
401
d.addCallback(lambda res: None)
405
# these four methods are the primitives for creating filenodes and
406
# dirnodes. The first takes a URI and produces a filenode or (new-style)
407
# dirnode. The other three create brand-new filenodes/dirnodes.
409
def create_node_from_uri(self, writecap, readcap=None):
410
# this returns synchronously.
411
u = writecap or readcap
413
# maybe the writecap was hidden because we're in a readonly
414
# directory, and the future cap format doesn't have a readcap, or
416
return UnknownNode(writecap, readcap)
418
if isinstance(u, UnknownURI):
419
return UnknownNode(writecap, readcap)
421
if u_s not in self._node_cache:
422
if IReadonlyDirectoryURI.providedBy(u):
424
node = DirectoryNode(self).init_from_uri(u)
425
elif IDirectoryURI.providedBy(u):
427
node = DirectoryNode(self).init_from_uri(u)
428
elif IFileURI.providedBy(u):
429
if isinstance(u, LiteralFileURI):
430
node = LiteralFileNode(u, self) # LIT
432
node = FileNode(u, self, self.download_cache_dirman) # CHK
433
elif IMutableFileURI.providedBy(u):
434
node = MutableFileNode(self).init_from_uri(u)
436
raise UnhandledCapTypeError("cap is recognized, but has no Node")
437
self._node_cache[u_s] = node # note: WeakValueDictionary
438
return self._node_cache[u_s]
440
def create_empty_dirnode(self):
441
d = self.create_mutable_file()
442
d.addCallback(DirectoryNode.create_with_mutablefile, self)
445
def create_mutable_file(self, contents="", keysize=None):
446
keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
447
n = MutableFileNode(self)
448
d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
449
d.addCallback(lambda res: n)
452
def _generate_pubprivkeys(self, key_size):
453
if self._key_generator:
454
d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
455
def make_key_objs((verifying_key, signing_key)):
456
v = rsa.create_verifying_key_from_string(verifying_key)
457
s = rsa.create_signing_key_from_string(signing_key)
459
d.addCallback(make_key_objs)
462
# RSA key generation for a 2048 bit key takes between 0.8 and 3.2
464
signer = rsa.generate(key_size)
465
verifier = signer.get_verifying_key()
466
return verifier, signer
468
def upload(self, uploadable):
469
uploader = self.getServiceNamed("uploader")
470
return uploader.upload(uploadable, history=self.get_history())
473
def list_all_upload_statuses(self):
474
return self.get_history().list_all_upload_statuses()
476
def list_all_download_statuses(self):
477
return self.get_history().list_all_download_statuses()
479
def list_all_mapupdate_statuses(self):
480
return self.get_history().list_all_mapupdate_statuses()
481
def list_all_publish_statuses(self):
482
return self.get_history().list_all_publish_statuses()
483
def list_all_retrieve_statuses(self):
484
return self.get_history().list_all_retrieve_statuses()
486
def list_all_helper_statuses(self):
488
helper = self.getServiceNamed("helper")
491
return helper.get_all_upload_statuses()