~ubuntu-branches/ubuntu/karmic/tahoe-lafs/karmic

« back to all changes in this revision

Viewing changes to src/allmydata/introducer/client.py

  • Committer: Bazaar Package Importer
  • Author(s): Zooko O'Whielacronx (Hacker)
  • Date: 2009-09-24 00:00:05 UTC
  • Revision ID: james.westby@ubuntu.com-20090924000005-ixe2n4yngmk49ysz
Tags: upstream-1.5.0
ImportĀ upstreamĀ versionĀ 1.5.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
from base64 import b32decode
 
3
from zope.interface import implements
 
4
from twisted.application import service
 
5
from foolscap.api import Referenceable, SturdyRef, eventually
 
6
from allmydata.interfaces import InsufficientVersionError
 
7
from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
 
8
     IIntroducerClient
 
9
from allmydata.util import log, idlib
 
10
from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
 
11
 
 
12
 
 
13
class IntroducerClient(service.Service, Referenceable):
 
14
    implements(RIIntroducerSubscriberClient, IIntroducerClient)
 
15
 
 
16
    def __init__(self, tub, introducer_furl,
 
17
                 nickname, my_version, oldest_supported):
 
18
        self._tub = tub
 
19
        self.introducer_furl = introducer_furl
 
20
 
 
21
        assert type(nickname) is unicode
 
22
        self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
 
23
        self._my_version = my_version
 
24
        self._oldest_supported = oldest_supported
 
25
 
 
26
        self._published_announcements = set()
 
27
 
 
28
        self._publisher = None
 
29
 
 
30
        self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
 
31
        self._subscribed_service_names = set()
 
32
        self._subscriptions = set() # requests we've actually sent
 
33
 
 
34
        # _current_announcements remembers one announcement per
 
35
        # (servicename,serverid) pair. Anything that arrives with the same
 
36
        # pair will displace the previous one. This stores unpacked
 
37
        # announcement dictionaries, which can be compared for equality to
 
38
        # distinguish re-announcement from updates. It also provides memory
 
39
        # for clients who subscribe after startup.
 
40
        self._current_announcements = {}
 
41
 
 
42
        self.encoding_parameters = None
 
43
 
 
44
        # hooks for unit tests
 
45
        self._debug_counts = {
 
46
            "inbound_message": 0,
 
47
            "inbound_announcement": 0,
 
48
            "wrong_service": 0,
 
49
            "duplicate_announcement": 0,
 
50
            "update": 0,
 
51
            "new_announcement": 0,
 
52
            "outbound_message": 0,
 
53
            }
 
54
 
 
55
    def startService(self):
 
56
        service.Service.startService(self)
 
57
        self._introducer_error = None
 
58
        rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
 
59
        self._introducer_reconnector = rc
 
60
        def connect_failed(failure):
 
61
            self.log("Initial Introducer connection failed: perhaps it's down",
 
62
                     level=log.WEIRD, failure=failure, umid="c5MqUQ")
 
63
        d = self._tub.getReference(self.introducer_furl)
 
64
        d.addErrback(connect_failed)
 
65
 
 
66
    def _got_introducer(self, publisher):
 
67
        self.log("connected to introducer, getting versions")
 
68
        default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
 
69
                    { },
 
70
                    "application-version": "unknown: no get_version()",
 
71
                    }
 
72
        d = add_version_to_remote_reference(publisher, default)
 
73
        d.addCallback(self._got_versioned_introducer)
 
74
        d.addErrback(self._got_error)
 
75
 
 
76
    def _got_error(self, f):
 
77
        # TODO: for the introducer, perhaps this should halt the application
 
78
        self._introducer_error = f # polled by tests
 
79
 
 
80
    def _got_versioned_introducer(self, publisher):
 
81
        self.log("got introducer version: %s" % (publisher.version,))
 
82
        # we require a V1 introducer
 
83
        needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
 
84
        if needed not in publisher.version:
 
85
            raise InsufficientVersionError(needed, publisher.version)
 
86
        self._publisher = publisher
 
87
        publisher.notifyOnDisconnect(self._disconnected)
 
88
        self._maybe_publish()
 
89
        self._maybe_subscribe()
 
90
 
 
91
    def _disconnected(self):
 
92
        self.log("bummer, we've lost our connection to the introducer")
 
93
        self._publisher = None
 
94
        self._subscriptions.clear()
 
95
 
 
96
    def log(self, *args, **kwargs):
 
97
        if "facility" not in kwargs:
 
98
            kwargs["facility"] = "tahoe.introducer"
 
99
        return log.msg(*args, **kwargs)
 
100
 
 
101
 
 
102
    def publish(self, furl, service_name, remoteinterface_name):
 
103
        assert type(self._nickname_utf8) is str # we always send UTF-8
 
104
        ann = (furl, service_name, remoteinterface_name,
 
105
               self._nickname_utf8, self._my_version, self._oldest_supported)
 
106
        self._published_announcements.add(ann)
 
107
        self._maybe_publish()
 
108
 
 
109
    def subscribe_to(self, service_name, cb, *args, **kwargs):
 
110
        self._local_subscribers.append( (service_name,cb,args,kwargs) )
 
111
        self._subscribed_service_names.add(service_name)
 
112
        self._maybe_subscribe()
 
113
        for (servicename,nodeid),ann_d in self._current_announcements.items():
 
114
            if servicename == service_name:
 
115
                eventually(cb, nodeid, ann_d)
 
116
 
 
117
    def _maybe_subscribe(self):
 
118
        if not self._publisher:
 
119
            self.log("want to subscribe, but no introducer yet",
 
120
                     level=log.NOISY)
 
121
            return
 
122
        for service_name in self._subscribed_service_names:
 
123
            if service_name not in self._subscriptions:
 
124
                # there is a race here, but the subscription desk ignores
 
125
                # duplicate requests.
 
126
                self._subscriptions.add(service_name)
 
127
                d = self._publisher.callRemote("subscribe", self, service_name)
 
128
                d.addErrback(trap_deadref)
 
129
                d.addErrback(log.err, format="server errored during subscribe",
 
130
                             facility="tahoe.introducer",
 
131
                             level=log.WEIRD, umid="2uMScQ")
 
132
 
 
133
    def _maybe_publish(self):
 
134
        if not self._publisher:
 
135
            self.log("want to publish, but no introducer yet", level=log.NOISY)
 
136
            return
 
137
        # this re-publishes everything. The Introducer ignores duplicates
 
138
        for ann in self._published_announcements:
 
139
            self._debug_counts["outbound_message"] += 1
 
140
            d = self._publisher.callRemote("publish", ann)
 
141
            d.addErrback(trap_deadref)
 
142
            d.addErrback(log.err,
 
143
                         format="server errored during publish %(ann)s",
 
144
                         ann=ann, facility="tahoe.introducer",
 
145
                         level=log.WEIRD, umid="xs9pVQ")
 
146
 
 
147
 
 
148
 
 
149
    def remote_announce(self, announcements):
 
150
        self.log("received %d announcements" % len(announcements))
 
151
        self._debug_counts["inbound_message"] += 1
 
152
        for ann in announcements:
 
153
            try:
 
154
                self._process_announcement(ann)
 
155
            except:
 
156
                log.err(format="unable to process announcement %(ann)s",
 
157
                        ann=ann)
 
158
                # Don't let a corrupt announcement prevent us from processing
 
159
                # the remaining ones. Don't return an error to the server,
 
160
                # since they'd just ignore it anyways.
 
161
                pass
 
162
 
 
163
    def _process_announcement(self, ann):
 
164
        self._debug_counts["inbound_announcement"] += 1
 
165
        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
 
166
        if service_name not in self._subscribed_service_names:
 
167
            self.log("announcement for a service we don't care about [%s]"
 
168
                     % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
 
169
            self._debug_counts["wrong_service"] += 1
 
170
            return
 
171
        self.log("announcement for [%s]: %s" % (service_name, ann),
 
172
                 umid="BoKEag")
 
173
        assert type(furl) is str
 
174
        assert type(service_name) is str
 
175
        assert type(ri_name) is str
 
176
        assert type(nickname_utf8) is str
 
177
        nickname = nickname_utf8.decode("utf-8")
 
178
        assert type(nickname) is unicode
 
179
        assert type(ver) is str
 
180
        assert type(oldest) is str
 
181
 
 
182
        nodeid = b32decode(SturdyRef(furl).tubID.upper())
 
183
        nodeid_s = idlib.shortnodeid_b2a(nodeid)
 
184
 
 
185
        ann_d = { "version": 0,
 
186
                  "service-name": service_name,
 
187
 
 
188
                  "FURL": furl,
 
189
                  "nickname": nickname,
 
190
                  "app-versions": {}, # need #466 and v2 introducer
 
191
                  "my-version": ver,
 
192
                  "oldest-supported": oldest,
 
193
                  }
 
194
 
 
195
        index = (service_name, nodeid)
 
196
        if self._current_announcements.get(index, None) == ann_d:
 
197
            self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
 
198
                     service=service_name, nodeid=nodeid_s,
 
199
                     level=log.UNUSUAL, umid="B1MIdA")
 
200
            self._debug_counts["duplicate_announcement"] += 1
 
201
            return
 
202
        if index in self._current_announcements:
 
203
            self._debug_counts["update"] += 1
 
204
        else:
 
205
            self._debug_counts["new_announcement"] += 1
 
206
 
 
207
        self._current_announcements[index] = ann_d
 
208
        # note: we never forget an index, but we might update its value
 
209
 
 
210
        for (service_name2,cb,args,kwargs) in self._local_subscribers:
 
211
            if service_name2 == service_name:
 
212
                eventually(cb, nodeid, ann_d, *args, **kwargs)
 
213
 
 
214
    def remote_set_encoding_parameters(self, parameters):
 
215
        self.encoding_parameters = parameters
 
216
 
 
217
    def connected_to_introducer(self):
 
218
        return bool(self._publisher)