~alecu/ubuntuone-client/delivery-call-api

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/action_queue.py

  • Committer: Alejandro J. Cura
  • Date: 2013-01-25 14:55:12 UTC
  • Revision ID: alecu@canonical.com-20130125145512-1mq2skk5gnd1jwh0
Add call to newest-purchases web api

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# -*- coding: utf-8 -*-
2
2
#
3
 
# Copyright 2009-2012 Canonical Ltd.
 
3
# Copyright 2009-2013 Canonical Ltd.
4
4
#
5
5
# This program is free software: you can redistribute it and/or modify it
6
6
# under the terms of the GNU General Public License version 3, as published
761
761
        return getattr(self.fd, attr)
762
762
 
763
763
 
 
764
class MusicDeliveryChecker(object):
 
765
    """Checks the api for new albums delivered to the cloud."""
 
766
 
 
767
    ALBUM_DELIVERY_IRI = (u"https://edge.one.ubuntu.com/" +
 
768
        u"music-store/api/1/user/newest-purchases?last_server_timestamp=%d")
 
769
 
 
770
    def __init__(self, action_queue):
 
771
        self.action_queue = action_queue
 
772
        self.last_server_timestamp = 0
 
773
 
 
774
    def start(self):
 
775
        """Start calling the api periodically."""
 
776
        # TODO: coming in following branches. See pad.lv/1103690
 
777
        # reactor.callLater(0, self.check_and_schedule_next_check)
 
778
 
 
779
    def stop(self):
 
780
        """Stop calling the api."""
 
781
 
 
782
    @defer.inlineCallbacks
 
783
    def check_and_schedule_next_check(self):
 
784
        """Check the api, and schedule the next check."""
 
785
        # TODO: coming in following branches. See pad.lv/1103690
 
786
        # on any error, wait 10 minutes
 
787
        # on success, wait 60 minutes
 
788
 
 
789
    @defer.inlineCallbacks
 
790
    def check_api(self):
 
791
        """Do the actual checking."""
 
792
        iri = self.ALBUM_DELIVERY_IRI % self.last_server_timestamp
 
793
        logger.debug("Checking music delivery: %d", self.last_server_timestamp)
 
794
        response = yield self.action_queue.webcall(iri, method="GET")
 
795
        logger.debug("Music delivery returned %d bytes", len(response.content))
 
796
        parsed = json.loads(response.content)
 
797
        self.last_server_timestamp = int(parsed["server_timestamp"])
 
798
        self.action_queue.music_delivery_results(parsed["purchased_albums"])
 
799
 
 
800
 
764
801
class ActionQueue(ThrottlingStorageClientFactory, object):
765
802
    """The ActionQueue itself."""
766
803
 
813
850
        self.commands = dict((x, y) for x, y in globals().iteritems()
814
851
                             if inspect.isclass(y) and
815
852
                                            issubclass(y, ActionQueueCommand))
 
853
        self.delivery_checker = MusicDeliveryChecker(self)
816
854
 
817
855
    def check_conditions(self):
818
856
        """Check conditions in the locker, to release all the waiting ops."""
843
881
        self.client = None
844
882
        self.connector = None
845
883
        self.connect_in_progress = False
 
884
        self.delivery_checker.stop()
846
885
 
847
886
    def _share_change_callback(self, info):
848
887
        """Called by the client when notified that a share changed."""
921
960
        else:
922
961
            return defer.succeed((self.host, self.port))
923
962
 
 
963
    def music_delivery_results(self, results):
 
964
        """Push the music delivery results into the event queue."""
 
965
        self.event_queue.push('AQ_MUSIC_DELIVERY_RESULTS',
 
966
                              purchased_albums=results)
924
967
 
925
968
    @defer.inlineCallbacks
926
969
    def webcall(self, iri, **kwargs):
968
1011
        d = self._lookup_srv()
969
1012
        # DNS lookup always succeeds, proceed to actually connect
970
1013
        d.addCallback(self._make_connection)
 
1014
        self.delivery_checker.start()
971
1015
 
972
1016
    def buildProtocol(self, addr):
973
1017
        """Build the client and store it. Connect callbacks."""