1
# PiTiVi , Non-linear video editor
5
# Copyright (c) 2005-2008, Edward Hervey <bilboed@bilboed.com>
6
# 2008, Alessandro Decina <alessandro.decina@collabora.co.uk>
8
# This program is free software; you can redistribute it and/or
9
# modify it under the terms of the GNU Lesser General Public
10
# License as published by the Free Software Foundation; either
11
# version 2.1 of the License, or (at your option) any later version.
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16
# Lesser General Public License for more details.
18
# You should have received a copy of the GNU Lesser General Public
19
# License along with this program; if not, write to the
20
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21
# Boston, MA 02110-1301, USA.
24
Discover file multimedia information.
27
from gettext import gettext as _
31
from gst import pbutils
35
from pitivi.log.loggable import Loggable
36
from pitivi.factories.file import FileSourceFactory, PictureFileSourceFactory
37
from pitivi.stream import get_stream_for_pad
38
from pitivi.signalinterface import Signallable
39
from pitivi.stream import VideoStream, TextStream
40
from pitivi.settings import xdg_cache_home
42
# FIXME: We need to store more information regarding streams
43
# i.e. remember the path took to get to a raw stream, and figure out
44
# what encoded format it is
45
# We will need that in order to create proper Stream objects.
48
class EOSSir(gst.Element):
52
"pushes EOS after the first buffer",
53
"Alessandro Decina <alessandro.d@gmail.com>")
55
srctemplate = gst.PadTemplate("src", gst.PAD_SRC,
56
gst.PAD_ALWAYS, gst.Caps("ANY"))
57
sinktemplate = gst.PadTemplate("sink", gst.PAD_SINK,
58
gst.PAD_ALWAYS, gst.Caps("ANY"))
60
__gsttemplates__ = (srctemplate, sinktemplate)
63
gst.Element.__init__(self)
65
self.sinkpad = gst.Pad(self.sinktemplate, "sink")
66
self.sinkpad.set_chain_function(self.chain)
67
self.add_pad(self.sinkpad)
69
self.srcpad = gst.Pad(self.srctemplate, "src")
70
self.add_pad(self.srcpad)
72
def chain(self, pad, buf):
73
ret = self.srcpad.push(buf)
74
if ret == gst.FLOW_OK:
75
self.info("pushed, doing EOS")
76
self.srcpad.push_event(gst.event_new_eos())
79
gobject.type_register(EOSSir)
82
class Discoverer(Signallable, Loggable):
84
Queues requests to discover information about given files.
85
The discovery is done in a very fragmented way, so that it appears to be
86
running in a separate thread.
88
The "starting" signal is emitted when the discoverer starts analyzing some
91
The "ready" signal is emitted when the discoverer has no more files to
94
The "discovery-done" signal is emitted an uri is finished being analyzed.
95
The "discovery-error" signal is emitted if an error is encountered while
100
"discovery-error": ["a", "b", "c"],
101
"discovery-done": ["uri", "factory"],
104
"missing-plugins": ["uri", "detail", "description"]}
107
Loggable.__init__(self)
113
def _resetState(self):
114
self.current_uri = None
115
self.current_tags = []
116
self.current_streams = []
117
self.current_duration = gst.CLOCK_TIME_NONE
121
self.error_detail = None
122
self.unfixed_pads = 0
123
self.unknown_pads = 0
124
self.missing_plugin_messages = []
125
self.dynamic_elements = []
127
self.missing_plugin_details = []
128
self.missing_plugin_descriptions = []
130
def _resetPipeline(self):
131
# finish current, cleanup
132
if self.bus is not None:
133
self.bus.remove_signal_watch()
136
if self.pipeline is not None:
137
self.debug("before setting to NULL")
138
res = self.pipeline.set_state(gst.STATE_NULL)
139
self.debug("after setting to NULL : %s", res)
141
for element in self.dynamic_elements:
142
self.pipeline.remove(element)
144
def addUri(self, uri):
145
""" queue a filename to be discovered """
146
self.info("filename: %s", uri)
147
self.queue.append(uri)
149
self._startAnalysis()
151
def addUris(self, uris):
152
""" queue a list of filenames to be discovered """
153
self.info("filenames : %s", uris)
154
self.queue.extend(uris)
155
if self.queue and not self.working:
156
self._startAnalysis()
158
def _startAnalysis(self):
160
Call this method to start analyzing the uris
163
self.emit("starting")
165
self._scheduleAnalysis()
167
def _scheduleAnalysis(self):
168
gobject.idle_add(self._analyze)
170
def _removeTimeout(self):
171
gobject.source_remove(self.timeout_id)
174
def _checkMissingPlugins(self):
175
if self.bus is not None:
176
# This method is usually called when decodebin(2) reaches PAUSED and
177
# we stop analyzing the current source.
178
# decodebin2 commits its state change to PAUSED _before_ posting
179
# missing-plugin messages, so we manually pop ELEMENT messages
180
# looking for queued missing-plugin messages.
182
message = self.bus.pop_filtered(gst.MESSAGE_ELEMENT)
186
self._busMessageElementCb(self.bus, message)
188
if not self.missing_plugin_messages:
191
for message in self.missing_plugin_messages:
193
pbutils.missing_plugin_message_get_installer_detail(message)
195
pbutils.missing_plugin_message_get_description(message)
197
self.missing_plugin_details.append(detail)
198
self.missing_plugin_descriptions.append(description)
202
def _installMissingPluginsCallback(self, result, factory):
205
if result in (pbutils.INSTALL_PLUGINS_SUCCESS,
206
pbutils.INSTALL_PLUGINS_PARTIAL_SUCCESS):
207
gst.update_registry()
209
elif result == pbutils.INSTALL_PLUGINS_USER_ABORT \
210
and factory.getOutputStreams():
211
self._emitDone(factory)
213
self._emitErrorMissingPlugins()
215
self._finishAnalysisAfterResult(rescan=rescan)
217
def _emitError(self):
218
self.debug("emitting error %s, %s, %s",
219
self.current_uri, self.error, self.error_detail)
220
self.emit("discovery-error", self.current_uri, self.error, self.error_detail)
222
def _emitErrorMissingPlugins(self):
223
self.error = _("Missing plugins:\n%s") % \
224
"\n".join(self.missing_plugin_descriptions)
225
self.error_detail = ""
228
def _emitDone(self, factory):
229
self.emit("discovery-done", self.current_uri, factory)
231
def _emitResult(self):
232
missing_plugins = bool(self.missing_plugin_details)
233
# we got a gst error, error out ASAP
234
if not missing_plugins and self.error:
238
have_video, have_audio, have_image = self._getCurrentStreamTypes()
239
missing_plugins = bool(self.missing_plugin_details)
241
if not self.current_streams and not missing_plugins:
242
# woot, nothing decodable
243
self.error = _('Cannot decode file.')
244
self.error_detail = _("The given file does not contain audio, "
245
"video or picture streams.")
249
# construct the factory with the streams we found
250
if have_image and self.current_duration == gst.CLOCK_TIME_NONE:
251
factory = PictureFileSourceFactory(self.current_uri)
253
factory = FileSourceFactory(self.current_uri)
255
factory.duration = self.current_duration
256
for stream in self.current_streams:
257
factory.addOutputStream(stream)
259
if not missing_plugins:
260
# make sure that we could query the duration (if it's an image, we
261
# assume it's got infinite duration)
262
is_image = have_image and len(self.current_streams) == 1
263
if self.current_duration == gst.CLOCK_TIME_NONE and not is_image:
264
self.error = _("Could not establish the duration of the file.")
265
self.error_detail = _("This clip seems to be in a format "
266
"which cannot be accessed in a random fashion.")
270
self._emitDone(factory)
273
def callback(result):
274
self._installMissingPluginsCallback(result, factory)
276
res = self.emit("missing-plugins", self.current_uri, factory,
277
self.missing_plugin_details,
278
self.missing_plugin_descriptions,
280
if res is None or res != pbutils.INSTALL_PLUGINS_STARTED_OK:
281
# no missing-plugins handlers
282
if factory.getOutputStreams():
283
self._emitDone(factory)
285
self._emitErrorMissingPlugins()
289
# plugins are being installed, processing will continue when
290
# self._installMissingPluginsCallback is called by the application
293
def _finishAnalysis(self, reason):
295
Call this method when the current file is analyzed
296
This method will wrap-up the analyzis and call the next analysis if needed
299
self._removeTimeout()
301
self.info("analysys finished, reason %s", reason)
303
# check if there are missing plugins before calling _resetPipeline as we
304
# are going to pop messagess off the bus
305
self._checkMissingPlugins()
306
self._resetPipeline()
308
# emit discovery-done, discovery-error or missing-plugins
309
if self._emitResult():
310
self._finishAnalysisAfterResult()
312
def _finishAnalysisAfterResult(self, rescan=False):
313
self.info("Cleaning up after finished analyzing %s", self.current_uri)
318
# restart an analysis if there's more...
320
self._scheduleAnalysis()
323
self.info("discoverer is now ready again")
326
def _timeoutCb(self):
327
self.debug("timeout")
330
self.error = _('Timeout while analyzing file.')
331
self.error_detail = _('Analyzing the file took too long.')
332
self._finishAnalysis("timeout")
336
def _getCurrentStreamTypes(self):
340
for stream in self.current_streams:
341
caps_str = str(stream.caps)
342
if caps_str.startswith('video'):
347
elif caps_str.startswith('audio'):
350
return have_video, have_audio, have_image
352
def _scheduleTimeout(self):
353
self.timeout_id = gobject.timeout_add_seconds(10, self._timeoutCb)
355
def _createSource(self):
356
source = gst.element_make_from_uri(gst.URI_SRC,
357
self.current_uri, "src-%s" % self.current_uri)
359
self.warning("This is not a media file: %s", self.current_uri)
360
self.error = _("No available source handler.")
361
self.error_detail = _('You do not have a GStreamer source element to handle the "%s" protocol') % gst.uri_get_protocol(self.current_uri)
365
# increment source blocksize to 128kbytes, this should speed up
366
# push-mode scenarios (like pictures).
367
if hasattr(source.props, 'blocksize'):
368
source.props.blocksize = 131072
371
def _useDecodeBinTwo(self):
372
ret = os.getenv('USE_DECODEBIN2', '1') == '1'
375
def _createDecodeBin(self):
376
if self._useDecodeBinTwo():
377
dbin = gst.element_factory_make("decodebin2", "dbin")
379
dbin = gst.element_factory_make("decodebin", "dbin")
381
dbin.connect("new-decoded-pad", self._newDecodedPadCb)
382
dbin.connect("unknown-type", self._unknownType)
386
def _connectToBus(self):
387
self.bus = self.pipeline.get_bus()
388
self.bus.add_signal_watch()
389
self.bus.connect("message::eos", self._busMessageEosCb)
390
self.bus.connect("message::error", self._busMessageErrorCb)
391
self.bus.connect("message::element", self._busMessageElementCb)
392
self.bus.connect("message::state-changed",
393
self._busMessageStateChangedCb)
397
Sets up a pipeline to analyze the given uri
399
self.current_uri = self.queue[0]
400
self.info("Analyzing %s", self.current_uri)
402
# check if file exists and is readable
403
if gst.uri_get_protocol(self.current_uri) == "file":
404
filename = gst.uri_get_location(self.current_uri)
406
if not os.access(filename, os.F_OK):
407
error = _("File does not exist")
408
elif not os.access(filename, os.R_OK):
409
error = _("File not readable by current user")
411
self.info("Error: %s", self.error)
413
self._finishAnalysis("File does not exist or is not readable by the current user")
416
# setup graph and start analyzing
417
self.pipeline = gst.Pipeline("Discoverer-%s" % self.current_uri)
419
# create the source element
420
source = self._createSource()
422
self._finishAnalysis("no source")
425
# create decodebin(2)
426
dbin = self._createDecodeBin()
428
self.pipeline.add(source, dbin)
430
self.info("analysis pipeline created")
432
# connect to bus messages
435
self.info("setting pipeline to PAUSED")
438
if self.pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_FAILURE:
440
self.error = _("Pipeline didn't want to go to PAUSED.")
441
self.info("Pipeline didn't want to go to PAUSED")
442
self._finishAnalysis("failure going to PAUSED")
446
self._scheduleTimeout()
448
# return False so we don't get called again
451
def _busMessageEosCb(self, unused_bus, message):
452
self.debug("got EOS")
454
self._finishAnalysis("EOS")
456
def _busMessageErrorCb(self, unused_bus, message):
457
gerror, detail = message.parse_error()
459
if self.error is not None:
460
# don't clobber existing errors
463
self.error = _("An internal error occurred while analyzing this file: %s") % gerror.message
464
self.error_detail = detail
466
self._finishAnalysis("ERROR")
468
def _busMessageElementCb(self, unused_bus, message):
469
self.debug("Element message %s", message.structure.to_string())
470
if message.structure.get_name() == "redirect":
471
self.warning("We don't implement redirections currently, ignoring file")
472
if self.error is None:
473
self.error = _("File contains a redirection to another clip.")
474
self.error_detail = _("PiTiVi currently does not handle redirection files.")
476
self._finishAnalysis("redirect")
479
if pbutils.is_missing_plugin_message(message):
480
self._busMessageMissingPlugins(message)
482
def _busMessageMissingPlugins(self, message):
483
self.missing_plugin_messages.append(message)
485
def _busMessageStateChangedCb(self, unused_bus, message):
486
if message.src != self.pipeline:
489
state_change = message.parse_state_changed()
490
self.log("%s:%s", message.src, state_change)
491
prev, new, pending = state_change
493
if prev == gst.STATE_READY and new == gst.STATE_PAUSED and \
494
pending == gst.STATE_VOID_PENDING:
495
have_video, have_audio, have_image = self._getCurrentStreamTypes()
496
if self.unfixed_pads or self.unknown_pads or have_video or have_image:
497
# go to PLAYING to generate the thumbnails
498
if self.pipeline.set_state(gst.STATE_PLAYING) == gst.STATE_CHANGE_FAILURE:
500
self.error = _("Pipeline didn't want to go to PLAYING.")
501
self.info("Pipeline didn't want to go to PLAYING")
502
self._finishAnalysis("failure going to PLAYING")
503
elif self.unfixed_pads == 0:
504
# check for unfixed_pads until elements are fixed to do
505
# negotiation before pushing in band data
506
self._finishAnalysis("got to PAUSED and no unfixed pads")
508
def _busMessageTagCb(self, unused_bus, message):
509
self.debug("Got tags %s", message.structure.to_string())
510
self.current_tags.append(message.parse_tag())
512
def _maybeQueryDuration(self, pad):
513
if self.current_duration == gst.CLOCK_TIME_NONE:
514
result = pad.query_duration(gst.FORMAT_TIME)
515
if result is not None:
516
duration, format = result
517
if format == gst.FORMAT_TIME:
518
self.current_duration = duration
520
def _gettempdir(self):
521
tmp = tempfile.gettempdir()
522
tmp = os.path.join(tmp, 'pitivi-%s' % os.getenv('USER'))
523
if not os.path.exists(tmp):
527
def _getThumbnailFilenameFromPad(self, pad):
528
md5sum = hashlib.md5()
529
md5sum.update(self.current_uri)
530
name = md5sum.hexdigest() + '.png'
531
filename = os.path.join(xdg_cache_home(), name)
534
def _videoPadSeekCb(self, pad):
536
duration = self.pipeline.query_duration(gst.FORMAT_TIME)[0]
537
except gst.QueryError:
540
self.debug("doing thumbnail seek at %s", gst.TIME_ARGS(duration))
543
self.pipeline.seek_simple(gst.FORMAT_TIME,
544
gst.SEEK_FLAG_FLUSH, duration / 3)
546
pad.set_blocked_async(False, self._videoPadBlockCb)
548
def _videoPadBlockCb(self, pad, blocked):
549
self.debug("video pad blocked: %s" % blocked)
551
gobject.timeout_add(0, self._videoPadSeekCb, pad)
553
def _addVideoBufferProbe(self, pad):
555
closure['probe_id'] = pad.add_buffer_probe(self._videoBufferProbeCb,
558
def _removeVideoBufferProbe(self, pad, closure):
559
pad.remove_buffer_probe(closure['probe_id'])
561
def _videoBufferProbeCb(self, pad, buf, closure):
562
self.log("video buffer probe for pad %s", pad)
563
self._removeVideoBufferProbe(pad, closure)
565
pad.set_blocked_async(True, self._videoPadBlockCb)
569
def _padEventProbeCb(self, pad, event):
570
self.log("got event %s from src %s on pad %s",
571
event.type, event.src, pad)
575
def _padBufferProbeCb(self, pad, buf):
576
self.debug("got buffer on pad %s", pad)
580
def _addPadProbes(self, pad):
581
pad.add_event_probe(self._padEventProbeCb)
582
pad.add_buffer_probe(self._padBufferProbeCb)
584
def _newVideoPadCb(self, pad):
585
""" a new video pad was found """
586
self.debug("pad %r", pad)
588
self._addPadProbes(pad)
590
thumbnail = self._getThumbnailFilenameFromPad(pad)
591
self.thumbnails[pad] = thumbnail
592
have_thumbnail = os.path.exists(thumbnail)
595
self.debug("we already have a thumbnail %s for %s", thumbnail, pad)
596
sink = gst.element_factory_make("fakesink")
597
# use this and not fakesink.props.num_buffers = 1 to avoid some
598
# not-expected errors when discovering pictures
600
self.dynamic_elements.extend([eossir, sink])
601
self.pipeline.add(eossir, sink)
602
eossir.set_state(gst.STATE_PLAYING)
603
sink.set_state(gst.STATE_PLAYING)
605
pad.link(eossir.get_pad("sink"))
610
stream = get_stream_for_pad(pad)
611
if isinstance(stream, VideoStream) and not stream.is_image:
612
self._addVideoBufferProbe(pad)
614
queue = gst.element_factory_make("queue")
615
queue.props.max_size_bytes = 5 * 1024 * 1024
616
queue.props.max_size_time = 5 * gst.SECOND
617
vscale = gst.element_factory_make("videoscale")
618
vscale.props.method = 0
619
csp = gst.element_factory_make("ffmpegcolorspace")
620
pngenc = gst.element_factory_make("pngenc")
621
pngenc.props.snapshot = True
622
pngsink = gst.element_factory_make("filesink")
623
pngsink.props.location = thumbnail
625
self.dynamic_elements.extend([queue, vscale, csp, pngenc, pngsink])
627
self.pipeline.add(queue, vscale, csp, pngenc, pngsink)
628
gst.element_link_many(queue, csp, vscale)
629
vscale.link(pngenc, gst.Caps("video/x-raw-rgb,width=[1,96],height=[1,96];video/x-raw-yuv,width=[1,96],height=[1,96]"))
630
gst.element_link_many(pngenc, pngsink)
631
pad.link(queue.get_pad("sink"))
633
for element in [queue, vscale, csp, pngenc, pngsink]:
634
element.sync_state_with_parent()
636
def _newPadCb(self, pad):
637
stream = get_stream_for_pad(pad)
638
if isinstance(stream, TextStream):
639
self.info("skipping subtitle pad")
642
self._addPadProbes(pad)
644
queue = gst.element_factory_make('queue')
645
fakesink = gst.element_factory_make('fakesink')
646
fakesink.props.num_buffers = 1
647
self.dynamic_elements.append(queue)
648
self.dynamic_elements.append(fakesink)
650
self.pipeline.add(queue, fakesink)
651
pad.link(queue.get_pad('sink'))
654
queue.sync_state_with_parent()
655
fakesink.sync_state_with_parent()
657
def _capsNotifyCb(self, pad, unused_property, ghost=None):
661
caps = pad.props.caps
662
self.debug("pad caps notify %s", caps)
663
if caps is None or not caps.is_fixed():
666
pad.disconnect_by_func(self._capsNotifyCb)
668
self.info("got fixed caps for pad %s", pad)
670
self.unfixed_pads -= 1
671
self.debug("unfixed pads %d", self.unfixed_pads)
672
stream = self._addStreamFromPad(ghost)
673
if isinstance(stream, VideoStream):
674
stream.thumbnail = self.thumbnails[ghost]
676
def _newDecodedPadCb(self, unused_element, pad, is_last):
677
self.info("pad:%s caps:%s is_last:%s", pad, pad.get_caps(), is_last)
679
caps_str = str(pad.get_caps())
680
if caps_str.startswith("video/x-raw"):
681
self._newVideoPadCb(pad)
685
# try to get the duration
686
self._maybeQueryDuration(pad)
688
caps = pad.props.caps
690
if caps is not None and caps.is_fixed():
691
self.debug("got fixed caps for pad %s", pad)
693
stream = self._addStreamFromPad(pad)
694
if isinstance(stream, VideoStream):
695
stream.thumbnail = self.thumbnails[pad]
697
# add the stream once the caps are fixed
698
if gst.version() < (0, 10, 21, 1) and \
699
isinstance(pad, gst.GhostPad):
700
# see #564863 for the version check
701
# the isinstance check is there so that we don't have to create
702
# ghost pads in the tests
703
pad.get_target().connect("notify::caps",
704
self._capsNotifyCb, pad)
706
pad.connect("notify::caps", self._capsNotifyCb)
707
self.unfixed_pads += 1
708
self.debug("unfixed pads %d", self.unfixed_pads)
710
def _unknownType(self, decodebin, pad, caps):
711
# decodebin2 sends ASYNC_DONE when it finds an unknown type so we have
712
# to deal with that...
713
self.unknown_pads += 1
715
def _addStreamFromPad(self, pad):
716
self._maybeQueryDuration(pad)
717
self.debug("adding stream from pad %s caps %s", pad, pad.props.caps)
718
stream = get_stream_for_pad(pad)
719
self.current_streams.append(stream)
723
if __name__ == '__main__':
726
discoverer = Discoverer()
727
discoverer.addUris(['file://%s' % i for i in sys.argv[1:]])
728
loop = gobject.MainLoop()