~tcole/ubuntuone-client/request-queue-predicates

« back to all changes in this revision

Viewing changes to ubuntuone/u1sync/client.py

  • Committer: tim.cole at canonical
  • Date: 2009-08-22 00:50:15 UTC
  • mfrom: (160.2.10 trunk)
  • Revision ID: tim.cole@canonical.com-20090822005015-6tdpcitod026r6z2
mergeĀ fromĀ trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
import os
26
26
import sys
27
27
import shutil
28
 
from Queue import Queue, Empty
 
28
from Queue import Queue
29
29
from threading import Lock
30
30
import zlib
31
31
import urlparse
77
77
        return ent
78
78
    return wrapper
79
79
 
 
80
 
 
81
class ForcedShutdown(Exception):
 
82
    """Client shutdown forced."""
 
83
 
 
84
 
 
85
class Waiter(object):
 
86
    """Wait object for blocking waits."""
 
87
 
 
88
    def __init__(self):
 
89
        """Initializes the wait object."""
 
90
        self.queue = Queue()
 
91
 
 
92
    def wake(self, result):
 
93
        """Wakes the waiter with a result."""
 
94
        self.queue.put((result, None))
 
95
 
 
96
    def wakeAndRaise(self, exc_info):
 
97
        """Wakes the waiter, raising the given exception in it."""
 
98
        self.queue.put((None, exc_info))
 
99
 
 
100
    def wakeWithResult(self, func, *args, **kw):
 
101
        """Wakes the waiter with the result of the given function."""
 
102
        try:
 
103
            result = func(*args, **kw)
 
104
        except Exception:
 
105
            self.wakeAndRaise(sys.exc_info())
 
106
        else:
 
107
            self.wake(result)
 
108
 
 
109
    def wait(self):
 
110
        """Waits for wakeup."""
 
111
        (result, exc_info) = self.queue.get()
 
112
        if exc_info:
 
113
            try:
 
114
                raise exc_info[0], exc_info[1], exc_info[2]
 
115
            finally:
 
116
                exc_info = None
 
117
        else:
 
118
            return result
 
119
 
 
120
 
80
121
class SyncStorageClient(StorageClient):
81
122
    """Simple client that calls a callback on connection."""
82
123
 
149
190
        self._status = "disconnected"
150
191
        self._status_reason = None
151
192
        self._status_waiting = []
 
193
        self._active_waiters = set()
152
194
 
153
195
        self.realm = realm
154
196
 
182
224
        self.user_authorisation_url = get_oauth_url("user_authorisation_url")
183
225
        self.access_token_url = get_oauth_url("access_token_url")
184
226
 
 
227
    def force_shutdown(self):
 
228
        """Forces the client to shut itself down."""
 
229
        with self._status_lock:
 
230
            self._status = "forced_shutdown"
 
231
            self._reason = None
 
232
            for waiter in self._active_waiters:
 
233
                waiter.wakeAndRaise((ForcedShutdown("Forced shutdown"),
 
234
                                     None, None))
 
235
            self._active_waiters.clear()
 
236
 
 
237
    def _get_waiter_locked(self):
 
238
        """Gets a wait object for blocking waits.  Should be called with the
 
239
        status lock held.
 
240
        """
 
241
        waiter = Waiter()
 
242
        if self._status == "forced_shutdown":
 
243
            raise ForcedShutdown("Forced shutdown")
 
244
        self._active_waiters.add(waiter)
 
245
        return waiter
 
246
 
 
247
    def _get_waiter(self):
 
248
        """Get a wait object for blocking waits.  Acquires the status lock."""
 
249
        with self._status_lock:
 
250
            return self._get_waiter_locked()
 
251
 
 
252
    def _wait(self, waiter):
 
253
        """Waits for the waiter."""
 
254
        try:
 
255
            return waiter.wait()
 
256
        finally:
 
257
            with self._status_lock:
 
258
                if waiter in self._active_waiters:
 
259
                    self._active_waiters.remove(waiter)
 
260
 
185
261
    @log_timing
186
262
    def obtain_oauth_token(self, create_token):
187
263
        """Obtains an oauth token, optionally creating one if requried."""
188
 
        token_result = Queue()
 
264
        token_result = self._get_waiter()
189
265
 
190
266
        @log_timing
191
267
        def have_token(token):
192
268
            """When a token is available."""
193
 
            token_result.put(token)
 
269
            token_result.wake(token)
194
270
 
195
271
        @log_timing
196
272
        def no_token():
197
273
            """When no token is available."""
198
 
            token_result.put(None)
 
274
            token_result.wake(None)
199
275
 
200
276
        oauth_client = AuthorisationClient(realm=self.realm,
201
277
                                           request_token_url=
220
296
            oauth_client.ensure_access_token()
221
297
 
222
298
        self.reactor.callFromThread(_obtain_token)
223
 
        token = token_result.get()
 
299
        token = self._wait(token_result)
224
300
        if token is None:
225
301
            raise AuthenticationError("Unable to obtain OAuth token.")
226
302
        return token
232
308
 
233
309
        """
234
310
        with self._status_lock:
 
311
            if self._status == "forced_shutdown":
 
312
                return
235
313
            self._status = status
236
314
            self._status_reason = reason
237
315
            waiting = self._status_waiting
238
 
            if len(waiting) > 0:
239
 
                self._status_waiting = []
240
 
                for waiter in waiting:
241
 
                    waiter.put((status, reason))
 
316
            self._status_waiting = []
 
317
        for waiter in waiting:
 
318
            waiter.wake((status, reason))
242
319
 
243
320
    @log_timing
244
321
    def _await_status_not(self, *ignore_statuses):
250
327
            status = self._status
251
328
            reason = self._status_reason
252
329
            while status in ignore_statuses:
253
 
                waiter = Queue()
 
330
                waiter = self._get_waiter_locked()
254
331
                self._status_waiting.append(waiter)
255
332
                self._status_lock.release()
256
333
                try:
257
 
                    status, reason = waiter.get()
 
334
                    status, reason = self._wait(waiter)
258
335
                finally:
259
336
                    self._status_lock.acquire()
 
337
            if status == "forced_shutdown":
 
338
                raise ForcedShutdown("Forced shutdown.")
260
339
            return (status, reason)
261
340
 
262
341
    def connection_failed(self, reason):
273
352
 
274
353
    def defer_from_thread(self, function, *args, **kwargs):
275
354
        """Do twisted defer magic to get results and show exceptions."""
276
 
 
277
 
        queue = Queue()
 
355
        waiter = self._get_waiter()
278
356
        @log_timing
279
357
        def runner():
280
358
            """inner."""
283
361
            try:
284
362
                d = function(*args, **kwargs)
285
363
                if isinstance(d, defer.Deferred):
286
 
                    d.addCallbacks(lambda r: queue.put((r, None, None)),
287
 
                                   lambda f: queue.put((None, None, f)))
 
364
                    d.addCallbacks(lambda r: waiter.wake((r, None, None)),
 
365
                                   lambda f: waiter.wake((None, None, f)))
288
366
                else:
289
 
                    queue.put((d, None, None))
290
 
            except Exception:
291
 
                queue.put((None, sys.exc_info(), None))
 
367
                    waiter.wake((d, None, None))
 
368
            except Exception, e:
 
369
                waiter.wake((None, sys.exc_info(), None))
292
370
 
293
371
        self.reactor.callFromThread(runner)
294
 
        while True:
 
372
        result, exc_info, failure = self._wait(waiter)
 
373
        if exc_info:
295
374
            try:
296
 
                # poll with a timeout so that interrupts are still serviced
297
 
                result, exc_info, failure = queue.get(True, 1)
298
 
                break
299
 
            except Empty: # pylint: disable-msg=W0704
300
 
                pass
301
 
        if exc_info:
302
 
            raise exc_info[1], None, exc_info[2]
 
375
                raise exc_info[0], exc_info[1], exc_info[2]
 
376
            finally:
 
377
                exc_info = None
303
378
        elif failure:
304
379
            failure.raiseException()
305
380
        else: