1
# Copyright 2014 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Utilities for working with asynchronous operations."""
6
from __future__ import (
19
from itertools import count
20
from Queue import Queue
22
from crochet import wait_for_reactor
23
from maasserver.exceptions import IteratorReusedError
24
from twisted.internet import reactor
25
from twisted.internet.defer import maybeDeferred
26
from twisted.python import log
29
class UseOnceIterator:
30
"""An iterator that is usable only once."""
32
def __init__(self, *args):
33
"""Create a new :class:`UseOnceIterator`.
35
Takes the same arguments as iter().
37
self.iterable = iter(*args)
38
self.has_run_once = False
45
raise IteratorReusedError(
46
"It is not possible to reuse a UseOnceIterator.")
48
return self.iterable.next()
50
self.has_run_once = True
55
def gather(calls, timeout=10.0):
56
"""gather(calls, timeout=10.0)
58
Issue calls into the reactor, passing results back to another thread.
60
:param calls: An iterable of no-argument callables to be called in
61
the reactor thread. Each will be called via
62
:py:func:`~twisted.internet.defer.maybeDeferred`.
64
:param timeout: The number of seconds before further results are
65
ignored. Outstanding results will be cancelled.
67
:return: A :class:`UseOnceIterator` of results. A result might be a
68
failure, i.e. an instance of
69
:py:class:`twisted.python.failure.Failure`, or a valid result;
70
it's up to the caller to check.
74
# Prepare of a list of Deferreds that we're going to wait for.
75
deferreds = [maybeDeferred(call) for call in calls]
77
# We'll use this queue (thread-safe) to pass results back.
80
# A sentinel to mark the end of the results.
83
# This function will get called if not all results are in before
84
# `timeout` seconds have passed. It puts `done` into the queue to
85
# indicate the end of results, and cancels all outstanding deferred
89
for deferred in deferreds:
98
canceller = reactor.callLater(timeout, cancel)
100
countdown = count(len(deferreds), -1)
102
# Callback to report the result back to the queue. If it's the last
103
# result to be reported, `done` is put into the queue, and the
104
# delayed call to `cancel` is itself cancelled.
107
if next(countdown) == 1:
109
if canceller is not None:
110
if canceller.active():
113
for deferred in deferreds:
114
deferred.addBoth(report)
116
# If there are no calls then there will be no results, so we put
117
# `done` into the queue, and cancel the nascent delayed call to
118
# `cancel`, if it exists.
119
if len(deferreds) == 0:
121
if canceller is not None:
124
# Return an iterator to the invoking thread that will stop at the
125
# first sign of the `done` sentinel.
126
return UseOnceIterator(queue.get, done)