81
class ForcedShutdown(Exception):
82
"""Client shutdown forced."""
86
"""Wait object for blocking waits."""
89
"""Initializes the wait object."""
92
def wake(self, result):
93
"""Wakes the waiter with a result."""
94
self.queue.put((result, None))
96
def wakeAndRaise(self, exc_info):
97
"""Wakes the waiter, raising the given exception in it."""
98
self.queue.put((None, exc_info))
100
def wakeWithResult(self, func, *args, **kw):
101
"""Wakes the waiter with the result of the given function."""
103
result = func(*args, **kw)
105
self.wakeAndRaise(sys.exc_info())
110
"""Waits for wakeup."""
111
(result, exc_info) = self.queue.get()
114
raise exc_info[0], exc_info[1], exc_info[2]
80
121
class SyncStorageClient(StorageClient):
81
122
"""Simple client that calls a callback on connection."""
182
224
self.user_authorisation_url = get_oauth_url("user_authorisation_url")
183
225
self.access_token_url = get_oauth_url("access_token_url")
227
def force_shutdown(self):
228
"""Forces the client to shut itself down."""
229
with self._status_lock:
230
self._status = "forced_shutdown"
232
for waiter in self._active_waiters:
233
waiter.wakeAndRaise((ForcedShutdown("Forced shutdown"),
235
self._active_waiters.clear()
237
def _get_waiter_locked(self):
238
"""Gets a wait object for blocking waits. Should be called with the
242
if self._status == "forced_shutdown":
243
raise ForcedShutdown("Forced shutdown")
244
self._active_waiters.add(waiter)
247
def _get_waiter(self):
248
"""Get a wait object for blocking waits. Acquires the status lock."""
249
with self._status_lock:
250
return self._get_waiter_locked()
252
def _wait(self, waiter):
253
"""Waits for the waiter."""
257
with self._status_lock:
258
if waiter in self._active_waiters:
259
self._active_waiters.remove(waiter)
186
262
def obtain_oauth_token(self, create_token):
187
263
"""Obtains an oauth token, optionally creating one if requried."""
188
token_result = Queue()
264
token_result = self._get_waiter()
191
267
def have_token(token):
192
268
"""When a token is available."""
193
token_result.put(token)
269
token_result.wake(token)
197
273
"""When no token is available."""
198
token_result.put(None)
274
token_result.wake(None)
200
276
oauth_client = AuthorisationClient(realm=self.realm,
201
277
request_token_url=
220
296
oauth_client.ensure_access_token()
222
298
self.reactor.callFromThread(_obtain_token)
223
token = token_result.get()
299
token = self._wait(token_result)
224
300
if token is None:
225
301
raise AuthenticationError("Unable to obtain OAuth token.")
234
310
with self._status_lock:
311
if self._status == "forced_shutdown":
235
313
self._status = status
236
314
self._status_reason = reason
237
315
waiting = self._status_waiting
239
self._status_waiting = []
240
for waiter in waiting:
241
waiter.put((status, reason))
316
self._status_waiting = []
317
for waiter in waiting:
318
waiter.wake((status, reason))
244
321
def _await_status_not(self, *ignore_statuses):
250
327
status = self._status
251
328
reason = self._status_reason
252
329
while status in ignore_statuses:
330
waiter = self._get_waiter_locked()
254
331
self._status_waiting.append(waiter)
255
332
self._status_lock.release()
257
status, reason = waiter.get()
334
status, reason = self._wait(waiter)
259
336
self._status_lock.acquire()
337
if status == "forced_shutdown":
338
raise ForcedShutdown("Forced shutdown.")
260
339
return (status, reason)
262
341
def connection_failed(self, reason):
274
353
def defer_from_thread(self, function, *args, **kwargs):
275
354
"""Do twisted defer magic to get results and show exceptions."""
355
waiter = self._get_waiter()
284
362
d = function(*args, **kwargs)
285
363
if isinstance(d, defer.Deferred):
286
d.addCallbacks(lambda r: queue.put((r, None, None)),
287
lambda f: queue.put((None, None, f)))
364
d.addCallbacks(lambda r: waiter.wake((r, None, None)),
365
lambda f: waiter.wake((None, None, f)))
289
queue.put((d, None, None))
291
queue.put((None, sys.exc_info(), None))
367
waiter.wake((d, None, None))
369
waiter.wake((None, sys.exc_info(), None))
293
371
self.reactor.callFromThread(runner)
372
result, exc_info, failure = self._wait(waiter)
296
# poll with a timeout so that interrupts are still serviced
297
result, exc_info, failure = queue.get(True, 1)
299
except Empty: # pylint: disable-msg=W0704
302
raise exc_info[1], None, exc_info[2]
375
raise exc_info[0], exc_info[1], exc_info[2]
304
379
failure.raiseException()