111
111
self.event_q = event_queue.EventQueue(self.fs)
112
112
self.fs.register_eq(self.event_q)
113
113
self.oauth_client = OAuthClient(self.realm)
114
self.state = SyncDaemonStateManager(self, handshake_timeout,
115
max_handshake_timeouts)
116
115
# subscribe VM to EQ
117
116
self.event_q.subscribe(self.vm)
118
117
self.vm.init_root()
119
119
# we don't have the oauth tokens yet, we 'll get them later
120
120
self.action_q = action_queue.ActionQueue(self.event_q, self,
126
126
self.hash_q = hash_queue.HashQueue(self.event_q)
127
127
events_nanny.DownloadFinishedNanny(self.fs, self.event_q, self.hash_q)
129
# call StateManager after having AQ
130
self.state = SyncDaemonStateManager(self, handshake_timeout,
131
max_handshake_timeouts)
129
133
self.sync = sync.Sync(self)
130
134
self.lr = local_rescan.LocalRescan(self.vm, self.fs,
131
135
self.event_q, self.action_q)
197
201
self.event_q.push('SYS_WAIT_FOR_LOCAL_RESCAN')
198
202
self.event_q.inotify_add_watch(self.root_dir)
200
# do the local rescan
201
self.logger.note("Local rescan starting...")
204
def _wait_for_hashq():
206
Keep on calling this until the hash_q finishes.
209
self.logger.info("hash queue pending. Waiting for it...")
210
reactor.callLater(.1, _wait_for_hashq)
212
self.logger.info("hash queue empty. We are ready!")
213
# nudge the action queue into action
214
self.event_q.push('SYS_LOCAL_RESCAN_DONE')
216
def local_rescan_done(_):
217
"""After local rescan finished."""
218
self.logger.note("Local rescan finished!")
221
def stop_the_press(failure):
222
"""Something went wrong in LR, can't continue."""
223
self.logger.error("Local rescan finished with error: %s",
224
failure.getBriefTraceback())
225
self.event_q.push('SYS_UNKNOWN_ERROR')
227
d.addCallbacks(local_rescan_done, stop_the_press)
204
d = self.local_rescan()
230
207
def shutdown(self, with_restart=False):
231
""" shutdown the daemon; optionally restart it """
208
"""Shutdown the daemon; optionally restart it."""
232
209
self.event_q.push('SYS_DISCONNECT')
233
210
self.event_q.shutdown()
234
211
self.hash_q.shutdown()
238
215
def restart(self):
216
"""Restart the daemon.
242
218
This ultimately shuts down the daemon and asks dbus to start one again.
244
220
self.quit(exit_value=42, with_restart=True)
246
222
def get_root(self, root_mdid):
248
Ask que AQ for our root's uuid
223
"""Ask que AQ for our root's uuid."""
252
Actually do the asking
225
"""Actually do the asking."""
254
226
d = self.action_q.get_root(root_mdid)
255
227
def root_node_cb(root):
256
""" root node fetched callback. """
228
"""Root node fetched callback."""
257
229
root_mdid = self.vm.on_server_root(root)
258
230
self.action_q.uuid_map.set(root_mdid, root)
259
231
d.addCallback(root_node_cb)
268
240
def check_version(self):
270
Check the client protocol version matches that of the server.
241
"""Check the client protocol version matches that of the server."""
272
242
self.action_q.check_version()
274
244
def authenticate(self):
245
"""Do the OAuth dance."""
278
246
self.action_q.authenticate(self.oauth_client.consumer)
280
248
def set_capabilities(self):
283
251
self.action_q.set_capabilities(syncdaemon.REQUIRED_CAPS)
285
253
def server_rescan(self):
254
"""Do the server rescan."""
289
255
self.action_q.server_rescan(self.fs.get_data_for_server_rescan)
291
257
def set_oauth_token(self, key, secret):
293
259
self.token = oauth.OAuthToken(key, secret)
295
261
def get_access_token(self):
296
"""Return the access token or a new one"""
262
"""Return the access token or a new one."""
298
264
return self.token
300
266
return self.oauth_client.get_access_token()
302
268
def get_rootdir(self):
303
""" Returns the base dir/mount point"""
269
"""Return the base dir/mount point."""
304
270
return self.root_dir
306
272
def quit(self, exit_value=0, with_restart=False):
307
""" shutdown and stop the reactor. """
273
"""Shutdown and stop the reactor."""
308
274
self.shutdown(with_restart)
309
275
if reactor.running:
312
278
sys.exit(exit_value)
280
def local_rescan(self):
281
"""Do the local rescan."""
282
self.logger.note("Local rescan starting...")
285
def _wait_for_hashq():
286
"""Keep on calling this until the hash_q finishes."""
288
self.logger.info("hash queue pending. Waiting for it...")
289
reactor.callLater(.1, _wait_for_hashq)
291
self.logger.info("hash queue empty. We are ready!")
292
# nudge the action queue into action
293
self.event_q.push('SYS_LOCAL_RESCAN_DONE')
295
def local_rescan_done(_):
296
"""After local rescan finished."""
297
self.logger.note("Local rescan finished!")
300
def stop_the_press(failure):
301
"""Something went wrong in LR, can't continue."""
302
self.logger.error("Local rescan finished with error: %s",
303
failure.getBriefTraceback())
304
self.event_q.push('SYS_UNKNOWN_ERROR')
306
d.addCallbacks(local_rescan_done, stop_the_press)
315
309
class NoAccessToken(Exception):
316
310
"""No access token available."""