~ubuntu-branches/ubuntu/precise/gst0.10-python/precise

« back to all changes in this revision

Viewing changes to testsuite/test_event.py

  • Committer: Bazaar Package Importer
  • Author(s): Loic Minier
  • Date: 2006-06-25 19:37:45 UTC
  • Revision ID: james.westby@ubuntu.com-20060625193745-9yeg0wq56r24n57x
Tags: upstream-0.10.4
ImportĀ upstreamĀ versionĀ 0.10.4

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- Mode: Python -*-
 
2
# vi:si:et:sw=4:sts=4:ts=4
 
3
#
 
4
# gst-python - Python bindings for GStreamer
 
5
# Copyright (C) 2002 David I. Lehn
 
6
# Copyright (C) 2004 Johan Dahlin
 
7
# Copyright (C) 2005 Edward Hervey
 
8
#
 
9
# This library is free software; you can redistribute it and/or
 
10
# modify it under the terms of the GNU Lesser General Public
 
11
# License as published by the Free Software Foundation; either
 
12
# version 2.1 of the License, or (at your option) any later version.
 
13
#
 
14
# This library is distributed in the hope that it will be useful,
 
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
17
# Lesser General Public License for more details.
 
18
#
 
19
# You should have received a copy of the GNU Lesser General Public
 
20
# License along with this library; if not, write to the Free Software
 
21
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
 
22
 
 
23
import os
 
24
import sys
 
25
import time
 
26
import tempfile
 
27
 
 
28
from common import gst, unittest, testhelper, TestCase
 
29
 
 
30
class EventTest(TestCase):
 
31
    def setUp(self):
 
32
        TestCase.setUp(self)
 
33
        self.pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
 
34
        self.sink = self.pipeline.get_by_name('sink')
 
35
        self.pipeline.set_state(gst.STATE_PLAYING)
 
36
 
 
37
    def tearDown(self):
 
38
        gst.debug('setting pipeline to NULL')
 
39
        self.pipeline.set_state(gst.STATE_NULL)
 
40
        gst.debug('set pipeline to NULL')
 
41
        # FIXME: wait for state change thread to die
 
42
        while self.pipeline.__gstrefcount__ > 1:
 
43
            gst.debug('waiting for self.pipeline G rc to drop to 1')
 
44
            time.sleep(0.1)
 
45
        self.assertEquals(self.pipeline.__gstrefcount__, 1)
 
46
 
 
47
        del self.sink
 
48
        del self.pipeline
 
49
        TestCase.tearDown(self)
 
50
        
 
51
    def testEventSeek(self):
 
52
        # this event only serves to change the rate of data transfer
 
53
        event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
 
54
            gst.SEEK_TYPE_NONE, 0, gst.SEEK_TYPE_NONE, 0)
 
55
        # FIXME: but basesrc goes into an mmap/munmap spree, needs to be fixed
 
56
 
 
57
        event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
 
58
            gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0)
 
59
        assert event
 
60
        gst.debug('sending event')
 
61
        self.sink.send_event(event)
 
62
        gst.debug('sent event')
 
63
 
 
64
        self.assertEqual(event.parse_seek(), [1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
 
65
            gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0])
 
66
 
 
67
    def testWrongEvent(self):
 
68
        buffer = gst.Buffer()
 
69
        self.assertRaises(TypeError, self.sink.send_event, buffer)
 
70
        number = 1
 
71
        self.assertRaises(TypeError, self.sink.send_event, number)
 
72
 
 
73
 
 
74
class EventFileSrcTest(TestCase):
 
75
 
 
76
   def setUp(self):
 
77
       TestCase.setUp(self)
 
78
       gst.info("start")
 
79
       self.filename = tempfile.mktemp()
 
80
       open(self.filename, 'w').write(''.join(map(str, range(10))))
 
81
       
 
82
       self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
 
83
       self.source = self.pipeline.get_by_name('source')
 
84
       self.sink = self.pipeline.get_by_name('sink')
 
85
       self.sigid = self.sink.connect('handoff', self.handoff_cb)
 
86
       self.bus = self.pipeline.get_bus()
 
87
       
 
88
   def tearDown(self):
 
89
       self.pipeline.set_state(gst.STATE_NULL)
 
90
       self.sink.disconnect(self.sigid)
 
91
       if os.path.exists(self.filename):
 
92
           os.remove(self.filename)
 
93
       del self.bus
 
94
       del self.pipeline
 
95
       del self.source
 
96
       del self.sink
 
97
       del self.handoffs
 
98
       TestCase.tearDown(self)
 
99
 
 
100
   def handoff_cb(self, element, buffer, pad):
 
101
       self.handoffs.append(str(buffer))
 
102
 
 
103
   def playAndIter(self):
 
104
       self.handoffs = []
 
105
       self.pipeline.set_state(gst.STATE_PLAYING)
 
106
       assert self.pipeline.set_state(gst.STATE_PLAYING)
 
107
       while 42:
 
108
           msg = self.bus.pop()
 
109
           if msg and msg.type == gst.MESSAGE_EOS:
 
110
               break
 
111
       assert self.pipeline.set_state(gst.STATE_PAUSED)
 
112
       handoffs = self.handoffs
 
113
       self.handoffs = []
 
114
       return handoffs
 
115
 
 
116
   def sink_seek(self, offset, method=gst.SEEK_TYPE_SET):
 
117
       self.sink.seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
 
118
                      method, offset,
 
119
                      gst.SEEK_TYPE_NONE, 0)
 
120
      
 
121
   def testSimple(self):
 
122
       handoffs = self.playAndIter()
 
123
       assert handoffs == map(str, range(10))
 
124
   
 
125
   def testSeekCur(self):
 
126
       self.sink_seek(8)
 
127
       self.playAndIter()
 
128
 
 
129
class TestEmit(TestCase):
 
130
    def testEmit(self):
 
131
        object = testhelper.get_object()
 
132
        object.connect('event', self._event_cb)
 
133
        
 
134
        # First emit from C
 
135
        testhelper.emit_event(object)
 
136
 
 
137
        # Then emit from Python
 
138
        object.emit('event', gst.event_new_eos())
 
139
 
 
140
    def _event_cb(self, obj, event):
 
141
        assert isinstance(event, gst.Event)
 
142
    
 
143
 
 
144
class TestDelayedEventProbe(TestCase):
 
145
    # this test:
 
146
    # starts a pipeline with only a source
 
147
    # adds an event probe to catch the (first) new-segment
 
148
    # adds a buffer probe to "autoplug" and send out this event
 
149
    def setUp(self):
 
150
        TestCase.setUp(self)
 
151
        self.pipeline = gst.Pipeline()
 
152
        self.src = gst.element_factory_make('fakesrc')
 
153
        self.src.set_property('num-buffers', 10)
 
154
        self.pipeline.add(self.src)
 
155
        self.srcpad = self.src.get_pad('src')
 
156
        
 
157
    def tearDown(self):
 
158
        gst.debug('setting pipeline to NULL')
 
159
        self.pipeline.set_state(gst.STATE_NULL)
 
160
        gst.debug('set pipeline to NULL')
 
161
        # FIXME: wait for state change thread to die
 
162
        while self.pipeline.__gstrefcount__ > 1:
 
163
            gst.debug('waiting for self.pipeline G rc to drop to 1')
 
164
            time.sleep(0.1)
 
165
        self.assertEquals(self.pipeline.__gstrefcount__, 1)
 
166
 
 
167
    def testProbe(self):
 
168
        self.srcpad.add_event_probe(self._event_probe_cb)
 
169
        self._buffer_probe_id = self.srcpad.add_buffer_probe(
 
170
            self._buffer_probe_cb)
 
171
 
 
172
        self._newsegment = None
 
173
        self._eos = None
 
174
        self._had_buffer = False
 
175
 
 
176
        self.pipeline.set_state(gst.STATE_PLAYING)
 
177
 
 
178
        while not self._eos:
 
179
            time.sleep(0.1)
 
180
 
 
181
        # verify if our newsegment event is still around and valid
 
182
        self.failUnless(self._newsegment)
 
183
        self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
 
184
        self.assertEquals(self._newsegment.__grefcount__, 1)
 
185
 
 
186
        # verify if our eos event is still around and valid
 
187
        self.failUnless(self._eos)
 
188
        self.assertEquals(self._eos.type, gst.EVENT_EOS)
 
189
        self.assertEquals(self._eos.__grefcount__, 1)
 
190
 
 
191
    def _event_probe_cb(self, pad, event):
 
192
        if event.type == gst.EVENT_NEWSEGMENT:
 
193
            self._newsegment = event
 
194
            self.assertEquals(event.__grefcount__, 3)
 
195
            # drop the event, we're storing it for later sending
 
196
            return False
 
197
 
 
198
        if  event.type == gst.EVENT_EOS:
 
199
            self._eos = event
 
200
            # we also want fakesink to get it
 
201
            return True
 
202
 
 
203
        self.fail("Got an unknown event %r" % event)
 
204
 
 
205
    def _buffer_probe_cb(self, pad, buffer):
 
206
        self.failUnless(self._newsegment)
 
207
 
 
208
        # fake autoplugging by now putting in a fakesink
 
209
        sink = gst.element_factory_make('fakesink')
 
210
        self.pipeline.add(sink)
 
211
        self.src.link(sink)
 
212
        sink.set_state(gst.STATE_PLAYING)
 
213
 
 
214
        pad = sink.get_pad('sink')
 
215
        pad.send_event(self._newsegment)
 
216
 
 
217
        # we don't want to be called again
 
218
        self.srcpad.remove_buffer_probe(self._buffer_probe_id)
 
219
        
 
220
        self._had_buffer = True
 
221
        # now let the buffer through
 
222
        return True
 
223
 
 
224
if __name__ == "__main__":
 
225
    unittest.main()