~hudson-openstack/burrow/trunk

« back to all changes in this revision

Viewing changes to burrow/backend/__init__.py

  • Committer: Tarmac
  • Author(s): Eric Day
  • Date: 2011-08-10 07:28:01 UTC
  • mfrom: (16.3.24 backend-unittests)
  • Revision ID: tarmac-20110810072801-32ue4p8boiepxhjg
Pushed wait functionality down into backend, removed frontend tests since this is replaced with HTTP backend test now.

Show diffs side-by-side

added added

removed removed

Lines of Context:
66
66
    def update_message(self, account, queue, message, attributes, filters={}):
67
67
        return None
68
68
 
69
 
    def notify(self, account, queue):
70
 
        '''Notify any waiting callers that the account/queue has
71
 
        a visible message.'''
72
 
        queue = '%s/%s' % (account, queue)
73
 
        if queue in self.queues:
74
 
            for count in xrange(0, self.queues[queue].getting()):
75
 
                self.queues[queue].put(0)
76
 
 
77
 
    def wait(self, account, queue, seconds):
78
 
        '''Wait for a message to appear in the account/queue.'''
79
 
        queue = '%s/%s' % (account, queue)
80
 
        if queue not in self.queues:
81
 
            self.queues[queue] = eventlet.Queue()
82
 
        try:
83
 
            self.queues[queue].get(timeout=seconds)
84
 
        except Exception:
85
 
            pass
86
 
        if self.queues[queue].getting() == 0:
87
 
            del self.queues[queue]
88
 
 
89
69
    def clean(self):
90
70
        '''This method should remove all messages with an expired
91
71
        TTL and make hidden messages that have an expired hide time
124
104
            raise burrow.backend.InvalidArguments(detail)
125
105
        return detail
126
106
 
 
107
    def _notify(self, account, queue):
 
108
        '''Notify any waiting callers that the account/queue has
 
109
        a visible message.'''
 
110
        queue = '%s/%s' % (account, queue)
 
111
        if queue in self.queues:
 
112
            for count in xrange(0, self.queues[queue].getting()):
 
113
                self.queues[queue].put(0)
 
114
 
 
115
    def _wait(self, account, queue, seconds):
 
116
        '''Wait for a message to appear in the account/queue.'''
 
117
        queue = '%s/%s' % (account, queue)
 
118
        if queue not in self.queues:
 
119
            self.queues[queue] = eventlet.Queue()
 
120
        try:
 
121
            self.queues[queue].get(timeout=seconds)
 
122
        except Exception:
 
123
            pass
 
124
        if self.queues[queue].getting() == 0:
 
125
            del self.queues[queue]
 
126
 
 
127
 
 
128
def wait_without_attributes(method):
 
129
    def wrapper(self, account, queue, filters={}):
 
130
        original = lambda: method(self, account, queue, filters)
 
131
        return wait(self, account, queue, filters, original)
 
132
    return wrapper
 
133
 
 
134
 
 
135
def wait_with_attributes(method):
 
136
    def wrapper(self, account, queue, attributes, filters={}):
 
137
        original = lambda: method(self, account, queue, attributes, filters)
 
138
        return wait(self, account, queue, filters, original)
 
139
    return wrapper
 
140
 
 
141
 
 
142
def wait(self, account, queue, filters, method):
 
143
    '''Decorator to wait on a queue if the wait option is given. This
 
144
    will block until a message in the queue is ready or the timeout
 
145
    expires.'''
 
146
    wait = filters.get('wait', 0)
 
147
    if wait > 0:
 
148
        wait += time.time()
 
149
    while True:
 
150
        try:
 
151
            for message in method():
 
152
                yield message
 
153
            return
 
154
        except burrow.backend.NotFound, e:
 
155
            now = time.time()
 
156
            if wait - now > 0:
 
157
                self._wait(account, queue, wait - now)
 
158
            if wait < time.time():
 
159
                raise e
 
160
 
127
161
 
128
162
class NotFound(Exception):
129
163
    pass