2
# vi:si:et:sw=4:sts=4:ts=4
4
# gst-python - Python bindings for GStreamer
5
# Copyright (C) 2002 David I. Lehn
6
# Copyright (C) 2004 Johan Dahlin
7
# Copyright (C) 2005 Edward Hervey
9
# This library is free software; you can redistribute it and/or
10
# modify it under the terms of the GNU Lesser General Public
11
# License as published by the Free Software Foundation; either
12
# version 2.1 of the License, or (at your option) any later version.
14
# This library is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17
# Lesser General Public License for more details.
19
# You should have received a copy of the GNU Lesser General Public
20
# License along with this library; if not, write to the Free Software
21
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28
from common import gst, unittest, testhelper, TestCase
30
class EventTest(TestCase):
33
self.pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
34
self.sink = self.pipeline.get_by_name('sink')
35
self.pipeline.set_state(gst.STATE_PLAYING)
38
gst.debug('setting pipeline to NULL')
39
self.pipeline.set_state(gst.STATE_NULL)
40
gst.debug('set pipeline to NULL')
41
# FIXME: wait for state change thread to die
42
while self.pipeline.__gstrefcount__ > 1:
43
gst.debug('waiting for self.pipeline G rc to drop to 1')
45
self.assertEquals(self.pipeline.__gstrefcount__, 1)
49
TestCase.tearDown(self)
51
def testEventSeek(self):
52
# this event only serves to change the rate of data transfer
53
event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
54
gst.SEEK_TYPE_NONE, 0, gst.SEEK_TYPE_NONE, 0)
55
# FIXME: but basesrc goes into an mmap/munmap spree, needs to be fixed
57
event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
58
gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0)
60
gst.debug('sending event')
61
self.sink.send_event(event)
62
gst.debug('sent event')
64
self.assertEqual(event.parse_seek(), [1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
65
gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0])
67
def testWrongEvent(self):
69
self.assertRaises(TypeError, self.sink.send_event, buffer)
71
self.assertRaises(TypeError, self.sink.send_event, number)
74
class EventFileSrcTest(TestCase):
79
self.filename = tempfile.mktemp()
80
open(self.filename, 'w').write(''.join(map(str, range(10))))
82
self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
83
self.source = self.pipeline.get_by_name('source')
84
self.sink = self.pipeline.get_by_name('sink')
85
self.sigid = self.sink.connect('handoff', self.handoff_cb)
86
self.bus = self.pipeline.get_bus()
89
self.pipeline.set_state(gst.STATE_NULL)
90
self.sink.disconnect(self.sigid)
91
if os.path.exists(self.filename):
92
os.remove(self.filename)
98
TestCase.tearDown(self)
100
def handoff_cb(self, element, buffer, pad):
101
self.handoffs.append(str(buffer))
103
def playAndIter(self):
105
self.pipeline.set_state(gst.STATE_PLAYING)
106
assert self.pipeline.set_state(gst.STATE_PLAYING)
109
if msg and msg.type == gst.MESSAGE_EOS:
111
assert self.pipeline.set_state(gst.STATE_PAUSED)
112
handoffs = self.handoffs
116
def sink_seek(self, offset, method=gst.SEEK_TYPE_SET):
117
self.sink.seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
119
gst.SEEK_TYPE_NONE, 0)
121
def testSimple(self):
122
handoffs = self.playAndIter()
123
assert handoffs == map(str, range(10))
125
def testSeekCur(self):
129
class TestEmit(TestCase):
131
object = testhelper.get_object()
132
object.connect('event', self._event_cb)
135
testhelper.emit_event(object)
137
# Then emit from Python
138
object.emit('event', gst.event_new_eos())
140
def _event_cb(self, obj, event):
141
assert isinstance(event, gst.Event)
144
class TestDelayedEventProbe(TestCase):
146
# starts a pipeline with only a source
147
# adds an event probe to catch the (first) new-segment
148
# adds a buffer probe to "autoplug" and send out this event
151
self.pipeline = gst.Pipeline()
152
self.src = gst.element_factory_make('fakesrc')
153
self.src.set_property('num-buffers', 10)
154
self.pipeline.add(self.src)
155
self.srcpad = self.src.get_pad('src')
158
gst.debug('setting pipeline to NULL')
159
self.pipeline.set_state(gst.STATE_NULL)
160
gst.debug('set pipeline to NULL')
161
# FIXME: wait for state change thread to die
162
while self.pipeline.__gstrefcount__ > 1:
163
gst.debug('waiting for self.pipeline G rc to drop to 1')
165
self.assertEquals(self.pipeline.__gstrefcount__, 1)
168
self.srcpad.add_event_probe(self._event_probe_cb)
169
self._buffer_probe_id = self.srcpad.add_buffer_probe(
170
self._buffer_probe_cb)
172
self._newsegment = None
174
self._had_buffer = False
176
self.pipeline.set_state(gst.STATE_PLAYING)
181
# verify if our newsegment event is still around and valid
182
self.failUnless(self._newsegment)
183
self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
184
self.assertEquals(self._newsegment.__grefcount__, 1)
186
# verify if our eos event is still around and valid
187
self.failUnless(self._eos)
188
self.assertEquals(self._eos.type, gst.EVENT_EOS)
189
self.assertEquals(self._eos.__grefcount__, 1)
191
def _event_probe_cb(self, pad, event):
192
if event.type == gst.EVENT_NEWSEGMENT:
193
self._newsegment = event
194
self.assertEquals(event.__grefcount__, 3)
195
# drop the event, we're storing it for later sending
198
if event.type == gst.EVENT_EOS:
200
# we also want fakesink to get it
203
self.fail("Got an unknown event %r" % event)
205
def _buffer_probe_cb(self, pad, buffer):
206
self.failUnless(self._newsegment)
208
# fake autoplugging by now putting in a fakesink
209
sink = gst.element_factory_make('fakesink')
210
self.pipeline.add(sink)
212
sink.set_state(gst.STATE_PLAYING)
214
pad = sink.get_pad('sink')
215
pad.send_event(self._newsegment)
217
# we don't want to be called again
218
self.srcpad.remove_buffer_probe(self._buffer_probe_id)
220
self._had_buffer = True
221
# now let the buffer through
224
if __name__ == "__main__":