~lutostag/ubuntu/trusty/maas/1.5.2+packagefix

« back to all changes in this revision

Viewing changes to src/maasserver/utils/async.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez
  • Date: 2014-03-28 10:43:53 UTC
  • mto: This revision was merged to the branch mainline in revision 57.
  • Revision ID: package-import@ubuntu.com-20140328104353-ekpolg0pm5xnvq2s
Tags: upstream-1.5+bzr2204
ImportĀ upstreamĀ versionĀ 1.5+bzr2204

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2014 Canonical Ltd.  This software is licensed under the
 
2
# GNU Affero General Public License version 3 (see the file LICENSE).
 
3
 
 
4
"""Utilities for working with asynchronous operations."""
 
5
 
 
6
from __future__ import (
 
7
    absolute_import,
 
8
    print_function,
 
9
    unicode_literals,
 
10
    )
 
11
 
 
12
str = None
 
13
 
 
14
__metaclass__ = type
 
15
__all__ = [
 
16
    "gather",
 
17
]
 
18
 
 
19
from itertools import count
 
20
from Queue import Queue
 
21
 
 
22
from crochet import wait_for_reactor
 
23
from maasserver.exceptions import IteratorReusedError
 
24
from twisted.internet import reactor
 
25
from twisted.internet.defer import maybeDeferred
 
26
from twisted.python import log
 
27
 
 
28
 
 
29
class UseOnceIterator:
 
30
    """An iterator that is usable only once."""
 
31
 
 
32
    def __init__(self, *args):
 
33
        """Create a new :class:`UseOnceIterator`.
 
34
 
 
35
        Takes the same arguments as iter().
 
36
        """
 
37
        self.iterable = iter(*args)
 
38
        self.has_run_once = False
 
39
 
 
40
    def __iter__(self):
 
41
        return self
 
42
 
 
43
    def next(self):
 
44
        if self.has_run_once:
 
45
            raise IteratorReusedError(
 
46
                "It is not possible to reuse a UseOnceIterator.")
 
47
        try:
 
48
            return self.iterable.next()
 
49
        except StopIteration:
 
50
            self.has_run_once = True
 
51
            raise
 
52
 
 
53
 
 
54
@wait_for_reactor
 
55
def gather(calls, timeout=10.0):
 
56
    """gather(calls, timeout=10.0)
 
57
 
 
58
    Issue calls into the reactor, passing results back to another thread.
 
59
 
 
60
    :param calls: An iterable of no-argument callables to be called in
 
61
        the reactor thread. Each will be called via
 
62
        :py:func:`~twisted.internet.defer.maybeDeferred`.
 
63
 
 
64
    :param timeout: The number of seconds before further results are
 
65
        ignored. Outstanding results will be cancelled.
 
66
 
 
67
    :return: A :class:`UseOnceIterator` of results. A result might be a
 
68
        failure, i.e. an instance of
 
69
        :py:class:`twisted.python.failure.Failure`, or a valid result;
 
70
        it's up to the caller to check.
 
71
 
 
72
    """
 
73
 
 
74
    # Prepare of a list of Deferreds that we're going to wait for.
 
75
    deferreds = [maybeDeferred(call) for call in calls]
 
76
 
 
77
    # We'll use this queue (thread-safe) to pass results back.
 
78
    queue = Queue()
 
79
 
 
80
    # A sentinel to mark the end of the results.
 
81
    done = object()
 
82
 
 
83
    # This function will get called if not all results are in before
 
84
    # `timeout` seconds have passed. It puts `done` into the queue to
 
85
    # indicate the end of results, and cancels all outstanding deferred
 
86
    # calls.
 
87
    def cancel():
 
88
        queue.put(done)
 
89
        for deferred in deferreds:
 
90
            try:
 
91
                deferred.cancel()
 
92
            except:
 
93
                log.err()
 
94
 
 
95
    if timeout is None:
 
96
        canceller = None
 
97
    else:
 
98
        canceller = reactor.callLater(timeout, cancel)
 
99
 
 
100
    countdown = count(len(deferreds), -1)
 
101
 
 
102
    # Callback to report the result back to the queue. If it's the last
 
103
    # result to be reported, `done` is put into the queue, and the
 
104
    # delayed call to `cancel` is itself cancelled.
 
105
    def report(result):
 
106
        queue.put(result)
 
107
        if next(countdown) == 1:
 
108
            queue.put(done)
 
109
            if canceller is not None:
 
110
                if canceller.active():
 
111
                    canceller.cancel()
 
112
 
 
113
    for deferred in deferreds:
 
114
        deferred.addBoth(report)
 
115
 
 
116
    # If there are no calls then there will be no results, so we put
 
117
    # `done` into the queue, and cancel the nascent delayed call to
 
118
    # `cancel`, if it exists.
 
119
    if len(deferreds) == 0:
 
120
        queue.put(done)
 
121
        if canceller is not None:
 
122
            canceller.cancel()
 
123
 
 
124
    # Return an iterator to the invoking thread that will stop at the
 
125
    # first sign of the `done` sentinel.
 
126
    return UseOnceIterator(queue.get, done)