~ubuntu-branches/ubuntu/karmic/desktopcouch/karmic

« back to all changes in this revision

Viewing changes to desktopcouch/replication.py

  • Committer: Bazaar Package Importer
  • Author(s): Ken VanDine
  • Date: 2009-09-23 14:22:38 UTC
  • mfrom: (1.1.4 upstream)
  • Revision ID: james.westby@ubuntu.com-20090923142238-xp1cdxoc657pdw9w
Tags: 0.4.1-0ubuntu1
New upstream release (LP: #435429)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2009 Canonical Ltd.
 
2
#
 
3
# This file is part of desktopcouch.
 
4
#
 
5
# desktopcouch is free software: you can redistribute it and/or modify
 
6
# it under the terms of the GNU Lesser General Public License version 3
 
7
# as published by the Free Software Foundation.
 
8
#
 
9
# desktopcouch is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU Lesser General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU Lesser General Public License
 
15
# along with desktopcouch.  If not, see <http://www.gnu.org/licenses/>.
 
16
#
 
17
# Authors: Chad Miller <chad.miller@canonical.com>
 
18
 
 
19
import threading
 
20
import logging
 
21
import logging.handlers
 
22
log = logging.getLogger("main")
 
23
 
 
24
import dbus.exceptions
 
25
 
 
26
import desktopcouch
 
27
from desktopcouch.pair.couchdb_pairing import couchdb_io
 
28
from desktopcouch.pair.couchdb_pairing import dbus_io
 
29
from desktopcouch import replication_services
 
30
 
 
31
try:
 
32
    import urlparse
 
33
except ImportError:
 
34
    import urllib.parse as urlparse
 
35
 
 
36
from twisted.internet import task, reactor
 
37
 
 
38
 
 
39
known_bad_service_names = set()
 
40
already_replicating = False
 
41
is_running = True
 
42
 
 
43
 
 
44
def db_targetprefix_for_service(service_name):
 
45
    """Use the service name to look up what the prefix should be on the
 
46
    databases.  This gives an egalitarian way for non-UbuntuOne servers to have
 
47
    their own remote-db-name scheme."""
 
48
    try:
 
49
        logging.debug("Looking up prefix for service %r", service_name)
 
50
        mod = __import__("desktopcouch.replication_services", fromlist=[service_name])
 
51
        return getattr(mod, service_name).db_name_prefix
 
52
    except ImportError, e:
 
53
        logging.info("Not changing remote db name.  %s", e)
 
54
        return ""
 
55
    except Exception, e:
 
56
        logging.exception("Not changing remote db name.")
 
57
        return ""
 
58
 
 
59
def oauth_info_for_service(service_name):
 
60
    """Use the service name to look up what oauth information we should use
 
61
    when talking to that service."""
 
62
    try:
 
63
        logging.debug("Looking up prefix for service %r", service_name)
 
64
        mod = __import__("desktopcouch.replication_services", fromlist=[service_name])
 
65
        return getattr(mod, service_name).get_oauth_data()
 
66
    except ImportError, e:
 
67
        logging.info("No service information available.  %s", e)
 
68
        return None
 
69
 
 
70
def do_all_replication(local_port):
 
71
        global already_replicating  # Fuzzy, as not really critical,
 
72
        already_replicating = True  #   just trying to be polite.
 
73
        try:
 
74
            for remote_hostid, addr, port, is_unpaired in \
 
75
                    dbus_io.get_seen_paired_hosts():
 
76
 
 
77
                if is_unpaired:
 
78
                    # The far end doesn't know want to break up.
 
79
                    for local_identifier in couchdb_io.get_my_host_unique_id():
 
80
                        # Tell her gently, using each pseudonym. 
 
81
                        couchdb_io.expunge_pairing(local_identifier, 
 
82
                                couchdb_io.mkuri(addr, port))
 
83
                    # Finally, find your inner peace...
 
84
                    couchdb_io.expunge_pairing(remote_identifier)
 
85
                    # ...and move on.
 
86
                    continue
 
87
 
 
88
                log.debug("want to replipush to discovered host %r @ %s",
 
89
                        remote_hostid, addr)
 
90
                for db_name in couchdb_io.get_database_names_replicatable(
 
91
                        couchdb_io.mkuri("localhost", local_port)):
 
92
                    if not is_running: return
 
93
                    couchdb_io.replicate(db_name, db_name,
 
94
                            target_host=addr, target_port=port, 
 
95
                            source_port=local_port)
 
96
            
 
97
            for remote_hostid, sn, to_pull, to_push in \
 
98
                        couchdb_io.get_static_paired_hosts():
 
99
 
 
100
                if not sn in dir(replication_services):
 
101
                    if not is_running: return
 
102
                    if sn in known_bad_service_names:
 
103
                        continue  # Don't nag.
 
104
                    logging.error("The service %r is unknown.  It is not a "
 
105
                            "module in the desktopcouch.replication_services "
 
106
                            "package ." % (sn,))
 
107
                    known_bad_service_names.add(sn)
 
108
 
 
109
                remote_oauth_data = oauth_info_for_service(sn)
 
110
 
 
111
                try:
 
112
                    remote_location = db_targetprefix_for_service(sn)
 
113
                    urlinfo = urlparse.urlsplit(str(remote_location))
 
114
                except ValueError, e:
 
115
                    logging.warn("Can't reach service %s.  %s", sn, e)
 
116
                    continue
 
117
                if ":" in urlinfo.netloc:
 
118
                    addr, port = urlinfo.netloc.rsplit(":", 1)
 
119
                else:
 
120
                    addr = urlinfo.netloc
 
121
                    port = 443 if urlinfo.scheme == "https" else 80
 
122
                remote_db_name_prefix = urlinfo.path.strip("/")
 
123
 
 
124
                if to_pull:
 
125
                    for db_name in couchdb_io.get_database_names_replicatable(
 
126
                            couchdb_io.mkuri("localhost", int(local_port))):
 
127
                        if not is_running: return
 
128
 
 
129
                        remote_db_name = remote_db_name_prefix + "/" + db_name
 
130
 
 
131
                        log.debug("want to replipush %r to static host %r @ %s",
 
132
                                remote_db_name, remote_hostid, addr)
 
133
                        couchdb_io.replicate(db_name, remote_db_name,
 
134
                                target_host=addr, target_port=port, 
 
135
                                source_port=local_port, target_ssl=True,
 
136
                                target_oauth=remote_oauth_data)
 
137
                if to_push:
 
138
                    for remote_db_name in \
 
139
                            couchdb_io.get_database_names_replicatable(
 
140
                                    couchdb_io.mkuri(addr, port)):
 
141
                        if not is_running: return
 
142
                        try:
 
143
                            if not remote_db_name.startswith(
 
144
                                    str(remote_db_name_prefix + "/")):
 
145
                                continue
 
146
                        except ValueError, e:
 
147
                            log.error("skipping %r on %s.  %s", db_name, sn, e)
 
148
                            continue
 
149
 
 
150
                        db_name = remote_db_name[1+len(str(remote_db_name_prefix)):]
 
151
                        if db_name.strip("/") == "management":
 
152
                            continue  # be paranoid about what we accept.
 
153
                        log.debug("want to replipull %r from static host %r @ %s",
 
154
                                db_name, remote_hostid, addr)
 
155
                        couchdb_io.replicate(remote_db_name, db_name,
 
156
                                source_host=addr, source_port=port, 
 
157
                                target_port=local_port, source_ssl=True,
 
158
                                source_oauth=remote_oauth_data)
 
159
 
 
160
        finally:
 
161
            already_replicating = False
 
162
        log.debug("finished replicating")
 
163
        
 
164
        
 
165
def replicate_local_databases_to_paired_hosts(local_port):
 
166
    if already_replicating:
 
167
        log.warn("haven't finished replicating before next time to start.")
 
168
        return False
 
169
 
 
170
    reactor.callInThread(do_all_replication, local_port)
 
171
 
 
172
def set_up(port_getter):
 
173
    port = port_getter()
 
174
    unique_identifiers = couchdb_io.get_my_host_unique_id(
 
175
            couchdb_io.mkuri("localhost", int(port)), create=False)
 
176
    if unique_identifiers is None:
 
177
        log.warn("No unique hostaccount id is set, so pairing not enabled.")
 
178
        return None
 
179
 
 
180
    beacons = [dbus_io.LocationAdvertisement(port, "desktopcouch " + i)
 
181
            for i in unique_identifiers]
 
182
    for b in beacons:
 
183
        try:
 
184
            b.publish()
 
185
        except dbus.exceptions.DBusException, e:
 
186
            logging.error("We seem to be running already, or can't publish "
 
187
                    "our zeroconf advert.  %s", e)
 
188
            return None
 
189
 
 
190
    dbus_io.discover_services(None, None, True)
 
191
 
 
192
    dbus_io.maintain_discovered_servers()
 
193
 
 
194
    t = task.LoopingCall(replicate_local_databases_to_paired_hosts, port)
 
195
    t.start(600)
 
196
 
 
197
    # TODO:  port may change, so every so often, check it and 
 
198
    # perhaps refresh the beacons.
 
199
 
 
200
    return beacons, t
 
201
 
 
202
 
 
203
def tear_down(beacons, looping_task):
 
204
    for b in beacons:
 
205
        b.unpublish()
 
206
    try:
 
207
        is_running = False
 
208
        looping_task.stop()
 
209
    except:
 
210
        pass