1
# Miro - an RSS based video player application
2
# Copyright (C) 2005-2010 Participatory Culture Foundation
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
# In addition, as a special exception, the copyright holders give
19
# permission to link the code of portions of this program with the OpenSSL
22
# You must obey the GNU General Public License in all respects for all of
23
# the code used other than OpenSSL. If you modify file(s) with this
24
# exception, you may extend this exception to your version of the file(s),
25
# but you are not obligated to do so. If you do not wish to do so, delete
26
# this exception statement from your version. If you delete this exception
27
# statement from all source files in the program, then also delete it here.
29
"""``miro.eventloop`` -- Event loop handler.
31
This module handles the miro event loop which is responsible for
32
network requests and scheduling.
34
TODO: handle user setting clock back
44
from miro import trapcall
45
from miro import signals
48
from miro.clock import clock
52
class DelayedCall(object):
53
def __init__(self, function, name, args, kwargs):
54
self.function = function
61
"""Removes the references that this object has to the outside
62
world, this eases the GC's work in finding cycles and fixes
63
some memory leaks on windows.
65
self.function = self.args = self.kwargs = None
74
when = "While handling %s" % self.name
76
success = trapcall.trap_call(when, self.function, *self.args,
80
logging.timing("%s too slow (%.3f secs)",
83
total = cumulative[self.name]
84
except (SystemExit, KeyboardInterrupt):
89
cumulative[self.name] = total
91
logging.timing("%s cumulative is too slow (%.3f secs)",
93
cumulative[self.name] = 0
97
class Scheduler(object):
101
def addTimeout(self, delay, function, name, args=None, kwargs=None):
106
scheduledTime = clock() + delay
107
dc = DelayedCall(function, "timeout (%s)" % (name,), args, kwargs)
108
heapq.heappush(self.heap, (scheduledTime, dc))
111
def next_timeout(self):
112
if len(self.heap) == 0:
115
return max(0, self.heap[0][0] - clock())
117
def hasPendingTimeout(self):
118
return len(self.heap) > 0 and self.heap[0][0] < clock()
120
def process_next_timeout(self):
121
time, dc = heapq.heappop(self.heap)
124
def processTimeouts(self):
125
while self.hasPendingTimeout():
126
self.process_next_timeout()
128
class CallQueue(object):
130
self.queue = Queue.Queue()
131
self.quit_flag = False
133
def addIdle(self, function, name, args=None, kwargs=None):
138
dc = DelayedCall(function, "idle (%s)" % (name,), args, kwargs)
142
def processNextIdle(self):
143
dc = self.queue.get()
146
def hasPendingIdle(self):
147
return not self.queue.empty()
149
def processIdles(self):
150
while self.hasPendingIdle() and not self.quit_flag:
151
self.processNextIdle()
153
class ThreadPool(object):
154
"""The thread pool is used to handle calls like gethostbyname()
155
that block and there's no asynchronous workaround. What we do
156
instead is call them in a separate thread and return the result in
157
a callback that executes in the event loop.
161
def __init__(self, eventLoop):
162
self.eventLoop = eventLoop
163
self.queue = Queue.Queue()
166
def initThreads(self):
167
while len(self.threads) < ThreadPool.THREADS:
168
t = threading.Thread(name='ThreadPool - %d' % len(self.threads),
169
target=self.thread_loop)
172
self.threads.append(t)
174
def thread_loop(self):
176
nextItem = self.queue.get()
177
if nextItem == "QUIT":
180
callback, errback, func, name, args, kwargs, = nextItem
182
result = func(*args, **kwargs)
183
except KeyboardInterrupt:
187
name = 'Thread Pool Errback (%s)' % name
191
name = 'Thread Pool Callback (%s)' % name
193
if not self.eventLoop.quitFlag:
194
self.eventLoop.idleQueue.addIdle(func, name, args=args)
195
self.eventLoop.wakeup()
197
def queueCall(self, callback, errback, function, name, *args, **kwargs):
198
self.queue.put((callback, errback, function, name, args, kwargs))
200
def closeThreads(self):
201
for x in xrange(len(self.threads)):
202
self.queue.put("QUIT")
203
while len(self.threads) > 0:
204
x = self.threads.pop()
207
except (SystemExit, KeyboardInterrupt):
212
class EventLoop(signals.SignalEmitter):
214
signals.SignalEmitter.__init__(self, 'thread-will-start',
220
self.scheduler = Scheduler()
221
self.idleQueue = CallQueue()
222
self.urgentQueue = CallQueue()
223
self.threadPool = ThreadPool(self)
224
self.readCallbacks = {}
225
self.writeCallbacks = {}
226
self.wakeSender, self.wakeReceiver = util.make_dummy_socket_pair()
227
self.addReadCallback(self.wakeReceiver, self._slurpWakerData)
228
self.quitFlag = False
229
self.clearRemovedCallbacks()
230
self.loop_ready = threading.Event()
232
def clearRemovedCallbacks(self):
233
self.removedReadCallbacks = set()
234
self.removedWriteCallbacks = set()
236
def _slurpWakerData(self):
237
self.wakeReceiver.recv(1024)
239
def addReadCallback(self, socket, callback):
240
self.readCallbacks[socket.fileno()] = callback
242
def removeReadCallback(self, socket):
243
del self.readCallbacks[socket.fileno()]
244
self.removedReadCallbacks.add(socket.fileno())
246
def addWriteCallback(self, socket, callback):
247
self.writeCallbacks[socket.fileno()] = callback
249
def removeWriteCallback(self, socket):
250
del self.writeCallbacks[socket.fileno()]
251
self.removedWriteCallbacks.add(socket.fileno())
255
self.wakeSender.send("b")
256
except socket.error, e:
257
logging.warn("Error waking up eventloop (%s)", e)
259
def callInThread(self, callback, errback, function, name, *args, **kwargs):
260
self.threadPool.queueCall(callback, errback, function, name,
264
self.loop_ready.set()
265
self.emit('thread-will-start')
266
self.emit('thread-started', threading.currentThread())
267
self.emit('thread-did-start')
269
while not self.quitFlag:
270
self.emit('begin-loop')
271
self.clearRemovedCallbacks()
272
timeout = self.scheduler.next_timeout()
273
readfds = self.readCallbacks.keys()
274
writefds = self.writeCallbacks.keys()
276
readables, writeables, _ = select.select(readfds, writefds,
278
except select.error, (err, detail):
279
if err == errno.EINTR:
280
logging.warning ("eventloop: %s", detail)
285
self._process_urgent_events()
286
for event in self.generateEvents(readables, writeables):
288
self.emit('event-finished', success)
291
self._process_urgent_events()
294
self.emit('end-loop')
296
def _process_urgent_events(self):
297
queue = self.urgentQueue
298
while queue.hasPendingIdle() and not queue.quit_flag:
299
success = queue.processNextIdle()
300
self.emit('event-finished', success)
302
def generateEvents(self, readables, writeables):
303
"""Generator that creates the list of events that should be
304
dealt with on this iteration of the event loop. This includes
305
all socket read/write callbacks, timeouts and idle calls.
307
"events" are implemented as functions that should be called
310
for callback in self.generateCallbacks(writeables,
312
self.removedWriteCallbacks):
314
for callback in self.generateCallbacks(readables,
316
self.removedReadCallbacks):
318
while self.scheduler.hasPendingTimeout():
319
yield self.scheduler.process_next_timeout
320
while self.idleQueue.hasPendingIdle():
321
yield self.idleQueue.processNextIdle
323
def generateCallbacks(self, readyList, map_, removed):
328
# this can happen the write callback removes the read
329
# callback or vise versa
334
when = "While talking to the network"
336
success = trapcall.trap_call(when, function)
344
self.idleQueue.quit_flag = True
345
self.urgentQueue.quit_flag = True
347
_eventLoop = EventLoop()
349
def addReadCallback(socket, callback):
350
"""Add a read callback. When socket is ready for reading,
351
callback will be called. If there is already a read callback
352
installed, it will be replaced.
354
_eventLoop.addReadCallback(socket, callback)
356
def removeReadCallback(socket):
357
"""Remove a read callback. If there is not a read callback
358
installed for socket, a KeyError will be thrown.
360
_eventLoop.removeReadCallback(socket)
362
def addWriteCallback(socket, callback):
363
"""Add a write callback. When socket is ready for writing,
364
callback will be called. If there is already a write callback
365
installed, it will be replaced.
367
_eventLoop.addWriteCallback(socket, callback)
369
def removeWriteCallback(socket):
370
"""Remove a write callback. If there is not a write callback
371
installed for socket, a KeyError will be thrown.
373
_eventLoop.removeWriteCallback(socket)
375
def stopHandlingSocket(socket):
376
"""Convience function to that removes both the read and write
377
callback for a socket if they exist.
380
removeReadCallback(socket)
384
removeWriteCallback(socket)
388
def addTimeout(delay, function, name, args=None, kwargs=None):
389
"""Schedule a function to be called at some point in the future.
390
Returns a ``DelayedCall`` object that can be used to cancel the
393
dc = _eventLoop.scheduler.addTimeout(delay, function, name, args, kwargs)
396
def addIdle(function, name, args=None, kwargs=None):
397
"""Schedule a function to be called when we get some spare time.
398
Returns a ``DelayedCall`` object that can be used to cancel the
401
dc = _eventLoop.idleQueue.addIdle(function, name, args, kwargs)
405
def addUrgentCall(function, name, args=None, kwargs=None):
406
"""Schedule a function to be called as soon as possible. This
407
method should be used for things like GUI actions, where the user
410
dc = _eventLoop.urgentQueue.addIdle(function, name, args, kwargs)
414
def callInThread(callback, errback, function, name, *args, **kwargs):
415
"""Schedule a function to be called in a separate thread. Do not
416
put code that accesses the database or the UI here!
418
_eventLoop.callInThread(callback, errback, function, name, *args, **kwargs)
427
def profile_startup():
431
profile.runctx('_eventLoop.loop()', globals(), locals(),
432
profile_file + ".event_loop")
436
lt = threading.Thread(target=profile_startup, name="Event Loop")
438
lt = threading.Thread(target=_eventLoop.loop, name="Event Loop")
441
_eventLoop.loop_ready.wait()
452
def connect(signal, callback):
453
_eventLoop.connect(signal, callback)
455
def disconnect(signal, callback):
456
_eventLoop.disconnect(signal, callback)
458
def resetEventLoop():
460
_eventLoop = EventLoop()
462
def threadPoolQuit():
463
_eventLoop.threadPool.closeThreads()
465
def threadPoolInit():
466
_eventLoop.threadPool.initThreads()
469
"""Decorator to make a methods run as an idle function
471
Suppose you have 2 methods, foo and bar
475
# database operations
478
# same database operations as foo
480
Then calling foo() is exactly the same as calling addIdle(bar).
482
def queuer(*args, **kwargs):
483
return addIdle(func, "%s() (using as_idle)" % func.__name__,
484
args=args, kwargs=kwargs)
488
"""Like ``as_idle``, but uses ``addUrgentCall()`` instead of
491
def queuer(*args, **kwargs):
492
return addUrgentCall(func, "%s() (using as_urgent)" % func.__name__,
493
args=args, kwargs=kwargs)
496
def idle_iterate(func, name, args=None, kwargs=None):
497
"""Iterate over a generator function using addIdle for each
500
This allows long running functions to be split up into distinct
501
steps, after each step other idle functions will have a chance to
507
# do some computation
514
eventloop.idle_iterate(foo, 'Foo', args=(1, 2, 3))
520
iter = func(*args, **kwargs)
521
addIdle(_idle_iterate_step, name, args=(iter, name))
523
def _idle_iterate_step(iter, name):
526
except StopIteration:
530
logging.warn("idle_iterate yield value ignored: %s (%s)",
532
addIdle(_idle_iterate_step, name, args=(iter, name))
534
def idle_iterator(func):
535
"""Decorator to wrap a generator function in a ``idle_iterate()``
538
def queuer(*args, **kwargs):
539
return idle_iterate(func, "%s() (using idle_iterator)" % func.__name__,
540
args=args, kwargs=kwargs)