1
1
# -*- coding: utf-8 -*-
3
# Copyright 2009-2012 Canonical Ltd.
3
# Copyright 2009-2013 Canonical Ltd.
5
5
# This program is free software: you can redistribute it and/or modify it
6
6
# under the terms of the GNU General Public License version 3, as published
761
761
return getattr(self.fd, attr)
764
class MusicDeliveryChecker(object):
765
"""Checks the api for new albums delivered to the cloud."""
767
ALBUM_DELIVERY_IRI = (u"https://edge.one.ubuntu.com/" +
768
u"music-store/api/1/user/newest-purchases?last_server_timestamp=%d")
770
def __init__(self, action_queue):
771
self.action_queue = action_queue
772
self.last_server_timestamp = 0
775
"""Start calling the api periodically."""
776
# TODO: coming in following branches. See pad.lv/1103690
777
# reactor.callLater(0, self.check_and_schedule_next_check)
780
"""Stop calling the api."""
782
@defer.inlineCallbacks
783
def check_and_schedule_next_check(self):
784
"""Check the api, and schedule the next check."""
785
# TODO: coming in following branches. See pad.lv/1103690
786
# on any error, wait 10 minutes
787
# on success, wait 60 minutes
789
@defer.inlineCallbacks
791
"""Do the actual checking."""
792
iri = self.ALBUM_DELIVERY_IRI % self.last_server_timestamp
793
logger.debug("Checking music delivery: %d", self.last_server_timestamp)
794
response = yield self.action_queue.webcall(iri, method="GET")
795
logger.debug("Music delivery returned %d bytes", len(response.content))
796
parsed = json.loads(response.content)
797
self.last_server_timestamp = int(parsed["server_timestamp"])
798
self.action_queue.music_delivery_results(parsed["purchased_albums"])
764
801
class ActionQueue(ThrottlingStorageClientFactory, object):
765
802
"""The ActionQueue itself."""
813
850
self.commands = dict((x, y) for x, y in globals().iteritems()
814
851
if inspect.isclass(y) and
815
852
issubclass(y, ActionQueueCommand))
853
self.delivery_checker = MusicDeliveryChecker(self)
817
855
def check_conditions(self):
818
856
"""Check conditions in the locker, to release all the waiting ops."""
843
881
self.client = None
844
882
self.connector = None
845
883
self.connect_in_progress = False
884
self.delivery_checker.stop()
847
886
def _share_change_callback(self, info):
848
887
"""Called by the client when notified that a share changed."""
922
961
return defer.succeed((self.host, self.port))
963
def music_delivery_results(self, results):
964
"""Push the music delivery results into the event queue."""
965
self.event_queue.push('AQ_MUSIC_DELIVERY_RESULTS',
966
purchased_albums=results)
925
968
@defer.inlineCallbacks
926
969
def webcall(self, iri, **kwargs):
968
1011
d = self._lookup_srv()
969
1012
# DNS lookup always succeeds, proceed to actually connect
970
1013
d.addCallback(self._make_connection)
1014
self.delivery_checker.start()
972
1016
def buildProtocol(self, addr):
973
1017
"""Build the client and store it. Connect callbacks."""