~ubuntu-branches/debian/wheezy/numm/wheezy

« back to all changes in this revision

Viewing changes to numm/io.py

  • Committer: Package Import Robot
  • Author(s): Dafydd Harries
  • Date: 2012-04-16 17:13:14 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20120416171314-8shz7o37wr4j2k7r
Tags: 0.4-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
 
2
 
import threading
 
2
import os
3
3
 
4
4
import gobject
5
5
import gst
6
6
 
7
7
gobject.threads_init()
8
8
 
 
9
class WakePipe():
 
10
    def __init__(self):
 
11
        (self._read_fd, self._write_fd) = os.pipe()
 
12
 
 
13
    def sleep(self):
 
14
        return os.read(self._read_fd, 1)
 
15
 
 
16
    def wake(self):
 
17
        os.write(self._write_fd, '\0')
 
18
 
9
19
class RunPipeline(object):
10
20
    def __init__(self, pipeline):
11
21
        self.pipeline = pipeline
12
22
        self.errors = []
13
 
        self.done = False
14
 
        self.context = gobject.main_context_default()
 
23
        self.eos = False
 
24
        self.wake = WakePipe()
15
25
 
16
26
        bus = pipeline.get_bus()
17
 
        # Using a sync handler seems to deadlock with pulling a buffer.
18
 
        bus.add_watch(self._message)
 
27
        bus.set_sync_handler(self._message)
19
28
 
20
29
    def _message(self, bus, msg):
21
30
        if msg.type == gst.MESSAGE_ERROR:
22
 
            self.errors.append(RuntimeError(msg))
23
 
            self.done = True
 
31
            self.errors.append(msg)
 
32
            self.wake.wake()
24
33
        elif msg.type == gst.MESSAGE_EOS:
25
 
            self.done = True
 
34
            self.eos = True
 
35
            self.wake.wake()
26
36
 
27
37
        return gst.BUS_PASS
28
38
 
29
39
    def start(self):
30
40
        self.pipeline.set_state(gst.STATE_PLAYING)
31
41
 
32
 
    def step(self):
33
 
        self.context.iteration()
34
 
 
35
 
    def stop(self):
 
42
    def finish(self):
36
43
        ok = self.pipeline.set_state(gst.STATE_NULL)
37
44
 
38
45
        if ok != gst.STATE_CHANGE_SUCCESS:
41
48
        state = self.pipeline.get_state()
42
49
        assert state[1] == gst.STATE_NULL, state
43
50
 
44
 
    def run(self):
 
51
    def wait(self):
 
52
        self.wake.sleep()
 
53
 
 
54
    def __iter__(self):
45
55
        try:
46
56
            self.start()
47
57
 
48
 
            while not self.done:
49
 
                yield self.step()
 
58
            while not (self.eos or self.errors):
 
59
                self.wake.sleep()
 
60
 
 
61
                yield
50
62
        finally:
51
 
            self.stop()
 
63
            self.finish()
52
64
 
53
65
        if self.errors:
54
 
            raise self.errors[0]
55
 
 
56
 
class Reader(object):
57
 
    def __init__(self, pipeline, appsink):
58
 
        self.pipeline = pipeline
 
66
            raise RuntimeError(self.errors[0])
 
67
 
 
68
    def run(self):
 
69
        for _ in self:
 
70
            pass
 
71
 
 
72
class Reader(RunPipeline):
 
73
    def __init__(self, pipeline, appsink, cb):
 
74
        RunPipeline.__init__(self, pipeline)
59
75
        self.appsink = appsink
60
 
        self._ready = threading.Semaphore(0)
 
76
        self.cb = cb
61
77
 
62
78
        appsink.props.emit_signals = True
63
79
        appsink.props.max_buffers = 10
65
81
        appsink.connect('new-buffer', self._new_buffer)
66
82
 
67
83
    def _new_buffer(self, _appsink):
68
 
        # Wake up the mainloop so that the .iteration() call in
69
 
        # _iter_pipeline() returns. Otherwise we can be waiting
70
 
        # indefinitely.
71
 
        gobject.idle_add(lambda: False)
72
 
 
73
 
    def pull(self):
74
 
        return self.appsink.emit('pull-buffer')
 
84
        buf = self.appsink.emit('pull-buffer')
 
85
        self.wake.wake()
 
86
 
 
87
        if buf is None:
 
88
            return
 
89
 
 
90
        v = self._process_buffer(buf)
 
91
 
 
92
        if v is not None:
 
93
            self.cb(v)
75
94
 
76
95
def _run_appsrc_pipeline(pipeline, appsrc, get_chunk):
77
96
    position = [0]
92
111
 
93
112
    appsrc.props.emit_signals = True
94
113
    appsrc.connect('need-data', need_data)
95
 
    r = RunPipeline(pipeline)
96
 
 
97
 
    for _ in r.run():
98
 
        pass
 
114
    run = RunPipeline(pipeline)
 
115
    run.run()