~dmedia/dmedia/trunk

« back to all changes in this revision

Viewing changes to dmedia/service/replicator.py

  • Committer: Jason Gerard DeRose
  • Date: 2012-04-28 20:51:51 UTC
  • mfrom: (365.1.8 peer-sync)
  • Revision ID: jderose@novacut.com-20120428205151-72b00na8b8gx0ed0
Experimental sync to peers on localnetwork

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
 
26
26
import logging
27
27
import json
 
28
from collections import namedtuple
28
29
 
29
30
from microfiber import Server, Database, PreconditionFailed
30
31
import dbus
32
33
 
33
34
log = logging.getLogger()
34
35
system = dbus.SystemBus()
35
 
 
 
36
Peer = namedtuple('Peer', 'url names')
36
37
SERVICE = '_usercouch._tcp'
37
38
 
38
39
 
39
 
def get_body(source, target):
40
 
    return {
 
40
def get_body(source, target, cancel=False):
 
41
    body = {
41
42
        'source': source,
42
43
        'target': target,
43
44
        'continuous': True,
44
45
    }
 
46
    if cancel:
 
47
        body['cancel'] = True
 
48
    return body
45
49
 
46
50
 
47
51
def get_peer(url, dbname, tokens):
56
60
class Replicator:
57
61
    def __init__(self, env, config):
58
62
        self.group = None
 
63
        self.env = env
59
64
        self.server = Server(env)
60
65
        self.library_id = config['library_id']
61
66
        self.base_id = self.library_id + '-'
62
 
        self.machine_id = env['machine_id']
63
 
        self.id = self.base_id + self.machine_id
64
67
        self.port = env['port']
65
68
        self.tokens = config['tokens']
66
69
        self.peers = {}
69
72
        self.free()
70
73
 
71
74
    def run(self):
 
75
        self.machine_id = self.env['machine_id']
 
76
        self.id = self.base_id + self.machine_id
72
77
        self.avahi = system.get_object('org.freedesktop.Avahi', '/')
73
78
        self.group = system.get_object(
74
79
            'org.freedesktop.Avahi',
106
111
        if self.group is not None:
107
112
            self.group.Reset(dbus_interface='org.freedesktop.Avahi.EntryGroup')
108
113
 
109
 
    def on_ItemNew(self, interface, protocol, name, _type, domain, flags):
110
 
        if name == self.id:  # Ignore what we publish ourselves
 
114
    def on_ItemNew(self, interface, protocol, key, _type, domain, flags):
 
115
        if key == self.id:  # Ignore what we publish ourselves
111
116
            return
112
 
        if not name.startswith(self.base_id):  # Ignore other libraries
 
117
        if not key.startswith(self.base_id):  # Ignore other libraries
113
118
            return
114
119
        (ip, port) = self.avahi.ResolveService(
115
 
            interface, protocol, name, _type, domain, -1, 0,
 
120
            interface, protocol, key, _type, domain, -1, 0,
116
121
            dbus_interface='org.freedesktop.Avahi.Server'
117
122
        )[7:9]
118
123
        url = 'http://{}:{}/'.format(ip, port)
119
 
        log.info('Replicator: new peer %s at %s', name, url)
120
 
        self.peers[name] = url
121
 
        self.replicate(url, 'foo') 
122
 
 
123
 
    def on_ItemRemove(self, interface, protocol, name, _type, domain, flags):
124
 
        log.info('Replicator: removing peer %s', name)
125
 
        try:
126
 
            del self.peers[name]
127
 
        except KeyError:
128
 
            pass
129
 
 
130
 
    def replicate(self, url, dbname):
131
 
        # Create local DB if needed
132
 
        try:
133
 
            self.server.put(None, dbname)
134
 
        except PreconditionFailed:
135
 
            pass
136
 
 
137
 
        # Create remote DB if needed
138
 
        env = {'url': url, 'oauth': self.tokens}
139
 
        db = Database(dbname, env)
140
 
        db.ensure() 
141
 
 
142
 
        peer = get_peer(url, dbname, self.tokens)
143
 
        local_to_remote = get_body(dbname, peer)
144
 
        remote_to_local = get_body(peer, dbname)
145
 
        for obj in (local_to_remote, remote_to_local):
146
 
            self.server.post(obj, '_replicate')
 
124
        log.info('Replicator: new peer %s at %s', key, url)
 
125
        self.cancel_all(key)
 
126
        self.peers[key] = Peer(url, [])
 
127
        self.replicate_all(key)
 
128
 
 
129
    def on_ItemRemove(self, interface, protocol, key, _type, domain, flags):
 
130
        log.info('Replicator: peer removed %s', key)
 
131
        self.cancel_all(str(key))
 
132
 
 
133
    def cancel_all(self, key):
 
134
        p = self.peers.pop(key, None)
 
135
        if p is None:
 
136
            return
 
137
        log.info('Canceling replications for %r', key)
 
138
        for name in p.names:
 
139
            self.replicate(p.url, name, cancel=True)
 
140
 
 
141
    def replicate_all(self, key):
 
142
        p = self.peers[key]
 
143
        env = {'url': p.url, 'oauth': self.tokens}
 
144
        remote = Server(env)
 
145
        for name in self.server.get('_all_dbs'):
 
146
            if name.startswith('_'):
 
147
                continue
 
148
            if not (name.startswith('dmedia-0') or name.startswith('novacut-0')):
 
149
                continue
 
150
            # Create remote DB if needed
 
151
            try:
 
152
                remote.put(None, name)
 
153
            except PreconditionFailed:
 
154
                pass
 
155
 
 
156
            # Start replication
 
157
            p.names.append(name)
 
158
            self.replicate(p.url, name)
 
159
 
 
160
    def replicate(self, url, name, cancel=False):
 
161
        """
 
162
        Start or cancel push replication of database *name* to peer at *url*.
 
163
 
 
164
        Security note: we only do push replication because pull replication
 
165
        would allow unauthorized peers to write to our databases via their
 
166
        changes feed.  For both push and pull, there is currently no privacy
 
167
        whatsoever... everything is in cleartext and uses oauth 1.0a. But push
 
168
        replication is the only way to at least prevent malicious data
 
169
        corruption.
 
170
        """
 
171
        if cancel:
 
172
            log.info('Canceling push of %r to %r', name, url)
 
173
        else:
 
174
            log.info('Starting push of %r to %r', name, url)
 
175
        peer = get_peer(url, name, self.tokens)
 
176
        push = get_body(name, peer, cancel)
 
177
        self.server.post(push, '_replicate')
147
178
        
148
179
        
149
180