~ubuntu-branches/ubuntu/natty/miro/natty

« back to all changes in this revision

Viewing changes to lib/eventloop.py

  • Committer: Bazaar Package Importer
  • Author(s): Bryce Harrington
  • Date: 2011-01-22 02:46:33 UTC
  • mfrom: (1.4.10 upstream) (1.7.5 experimental)
  • Revision ID: james.westby@ubuntu.com-20110122024633-kjme8u93y2il5nmf
Tags: 3.5.1-1ubuntu1
* Merge from debian.  Remaining ubuntu changes:
  - Use python 2.7 instead of python 2.6
  - Relax dependency on python-dbus to >= 0.83.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Miro - an RSS based video player application
 
2
# Copyright (C) 2005-2010 Participatory Culture Foundation
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation; either version 2 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program; if not, write to the Free Software
 
16
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 
17
#
 
18
# In addition, as a special exception, the copyright holders give
 
19
# permission to link the code of portions of this program with the OpenSSL
 
20
# library.
 
21
#
 
22
# You must obey the GNU General Public License in all respects for all of
 
23
# the code used other than OpenSSL. If you modify file(s) with this
 
24
# exception, you may extend this exception to your version of the file(s),
 
25
# but you are not obligated to do so. If you do not wish to do so, delete
 
26
# this exception statement from your version. If you delete this exception
 
27
# statement from all source files in the program, then also delete it here.
 
28
 
 
29
"""``miro.eventloop`` -- Event loop handler.
 
30
 
 
31
This module handles the miro event loop which is responsible for
 
32
network requests and scheduling.
 
33
 
 
34
TODO: handle user setting clock back
 
35
"""
 
36
 
 
37
import threading
 
38
import errno
 
39
import select
 
40
import socket
 
41
import heapq
 
42
import Queue
 
43
import logging
 
44
import traceback
 
45
from miro import trapcall
 
46
from miro import signals
 
47
from miro import util
 
48
 
 
49
from miro.clock import clock
 
50
 
 
51
cumulative = {}
 
52
 
 
53
class DelayedCall(object):
 
54
    def __init__(self, function, name, args, kwargs):
 
55
        self.function = function
 
56
        self.name = name
 
57
        self.args = args
 
58
        self.kwargs = kwargs
 
59
        self.canceled = False
 
60
 
 
61
    def _unlink(self):
 
62
        """Removes the references that this object has to the outside
 
63
        world, this eases the GC's work in finding cycles and fixes
 
64
        some memory leaks on windows.
 
65
        """
 
66
        self.function = self.args = self.kwargs = None
 
67
 
 
68
    def cancel(self):
 
69
        self.canceled = True
 
70
        self._unlink()
 
71
 
 
72
    def dispatch(self):
 
73
        success = True
 
74
        if not self.canceled:
 
75
            when = "While handling %s" % self.name
 
76
            start = clock()
 
77
            success = trapcall.trap_call(when, self.function, *self.args,
 
78
                    **self.kwargs)
 
79
            end = clock()
 
80
            if end-start > 0.5:
 
81
                logging.timing("%s too slow (%.3f secs)",
 
82
                               self.name, end-start)
 
83
            try:
 
84
                total = cumulative[self.name]
 
85
            except (KeyError, AttributeError):
 
86
                total = 0
 
87
            total += end - start
 
88
            cumulative[self.name] = total
 
89
            if total > 5.0:
 
90
                logging.timing("%s cumulative is too slow (%.3f secs)",
 
91
                               self.name, total)
 
92
                cumulative[self.name] = 0
 
93
        self._unlink()
 
94
        return success
 
95
 
 
96
class Scheduler(object):
 
97
    def __init__(self):
 
98
        self.heap = []
 
99
 
 
100
    def add_timeout(self, delay, function, name, args=None, kwargs=None):
 
101
        if args is None:
 
102
            args = ()
 
103
        if kwargs is None:
 
104
            kwargs = {}
 
105
        scheduled_time = clock() + delay
 
106
        dc = DelayedCall(function,  "timeout (%s)" % (name,), args, kwargs)
 
107
        heapq.heappush(self.heap, (scheduled_time, dc))
 
108
        return dc
 
109
 
 
110
    def next_timeout(self):
 
111
        if len(self.heap) == 0:
 
112
            return None
 
113
        else:
 
114
            return max(0, self.heap[0][0] - clock())
 
115
 
 
116
    def has_pending_timeout(self):
 
117
        return len(self.heap) > 0 and self.heap[0][0] < clock()
 
118
 
 
119
    def process_next_timeout(self):
 
120
        time, dc = heapq.heappop(self.heap)
 
121
        return dc.dispatch()
 
122
 
 
123
class CallQueue(object):
 
124
    def __init__(self):
 
125
        self.queue = Queue.Queue()
 
126
        self.quit_flag = False
 
127
 
 
128
    def add_idle(self, function, name, args=None, kwargs=None):
 
129
        if args is None:
 
130
            args = ()
 
131
        if kwargs is None:
 
132
            kwargs = {}
 
133
        dc = DelayedCall(function, "idle (%s)" % (name,), args, kwargs)
 
134
        self.queue.put(dc)
 
135
        return dc
 
136
 
 
137
    def process_next_idle(self):
 
138
        dc = self.queue.get()
 
139
        return dc.dispatch()
 
140
 
 
141
    def has_pending_idle(self):
 
142
        return not self.queue.empty()
 
143
 
 
144
    def process_idles(self):
 
145
        # Note: used for testing purposes
 
146
        while self.has_pending_idle() and not self.quit_flag:
 
147
            self.process_next_idle()
 
148
 
 
149
class ThreadPool(object):
 
150
    """The thread pool is used to handle calls like gethostbyname()
 
151
    that block and there's no asynchronous workaround.  What we do
 
152
    instead is call them in a separate thread and return the result in
 
153
    a callback that executes in the event loop.
 
154
    """
 
155
    THREADS = 3
 
156
 
 
157
    def __init__(self, event_loop):
 
158
        self.event_loop = event_loop
 
159
        self.queue = Queue.Queue()
 
160
        self.threads = []
 
161
 
 
162
    def init_threads(self):
 
163
        while len(self.threads) < ThreadPool.THREADS:
 
164
            t = threading.Thread(name='ThreadPool - %d' % len(self.threads),
 
165
                                 target=self.thread_loop)
 
166
            t.setDaemon(True)
 
167
            t.start()
 
168
            self.threads.append(t)
 
169
 
 
170
    def thread_loop(self):
 
171
        while True:
 
172
            next_item = self.queue.get()
 
173
            if next_item == "QUIT":
 
174
                break
 
175
            else:
 
176
                callback, errback, func, name, args, kwargs, = next_item
 
177
            try:
 
178
                result = func(*args, **kwargs)
 
179
            except KeyboardInterrupt:
 
180
                raise
 
181
            except Exception, exc:
 
182
                logging.debug(">>> thread_loop: %s %s %s %s\n%s",
 
183
                              func, name, args, kwargs,
 
184
                              "".join(traceback.format_exc()))
 
185
                func = errback
 
186
                name = 'Thread Pool Errback (%s)' % name
 
187
                args = (exc,)
 
188
            else:
 
189
                func = callback
 
190
                name = 'Thread Pool Callback (%s)' % name
 
191
                args = (result,)
 
192
            if not self.event_loop.quit_flag:
 
193
                self.event_loop.idle_queue.add_idle(func, name, args=args)
 
194
                self.event_loop.wakeup()
 
195
 
 
196
    def queue_call(self, callback, errback, function, name, *args, **kwargs):
 
197
        self.queue.put((callback, errback, function, name, args, kwargs))
 
198
 
 
199
    def close_threads(self):
 
200
        for x in xrange(len(self.threads)):
 
201
            self.queue.put("QUIT")
 
202
        while len(self.threads) > 0:
 
203
            x = self.threads.pop()
 
204
            try:
 
205
                x.join()
 
206
            except (SystemExit, KeyboardInterrupt):
 
207
                raise
 
208
            except:
 
209
                pass
 
210
 
 
211
class SimpleEventLoop(signals.SignalEmitter):
 
212
    def __init__(self):
 
213
        signals.SignalEmitter.__init__(self, 'thread-will-start',
 
214
                                       'thread-started',
 
215
                                       'thread-did-start',
 
216
                                       'begin-loop',
 
217
                                       'end-loop')
 
218
        self.quit_flag = False
 
219
        self.wake_sender, self.wake_receiver = util.make_dummy_socket_pair()
 
220
        self.loop_ready = threading.Event()
 
221
 
 
222
    def loop(self):
 
223
        self.loop_ready.set()
 
224
        self.emit('thread-will-start')
 
225
        self.emit('thread-started', threading.currentThread())
 
226
        self.emit('thread-did-start')
 
227
 
 
228
        while not self.quit_flag:
 
229
            self.emit('begin-loop')
 
230
            timeout = self.calc_timeout()
 
231
            readfds, writefds, excfds = self.calc_fds()
 
232
            readfds.append(self.wake_receiver.fileno())
 
233
            try:
 
234
                read_fds_ready, write_fds_ready, exc_fds_ready = \
 
235
                        select.select(readfds, writefds, excfds, timeout)
 
236
            except select.error, (err, detail):
 
237
                if err == errno.EINTR:
 
238
                    logging.warning ("eventloop: %s", detail)
 
239
                else:
 
240
                    raise
 
241
            if self.quit_flag:
 
242
                break
 
243
            if self.wake_receiver.fileno() in read_fds_ready:
 
244
                self._slurp_waker_data()
 
245
            self.process_events(read_fds_ready, write_fds_ready, exc_fds_ready)
 
246
            self.emit('end-loop')
 
247
 
 
248
    def wakeup(self):
 
249
        try:
 
250
            self.wake_sender.send("b")
 
251
        except socket.error, e:
 
252
            logging.warn("Error waking up eventloop (%s)", e)
 
253
 
 
254
    def _slurp_waker_data(self):
 
255
        self.wake_receiver.recv(1024)
 
256
 
 
257
class EventLoop(SimpleEventLoop):
 
258
    def __init__(self):
 
259
        SimpleEventLoop.__init__(self)
 
260
        self.create_signal('event-finished')
 
261
        self.scheduler = Scheduler()
 
262
        self.idle_queue = CallQueue()
 
263
        self.urgent_queue = CallQueue()
 
264
        self.threadpool = ThreadPool(self)
 
265
        self.read_callbacks = {}
 
266
        self.write_callbacks = {}
 
267
        self.clear_removed_callbacks()
 
268
 
 
269
    def clear_removed_callbacks(self):
 
270
        self.removed_read_callbacks = set()
 
271
        self.removed_write_callbacks = set()
 
272
 
 
273
    def add_read_callback(self, sock, callback):
 
274
        self.read_callbacks[sock.fileno()] = callback
 
275
 
 
276
    def remove_read_callback(self, sock):
 
277
        del self.read_callbacks[sock.fileno()]
 
278
        self.removed_read_callbacks.add(sock.fileno())
 
279
 
 
280
    def add_write_callback(self, sock, callback):
 
281
        self.write_callbacks[sock.fileno()] = callback
 
282
 
 
283
    def remove_write_callback(self, sock):
 
284
        del self.write_callbacks[sock.fileno()]
 
285
        self.removed_write_callbacks.add(sock.fileno())
 
286
 
 
287
    def call_in_thread(self, callback, errback, function, name,
 
288
                       *args, **kwargs):
 
289
        self.threadpool.queue_call(callback, errback, function, name,
 
290
                                  *args, **kwargs)
 
291
 
 
292
    def process_events(self, read_fds_ready, write_fds_ready, exc_fds_ready):
 
293
        self._process_urgent_events()
 
294
        if self.quit_flag:
 
295
            return
 
296
        for event in self.generate_events(read_fds_ready, write_fds_ready):
 
297
            success = event()
 
298
            self.emit('event-finished', success)
 
299
            if self.quit_flag:
 
300
                break
 
301
            self._process_urgent_events()
 
302
            if self.quit_flag:
 
303
                break
 
304
 
 
305
    def calc_fds(self):
 
306
        return (self.read_callbacks.keys(), self.write_callbacks.keys(), [])
 
307
 
 
308
    def calc_timeout(self):
 
309
        return self.scheduler.next_timeout()
 
310
 
 
311
    def do_begin_loop(self):
 
312
        self.clear_removed_callbacks()
 
313
 
 
314
    def _process_urgent_events(self):
 
315
        queue = self.urgent_queue
 
316
        while queue.has_pending_idle() and not queue.quit_flag:
 
317
            success = queue.process_next_idle()
 
318
            self.emit('event-finished', success)
 
319
 
 
320
    def generate_events(self, read_fds_ready, write_fds_ready):
 
321
        """Generator that creates the list of events that should be
 
322
        dealt with on this iteration of the event loop.  This includes
 
323
        all socket read/write callbacks, timeouts and idle calls.
 
324
 
 
325
        "events" are implemented as functions that should be called
 
326
        with no arguments.
 
327
        """
 
328
        for callback in self.generate_callbacks(write_fds_ready,
 
329
                                               self.write_callbacks,
 
330
                                               self.removed_write_callbacks):
 
331
            yield callback
 
332
        for callback in self.generate_callbacks(read_fds_ready,
 
333
                                               self.read_callbacks,
 
334
                                               self.removed_read_callbacks):
 
335
            yield callback
 
336
        while self.scheduler.has_pending_timeout():
 
337
            yield self.scheduler.process_next_timeout
 
338
        while self.idle_queue.has_pending_idle():
 
339
            yield self.idle_queue.process_next_idle
 
340
 
 
341
    def generate_callbacks(self, ready_list, map_, removed):
 
342
        for fd in ready_list:
 
343
            try:
 
344
                function = map_[fd]
 
345
            except KeyError:
 
346
                # this can happen the write callback removes the read
 
347
                # callback or vise versa
 
348
                pass
 
349
            else:
 
350
                if fd in removed:
 
351
                    continue
 
352
                when = "While talking to the network"
 
353
                def callback_event():
 
354
                    success = trapcall.trap_call(when, function)
 
355
                    if not success:
 
356
                        del map_[fd]
 
357
                    return success
 
358
                yield callback_event
 
359
 
 
360
    def quit(self):
 
361
        self.quit_flag = True
 
362
        self.idle_queue.quit_flag = True
 
363
        self.urgent_queue.quit_flag = True
 
364
 
 
365
_eventloop = EventLoop()
 
366
 
 
367
def add_read_callback(sock, callback):
 
368
    """Add a read callback.  When socket is ready for reading,
 
369
    callback will be called.  If there is already a read callback
 
370
    installed, it will be replaced.
 
371
    """
 
372
    _eventloop.add_read_callback(sock, callback)
 
373
 
 
374
def remove_read_callback(sock):
 
375
    """Remove a read callback.  If there is not a read callback
 
376
    installed for socket, a KeyError will be thrown.
 
377
    """
 
378
    _eventloop.remove_read_callback(sock)
 
379
 
 
380
def add_write_callback(sock, callback):
 
381
    """Add a write callback.  When socket is ready for writing,
 
382
    callback will be called.  If there is already a write callback
 
383
    installed, it will be replaced.
 
384
    """
 
385
    _eventloop.add_write_callback(sock, callback)
 
386
 
 
387
def remove_write_callback(sock):
 
388
    """Remove a write callback.  If there is not a write callback
 
389
    installed for socket, a KeyError will be thrown.
 
390
    """
 
391
    _eventloop.remove_write_callback(sock)
 
392
 
 
393
def stop_handling_socket(sock):
 
394
    """Convience function to that removes both the read and write
 
395
    callback for a socket if they exist.
 
396
    """
 
397
    try:
 
398
        remove_read_callback(sock)
 
399
    except KeyError:
 
400
        pass
 
401
    try:
 
402
        remove_write_callback(sock)
 
403
    except KeyError:
 
404
        pass
 
405
 
 
406
def add_timeout(delay, function, name, args=None, kwargs=None):
 
407
    """Schedule a function to be called at some point in the future.
 
408
    Returns a ``DelayedCall`` object that can be used to cancel the
 
409
    call.
 
410
    """
 
411
    dc = _eventloop.scheduler.add_timeout(delay, function, name, args, kwargs)
 
412
    return dc
 
413
 
 
414
def add_idle(function, name, args=None, kwargs=None):
 
415
    """Schedule a function to be called when we get some spare time.
 
416
    Returns a ``DelayedCall`` object that can be used to cancel the
 
417
    call.
 
418
    """
 
419
    dc = _eventloop.idle_queue.add_idle(function, name, args, kwargs)
 
420
    _eventloop.wakeup()
 
421
    return dc
 
422
 
 
423
def add_urgent_call(function, name, args=None, kwargs=None):
 
424
    """Schedule a function to be called as soon as possible.  This
 
425
    method should be used for things like GUI actions, where the user
 
426
    is waiting on us.
 
427
    """
 
428
    dc = _eventloop.urgent_queue.add_idle(function, name, args, kwargs)
 
429
    _eventloop.wakeup()
 
430
    return dc
 
431
 
 
432
def call_in_thread(callback, errback, function, name, *args, **kwargs):
 
433
    """Schedule a function to be called in a separate thread.
 
434
 
 
435
    .. Warning::
 
436
 
 
437
       Do not put code that accesses the database or the UI here!
 
438
    """
 
439
    _eventloop.call_in_thread(
 
440
        callback, errback, function, name, *args, **kwargs)
 
441
 
 
442
lt = None
 
443
 
 
444
profile_file = None
 
445
 
 
446
def startup():
 
447
    thread_pool_init()
 
448
 
 
449
    def profile_startup():
 
450
        import profile
 
451
        profile.runctx('_eventloop.loop()', globals(), locals(),
 
452
                       profile_file + ".event_loop")
 
453
 
 
454
    global lt
 
455
    if profile_file:
 
456
        lt = threading.Thread(target=profile_startup, name="Event Loop")
 
457
    else:
 
458
        lt = threading.Thread(target=_eventloop.loop, name="Event Loop")
 
459
    lt.setDaemon(False)
 
460
    lt.start()
 
461
    _eventloop.loop_ready.wait()
 
462
 
 
463
def join():
 
464
    if lt is not None:
 
465
        lt.join()
 
466
 
 
467
def shutdown():
 
468
    """Shuts down the thread pool and eventloop.
 
469
    """
 
470
    thread_pool_quit()
 
471
    _eventloop.quit()
 
472
    _eventloop.wakeup()
 
473
 
 
474
def connect(signal, callback):
 
475
    _eventloop.connect(signal, callback)
 
476
 
 
477
def disconnect(signal, callback):
 
478
    _eventloop.disconnect(signal, callback)
 
479
 
 
480
def thread_pool_quit():
 
481
    _eventloop.threadpool.close_threads()
 
482
 
 
483
def thread_pool_init():
 
484
    _eventloop.threadpool.init_threads()
 
485
 
 
486
def as_idle(func):
 
487
    """Decorator to make a methods run as an idle function
 
488
 
 
489
    Suppose you have 2 methods, foo and bar::
 
490
 
 
491
        @as_idle
 
492
        def foo():
 
493
            # database operations
 
494
 
 
495
        def bar():
 
496
            # same database operations as foo
 
497
 
 
498
    Then calling ``foo()`` is exactly the same as calling
 
499
    ``add_idle(bar)``.
 
500
    """
 
501
    def queuer(*args, **kwargs):
 
502
        return add_idle(func, "%s() (using as_idle)" % func.__name__,
 
503
                       args=args, kwargs=kwargs)
 
504
    return queuer
 
505
 
 
506
def as_urgent(func):
 
507
    """Like ``as_idle``, but uses ``add_urgent_call()`` instead of
 
508
    ``add_idle()``.
 
509
    """
 
510
    def queuer(*args, **kwargs):
 
511
        return add_urgent_call(func, "%s() (using as_urgent)" % func.__name__,
 
512
                               args=args, kwargs=kwargs)
 
513
    return queuer
 
514
 
 
515
def idle_iterate(func, name, args=None, kwargs=None):
 
516
    """Iterate over a generator function using add_idle for each
 
517
    iteration.
 
518
 
 
519
    This allows long running functions to be split up into distinct
 
520
    steps, after each step other idle functions will have a chance to
 
521
    run.
 
522
 
 
523
    For example::
 
524
 
 
525
        def foo(x, y, z):
 
526
            # do some computation
 
527
            yield
 
528
            # more computation
 
529
            yield
 
530
            # more computation
 
531
            yield
 
532
 
 
533
        eventloop.idle_iterate(foo, 'Foo', args=(1, 2, 3))
 
534
    """
 
535
    if args is None:
 
536
        args = ()
 
537
    if kwargs is None:
 
538
        kwargs = {}
 
539
    iterator = func(*args, **kwargs)
 
540
    add_idle(_idle_iterate_step, name, args=(iterator, name))
 
541
 
 
542
def _idle_iterate_step(iterator, name):
 
543
    try:
 
544
        retval = iterator.next()
 
545
    except StopIteration:
 
546
        return
 
547
    else:
 
548
        if retval is not None:
 
549
            logging.warn("idle_iterate yield value ignored: %s (%s)",
 
550
                         retval, name)
 
551
        add_idle(_idle_iterate_step, name, args=(iterator, name))
 
552
 
 
553
def idle_iterator(func):
 
554
    """Decorator to wrap a generator function in a ``idle_iterate()``
 
555
    call.
 
556
    """
 
557
    def queuer(*args, **kwargs):
 
558
        return idle_iterate(func, "%s() (using idle_iterator)" % func.__name__, 
 
559
                            args=args, kwargs=kwargs)
 
560
    return queuer