1
# Miro - an RSS based video player application
2
# Copyright (C) 2005-2010 Participatory Culture Foundation
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
# In addition, as a special exception, the copyright holders give
19
# permission to link the code of portions of this program with the OpenSSL
22
# You must obey the GNU General Public License in all respects for all of
23
# the code used other than OpenSSL. If you modify file(s) with this
24
# exception, you may extend this exception to your version of the file(s),
25
# but you are not obligated to do so. If you do not wish to do so, delete
26
# this exception statement from your version. If you delete this exception
27
# statement from all source files in the program, then also delete it here.
29
"""``miro.eventloop`` -- Event loop handler.
31
This module handles the miro event loop which is responsible for
32
network requests and scheduling.
34
TODO: handle user setting clock back
45
from miro import trapcall
46
from miro import signals
49
from miro.clock import clock
53
class DelayedCall(object):
54
def __init__(self, function, name, args, kwargs):
55
self.function = function
62
"""Removes the references that this object has to the outside
63
world, this eases the GC's work in finding cycles and fixes
64
some memory leaks on windows.
66
self.function = self.args = self.kwargs = None
75
when = "While handling %s" % self.name
77
success = trapcall.trap_call(when, self.function, *self.args,
81
logging.timing("%s too slow (%.3f secs)",
84
total = cumulative[self.name]
85
except (KeyError, AttributeError):
88
cumulative[self.name] = total
90
logging.timing("%s cumulative is too slow (%.3f secs)",
92
cumulative[self.name] = 0
96
class Scheduler(object):
100
def add_timeout(self, delay, function, name, args=None, kwargs=None):
105
scheduled_time = clock() + delay
106
dc = DelayedCall(function, "timeout (%s)" % (name,), args, kwargs)
107
heapq.heappush(self.heap, (scheduled_time, dc))
110
def next_timeout(self):
111
if len(self.heap) == 0:
114
return max(0, self.heap[0][0] - clock())
116
def has_pending_timeout(self):
117
return len(self.heap) > 0 and self.heap[0][0] < clock()
119
def process_next_timeout(self):
120
time, dc = heapq.heappop(self.heap)
123
class CallQueue(object):
125
self.queue = Queue.Queue()
126
self.quit_flag = False
128
def add_idle(self, function, name, args=None, kwargs=None):
133
dc = DelayedCall(function, "idle (%s)" % (name,), args, kwargs)
137
def process_next_idle(self):
138
dc = self.queue.get()
141
def has_pending_idle(self):
142
return not self.queue.empty()
144
def process_idles(self):
145
# Note: used for testing purposes
146
while self.has_pending_idle() and not self.quit_flag:
147
self.process_next_idle()
149
class ThreadPool(object):
150
"""The thread pool is used to handle calls like gethostbyname()
151
that block and there's no asynchronous workaround. What we do
152
instead is call them in a separate thread and return the result in
153
a callback that executes in the event loop.
157
def __init__(self, event_loop):
158
self.event_loop = event_loop
159
self.queue = Queue.Queue()
162
def init_threads(self):
163
while len(self.threads) < ThreadPool.THREADS:
164
t = threading.Thread(name='ThreadPool - %d' % len(self.threads),
165
target=self.thread_loop)
168
self.threads.append(t)
170
def thread_loop(self):
172
next_item = self.queue.get()
173
if next_item == "QUIT":
176
callback, errback, func, name, args, kwargs, = next_item
178
result = func(*args, **kwargs)
179
except KeyboardInterrupt:
181
except Exception, exc:
182
logging.debug(">>> thread_loop: %s %s %s %s\n%s",
183
func, name, args, kwargs,
184
"".join(traceback.format_exc()))
186
name = 'Thread Pool Errback (%s)' % name
190
name = 'Thread Pool Callback (%s)' % name
192
if not self.event_loop.quit_flag:
193
self.event_loop.idle_queue.add_idle(func, name, args=args)
194
self.event_loop.wakeup()
196
def queue_call(self, callback, errback, function, name, *args, **kwargs):
197
self.queue.put((callback, errback, function, name, args, kwargs))
199
def close_threads(self):
200
for x in xrange(len(self.threads)):
201
self.queue.put("QUIT")
202
while len(self.threads) > 0:
203
x = self.threads.pop()
206
except (SystemExit, KeyboardInterrupt):
211
class SimpleEventLoop(signals.SignalEmitter):
213
signals.SignalEmitter.__init__(self, 'thread-will-start',
218
self.quit_flag = False
219
self.wake_sender, self.wake_receiver = util.make_dummy_socket_pair()
220
self.loop_ready = threading.Event()
223
self.loop_ready.set()
224
self.emit('thread-will-start')
225
self.emit('thread-started', threading.currentThread())
226
self.emit('thread-did-start')
228
while not self.quit_flag:
229
self.emit('begin-loop')
230
timeout = self.calc_timeout()
231
readfds, writefds, excfds = self.calc_fds()
232
readfds.append(self.wake_receiver.fileno())
234
read_fds_ready, write_fds_ready, exc_fds_ready = \
235
select.select(readfds, writefds, excfds, timeout)
236
except select.error, (err, detail):
237
if err == errno.EINTR:
238
logging.warning ("eventloop: %s", detail)
243
if self.wake_receiver.fileno() in read_fds_ready:
244
self._slurp_waker_data()
245
self.process_events(read_fds_ready, write_fds_ready, exc_fds_ready)
246
self.emit('end-loop')
250
self.wake_sender.send("b")
251
except socket.error, e:
252
logging.warn("Error waking up eventloop (%s)", e)
254
def _slurp_waker_data(self):
255
self.wake_receiver.recv(1024)
257
class EventLoop(SimpleEventLoop):
259
SimpleEventLoop.__init__(self)
260
self.create_signal('event-finished')
261
self.scheduler = Scheduler()
262
self.idle_queue = CallQueue()
263
self.urgent_queue = CallQueue()
264
self.threadpool = ThreadPool(self)
265
self.read_callbacks = {}
266
self.write_callbacks = {}
267
self.clear_removed_callbacks()
269
def clear_removed_callbacks(self):
270
self.removed_read_callbacks = set()
271
self.removed_write_callbacks = set()
273
def add_read_callback(self, sock, callback):
274
self.read_callbacks[sock.fileno()] = callback
276
def remove_read_callback(self, sock):
277
del self.read_callbacks[sock.fileno()]
278
self.removed_read_callbacks.add(sock.fileno())
280
def add_write_callback(self, sock, callback):
281
self.write_callbacks[sock.fileno()] = callback
283
def remove_write_callback(self, sock):
284
del self.write_callbacks[sock.fileno()]
285
self.removed_write_callbacks.add(sock.fileno())
287
def call_in_thread(self, callback, errback, function, name,
289
self.threadpool.queue_call(callback, errback, function, name,
292
def process_events(self, read_fds_ready, write_fds_ready, exc_fds_ready):
293
self._process_urgent_events()
296
for event in self.generate_events(read_fds_ready, write_fds_ready):
298
self.emit('event-finished', success)
301
self._process_urgent_events()
306
return (self.read_callbacks.keys(), self.write_callbacks.keys(), [])
308
def calc_timeout(self):
309
return self.scheduler.next_timeout()
311
def do_begin_loop(self):
312
self.clear_removed_callbacks()
314
def _process_urgent_events(self):
315
queue = self.urgent_queue
316
while queue.has_pending_idle() and not queue.quit_flag:
317
success = queue.process_next_idle()
318
self.emit('event-finished', success)
320
def generate_events(self, read_fds_ready, write_fds_ready):
321
"""Generator that creates the list of events that should be
322
dealt with on this iteration of the event loop. This includes
323
all socket read/write callbacks, timeouts and idle calls.
325
"events" are implemented as functions that should be called
328
for callback in self.generate_callbacks(write_fds_ready,
329
self.write_callbacks,
330
self.removed_write_callbacks):
332
for callback in self.generate_callbacks(read_fds_ready,
334
self.removed_read_callbacks):
336
while self.scheduler.has_pending_timeout():
337
yield self.scheduler.process_next_timeout
338
while self.idle_queue.has_pending_idle():
339
yield self.idle_queue.process_next_idle
341
def generate_callbacks(self, ready_list, map_, removed):
342
for fd in ready_list:
346
# this can happen the write callback removes the read
347
# callback or vise versa
352
when = "While talking to the network"
353
def callback_event():
354
success = trapcall.trap_call(when, function)
361
self.quit_flag = True
362
self.idle_queue.quit_flag = True
363
self.urgent_queue.quit_flag = True
365
_eventloop = EventLoop()
367
def add_read_callback(sock, callback):
368
"""Add a read callback. When socket is ready for reading,
369
callback will be called. If there is already a read callback
370
installed, it will be replaced.
372
_eventloop.add_read_callback(sock, callback)
374
def remove_read_callback(sock):
375
"""Remove a read callback. If there is not a read callback
376
installed for socket, a KeyError will be thrown.
378
_eventloop.remove_read_callback(sock)
380
def add_write_callback(sock, callback):
381
"""Add a write callback. When socket is ready for writing,
382
callback will be called. If there is already a write callback
383
installed, it will be replaced.
385
_eventloop.add_write_callback(sock, callback)
387
def remove_write_callback(sock):
388
"""Remove a write callback. If there is not a write callback
389
installed for socket, a KeyError will be thrown.
391
_eventloop.remove_write_callback(sock)
393
def stop_handling_socket(sock):
394
"""Convience function to that removes both the read and write
395
callback for a socket if they exist.
398
remove_read_callback(sock)
402
remove_write_callback(sock)
406
def add_timeout(delay, function, name, args=None, kwargs=None):
407
"""Schedule a function to be called at some point in the future.
408
Returns a ``DelayedCall`` object that can be used to cancel the
411
dc = _eventloop.scheduler.add_timeout(delay, function, name, args, kwargs)
414
def add_idle(function, name, args=None, kwargs=None):
415
"""Schedule a function to be called when we get some spare time.
416
Returns a ``DelayedCall`` object that can be used to cancel the
419
dc = _eventloop.idle_queue.add_idle(function, name, args, kwargs)
423
def add_urgent_call(function, name, args=None, kwargs=None):
424
"""Schedule a function to be called as soon as possible. This
425
method should be used for things like GUI actions, where the user
428
dc = _eventloop.urgent_queue.add_idle(function, name, args, kwargs)
432
def call_in_thread(callback, errback, function, name, *args, **kwargs):
433
"""Schedule a function to be called in a separate thread.
437
Do not put code that accesses the database or the UI here!
439
_eventloop.call_in_thread(
440
callback, errback, function, name, *args, **kwargs)
449
def profile_startup():
451
profile.runctx('_eventloop.loop()', globals(), locals(),
452
profile_file + ".event_loop")
456
lt = threading.Thread(target=profile_startup, name="Event Loop")
458
lt = threading.Thread(target=_eventloop.loop, name="Event Loop")
461
_eventloop.loop_ready.wait()
468
"""Shuts down the thread pool and eventloop.
474
def connect(signal, callback):
475
_eventloop.connect(signal, callback)
477
def disconnect(signal, callback):
478
_eventloop.disconnect(signal, callback)
480
def thread_pool_quit():
481
_eventloop.threadpool.close_threads()
483
def thread_pool_init():
484
_eventloop.threadpool.init_threads()
487
"""Decorator to make a methods run as an idle function
489
Suppose you have 2 methods, foo and bar::
493
# database operations
496
# same database operations as foo
498
Then calling ``foo()`` is exactly the same as calling
501
def queuer(*args, **kwargs):
502
return add_idle(func, "%s() (using as_idle)" % func.__name__,
503
args=args, kwargs=kwargs)
507
"""Like ``as_idle``, but uses ``add_urgent_call()`` instead of
510
def queuer(*args, **kwargs):
511
return add_urgent_call(func, "%s() (using as_urgent)" % func.__name__,
512
args=args, kwargs=kwargs)
515
def idle_iterate(func, name, args=None, kwargs=None):
516
"""Iterate over a generator function using add_idle for each
519
This allows long running functions to be split up into distinct
520
steps, after each step other idle functions will have a chance to
526
# do some computation
533
eventloop.idle_iterate(foo, 'Foo', args=(1, 2, 3))
539
iterator = func(*args, **kwargs)
540
add_idle(_idle_iterate_step, name, args=(iterator, name))
542
def _idle_iterate_step(iterator, name):
544
retval = iterator.next()
545
except StopIteration:
548
if retval is not None:
549
logging.warn("idle_iterate yield value ignored: %s (%s)",
551
add_idle(_idle_iterate_step, name, args=(iterator, name))
553
def idle_iterator(func):
554
"""Decorator to wrap a generator function in a ``idle_iterate()``
557
def queuer(*args, **kwargs):
558
return idle_iterate(func, "%s() (using idle_iterator)" % func.__name__,
559
args=args, kwargs=kwargs)