1
# Copyright 2009 Canonical Ltd.
3
# This file is part of desktopcouch.
5
# desktopcouch is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Lesser General Public License version 3
7
# as published by the Free Software Foundation.
9
# desktopcouch is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU Lesser General Public License for more details.
14
# You should have received a copy of the GNU Lesser General Public License
15
# along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
17
# Authors: Chad Miller <chad.miller@canonical.com>
21
import logging.handlers
22
log = logging.getLogger("main")
24
import dbus.exceptions
27
from desktopcouch.pair.couchdb_pairing import couchdb_io
28
from desktopcouch.pair.couchdb_pairing import dbus_io
29
from desktopcouch import replication_services
34
import urllib.parse as urlparse
36
from twisted.internet import task, reactor
39
known_bad_service_names = set()
40
already_replicating = False
44
def db_targetprefix_for_service(service_name):
45
"""Use the service name to look up what the prefix should be on the
46
databases. This gives an egalitarian way for non-UbuntuOne servers to have
47
their own remote-db-name scheme."""
49
logging.debug("Looking up prefix for service %r", service_name)
50
mod = __import__("desktopcouch.replication_services", fromlist=[service_name])
51
return getattr(mod, service_name).db_name_prefix
52
except ImportError, e:
53
logging.info("Not changing remote db name. %s", e)
56
logging.exception("Not changing remote db name.")
59
def oauth_info_for_service(service_name):
60
"""Use the service name to look up what oauth information we should use
61
when talking to that service."""
63
logging.debug("Looking up prefix for service %r", service_name)
64
mod = __import__("desktopcouch.replication_services", fromlist=[service_name])
65
return getattr(mod, service_name).get_oauth_data()
66
except ImportError, e:
67
logging.info("No service information available. %s", e)
70
def do_all_replication(local_port):
71
global already_replicating # Fuzzy, as not really critical,
72
already_replicating = True # just trying to be polite.
74
for remote_hostid, addr, port, is_unpaired in \
75
dbus_io.get_seen_paired_hosts():
78
# The far end doesn't know want to break up.
79
for local_identifier in couchdb_io.get_my_host_unique_id():
80
# Tell her gently, using each pseudonym.
81
couchdb_io.expunge_pairing(local_identifier,
82
couchdb_io.mkuri(addr, port))
83
# Finally, find your inner peace...
84
couchdb_io.expunge_pairing(remote_identifier)
88
log.debug("want to replipush to discovered host %r @ %s",
90
for db_name in couchdb_io.get_database_names_replicatable(
91
couchdb_io.mkuri("localhost", local_port)):
92
if not is_running: return
93
couchdb_io.replicate(db_name, db_name,
94
target_host=addr, target_port=port,
95
source_port=local_port)
97
for remote_hostid, sn, to_pull, to_push in \
98
couchdb_io.get_static_paired_hosts():
100
if not sn in dir(replication_services):
101
if not is_running: return
102
if sn in known_bad_service_names:
103
continue # Don't nag.
104
logging.error("The service %r is unknown. It is not a "
105
"module in the desktopcouch.replication_services "
107
known_bad_service_names.add(sn)
109
remote_oauth_data = oauth_info_for_service(sn)
112
remote_location = db_targetprefix_for_service(sn)
113
urlinfo = urlparse.urlsplit(str(remote_location))
114
except ValueError, e:
115
logging.warn("Can't reach service %s. %s", sn, e)
117
if ":" in urlinfo.netloc:
118
addr, port = urlinfo.netloc.rsplit(":", 1)
120
addr = urlinfo.netloc
121
port = 443 if urlinfo.scheme == "https" else 80
122
remote_db_name_prefix = urlinfo.path.strip("/")
125
for db_name in couchdb_io.get_database_names_replicatable(
126
couchdb_io.mkuri("localhost", int(local_port))):
127
if not is_running: return
129
remote_db_name = remote_db_name_prefix + "/" + db_name
131
log.debug("want to replipush %r to static host %r @ %s",
132
remote_db_name, remote_hostid, addr)
133
couchdb_io.replicate(db_name, remote_db_name,
134
target_host=addr, target_port=port,
135
source_port=local_port, target_ssl=True,
136
target_oauth=remote_oauth_data)
138
for remote_db_name in \
139
couchdb_io.get_database_names_replicatable(
140
couchdb_io.mkuri(addr, port)):
141
if not is_running: return
143
if not remote_db_name.startswith(
144
str(remote_db_name_prefix + "/")):
146
except ValueError, e:
147
log.error("skipping %r on %s. %s", db_name, sn, e)
150
db_name = remote_db_name[1+len(str(remote_db_name_prefix)):]
151
if db_name.strip("/") == "management":
152
continue # be paranoid about what we accept.
153
log.debug("want to replipull %r from static host %r @ %s",
154
db_name, remote_hostid, addr)
155
couchdb_io.replicate(remote_db_name, db_name,
156
source_host=addr, source_port=port,
157
target_port=local_port, source_ssl=True,
158
source_oauth=remote_oauth_data)
161
already_replicating = False
162
log.debug("finished replicating")
165
def replicate_local_databases_to_paired_hosts(local_port):
166
if already_replicating:
167
log.warn("haven't finished replicating before next time to start.")
170
reactor.callInThread(do_all_replication, local_port)
172
def set_up(port_getter):
174
unique_identifiers = couchdb_io.get_my_host_unique_id(
175
couchdb_io.mkuri("localhost", int(port)), create=False)
176
if unique_identifiers is None:
177
log.warn("No unique hostaccount id is set, so pairing not enabled.")
180
beacons = [dbus_io.LocationAdvertisement(port, "desktopcouch " + i)
181
for i in unique_identifiers]
185
except dbus.exceptions.DBusException, e:
186
logging.error("We seem to be running already, or can't publish "
187
"our zeroconf advert. %s", e)
190
dbus_io.discover_services(None, None, True)
192
dbus_io.maintain_discovered_servers()
194
t = task.LoopingCall(replicate_local_databases_to_paired_hosts, port)
197
# TODO: port may change, so every so often, check it and
198
# perhaps refresh the beacons.
203
def tear_down(beacons, looping_task):