2
from base64 import b32decode
3
from zope.interface import implements
4
from twisted.application import service
5
from foolscap.api import Referenceable, SturdyRef, eventually
6
from allmydata.interfaces import InsufficientVersionError
7
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
9
from allmydata.util import log, idlib
10
from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
13
class IntroducerClient(service.Service, Referenceable):
14
implements(RIIntroducerSubscriberClient, IIntroducerClient)
16
def __init__(self, tub, introducer_furl,
17
nickname, my_version, oldest_supported):
19
self.introducer_furl = introducer_furl
21
assert type(nickname) is unicode
22
self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
23
self._my_version = my_version
24
self._oldest_supported = oldest_supported
26
self._published_announcements = set()
28
self._publisher = None
30
self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
31
self._subscribed_service_names = set()
32
self._subscriptions = set() # requests we've actually sent
34
# _current_announcements remembers one announcement per
35
# (servicename,serverid) pair. Anything that arrives with the same
36
# pair will displace the previous one. This stores unpacked
37
# announcement dictionaries, which can be compared for equality to
38
# distinguish re-announcement from updates. It also provides memory
39
# for clients who subscribe after startup.
40
self._current_announcements = {}
42
self.encoding_parameters = None
44
# hooks for unit tests
45
self._debug_counts = {
47
"inbound_announcement": 0,
49
"duplicate_announcement": 0,
51
"new_announcement": 0,
52
"outbound_message": 0,
55
def startService(self):
56
service.Service.startService(self)
57
self._introducer_error = None
58
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
59
self._introducer_reconnector = rc
60
def connect_failed(failure):
61
self.log("Initial Introducer connection failed: perhaps it's down",
62
level=log.WEIRD, failure=failure, umid="c5MqUQ")
63
d = self._tub.getReference(self.introducer_furl)
64
d.addErrback(connect_failed)
66
def _got_introducer(self, publisher):
67
self.log("connected to introducer, getting versions")
68
default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
70
"application-version": "unknown: no get_version()",
72
d = add_version_to_remote_reference(publisher, default)
73
d.addCallback(self._got_versioned_introducer)
74
d.addErrback(self._got_error)
76
def _got_error(self, f):
77
# TODO: for the introducer, perhaps this should halt the application
78
self._introducer_error = f # polled by tests
80
def _got_versioned_introducer(self, publisher):
81
self.log("got introducer version: %s" % (publisher.version,))
82
# we require a V1 introducer
83
needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
84
if needed not in publisher.version:
85
raise InsufficientVersionError(needed, publisher.version)
86
self._publisher = publisher
87
publisher.notifyOnDisconnect(self._disconnected)
89
self._maybe_subscribe()
91
def _disconnected(self):
92
self.log("bummer, we've lost our connection to the introducer")
93
self._publisher = None
94
self._subscriptions.clear()
96
def log(self, *args, **kwargs):
97
if "facility" not in kwargs:
98
kwargs["facility"] = "tahoe.introducer"
99
return log.msg(*args, **kwargs)
102
def publish(self, furl, service_name, remoteinterface_name):
103
assert type(self._nickname_utf8) is str # we always send UTF-8
104
ann = (furl, service_name, remoteinterface_name,
105
self._nickname_utf8, self._my_version, self._oldest_supported)
106
self._published_announcements.add(ann)
107
self._maybe_publish()
109
def subscribe_to(self, service_name, cb, *args, **kwargs):
110
self._local_subscribers.append( (service_name,cb,args,kwargs) )
111
self._subscribed_service_names.add(service_name)
112
self._maybe_subscribe()
113
for (servicename,nodeid),ann_d in self._current_announcements.items():
114
if servicename == service_name:
115
eventually(cb, nodeid, ann_d)
117
def _maybe_subscribe(self):
118
if not self._publisher:
119
self.log("want to subscribe, but no introducer yet",
122
for service_name in self._subscribed_service_names:
123
if service_name not in self._subscriptions:
124
# there is a race here, but the subscription desk ignores
125
# duplicate requests.
126
self._subscriptions.add(service_name)
127
d = self._publisher.callRemote("subscribe", self, service_name)
128
d.addErrback(trap_deadref)
129
d.addErrback(log.err, format="server errored during subscribe",
130
facility="tahoe.introducer",
131
level=log.WEIRD, umid="2uMScQ")
133
def _maybe_publish(self):
134
if not self._publisher:
135
self.log("want to publish, but no introducer yet", level=log.NOISY)
137
# this re-publishes everything. The Introducer ignores duplicates
138
for ann in self._published_announcements:
139
self._debug_counts["outbound_message"] += 1
140
d = self._publisher.callRemote("publish", ann)
141
d.addErrback(trap_deadref)
142
d.addErrback(log.err,
143
format="server errored during publish %(ann)s",
144
ann=ann, facility="tahoe.introducer",
145
level=log.WEIRD, umid="xs9pVQ")
149
def remote_announce(self, announcements):
150
self.log("received %d announcements" % len(announcements))
151
self._debug_counts["inbound_message"] += 1
152
for ann in announcements:
154
self._process_announcement(ann)
156
log.err(format="unable to process announcement %(ann)s",
158
# Don't let a corrupt announcement prevent us from processing
159
# the remaining ones. Don't return an error to the server,
160
# since they'd just ignore it anyways.
163
def _process_announcement(self, ann):
164
self._debug_counts["inbound_announcement"] += 1
165
(furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
166
if service_name not in self._subscribed_service_names:
167
self.log("announcement for a service we don't care about [%s]"
168
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
169
self._debug_counts["wrong_service"] += 1
171
self.log("announcement for [%s]: %s" % (service_name, ann),
173
assert type(furl) is str
174
assert type(service_name) is str
175
assert type(ri_name) is str
176
assert type(nickname_utf8) is str
177
nickname = nickname_utf8.decode("utf-8")
178
assert type(nickname) is unicode
179
assert type(ver) is str
180
assert type(oldest) is str
182
nodeid = b32decode(SturdyRef(furl).tubID.upper())
183
nodeid_s = idlib.shortnodeid_b2a(nodeid)
185
ann_d = { "version": 0,
186
"service-name": service_name,
189
"nickname": nickname,
190
"app-versions": {}, # need #466 and v2 introducer
192
"oldest-supported": oldest,
195
index = (service_name, nodeid)
196
if self._current_announcements.get(index, None) == ann_d:
197
self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
198
service=service_name, nodeid=nodeid_s,
199
level=log.UNUSUAL, umid="B1MIdA")
200
self._debug_counts["duplicate_announcement"] += 1
202
if index in self._current_announcements:
203
self._debug_counts["update"] += 1
205
self._debug_counts["new_announcement"] += 1
207
self._current_announcements[index] = ann_d
208
# note: we never forget an index, but we might update its value
210
for (service_name2,cb,args,kwargs) in self._local_subscribers:
211
if service_name2 == service_name:
212
eventually(cb, nodeid, ann_d, *args, **kwargs)
214
def remote_set_encoding_parameters(self, parameters):
215
self.encoding_parameters = parameters
217
def connected_to_introducer(self):
218
return bool(self._publisher)