~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/internet/task.py

  • Committer: Bazaar Package Importer
  • Author(s): Moshe Zadka
  • Date: 2002-03-08 07:14:16 UTC
  • Revision ID: james.westby@ubuntu.com-20020308071416-oxvuw76tpcpi5v1q
Tags: upstream-0.15.5
ImportĀ upstreamĀ versionĀ 0.15.5

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
# Twisted, the Framework of Your Internet
 
3
# Copyright (C) 2001 Matthew W. Lefkowitz
 
4
 
5
# This library is free software; you can redistribute it and/or
 
6
# modify it under the terms of version 2.1 of the GNU Lesser General Public
 
7
# License as published by the Free Software Foundation.
 
8
 
9
# This library is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
12
# Lesser General Public License for more details.
 
13
 
14
# You should have received a copy of the GNU Lesser General Public
 
15
# License along with this library; if not, write to the Free Software
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
"""A task scheduler that is integrated with the main event loop.
 
19
"""
 
20
 
 
21
# Twisted Imports
 
22
 
 
23
from twisted.python import threadable
 
24
from twisted.python import log
 
25
from twisted.python import delay
 
26
from twisted.python import failure
 
27
 
 
28
class ThreadedScheduler:
 
29
    """I am a thread-aware delayed scheduler of for synchronous event loops.
 
30
 
 
31
    This lets threads execute non-thread safe code by adding it to the
 
32
    scheduler. Tasks added to this scheduler will *not* be stored persistently.
 
33
 
 
34
    This is an implementation of the Active Object pattern, and can be used
 
35
    as part of the queueing layer for the Async/Half-Async pattern. The other
 
36
    half the Async/Half-Async pattern is twisted.internet.threadtask.
 
37
    
 
38
    For more details:
 
39
 
 
40
      1) POSA2 book - http://www.cs.wustl.edu/~schmidt/POSA/    
 
41
 
 
42
      2) Active Object - http://www.cs.wustl.edu/~schmidt/PDF/Act-Obj.pdf
 
43
 
 
44
      3) Async/Half-Async - http://www.cs.wustl.edu/~schmidt/PDF/PLoP-95.pdf
 
45
    """
 
46
    
 
47
    __implements__ = delay.IDelayed,
 
48
    
 
49
    def __init__(self):
 
50
        self.threadTasks = {}
 
51
        self._lock = thread.allocate_lock()
 
52
 
 
53
    def __getstate__(self):
 
54
        return None
 
55
    
 
56
    def __setstate__(self):
 
57
        self.__init__()
 
58
    
 
59
    def addTask(self, function, *args, **kwargs):
 
60
        """Schedule a function to be called by the main event-loop thread.
 
61
        
 
62
        The result of the function will not be returned.
 
63
        """
 
64
        threadTasks = self.threadTasks
 
65
        hadNoTasks = (threadTasks == {})
 
66
        id = thread.get_ident()
 
67
        
 
68
        self._lock.acquire()
 
69
        try:
 
70
            if not threadTasks.has_key(id):
 
71
                threadTasks[id] = [(function, args, kwargs)]
 
72
            else:
 
73
                threadTasks[id].append((function, args, kwargs))
 
74
        finally:
 
75
            self._lock.release()
 
76
        
 
77
        if hadNoTasks:
 
78
            main.wakeUp()
 
79
    
 
80
    def timeout(self):
 
81
        """Either I have work to do immediately, or no work to do at all.
 
82
        """
 
83
        if self.threadTasks:
 
84
            return 0.
 
85
        else:
 
86
            return None
 
87
 
 
88
    def runUntilCurrent(self):
 
89
        threadTasks = self.threadTasks
 
90
        tasksTodo = []
 
91
        
 
92
        self._lock.acquire()
 
93
        try:
 
94
            for thread, tasks in threadTasks.items():
 
95
                tasksTodo.append(tasks.pop(0))
 
96
                if tasks: tasksTodo.append(tasks.pop(0))
 
97
                if not tasks:
 
98
                    del threadTasks[thread]
 
99
        finally:
 
100
            self._lock.release()
 
101
        
 
102
        for func, args, kwargs in tasksTodo:
 
103
            apply(func, args, kwargs)
 
104
 
 
105
 
 
106
class Scheduler:
 
107
    """I am a non-thread-safe delayed scheduler for synchronous event loops.
 
108
    """
 
109
    
 
110
    __implements__ = delay.IDelayed,
 
111
    
 
112
    def __init__(self):
 
113
        self.tasks = []
 
114
 
 
115
    def addTask(self, function, *args, **kwargs):
 
116
        self.tasks.append((function, args, kwargs))
 
117
 
 
118
    def timeout(self):
 
119
        """Either I have work to do immediately, or no work to do at all.
 
120
        """
 
121
        if self.tasks:
 
122
            return 0.
 
123
        else:
 
124
            return None
 
125
    
 
126
    def runUntilCurrent(self):
 
127
        tasks = self.tasks
 
128
        self.tasks = []
 
129
        for function, args, kwargs in tasks:
 
130
            apply(function, args, kwargs)
 
131
 
 
132
 
 
133
def initThreads():
 
134
    global theScheduler, thread, schedule
 
135
    import thread
 
136
    
 
137
    # Sibling Imports
 
138
    import main
 
139
    
 
140
    # there may already be a registered scheduler, so we need to get
 
141
    # rid of it.
 
142
    try:
 
143
        main.removeDelayed(theScheduler)
 
144
    except (NameError, ValueError):
 
145
        pass
 
146
    
 
147
    theScheduler = ThreadedScheduler()
 
148
    schedule = theScheduler.addTask
 
149
    main.addDelayed(theScheduler)
 
150
 
 
151
theScheduler = Scheduler()
 
152
schedule = theScheduler.addTask
 
153
threadable.whenThreaded(initThreads)
 
154
 
 
155
 
 
156
def doAllTasks():
 
157
    """Run all tasks in the scheduler.
 
158
    """
 
159
    while theScheduler.timeout() != None:
 
160
        theScheduler.runUntilCurrent()
 
161
 
 
162
 
 
163
class Task:
 
164
    """I am a set of steps that get executed.
 
165
 
 
166
    Each "step" is a method to call and some arguments to call it with.  I am
 
167
    to be used with the Scheduler class - after each step is called I will
 
168
    readd myself to the scheduler if I still have steps to do.
 
169
    """
 
170
    
 
171
    def __init__(self, steps=None, scheduler=theScheduler):
 
172
        """Create a Task.
 
173
        """
 
174
        # Format for 'steps' is [[func,args,kw],[func,args,kw], ...]
 
175
        if steps:
 
176
            self.steps = steps
 
177
        else:
 
178
            self.steps = []
 
179
        self.progress = 0
 
180
        self.scheduler = scheduler
 
181
 
 
182
    def addWork(self, callable, *args, **kw):
 
183
        self.steps.append([callable, args, kw])
 
184
 
 
185
    def __call__(self):
 
186
        if self.progress < len(self.steps):
 
187
            func, args, kw = self.steps[self.progress]
 
188
            try:
 
189
                apply(func, args, kw)
 
190
            except:
 
191
                log.msg( 'Exception in Task' )
 
192
                log.deferr()
 
193
                return 0
 
194
            else:
 
195
                self.progress = self.progress + 1
 
196
                # if we still have tasks left we add ourselves to the scheduler
 
197
                if self.progress < len(self.steps):
 
198
                    self.scheduler.addTask(self)
 
199
                return 1
 
200
        return 0
 
201
 
 
202
 
 
203
# Sibling Imports
 
204
import main
 
205
main.addDelayed(theScheduler)