226
226
"""A smaller finite state machine to handle queues."""
228
228
IDLE = Node('IDLE', "nothing in the queues")
229
WORKING_ON_METADATA = Node('WORKING_ON_METADATA', "working on metadata")
230
WORKING_ON_CONTENT = Node('WORKING_ON_CONTENT', "working con content")
231
WORKING_ON_BOTH = Node('WORKING_ON_BOTH', "working on both")
229
WORKING = Node('WORKING', "working on the commands queue")
233
231
def __init__(self, state_manager):
234
232
self.sm = state_manager
235
self.cq = state_manager.aq.content_queue
236
self.mq = state_manager.aq.meta_queue
233
self.queue = state_manager.aq.queue
237
234
self.state = self.IDLE
238
235
self.log = logging.getLogger("ubuntuone.SyncDaemon.QueueManager")
239
236
self.log.debug("start")
244
241
self.state = new_state
246
243
def on_event(self, event):
247
"""Handle transitions between working queues."""
244
"""Handle transitions."""
248
245
prv_state = self.state
249
246
if self.state == self.IDLE:
250
if event == 'SYS_META_QUEUE_WAITING':
251
self._set_state(self.WORKING_ON_METADATA)
253
elif event == 'SYS_CONTENT_QUEUE_WAITING':
254
self._set_state(self.WORKING_ON_CONTENT)
257
self._bad_event(event)
259
elif self.state == self.WORKING_ON_METADATA:
260
if event == 'SYS_META_QUEUE_WAITING':
262
elif event == 'SYS_META_QUEUE_DONE':
263
self._set_state(self.IDLE)
264
elif event == 'SYS_CONTENT_QUEUE_WAITING':
265
self._set_state(self.WORKING_ON_BOTH)
267
self._bad_event(event)
269
elif self.state == self.WORKING_ON_CONTENT:
270
if event == 'SYS_META_QUEUE_WAITING':
271
self._set_state(self.WORKING_ON_BOTH)
273
elif event == 'SYS_CONTENT_QUEUE_WAITING':
275
elif event == 'SYS_CONTENT_QUEUE_DONE':
276
self._set_state(self.IDLE)
278
self._bad_event(event)
280
elif self.state == self.WORKING_ON_BOTH:
281
if event == 'SYS_META_QUEUE_WAITING':
283
elif event == 'SYS_META_QUEUE_DONE':
284
self._set_state(self.WORKING_ON_CONTENT)
286
elif event == 'SYS_CONTENT_QUEUE_WAITING':
287
pass # same node, no run()
288
elif event == 'SYS_CONTENT_QUEUE_DONE':
289
self._set_state(self.WORKING_ON_METADATA)
247
if event == 'SYS_QUEUE_WAITING':
248
self._set_state(self.WORKING)
250
self._bad_event(event)
252
elif self.state == self.WORKING:
253
if event == 'SYS_QUEUE_DONE':
254
self._set_state(self.IDLE)
291
256
self._bad_event(event)
299
264
def on_enter(self, new_node):
300
265
"""Called when SM gets into a new node."""
301
266
if new_node == StateManager.QUEUE_MANAGER:
302
if self.state == self.WORKING_ON_CONTENT:
304
elif self.state in (self.WORKING_ON_METADATA, self.WORKING_ON_BOTH):
307
269
def _bad_event(self, event):
308
270
"""Log the bad event."""
309
271
m = "Bad Event received: Got %r while in %s"
310
272
self.log.warning(m, event, self.state)
312
def _run(self, queue):
313
"""Execute queue.run() if conditions are ok.
317
- the queue needs to have something to run
319
- the global state should be this QueueManager
321
if len(queue) and self.sm.state == StateManager.QUEUE_MANAGER:
322
self.log.debug("In %s: running %s", self.state.name, queue.name)
326
275
ACCEPTED_EVENTS = [
327
276
'SYS_AUTH_ERROR',
330
279
'SYS_CONNECTION_LOST',
331
280
'SYS_CONNECTION_MADE',
332
281
'SYS_CONNECTION_RETRY',
333
'SYS_CONTENT_QUEUE_DONE',
334
'SYS_CONTENT_QUEUE_WAITING',
335
284
'SYS_HANDSHAKE_TIMEOUT',
337
286
'SYS_LOCAL_RESCAN_DONE',
338
'SYS_META_QUEUE_DONE',
339
'SYS_META_QUEUE_WAITING',
340
287
'SYS_NET_CONNECTED',
341
288
'SYS_NET_DISCONNECTED',
342
289
'SYS_PROTOCOL_VERSION_ERROR',
343
290
'SYS_PROTOCOL_VERSION_OK',
344
292
'SYS_ROOT_MISMATCH',
345
293
'SYS_SERVER_RESCAN_DONE',
346
294
'SYS_SERVER_ERROR',
374
322
SERVER_RESCAN = Node('SERVER_RESCAN', "doing server rescan", conn=True)
376
QUEUE_MANAGER = Node('QUEUE_MANAGER', "processing queues",
324
QUEUE_MANAGER = Node('QUEUE_MANAGER', "processing the commands pool",
377
325
conn=True, online=True)
379
327
STANDOFF = Node('STANDOFF', "waiting for connection to end", conn=True)
382
330
"different", error=True)
383
331
UNKNOWN_ERROR = Node('UNKNOWN_ERROR', "something went wrong", error=True)
333
SHUTDOWN = Node('SHUTDOWN', "shutting down the service")
385
335
def __init__(self, main, handshake_timeout=None):
387
337
self.aq = main.action_q
435
385
self.log.debug("received event %r", event)
388
if event == 'SYS_QUIT':
389
self._transition(event, StateManager.SHUTDOWN)
437
392
# error management
438
393
if event == 'SYS_UNKNOWN_ERROR':
439
394
self._transition(event, StateManager.UNKNOWN_ERROR)
448
if event in ('SYS_META_QUEUE_WAITING', 'SYS_META_QUEUE_DONE',
449
'SYS_CONTENT_QUEUE_WAITING', 'SYS_CONTENT_QUEUE_DONE'):
403
if event in ('SYS_QUEUE_WAITING', 'SYS_QUEUE_DONE'):
450
404
self.log.debug("sending event to QueueManager")
451
405
changed = self.queues.on_event(event)