~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/threads.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Extended thread dispatching support.
 
6
 
 
7
For basic support see reactor threading API docs.
 
8
 
 
9
Maintainer: Itamar Shtull-Trauring
 
10
"""
 
11
 
 
12
import Queue
 
13
 
 
14
from twisted.python import failure
 
15
from twisted.internet import defer
 
16
 
 
17
 
 
18
def deferToThreadPool(reactor, threadpool, f, *args, **kwargs):
 
19
    """
 
20
    Call the function C{f} using a thread from the given threadpool and return
 
21
    the result as a Deferred.
 
22
 
 
23
    This function is only used by client code which is maintaining its own
 
24
    threadpool.  To run a function in the reactor's threadpool, use
 
25
    C{deferToThread}.
 
26
 
 
27
    @param reactor: The reactor in whose main thread the Deferred will be
 
28
        invoked.
 
29
 
 
30
    @param threadpool: An object which supports the C{callInThreadWithCallback}
 
31
        method of C{twisted.python.threadpool.ThreadPool}.
 
32
 
 
33
    @param f: The function to call.
 
34
    @param *args: positional arguments to pass to f.
 
35
    @param **kwargs: keyword arguments to pass to f.
 
36
 
 
37
    @return: A Deferred which fires a callback with the result of f, or an
 
38
        errback with a L{twisted.python.failure.Failure} if f throws an
 
39
        exception.
 
40
    """
 
41
    d = defer.Deferred()
 
42
 
 
43
    def onResult(success, result):
 
44
        if success:
 
45
            reactor.callFromThread(d.callback, result)
 
46
        else:
 
47
            reactor.callFromThread(d.errback, result)
 
48
 
 
49
    threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)
 
50
 
 
51
    return d
 
52
 
 
53
 
 
54
def deferToThread(f, *args, **kwargs):
 
55
    """
 
56
    Run a function in a thread and return the result as a Deferred.
 
57
 
 
58
    @param f: The function to call.
 
59
    @param *args: positional arguments to pass to f.
 
60
    @param **kwargs: keyword arguments to pass to f.
 
61
 
 
62
    @return: A Deferred which fires a callback with the result of f,
 
63
    or an errback with a L{twisted.python.failure.Failure} if f throws
 
64
    an exception.
 
65
    """
 
66
    from twisted.internet import reactor
 
67
    return deferToThreadPool(reactor, reactor.getThreadPool(),
 
68
                             f, *args, **kwargs)
 
69
 
 
70
 
 
71
def _runMultiple(tupleList):
 
72
    """
 
73
    Run a list of functions.
 
74
    """
 
75
    for f, args, kwargs in tupleList:
 
76
        f(*args, **kwargs)
 
77
 
 
78
 
 
79
def callMultipleInThread(tupleList):
 
80
    """
 
81
    Run a list of functions in the same thread.
 
82
 
 
83
    tupleList should be a list of (function, argsList, kwargsDict) tuples.
 
84
    """
 
85
    from twisted.internet import reactor
 
86
    reactor.callInThread(_runMultiple, tupleList)
 
87
 
 
88
 
 
89
def blockingCallFromThread(reactor, f, *a, **kw):
 
90
    """
 
91
    Run a function in the reactor from a thread, and wait for the result
 
92
    synchronously, i.e. until the callback chain returned by the function
 
93
    get a result.
 
94
 
 
95
    @param reactor: The L{IReactorThreads} provider which will be used to
 
96
        schedule the function call.
 
97
    @param f: the callable to run in the reactor thread
 
98
    @type f: any callable.
 
99
    @param a: the arguments to pass to C{f}.
 
100
    @param kw: the keyword arguments to pass to C{f}.
 
101
 
 
102
    @return: the result of the callback chain.
 
103
    @raise: any error raised during the callback chain.
 
104
    """
 
105
    queue = Queue.Queue()
 
106
    def _callFromThread():
 
107
        result = defer.maybeDeferred(f, *a, **kw)
 
108
        result.addBoth(queue.put)
 
109
    reactor.callFromThread(_callFromThread)
 
110
    result = queue.get()
 
111
    if isinstance(result, failure.Failure):
 
112
        result.raiseException()
 
113
    return result
 
114
 
 
115
 
 
116
__all__ = ["deferToThread", "deferToThreadPool", "callMultipleInThread",
 
117
           "blockingCallFromThread"]