~ubuntu-branches/ubuntu/natty/miro/natty

« back to all changes in this revision

Viewing changes to portable/eventloop.py

  • Committer: Bazaar Package Importer
  • Author(s): Bryce Harrington
  • Date: 2011-01-22 02:46:33 UTC
  • mfrom: (1.4.10 upstream) (1.7.5 experimental)
  • Revision ID: james.westby@ubuntu.com-20110122024633-kjme8u93y2il5nmf
Tags: 3.5.1-1ubuntu1
* Merge from debian.  Remaining ubuntu changes:
  - Use python 2.7 instead of python 2.6
  - Relax dependency on python-dbus to >= 0.83.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Miro - an RSS based video player application
2
 
# Copyright (C) 2005-2010 Participatory Culture Foundation
3
 
#
4
 
# This program is free software; you can redistribute it and/or modify
5
 
# it under the terms of the GNU General Public License as published by
6
 
# the Free Software Foundation; either version 2 of the License, or
7
 
# (at your option) any later version.
8
 
#
9
 
# This program is distributed in the hope that it will be useful,
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
# GNU General Public License for more details.
13
 
#
14
 
# You should have received a copy of the GNU General Public License
15
 
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17
 
#
18
 
# In addition, as a special exception, the copyright holders give
19
 
# permission to link the code of portions of this program with the OpenSSL
20
 
# library.
21
 
#
22
 
# You must obey the GNU General Public License in all respects for all of
23
 
# the code used other than OpenSSL. If you modify file(s) with this
24
 
# exception, you may extend this exception to your version of the file(s),
25
 
# but you are not obligated to do so. If you do not wish to do so, delete
26
 
# this exception statement from your version. If you delete this exception
27
 
# statement from all source files in the program, then also delete it here.
28
 
 
29
 
"""``miro.eventloop`` -- Event loop handler.
30
 
 
31
 
This module handles the miro event loop which is responsible for
32
 
network requests and scheduling.
33
 
 
34
 
TODO: handle user setting clock back
35
 
"""
36
 
 
37
 
import threading
38
 
import errno
39
 
import select
40
 
import socket
41
 
import heapq
42
 
import Queue
43
 
import logging
44
 
from miro import trapcall
45
 
from miro import signals
46
 
from miro import util
47
 
 
48
 
from miro.clock import clock
49
 
 
50
 
cumulative = {}
51
 
 
52
 
class DelayedCall(object):
53
 
    def __init__(self, function, name, args, kwargs):
54
 
        self.function = function
55
 
        self.name = name
56
 
        self.args = args
57
 
        self.kwargs = kwargs
58
 
        self.canceled = False
59
 
 
60
 
    def _unlink(self):
61
 
        """Removes the references that this object has to the outside
62
 
        world, this eases the GC's work in finding cycles and fixes
63
 
        some memory leaks on windows.
64
 
        """
65
 
        self.function = self.args = self.kwargs = None
66
 
 
67
 
    def cancel(self):
68
 
        self.canceled = True
69
 
        self._unlink()
70
 
 
71
 
    def dispatch(self):
72
 
        success = True
73
 
        if not self.canceled:
74
 
            when = "While handling %s" % self.name
75
 
            start = clock()
76
 
            success = trapcall.trap_call(when, self.function, *self.args,
77
 
                    **self.kwargs)
78
 
            end = clock()
79
 
            if end-start > 0.5:
80
 
                logging.timing("%s too slow (%.3f secs)",
81
 
                               self.name, end-start)
82
 
            try:
83
 
                total = cumulative[self.name]
84
 
            except (SystemExit, KeyboardInterrupt):
85
 
                raise
86
 
            except:
87
 
                total = 0
88
 
            total += end - start
89
 
            cumulative[self.name] = total
90
 
            if total > 5.0:
91
 
                logging.timing("%s cumulative is too slow (%.3f secs)",
92
 
                               self.name, total)
93
 
                cumulative[self.name] = 0
94
 
        self._unlink()
95
 
        return success
96
 
 
97
 
class Scheduler(object):
98
 
    def __init__(self):
99
 
        self.heap = []
100
 
 
101
 
    def addTimeout(self, delay, function, name, args=None, kwargs=None):
102
 
        if args is None:
103
 
            args = ()
104
 
        if kwargs is None:
105
 
            kwargs = {}
106
 
        scheduledTime = clock() + delay
107
 
        dc = DelayedCall(function,  "timeout (%s)" % (name,), args, kwargs)
108
 
        heapq.heappush(self.heap, (scheduledTime, dc))
109
 
        return dc
110
 
 
111
 
    def next_timeout(self):
112
 
        if len(self.heap) == 0:
113
 
            return None
114
 
        else:
115
 
            return max(0, self.heap[0][0] - clock())
116
 
 
117
 
    def hasPendingTimeout(self):
118
 
        return len(self.heap) > 0 and self.heap[0][0] < clock()
119
 
 
120
 
    def process_next_timeout(self):
121
 
        time, dc = heapq.heappop(self.heap)
122
 
        return dc.dispatch()
123
 
 
124
 
    def processTimeouts(self):
125
 
        while self.hasPendingTimeout():
126
 
            self.process_next_timeout()
127
 
 
128
 
class CallQueue(object):
129
 
    def __init__(self):
130
 
        self.queue = Queue.Queue()
131
 
        self.quit_flag = False
132
 
 
133
 
    def addIdle(self, function, name, args=None, kwargs=None):
134
 
        if args is None:
135
 
            args = ()
136
 
        if kwargs is None:
137
 
            kwargs = {}
138
 
        dc = DelayedCall(function, "idle (%s)" % (name,), args, kwargs)
139
 
        self.queue.put(dc)
140
 
        return dc
141
 
 
142
 
    def processNextIdle(self):
143
 
        dc = self.queue.get()
144
 
        return dc.dispatch()
145
 
 
146
 
    def hasPendingIdle(self):
147
 
        return not self.queue.empty()
148
 
 
149
 
    def processIdles(self):
150
 
        while self.hasPendingIdle() and not self.quit_flag:
151
 
            self.processNextIdle()
152
 
 
153
 
class ThreadPool(object):
154
 
    """The thread pool is used to handle calls like gethostbyname()
155
 
    that block and there's no asynchronous workaround.  What we do
156
 
    instead is call them in a separate thread and return the result in
157
 
    a callback that executes in the event loop.
158
 
    """
159
 
    THREADS = 3
160
 
 
161
 
    def __init__(self, eventLoop):
162
 
        self.eventLoop = eventLoop
163
 
        self.queue = Queue.Queue()
164
 
        self.threads = []
165
 
 
166
 
    def initThreads(self):
167
 
        while len(self.threads) < ThreadPool.THREADS:
168
 
            t = threading.Thread(name='ThreadPool - %d' % len(self.threads),
169
 
                                 target=self.thread_loop)
170
 
            t.setDaemon(True)
171
 
            t.start()
172
 
            self.threads.append(t)
173
 
 
174
 
    def thread_loop(self):
175
 
        while True:
176
 
            nextItem = self.queue.get()
177
 
            if nextItem == "QUIT":
178
 
                break
179
 
            else:
180
 
                callback, errback, func, name, args, kwargs, = nextItem
181
 
            try:
182
 
                result = func(*args, **kwargs)
183
 
            except KeyboardInterrupt:
184
 
                raise
185
 
            except Exception, e:
186
 
                func = errback
187
 
                name = 'Thread Pool Errback (%s)' % name
188
 
                args = (e,)
189
 
            else:
190
 
                func = callback
191
 
                name = 'Thread Pool Callback (%s)' % name
192
 
                args = (result,)
193
 
            if not self.eventLoop.quitFlag:
194
 
                self.eventLoop.idleQueue.addIdle(func, name, args=args)
195
 
                self.eventLoop.wakeup()
196
 
 
197
 
    def queueCall(self, callback, errback, function, name, *args, **kwargs):
198
 
        self.queue.put((callback, errback, function, name, args, kwargs))
199
 
 
200
 
    def closeThreads(self):
201
 
        for x in xrange(len(self.threads)):
202
 
            self.queue.put("QUIT")
203
 
        while len(self.threads) > 0:
204
 
            x = self.threads.pop()
205
 
            try:
206
 
                x.join()
207
 
            except (SystemExit, KeyboardInterrupt):
208
 
                raise
209
 
            except:
210
 
                pass
211
 
 
212
 
class EventLoop(signals.SignalEmitter):
213
 
    def __init__(self):
214
 
        signals.SignalEmitter.__init__(self, 'thread-will-start',
215
 
                                       'thread-started',
216
 
                                       'thread-did-start',
217
 
                                       'begin-loop',
218
 
                                       'end-loop',
219
 
                                       'event-finished')
220
 
        self.scheduler = Scheduler()
221
 
        self.idleQueue = CallQueue()
222
 
        self.urgentQueue = CallQueue()
223
 
        self.threadPool = ThreadPool(self)
224
 
        self.readCallbacks = {}
225
 
        self.writeCallbacks = {}
226
 
        self.wakeSender, self.wakeReceiver = util.make_dummy_socket_pair()
227
 
        self.addReadCallback(self.wakeReceiver, self._slurpWakerData)
228
 
        self.quitFlag = False
229
 
        self.clearRemovedCallbacks()
230
 
        self.loop_ready = threading.Event()
231
 
 
232
 
    def clearRemovedCallbacks(self):
233
 
        self.removedReadCallbacks = set()
234
 
        self.removedWriteCallbacks = set()
235
 
 
236
 
    def _slurpWakerData(self):
237
 
        self.wakeReceiver.recv(1024)
238
 
 
239
 
    def addReadCallback(self, socket, callback):
240
 
        self.readCallbacks[socket.fileno()] = callback
241
 
 
242
 
    def removeReadCallback(self, socket):
243
 
        del self.readCallbacks[socket.fileno()]
244
 
        self.removedReadCallbacks.add(socket.fileno())
245
 
 
246
 
    def addWriteCallback(self, socket, callback):
247
 
        self.writeCallbacks[socket.fileno()] = callback
248
 
 
249
 
    def removeWriteCallback(self, socket):
250
 
        del self.writeCallbacks[socket.fileno()]
251
 
        self.removedWriteCallbacks.add(socket.fileno())
252
 
 
253
 
    def wakeup(self):
254
 
        try:
255
 
            self.wakeSender.send("b")
256
 
        except socket.error, e:
257
 
            logging.warn("Error waking up eventloop (%s)", e)
258
 
 
259
 
    def callInThread(self, callback, errback, function, name, *args, **kwargs):
260
 
        self.threadPool.queueCall(callback, errback, function, name,
261
 
                                  *args, **kwargs)
262
 
 
263
 
    def loop(self):
264
 
        self.loop_ready.set()
265
 
        self.emit('thread-will-start')
266
 
        self.emit('thread-started', threading.currentThread())
267
 
        self.emit('thread-did-start')
268
 
 
269
 
        while not self.quitFlag:
270
 
            self.emit('begin-loop')
271
 
            self.clearRemovedCallbacks()
272
 
            timeout = self.scheduler.next_timeout()
273
 
            readfds = self.readCallbacks.keys()
274
 
            writefds = self.writeCallbacks.keys()
275
 
            try:
276
 
                readables, writeables, _ = select.select(readfds, writefds,
277
 
                                                         [], timeout)
278
 
            except select.error, (err, detail):
279
 
                if err == errno.EINTR:
280
 
                    logging.warning ("eventloop: %s", detail)
281
 
                else:
282
 
                    raise
283
 
            if self.quitFlag:
284
 
                break
285
 
            self._process_urgent_events()
286
 
            for event in self.generateEvents(readables, writeables):
287
 
                success = event()
288
 
                self.emit('event-finished', success)
289
 
                if self.quitFlag:
290
 
                    break
291
 
                self._process_urgent_events()
292
 
                if self.quitFlag:
293
 
                    break
294
 
            self.emit('end-loop')
295
 
 
296
 
    def _process_urgent_events(self):
297
 
        queue = self.urgentQueue
298
 
        while queue.hasPendingIdle() and not queue.quit_flag:
299
 
            success = queue.processNextIdle()
300
 
            self.emit('event-finished', success)
301
 
 
302
 
    def generateEvents(self, readables, writeables):
303
 
        """Generator that creates the list of events that should be
304
 
        dealt with on this iteration of the event loop.  This includes
305
 
        all socket read/write callbacks, timeouts and idle calls.
306
 
 
307
 
        "events" are implemented as functions that should be called
308
 
        with no arguments.
309
 
        """
310
 
        for callback in self.generateCallbacks(writeables,
311
 
                                               self.writeCallbacks,
312
 
                                               self.removedWriteCallbacks):
313
 
            yield callback
314
 
        for callback in self.generateCallbacks(readables,
315
 
                                               self.readCallbacks,
316
 
                                               self.removedReadCallbacks):
317
 
            yield callback
318
 
        while self.scheduler.hasPendingTimeout():
319
 
            yield self.scheduler.process_next_timeout
320
 
        while self.idleQueue.hasPendingIdle():
321
 
            yield self.idleQueue.processNextIdle
322
 
 
323
 
    def generateCallbacks(self, readyList, map_, removed):
324
 
        for fd in readyList:
325
 
            try:
326
 
                function = map_[fd]
327
 
            except KeyError:
328
 
                # this can happen the write callback removes the read
329
 
                # callback or vise versa
330
 
                pass
331
 
            else:
332
 
                if fd in removed:
333
 
                    continue
334
 
                when = "While talking to the network"
335
 
                def callbackEvent():
336
 
                    success = trapcall.trap_call(when, function)
337
 
                    if not success:
338
 
                        del map_[fd]
339
 
                    return success
340
 
                yield callbackEvent
341
 
 
342
 
    def quit(self):
343
 
        self.quitFlag = True
344
 
        self.idleQueue.quit_flag = True
345
 
        self.urgentQueue.quit_flag = True
346
 
 
347
 
_eventLoop = EventLoop()
348
 
 
349
 
def addReadCallback(socket, callback):
350
 
    """Add a read callback.  When socket is ready for reading,
351
 
    callback will be called.  If there is already a read callback
352
 
    installed, it will be replaced.
353
 
    """
354
 
    _eventLoop.addReadCallback(socket, callback)
355
 
 
356
 
def removeReadCallback(socket):
357
 
    """Remove a read callback.  If there is not a read callback
358
 
    installed for socket, a KeyError will be thrown.
359
 
    """
360
 
    _eventLoop.removeReadCallback(socket)
361
 
 
362
 
def addWriteCallback(socket, callback):
363
 
    """Add a write callback.  When socket is ready for writing,
364
 
    callback will be called.  If there is already a write callback
365
 
    installed, it will be replaced.
366
 
    """
367
 
    _eventLoop.addWriteCallback(socket, callback)
368
 
 
369
 
def removeWriteCallback(socket):
370
 
    """Remove a write callback.  If there is not a write callback
371
 
    installed for socket, a KeyError will be thrown.
372
 
    """
373
 
    _eventLoop.removeWriteCallback(socket)
374
 
 
375
 
def stopHandlingSocket(socket):
376
 
    """Convience function to that removes both the read and write
377
 
    callback for a socket if they exist.
378
 
    """
379
 
    try:
380
 
        removeReadCallback(socket)
381
 
    except KeyError:
382
 
        pass
383
 
    try:
384
 
        removeWriteCallback(socket)
385
 
    except KeyError:
386
 
        pass
387
 
 
388
 
def addTimeout(delay, function, name, args=None, kwargs=None):
389
 
    """Schedule a function to be called at some point in the future.
390
 
    Returns a ``DelayedCall`` object that can be used to cancel the
391
 
    call.
392
 
    """
393
 
    dc = _eventLoop.scheduler.addTimeout(delay, function, name, args, kwargs)
394
 
    return dc
395
 
 
396
 
def addIdle(function, name, args=None, kwargs=None):
397
 
    """Schedule a function to be called when we get some spare time.
398
 
    Returns a ``DelayedCall`` object that can be used to cancel the
399
 
    call.
400
 
    """
401
 
    dc = _eventLoop.idleQueue.addIdle(function, name, args, kwargs)
402
 
    _eventLoop.wakeup()
403
 
    return dc
404
 
 
405
 
def addUrgentCall(function, name, args=None, kwargs=None):
406
 
    """Schedule a function to be called as soon as possible.  This
407
 
    method should be used for things like GUI actions, where the user
408
 
    is waiting on us.
409
 
    """
410
 
    dc = _eventLoop.urgentQueue.addIdle(function, name, args, kwargs)
411
 
    _eventLoop.wakeup()
412
 
    return dc
413
 
 
414
 
def callInThread(callback, errback, function, name, *args, **kwargs):
415
 
    """Schedule a function to be called in a separate thread. Do not
416
 
    put code that accesses the database or the UI here!
417
 
    """
418
 
    _eventLoop.callInThread(callback, errback, function, name, *args, **kwargs)
419
 
 
420
 
lt = None
421
 
 
422
 
profile_file = None
423
 
 
424
 
def startup():
425
 
    threadPoolInit()
426
 
 
427
 
    def profile_startup():
428
 
        import profile
429
 
        def start_loop():
430
 
            _eventLoop.loop()
431
 
        profile.runctx('_eventLoop.loop()', globals(), locals(),
432
 
                       profile_file + ".event_loop")
433
 
 
434
 
    global lt
435
 
    if profile_file:
436
 
        lt = threading.Thread(target=profile_startup, name="Event Loop")
437
 
    else:
438
 
        lt = threading.Thread(target=_eventLoop.loop, name="Event Loop")
439
 
    lt.setDaemon(False)
440
 
    lt.start()
441
 
    _eventLoop.loop_ready.wait()
442
 
 
443
 
def join():
444
 
    if lt is not None:
445
 
        lt.join()
446
 
 
447
 
def quit():
448
 
    threadPoolQuit()
449
 
    _eventLoop.quit()
450
 
    _eventLoop.wakeup()
451
 
 
452
 
def connect(signal, callback):
453
 
    _eventLoop.connect(signal, callback)
454
 
 
455
 
def disconnect(signal, callback):
456
 
    _eventLoop.disconnect(signal, callback)
457
 
 
458
 
def resetEventLoop():
459
 
    global _eventLoop
460
 
    _eventLoop = EventLoop()
461
 
 
462
 
def threadPoolQuit():
463
 
    _eventLoop.threadPool.closeThreads()
464
 
 
465
 
def threadPoolInit():
466
 
    _eventLoop.threadPool.initThreads()
467
 
 
468
 
def as_idle(func):
469
 
    """Decorator to make a methods run as an idle function
470
 
 
471
 
    Suppose you have 2 methods, foo and bar
472
 
 
473
 
    @as_idle
474
 
    def foo():
475
 
        # database operations
476
 
 
477
 
    def bar():
478
 
        # same database operations as foo
479
 
 
480
 
    Then calling foo() is exactly the same as calling addIdle(bar).
481
 
    """
482
 
    def queuer(*args, **kwargs):
483
 
        return addIdle(func, "%s() (using as_idle)" % func.__name__,
484
 
                       args=args, kwargs=kwargs)
485
 
    return queuer
486
 
 
487
 
def as_urgent(func):
488
 
    """Like ``as_idle``, but uses ``addUrgentCall()`` instead of
489
 
    ``addIdle()``.
490
 
    """
491
 
    def queuer(*args, **kwargs):
492
 
        return addUrgentCall(func, "%s() (using as_urgent)" % func.__name__,
493
 
                             args=args, kwargs=kwargs)
494
 
    return queuer
495
 
 
496
 
def idle_iterate(func, name, args=None, kwargs=None):
497
 
    """Iterate over a generator function using addIdle for each
498
 
    iteration.
499
 
 
500
 
    This allows long running functions to be split up into distinct
501
 
    steps, after each step other idle functions will have a chance to
502
 
    run.
503
 
 
504
 
    For example::
505
 
 
506
 
        def foo(x, y, z):
507
 
            # do some computation
508
 
            yield
509
 
            # more computation
510
 
            yield
511
 
            # more computation
512
 
            yield
513
 
 
514
 
        eventloop.idle_iterate(foo, 'Foo', args=(1, 2, 3))
515
 
    """
516
 
    if args is None:
517
 
        args = ()
518
 
    if kwargs is None:
519
 
        kwargs = {}
520
 
    iter = func(*args, **kwargs)
521
 
    addIdle(_idle_iterate_step, name, args=(iter, name))
522
 
 
523
 
def _idle_iterate_step(iter, name):
524
 
    try:
525
 
        rv = iter.next()
526
 
    except StopIteration:
527
 
        return
528
 
    else:
529
 
        if rv is not None:
530
 
            logging.warn("idle_iterate yield value ignored: %s (%s)",
531
 
                    rv, name)
532
 
        addIdle(_idle_iterate_step, name, args=(iter, name))
533
 
 
534
 
def idle_iterator(func):
535
 
    """Decorator to wrap a generator function in a ``idle_iterate()``
536
 
    call.
537
 
    """
538
 
    def queuer(*args, **kwargs):
539
 
        return idle_iterate(func, "%s() (using idle_iterator)" % func.__name__, 
540
 
                            args=args, kwargs=kwargs)
541
 
    return queuer