4
# Copyright (C) 2006 Mark Nauwelaerts <mnauw@users.sourceforge.net>
6
# This library is free software; you can redistribute it and/or
7
# modify it under the terms of the GNU Library General Public
8
# License as published by the Free Software Foundation; either
9
# version 2 of the License, or (at your option) any later version.
11
# This library is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14
# Library General Public License for more details.
16
# You should have received a copy of the GNU Library General Public
17
# License along with this library; if not, write to the
18
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
19
# Boston, MA 02110-1307, USA.
22
import sys, traceback, time, signal, os, stat, copy
23
import optparse, ConfigParser, re
26
gobject.threads_init()
31
import gst.extend.pygobject
33
# add_buffer_probe ignores user_data
35
# -- Auxiliary Functions
37
# prevent deprecation warnings while remaining backwards compatible
38
if not 'parse_bin_from_description' in dir(gst):
39
gst.parse_bin_from_description = gst.gst_parse_bin_from_description
41
# converts given value (typically Python string) to the given gtype (e.g. INT)
42
# (may throw exception if conversion fails)
43
def gobject_convert_value(value, gtype):
44
if gtype in (gobject.TYPE_INT, gobject.TYPE_UINT,
45
gobject.TYPE_LONG, gobject.TYPE_ULONG,
46
gobject.TYPE_INT64, gobject.TYPE_UINT64):
48
elif gtype == gobject.TYPE_BOOLEAN:
53
elif isinstance(value, str):
54
if value.lower() in ['f', 'false', 'no', '0', 'off']:
58
elif gtype in (gobject.TYPE_DOUBLE, gobject.TYPE_FLOAT):
60
elif gtype == gobject.TYPE_STRING:
62
elif gtype == gst.Caps.__gtype__:
63
value = gst.caps_from_string(value)
64
# leave untouched for other cases, e.g. enums
67
# based on gst.extend.pygobject
68
def gobject_convert_property(object, property, value):
70
Convert the given value for the given property to the proper type
73
@type object: L{gobject.GObject}
74
@type property: string
75
@param value: value intended for the property
77
for pspec in gobject.list_properties(object):
78
if pspec.name == property:
82
"Property '%s' in element '%s' does not exist" % (
83
property, object.get_property('name')))
86
value = gobject_convert_value(value, pspec.value_type)
88
msg = "Invalid value given for property '%s' in element '%s'" % (
89
property, object.get_property('name'))
93
def gobject_set_property(object, property, value):
94
object.set_property(property, gobject_convert_property(object, property, value))
96
# string representation for a property-value, particularly enum
97
def prop_to_str(value):
98
if hasattr(value, 'value_name'):
99
return value.value_nick + ' - ' + value.value_name
103
def caps_to_short_str(caps):
108
if gobject.type_name(s.get_field_type(f)) in ['GstValueArray', 'GstBuffer']:
109
s[f] = str(s.get_field_type(f))
113
def element_is_src(element):
114
return not list(element.sink_pads())
116
def element_is_sink(element):
117
return not list(element.src_pads())
119
# auxiliary for below, normalize a (towards downstream ordered) list of pads:
120
# - remove proxy pads
121
# - ghostpad is replaced by ghostpad and its target (in the proper order)
122
def pad_list_normalize(padlist):
125
if gobject.type_name(pad) == 'GstProxyPad':
127
elif isinstance(pad, gst.GhostPad):
128
if pad.get_direction() == gst.PAD_SRC:
129
res.extend([pad, pad.get_target()])
131
res.extend([pad.get_target(), pad])
136
# returns list of pads (upstream or downstream) of given pad (including)
137
# - descends into bins
138
# - ignores proxy pads
139
# - also includes targets of ghostbin
140
# - list begins at pad (and so upstream or downstream)
141
# - at junctions, only 1 path is traced (upstream or downstream)
142
# - up to count pads (not counting pad) are traced
144
def pad_get_pred(pad, count = 100):
145
def pad_get_pred_rec(pad, count):
148
if pad.get_direction() == gst.PAD_SRC:
150
peer = pad.iterate_internal_links().next()
151
except (StopIteration, TypeError):
154
peer = pad.get_peer()
158
if gobject.type_name(pad) != 'GstProxyPad':
159
res = pad_get_pred_rec(peer, count - 1)
162
res = pad_get_pred_rec(peer, count)
164
return pad_list_normalize(pad_get_pred_rec(pad, count))
166
def pad_get_succ(pad, count = 100):
167
def pad_get_succ_rec(pad, count):
170
if pad.get_direction() == gst.PAD_SINK:
172
peer = pad.iterate_internal_links().next()
173
except (StopIteration, TypeError):
176
peer = pad.get_peer()
180
if gobject.type_name(pad) != 'GstProxyPad':
181
res = pad_get_succ_rec(peer, count - 1)
184
res = pad_get_succ_rec(peer, count)
187
return list(reversed(pad_list_normalize(reversed(pad_get_succ_rec(pad, count)))))
189
def pad_is_caps(pad, caps_fragment, klass_fragment):
190
pads = pad_get_pred(pad)
191
pads.extend(pad_get_succ(pad))
192
gst.debug('Checking %s against %s, %s' % (pad, caps_fragment, klass_fragment))
194
factory = p.get_parent_element().get_factory()
195
caps = p.get_negotiated_caps()
196
gst.log('Negotiated caps on ' + str(p) + ': ' + str(caps))
197
if not caps: # may not yet be negotiated
198
caps = p.get_pad_template_caps()
199
gst.log(str(p) + ': class ' + factory.get_klass() + ', caps ' + str(caps))
200
# note that some app types also look like video/...
201
if factory.get_klass().find(klass_fragment) >= 0 or \
202
re.search(klass_fragment.lower(), p.get_name()) or \
203
re.search(caps_fragment, str(caps)): # and caps[0].n_fields() > 0):
208
subcaps = [ 'text/plain', 'video/x-dvd-subpicture', 'application/x-ssa',
209
'application/x-ass', 'application/x-usf', 'application/x-subtitle-unknown']
210
return pad_is_caps(pad, '^(' + '|'.join(subcaps) + ')', 'nomatch')
212
def pad_is_video(pad):
213
return not pad_is_sub(pad) and pad_is_caps(pad, 'video/x-raw', 'Video')
215
def pad_is_audio(pad):
216
return pad_is_caps(pad, 'audio/x-raw', 'Audio')
218
def pad_is_current(pad):
219
pads = pad_get_pred(pad)
221
if p.get_name()[0:8] == 'current_':
225
def pad_stream_type(pad):
228
elif pad_is_video(pad):
230
elif pad_is_audio(pad):
235
# set of stream types of sink pads
236
def element_stream_types(element):
237
return set([pad_stream_type(pad) for pad in element.sink_pads()])
239
def element_is_sub(element):
240
return 'sub' in element_stream_types(element)
242
def element_is_video(element):
243
return 'video' in element_stream_types(element)
245
def element_is_audio(element):
246
return 'audio' in element_stream_types(element)
248
# returns immediate upstream neighbour(s)
249
def element_pred(element):
251
for x in element.sink_pads():
252
p = pad_get_pred(x, 1)
253
if len(p) > 1 and p[1].get_parent_element():
254
res.append(p[1].get_parent_element())
257
# returns immediate downstream neighbour(s)
258
def element_succ(element):
260
for x in element.src_pads():
261
p = pad_get_succ(x, 1)
262
if len(p) > 1 and p[1].get_parent_element():
263
res.append(p[1].get_parent_element())
267
return element.get_path_string() + ' [' + gobject.type_name(element) + ']'
270
# patterns - list of re (strings)
271
# element - GstObject
272
# prop - name of property
275
# <element factory name>.prop (if GstElement)
276
# <element name>.prop
277
# <element_path>.prop
278
# and checks if it matches any of the re's given
279
# (if prop is not given, .prop is omitted above)
280
def object_match_prop(object, patterns, prop = None):
286
if isinstance(object, gst.Element):
287
factory = object.get_factory()
288
name = factory.get_name()
289
checks.append(name + prop)
290
checks.append(object.get_name() + prop)
291
checks.append(object.get_path_string() + prop)
294
if re.match(pat, check):
299
# patterns - list of re (strings)
300
# message - a message (from element)
303
# element = sending element
304
# type = printable name of message type
305
# name = name of structure (if any)
306
# <element factory name>.type[.name] (if GstElement)
307
# <element name>.type[.name]
308
# <element_path>.type[.name]
309
# and checks if it matches any of the re's given
310
def message_match(message, patterns):
313
postfix = '.' + message.type.first_value_nick
314
if message.structure:
315
postfix += '.' + message.structure.get_name()
316
if isinstance(object, gst.Element):
317
factory = object.get_factory()
318
name = factory.get_name()
319
checks.append(name + postfix)
320
checks.append(object.get_name() + postfix)
321
checks.append(object.get_path_string() + postfix)
324
if re.match(pat, check):
328
# perform wildcard expansion on capsfilter in launch line
329
def expand_caps(launch):
330
mimetypes = ['video/x-raw-yuv', 'video/x-raw-rgb',
331
'audio/x-raw-int', 'audio/x-raw-float']
332
expr = re.compile(r'^(.*)!.*?((?:(?:video)|(?:audio))/.*?[*].*?),(.*?)!(.*)$')
333
m = expr.search(launch)
341
if re.search(mime.replace('*', '.*'), m):
342
caps.append(m + ',' + props)
344
raise Exception, 'Failed to expand mimetype ' + mime
345
launch = " ! ".join([prefix, " ; ".join(caps), postfix])
346
m = expr.search(launch)
347
gst.debug("Returning launch line " + launch)
350
# format given time to something user friendly
351
# pretty much inspired by other code ...
352
# time - in gst units
353
# msecond - number of milli-second digits (may be 0)
354
def get_time_as_str(time, msecond = 3):
358
for div, sep, mod, pad in ((gst.SECOND*3600, '', 0, 0),
359
(gst.SECOND*60, ':', 60, 2),
360
(gst.SECOND, ':', 60, 2),
361
(gst.MSECOND, '.', 1000, msecond)):
366
ret += sep + ('%%0%dd' % pad) % n
369
# parse time given in H:M:S.MS format
370
# - leading parts are optional, so is MS
371
# - MS can have up to gst unit precision
372
# time - string to parse
373
# raises exception if format error
374
def parse_time_from_str(time):
387
for v, w in zip(l, (gst.SECOND//(10**len(l[0])), gst.SECOND,
388
gst.SECOND*60, gst.SECOND*3600)):
393
# something was not valid
394
raise ValueError, "Invalid time format in " + time
396
# returns all elements in the bin (recursively)
397
# (topologically) sorted from src to sink
398
def bin_sorted_recursive(bin, include_bins = False):
400
tmp = list(bin.sorted())
402
# TODO bin.sorted may return duplicates (in case of multiple sinks)
405
if element in have_seen:
407
have_seen.append(element)
408
if isinstance(element, gst.Bin):
409
result.extend(bin_sorted_recursive(element, include_bins))
411
result.append(element)
413
result.insert(0, bin)
416
def clone_element(element, name = 0):
417
return element.get_factory().create('temp' + str(name))
423
# configuration class, responsible for
425
# - option/configuration store
430
# non-option leftover arguments
432
self.optparser = None
433
# also see below for configuration attributes
437
# properties: 3-level dict: (element, (prop, (time, val)))
438
# time is -1 if it is not a controlled property setting
440
# keep refs to controllers
441
self.controllers = []
443
usage = 'usage: %prog [options] -- [--raw PIPELINE] ' + \
444
'[--video PIPELINE] [--audio PIPELINE] [--other PIPELINE]'
445
version = '%prog 0.10.2'
446
optparser = optparse.OptionParser(usage=usage, version=version)
447
self.optparser = optparser
448
optparser.add_option('-i', '--inputfile', help='Input file or URI')
449
optparser.add_option('-o', '--outputfile', help='Output file')
450
optparser.add_option('--decoder', default='decodebin',
451
help='Decoder element')
452
optparser.add_option('--muxer',
453
help='Muxer element; overrides default choice from -o')
454
optparser.add_option('--vn', action='append', metavar='#no,#no,...',
455
help='Video stream numbers')
456
optparser.add_option('--an', action='append', metavar='#no,#no,...',
457
help='Audio stream numbers')
458
optparser.add_option('--on', action='append', metavar='#no,#no,...',
459
help='Other stream numbers')
460
optparser.add_option('--sync-link', action='store_true',
461
help='Synchronous linking [false]')
462
optparser.add_option('--no-sync-link', dest='sync-link', action='store_false')
463
optparser.add_option('--at', action='append', metavar='tag,...',
464
help='Audio stream language tag pattern')
465
optparser.add_option('--stamp', action='store_true',
466
help='Re-sequence timestamps [true]')
467
optparser.add_option('--no-stamp', dest='stamp', action='store_false')
468
optparser.add_option('-c', '--cut', action='append', metavar='T1-T2,T3-T4,...',
469
help='Only process this part of the stream')
470
optparser.add_option('-s', '--section', type='choice', metavar='METHOD',
471
choices=['seek', 'seek-key', 'cut', 'cut-time'], default='seek',
472
help='Section selection mode [%default]')
473
optparser.add_option('-a', '--accurate', action='store_true',
474
help='Use sample accurate cutting (if not segment/seeking)')
475
optparser.add_option('--no-accurate', dest='accurate', action='store_false')
476
optparser.add_option('--dam', action='store_true',
477
help='Dams are used (in raw pipeline)')
478
optparser.add_option('--no-dam', dest='dam', action='store_false')
479
optparser.add_option('-f', '--framerate', metavar='NUM/DENOM', default='25/1',
480
help='Fallback framerate if none automagically discovered [%default]')
481
optparser.add_option('-b', '--block-overrun', action='store_true',
482
help='Prevent queue size adjustments')
483
optparser.add_option('--no-block-overrun', dest='block_overrun', action='store_false')
484
optparser.add_option('--set-prop', action='append', metavar='ELEMENT:PROP:VALUE',
485
help='Set property PROP')
486
optparser.add_option('--vb', type='int', metavar='kbitrate', default='0',
487
help='Target video bitrate')
488
optparser.add_option('--ab', type='int', metavar='kbitrate', default='0',
489
help='Target audio bitrate')
490
optparser.add_option('--vq', type='int', metavar='quantizer/quality', default='0',
491
help='Constant video quantizer or quality')
492
optparser.add_option('--aq', metavar='quality', default='0',
493
help='Audio encoding quality')
494
optparser.add_option('--pass', type='int', metavar='0|1|2', default='0',
495
help='Pass 1/2 of 2-pass encoding')
496
optparser.add_option('-t', '--tag', action='append', metavar='TAG:VALUE',
498
optparser.add_option('-d', '--delay', default=2, type='int', metavar='SECONDS',
499
help='Delay between progress updates [%default]')
500
optparser.add_option('--timeout', default=4, type='int', metavar='SECONDS',
501
help='Timeout between successive stages [%default]')
502
optparser.add_option('--progress-fps', action='store_true',
503
help='Also provide proc speed in fps')
504
optparser.add_option('--no-progress-fps', dest='progress_fps', action='store_false')
505
optparser.add_option('--progress-real', action='store_true',
506
help='Calculate speed based on real-time, not cpu time')
507
optparser.add_option('--no-progress-real', dest='progress_real', action='store_false')
508
optparser.add_option('-m', '--messages', action='store_true',
509
help='Output messages')
510
optparser.add_option('--no-messages', dest='messages', action='store_false')
511
optparser.add_option('--display-msg', action='append', metavar='MSGPATTERN,...',
512
help='Only inform about matching messages')
513
optparser.add_option('--ignore-msg', action='append', metavar='MSGPATTERN,...',
514
help='Ignore matching messages' )
515
optparser.add_option('-v', '--verbose', action='store_true',
516
help='Output property notifications')
517
optparser.add_option('--no-verbose', dest='verbose', action='store_false')
518
optparser.add_option('--short-caps', action='store_true',
519
help='Output short versions of caps, e.g. buffer dumps')
520
optparser.add_option('--no-short-caps', action='store_false')
521
optparser.add_option('-x', '--exclude', action='append', metavar='PROPPATTERN,...',
522
help='Do not output status information of matching properties')
523
optparser.add_option('--include', action='append', metavar='PROPPATTERN,...',
524
help='Ignore status information of matching properties')
525
optparser.add_option('--display-prop', action='append', metavar='PROPPATTERN,...',
526
help='Provide info on matching properties')
527
optparser.add_option('--ignore-prop', action='append', metavar='PROPPATTERN,...',
528
help='Ignore matching properties' )
529
optparser.add_option('--config', help='Configuration file',
530
metavar='CONFIGFILE')
531
optparser.add_option('--profile', metavar='PROFILE',
532
help='Use settings from profile file')
533
optparser.add_option('--save', action='append', metavar='MESSAGE:FILE:APPEND,...',
534
help='Save messages')
536
(self.options, self.args) = optparser.parse_args()
538
# get configuration from file
539
if self.options.config:
540
file = [self.options.config]
542
file = ['.gst-entrans', os.path.expanduser('~/.gst-entrans')]
543
if self.options.profile:
544
file.extend([self.options.profile,
545
os.path.expanduser(os.path.join('~', self.options.profile))])
546
self.parser = ConfigParser.SafeConfigParser()
547
self.parser.read(file)
549
# initialize config store
550
self.__dict__.update({ 'sections': [], # list of (start: , end: )
551
'tag': { }, # dict of (tag, value)
552
'save': { }, # dict of (message, (file: , append: ))
553
'exclude': [], 'include': [], # list of regexps
554
'ignore_prop': [], 'display_prop': [], # list of regexps
555
'ignore_msg': [], 'display_msg': [], # list of regexps
556
'at': [], # list of regexps
557
'an': [], 'vn': [], 'on': [], # list of numbers
561
# convert properties to internal dict storage
562
for section in self.parser.sections():
563
if section != 'options':
564
self.props[section] = { }
565
for option in self.parser.options(section):
566
self.props[section][option] = { -1: self.parser.get(section, option) }
568
# -- tiny util for opt processing
569
# returns a list of which the items no longer contain separator sep
570
def flatten(opt, sep = ','):
571
if isinstance(opt, str):
573
elif isinstance(opt, list):
575
else: # should not be
580
nv.extend(t.split(','))
586
if self.parser.has_section('options'):
587
sources.append(self.parser.items('options'))
588
sources.append(self.options.__dict__.iteritems())
591
k = k.replace('-', '_')
592
if not k in self.options.__dict__:
593
optparser.error('Invalid option ' + k + ' in configuration file.')
595
if k in ['inputfile', 'outputfile', 'muxer', 'decoder']:
597
elif k in ['verbose', 'messages', 'block_overrun', 'dam', 'accurate',
598
'progress_real', 'progress_fps', 'short_caps', 'stamp',
601
# if not mentioned at all anywhere, must be False
603
if not self.__dict__.has_key(k):
604
if k == 'stamp': # exception: default True
610
if v == True or v == False:
612
elif v.upper() in ['1', 'ON', 'TRUE', 'YES']:
614
elif v.upper() in ['0', 'OFF', 'FALSE', 'NO']:
617
optparser.error('Invalid value ' + v + ' for ' + k)
620
self.vkbitrate = int(v)
622
self.akbitrate = int(v)
624
self.vquantizer = int(v)
626
self.aquantizer = float(v)
631
optparser.error('Invalid value ' + v + ' for ' + k)
633
self.progress = int(v) * 1000
635
self.timeout = int(v) * 1000
636
elif k in ['vn', 'an', 'on']:
641
optparser.error('Invalid value ' + t + ' for ' + k)
642
self.__dict__[k].extend([int(t) for t in v])
643
elif k == 'framerate':
653
if num.isdigit() and denom.isdigit():
654
self.fps = gst.Fraction(int(num), int(denom))
656
optparser.error('Invalid framerate in ' + v)
657
elif k == 'save' and v:
660
data = message.split(':')
661
if len(data) < 2 or len(data) > 3:
662
optparser.error('invalid spec for -s')
663
# check that we can create the file and empty it now for later use
664
filename = data[1].replace('${n}', 'dummy')
666
f = open(filename, 'w')
668
optparser.error('could not create ' + filename + ' for writing')
670
os.remove(filename) # clean up test
671
self.save[data[0]] = { 'file': data[1] }
673
self.save[data[0]]['append'] \
674
= data[2].lower() in ['1','t', 'true', 'yes']
676
self.save[data[0]]['append'] = False
677
elif k == 'tag' and v:
678
for message in flatten(v, None):
679
data = message.split(':')
681
optparser.error('Invalid spec for -t')
682
if not gst.tag_exists(data[0]):
683
optparser.error('Invalid tag ' + data[0])
684
self.tag[data[0]] = data[1]
685
elif k in ['include', 'exclude', 'display_prop', 'ignore_prop',
686
'display_msg', 'ignore_msg'] and v:
687
self.__dict__[k].extend(flatten(v))
688
for exp in self.__dict__[k]:
692
optparser.error('Invalid regexp ' + exp + ' for ' + k)
693
elif k == 'set_prop' and v:
694
for d in flatten(v, None):
697
optparser.error('Invalid spec for --set-prop')
700
time = parse_time_from_str(':'.join(data[3:]))
702
optparser.error('Invalid spec for %s: %s' % (k, str(e)))
707
if not section in self.props:
708
self.props[section] = { }
709
if not opt in self.props[section]:
710
self.props[section][opt] = { }
711
self.props[section][opt][time] = data[2]
712
elif k == 'at' and v:
718
optparser.error('Invalid regexp ' + exp + ' for ' + k)
722
self.seek = NonLinPipeline.SECTIONS_SEEK
723
elif v == 'seek-key':
724
self.seek = NonLinPipeline.SECTIONS_SEEK_KEY
726
self.seek = NonLinPipeline.SECTIONS_CUT
727
elif v == 'cut-time':
728
self.seek = NonLinPipeline.SECTIONS_CUT_TIME
729
elif k == 'cut' and v:
734
optparser.error('No section allowed after open-ended section.')
735
info = section.strip()
738
m = re.match('^(\(?[a-zA-Z]+\)?):(.*)$', info)
743
if (format[0] == '(' and format[-1] != ')') or \
744
(format[0] != '(' and format[-1] == ')'):
745
optparser.error('Invalid format specification in ' + section)
748
format = format[1:-1]
752
info = info.split('-')
753
if len(info) == 2 and info[0]:
754
for i, v in enumerate(info):
764
optparser.error('Invalid frame specification ' + v)
766
if v[0] in ['f', 'F']:
771
optparser.error('Invalid frame specification ' + v)
774
info[i] = parse_time_from_str(v)
776
optparser.error(str(e))
777
self.sections.append({ 'start': info[0], 'end': info[1],
778
'format': format, 'convert': convert })
780
optparser.error('Invalid specification in ' + section)
782
if self.vquantizer and self.vkbitrate:
783
optparser.error('Only one of --vb and --vq may be given')
784
if self.vquantizer and self.encpass:
785
optparser.error('Only one of --vb and --pass may be given')
786
if self.aquantizer and self.akbitrate:
787
optparser.error('Only one of --ab and --aq may be given')
788
if self.at and (self.vn or self.an or self.on):
789
optparser.error('--at cannot be used with --vn, --an or --on')
790
for x in self.sections:
791
if x['format'] and not x['convert'] and \
792
self.seek in [NonLinPipeline.SECTIONS_CUT,
793
NonLinPipeline.SECTIONS_CUT_TIME]:
794
optparser.error('Custom seek format not possible with given cut method')
796
def set_plugins(self):
797
for section in self.props:
798
if 'pf_rank' in self.props[section]:
799
factory = gst.element_factory_find(section)
801
rank = self.props[section]['pf_rank'][-1]
803
rank = max(0, min(int(gst.RANK_PRIMARY), int(rank)))
804
gst.debug('Setting rank of plugin %s to %d' % (factory.get_name(), rank))
805
factory.set_rank(rank)
807
def set(self, elements, subst = { }, force = False):
808
for element in elements:
809
gst.log('Setting properties on ' + element.get_name())
810
factory = element.get_factory()
811
# FIXME this should somehow be patched in a better way into core,
812
# for the conf benefit of al gst
813
# make a pristine copy so we can see if some property has already been set
814
newel = factory.create('temporary')
816
# most specific setting survives
817
for name in [factory.get_name(), element.get_name(), element.get_path_string()]:
818
if name in self.props:
819
props.update(self.props[name])
821
for prop, value in props.iteritems():
822
# last check, if value given in pipeline description, that one must win
823
# also disregard possible junk that is not really a property
824
# NOTE: this is expected to fail hard if user gives non-existing property
825
if prop[0:3] != 'pf_':
826
if prop not in [p.name for p in element.props]:
827
raise TypeError, 'no such property "%s" in element "%s"' \
828
% (prop, element.get_name())
829
if force or element.get_property(prop) == newel.get_property(prop):
830
for t, v in value.iteritems():
832
gst.debug('Setting %s.%s to %s' % (element, prop, v))
833
gobject_set_property(element, prop, v)
834
else: # controlled time-value list
835
if control.has_key(prop):
836
control[prop].append((v,t))
838
control[prop] = [(v,t)]
839
# handle controlled props
841
controller = gst.Controller(element, *control.keys())
843
controller.set_interpolation_mode(prop, gst.INTERPOLATE_NONE)
844
for v, t in control[prop]:
845
controller.set(prop, t, gobject_convert_property(element, prop, v))
846
# HACK would keep it quiet in the beginning, but not afterwards :-(
847
# element.set_property(prop, element.get_property(prop))
848
# HACK keep a ref around to the controller, otherwise it is GC
849
# and this seems to be the only ref to it ?? bug in core ?
850
self.controllers.append(controller)
851
# and check for custom stuff
852
self.set_special(element, subst)
854
# check for and set some special properties; bitrate, quantizer, ...
855
def set_special(self, element, subst):
856
gst.log('Setting special properties on ' + element.get_name())
857
factory = element.get_factory()
858
newel = factory.create('temporary')
859
if factory.get_klass().find('Audio') >= 0:
863
for pspec in gobject.list_properties(element):
864
if not pspec.flags & gobject.PARAM_WRITABLE:
866
if not pspec.flags & gobject.PARAM_READABLE:
869
if element.get_property(prop) != newel.get_property(prop):
871
# property might not be as we expect
873
if prop == 'bitrate':
874
value = element.get_property(prop)
875
# is it really in bit or rather in kbit ?
876
if value > 10000 or value < 0:
880
if self.vkbitrate and not audio:
881
element.set_property(prop, factor * self.vkbitrate)
882
if self.akbitrate and audio:
883
element.set_property(prop, factor * self.akbitrate)
885
# hopefully enum, try int as fall-back
887
if gobject.type_is_a (pspec.value_type, gobject.TYPE_ENUM):
888
gobject_set_property(element, prop, 'quant')
890
if gobject.type_is_a (pspec.value_type, gobject.TYPE_ENUM):
891
gobject_set_property(element, prop, 'pass' + str(self.encpass))
893
gobject.set_property(element, prop, self.encpass)
894
elif prop == 'quantizer' or prop == 'quality':
895
if self.vquantizer and not audio:
896
gobject_set_property(element, prop, self.vquantizer)
897
if self.aquantizer and audio:
898
gobject_set_property(element, prop, self.aquantizer)
900
# HACK: only on strings
901
elif repr(pspec).find('GParamString') >= 0 \
902
and pspec.flags & gobject.PARAM_WRITABLE \
903
and pspec.flags & gobject.PARAM_READABLE:
904
value = element.get_property(prop)
908
for s, v in subst.iteritems():
909
newvalue = newvalue.replace('${' + s + '}', v)
910
if value != newvalue:
911
gst.debug('Performed substitutions for prop ' + prop + ' on ' + to_str(element))
912
element.set_property(prop, newvalue)
914
gst.warning('Failed to set special property ' + prop)
915
# -- END Configuration
918
# Manages startup of and cutting in a non-linear pipeline.
919
class NonLinPipeline(gobject.GObject):
922
SECTIONS_CUT_TIME = 1
924
SECTIONS_SEEK_KEY = 3
927
# pads that are blocked will also block a thread when it perform a pad_alloc,
928
# and queue is *not* a thread boundary for such a call
929
# so a demux loop thread *might* be blocked by this, if it does not handle things right
930
# Right typically means it should send new-segments (e.g. from the seeking thread) in one loop,
931
# so that all parts can block before anything tries pad_alloc.
932
# Some elements send new-segment right after adding a pad (dvd-mpeg), which blocks a pad,
933
# so care must be taken when to consider all pads blocked.
935
# Possible alternative: use dam with cond-based blocking (replace pad blocking),
936
# not blocking on pad-alloc (on event?);
937
# should probably hold and queue events for later sending
938
# (can take care of new segments, tags and so sent *before* no-more-pads signalled)
939
# (see also note in TODO about no-more-pads)
941
# * updated: emitted (by client) when a new part has been added to (dynamic) pipeline
942
# (that should be inspected for dam-like candidates)
943
# * complete: emitted (by client) when pipeline has been fully constructed
944
# * blocked: emitted when a pad reaches blocked state
945
# * started: emitted when pipeline is fully constructed/connected
946
# *and* data flow is now unobstructed (after unblocking) on its way
947
# (note !! that buffers and caps may just not yet have reached dams
948
# when emitting or receiving this)
949
# (in seek-mode; when it reaches PAUSED; in cut-mode; when all pads have been unblocked)
950
# * playing: emitted when pipeline set to PLAYING
952
# Both are emitted only once (in either case), and the former prior to the latter.
953
__gsignals__ = {'updated': (gobject.SIGNAL_RUN_LAST, None, []),
954
'complete': (gobject.SIGNAL_RUN_LAST, None,[]),
955
'blocked': (gobject.SIGNAL_RUN_LAST, None, [gst.Pad]),
956
'started': (gobject.SIGNAL_RUN_LAST, None, []),
957
'playing': (gobject.SIGNAL_RUN_LAST, None, [])}
959
def __init__(self, pipeline, damming, complete, sections, seek,
960
precision = False, framerate = None):
961
self.__gobject_init__()
962
# -- PUBLIC -- only guaranteed valid after *started*
963
# the pipeline to manage, it should contain dam element to assist in this
964
self.pipeline = pipeline
965
# the sections allowed to get past the dams:
966
# list of dicts with keys: start, end, format, convert
967
# if no format given:
968
# pos value: time in gst sense
969
# neg value: frame number
970
# if (nick name of) format given:
971
# pos values for start and end that indicate range in that format
972
# if convert, then convert to time and seek in time, otherwise seek in format
973
self.sections = sections
974
# use precision slicing
975
self.precision = precision
976
# collects the dams that are found
977
# slightly ordered; video first, then audio
979
# the fps of the stream; will be searched for ...
981
# .. and use this one if all that fails
982
self.default_fps = framerate
983
# the method to use to only pass desired sections;
984
# one of the values above
986
# whether to also perform pad blocking if no sections to seek
987
self.do_block = False
990
# whether or not to use dams
991
self.use_dams = damming
992
# (dynamic) pipeline is completed
993
self.complete = complete
994
# we got NO_PREROLL upon changing state; probably live pipeline
995
self.no_preroll = False
998
# probes that are attached
1000
# main loop that runs during life of pipeline
1001
self.loop = gobject.MainLoop()
1002
# pads that should be blocked and have been blocked
1003
self.to_block_pads = []
1004
self.blocked_pads = []
1005
# dams that are about to see data
1007
# private 'mirror' of sections used for *seek*ing
1008
# (setup as a transformation later on)
1009
self.seek_sections = None
1010
# next_segment to seek to
1011
self.next_segment = 0
1012
# target element to use for seek, query, etc
1014
# -- default signal handlers
1015
self.connect('complete', self.cb_complete)
1016
self.connect('updated', self.cb_updated)
1017
self.connect('started', self.cb_started)
1019
if not self.sections:
1020
self.seek = self.SECTIONS_SEEK
1021
if not self.default_fps:
1022
self.default_fps = gst.Fraction(25, 1)
1024
def cb_complete(self, object):
1025
gst.log('Pipeline complete')
1026
if not self.complete:
1027
self.complete = True
1028
self.check_pads_blocked()
1030
def cb_updated(self, object):
1031
gst.log('Pipeline updated')
1032
if not self.use_dams:
1035
def cb_started(self, object):
1037
# make sure things are ok for clients
1041
def sort_dams(self):
1042
gst.debug("Sorting dams")
1043
# don't do this over and over again
1046
gst.debug("Actually sorting dams")
1048
# super-simplistic sort
1049
for dam in self.dams:
1050
if element_is_video(dam):
1052
for dam in self.dams:
1053
if dam not in result:
1058
# don't overwrite or search for it if already have fps
1061
# if more than one framerate out there, the one most upstream wins
1062
for element in bin_sorted_recursive(self.pipeline):
1063
for pad in element.pads():
1064
caps = pad.get_negotiated_caps()
1065
if caps and caps[0].has_key('framerate') and caps[0]['framerate'].num != 0:
1066
gst.debug('Found framerate on ' + str(pad) + ' in caps ' + str(caps))
1067
self.fps = caps[0]['framerate']
1070
gst.warning("No framerate has been found. Defaulting")
1071
self.fps = self.default_fps
1074
def convert_to_time(self, frame):
1075
if frame and frame < 0:
1076
return -frame * gst.SECOND * self.fps.denom / self.fps.num
1080
def seek_next_segment(self):
1081
if self.next_segment >= len(self.sections):
1083
start = self.seek_sections[self.next_segment]['start']
1084
end = self.seek_sections[self.next_segment]['end']
1085
format = self.seek_sections[self.next_segment]['format']
1086
self.next_segment = self.next_segment + 1
1089
format = gst.FORMAT_TIME
1092
if self.next_segment == 1:
1093
flags = gst.SEEK_FLAG_FLUSH
1094
if self.next_segment < len(self.sections):
1095
flags = flags | gst.SEEK_FLAG_SEGMENT
1096
if self.seek == self.SECTIONS_SEEK_KEY:
1097
flags = flags | gst.SEEK_FLAG_KEY_UNIT
1099
gst.info("Seeking from " + str(start) + " to " + str(end))
1101
endtype = gst.SEEK_TYPE_NONE
1104
endtype = gst.SEEK_TYPE_SET
1105
if self.target.seek(1.0, format, flags, gst.SEEK_TYPE_SET, start, endtype, end):
1106
gst.info("Seek succeeded!")
1108
gst.error("Seek failed!")
1110
def cut_next_segment(self):
1111
gst.log("Providing section information.")
1112
for dam in self.dams:
1113
gst.debug("Providing section information to " + str(dam))
1114
for section in self.sections:
1115
dam.set_property('begin-time', section['start'])
1116
end = section['end']
1118
end = gst.CLOCK_TIME_NONE
1119
dam.set_property('end-time', end)
1120
dam.set_property('save-section', True)
1122
def cb_on_pad_blocked_sync(self, pad, is_blocked):
1123
# cb can be called again; unblocking, after seeking, before unblocking
1128
gst.debug("Pad " + str(pad) + " has " + state + ".")
1129
if pad not in self.blocked_pads:
1130
self.blocked_pads.append(pad)
1131
self.check_pads_blocked()
1132
self.emit('blocked', pad)
1134
def cb_notify_start(self):
1135
self.emit('started')
1136
# remove from main loop
1139
def check_pads_blocked(self):
1140
# pads may be incrementally added (dynamic case)
1141
gst.log("Checking block_pads " + str(self.blocked_pads) + " == " + str(self.to_block_pads))
1142
if not (self.blocked_pads
1143
and len(self.blocked_pads) == len(self.to_block_pads) and self.complete):
1145
gst.info("All pads have blocked, scheduling seek")
1146
gobject.idle_add(self.handle_pads_blocked)
1148
def handle_pads_blocked(self):
1149
# should be some framerate info somewhere by now
1151
# put dams right order, so we focus on a video one if needed
1154
self.target = self.dams[0]
1156
self.target = self.pipeline
1157
# normalize sections
1158
self.seek_sections = []
1159
for s in self.sections:
1161
format = ss['format'] = s['format']
1163
format = gst.format_get_by_nick(format)
1165
gst.error("Failed to determine custom format; ignoring section!")
1166
s['start'] = s['end'] = 0
1168
ss['start'], ss['end'] = s['start'], s['end']
1169
ss['format'] = format
1170
s['start'] = s['end'] = None
1173
qformat, s['start'] = \
1174
self.target.query_convert(format, ss['start'], gst.FORMAT_TIME)
1176
qformat, s['end'] = \
1177
self.target.query_convert(format, ss['end'], gst.FORMAT_TIME)
1178
if (ss['start'] and not s['start']) or (ss['end'] and not s['start']):
1181
gst.error("Failed to convert custom position; no time info!")
1182
# FIXME (above as well) cutting dam might not like this ...
1183
s['start'] = s['end'] = 0
1186
ss['start'], ss['end'] = s['start'], s['end']
1188
s['start'] = self.convert_to_time(s['start'])
1189
s['end'] = self.convert_to_time(s['end'])
1190
ss['start'], ss['end'] = s['start'], s['end']
1191
self.seek_sections.append(ss)
1192
# hm, only now we can fully check whether given sections were valid
1195
for section in self.sections:
1196
if section['end'] and section['start'] > section['end']:
1198
if section['start'] < last \
1199
and self.seek in [self.SECTIONS_CUT, self.SECTIONS_CUT_TIME]:
1201
last = section['end']
1203
# FIXME brute way out
1204
raise Exception('Invalid section specification: impossible order')
1205
# get data flow going
1206
if self.seek in [self.SECTIONS_CUT, self.SECTIONS_CUT_TIME]:
1207
self.cut_next_segment()
1208
self.cb_notify_start()
1210
self.seek_next_segment()
1212
gst.info('Unblocking pads')
1213
for pad in self.blocked_pads:
1214
pad.set_blocked_async(False, lambda *x: None)
1215
# remove this from main loop
1216
gst.debug('Unblocked pads')
1219
def cb_linked(self, pad, peer):
1220
gst.debug("Pad " + str(pad) + " now linked to " + str(peer))
1221
self.setup_pad_block(pad.get_parent_element())
1224
def cb_unlinked(self, pad, peer):
1225
if peer in self.pads:
1226
self.pads.remove(peer)
1228
peer.set_blocked_async(False, lambda *x: None)
1230
def setup_pad_block(self, dam):
1231
# only do blocking if there are any sections
1232
if self.sections or self.do_block:
1233
pad = dam.sink_pads().next()
1234
peer = pad.get_peer()
1236
# need to mark this here to keep proper count of what has to block
1237
self.to_block_pads.append(peer)
1238
peer.set_blocked_async(True, self.cb_on_pad_blocked_sync)
1239
gst.debug("Pad " + str(peer) + " setup for blocking.")
1241
pad.connect('linked', self.cb_linked)
1243
def find_dams(self):
1244
gst.log('Looking for dams')
1245
for element in self.pipeline.recurse():
1246
if re.match('dam\d*', element.get_name()):
1247
if element not in self.dams:
1248
gst.debug("Using " + to_str(element) + " as dam")
1249
self.dams.append(element)
1250
self.setup_pad_block(element)
1252
def cb_message(self, bus, message):
1253
# if pipeline goes to paused state, set it to playing
1254
if message.type == gst.MESSAGE_STATE_CHANGED and message.src == self.pipeline:
1255
gst.debug("Checking state change")
1256
oldstate, newstate, pending = gst.Message.parse_state_changed(message)
1257
if newstate == gst.STATE_PAUSED and oldstate == gst.STATE_READY:
1258
gst.info("Setting pipeline to PLAYING ...")
1259
res = self.pipeline.set_state(gst.STATE_PLAYING)
1260
if res == gst.STATE_CHANGE_FAILURE:
1261
sys.stderr.write("ERROR: pipeline does not want to play\n")
1263
if not self.started:
1264
if self.seek in [self.SECTIONS_CUT, self.SECTIONS_CUT_TIME]:
1265
message = 'Unexpected state for requested mode; strange things may happen ...'
1268
gst.info('... but probably OK for a live pipeline')
1271
self.emit('started')
1272
# HACK !! now we go and find queues to set min level threshold
1273
# in order to allow for graceful EOS termination when not using dams
1274
# (see explanation elsewhere)
1275
# Note this may lead to a race condition and blocking
1276
# in case of a very short pipeline
1277
# (since queue hacks these to 0 when it receives eos ...)
1278
# Hm, also need to make sure decodebin has not put max-size too low
1280
# Even when using dams, it must be assured that a thread passes
1281
# by a dam, so that this one can send his EOS
1282
# (keeping a min threshold could help with this)
1283
gst.debug("HACK: fixing up queues")
1284
for element in self.pipeline.recurse():
1285
if gobject.type_name(element) in ['GstQueue', 'GstQueue2', 'GstMultiQueue']:
1286
if not self.use_dams and gobject.type_name(element) == 'GstQueue':
1287
# size linked to "collectpads balancing"
1288
element.set_property('min-threshold-buffers', 10)
1290
# disable to avoid short pipeline races
1291
# and just assume there's enough activity,
1292
# so that a thread will still pass by each dam
1293
# (to avoid problem indicated above)
1294
#element.set_property('min-threshold-buffers', 2)
1296
if element.get_property('max-size-bytes') < 50*(2**20):
1297
element.set_property('max-size-bytes', 50*2**20)
1298
# queues don't need to buffer wildly
1299
if element_is_video(element):
1300
element.set_property('max-size-buffers', 50)
1301
# a small amount of audio may consist of
1302
# lots of small (compressed) fragments
1304
element.set_property('max-size-buffers', 500)
1305
# now we have a good state to show
1306
self.emit('playing')
1307
elif message.type == gst.MESSAGE_SEGMENT_DONE:
1309
self.seek_next_segment()
1312
def cb_sync_message(self, bus, message):
1313
if message.structure.has_name('dam'):
1314
if message.structure:
1315
struct = message.structure.to_string()
1318
gst.debug("Handled Sync Message from element " + message.src.get_name() + "(" \
1319
+ message.type.first_value_nick + "): " + str(struct))
1320
if message.structure.has_field('announce'):
1321
# inform of proc mode
1322
message.src.set_property('segment-mode',
1323
self.seek not in [self.SECTIONS_CUT, self.SECTIONS_CUT_TIME])
1324
message.src.set_property('use-count', self.seek == self.SECTIONS_CUT)
1325
message.src.set_property('precision', self.precision)
1327
self.dams.append(message.src)
1328
# and start the machinery
1329
self.setup_pad_block(message.src)
1333
bus = self.pipeline.get_bus()
1334
bus.enable_sync_message_emission()
1336
# to go from PAUSED to PLAYING
1337
bus.add_signal_watch()
1338
bus.connect('message', self.cb_message)
1341
# this will set it all in motions when dams are discovered
1342
bus.connect("sync-message::element", self.cb_sync_message)
1346
# FIXME post these messages on the bus ?
1347
gst.info("Setting pipeline to PAUSED ...")
1348
res = self.pipeline.set_state(gst.STATE_PAUSED);
1350
# FIXME error not recognized as GError, maybe subclass from **gst.GError**!!??
1351
#error = gst.GError(gst.STREAM_ERROR, gst.STREAM_ERROR_FAILED, "Pipeline can't PREROLL")
1352
#bus.post(gst.message_new_error(self.pipeline, error, "Pipeline can't PREROLL"))
1354
if res == gst.STATE_CHANGE_FAILURE:
1355
gst.error("Pipeline doesn't want to pause")
1357
elif res == gst.STATE_CHANGE_NO_PREROLL:
1358
gst.info("Pipeline is live and does not need PREROLL ...")
1359
self.no_preroll = True
1361
elif res == gst.STATE_CHANGE_ASYNC:
1362
gst.info("Pipeline is PREROLLING ...")
1364
# finishing up state change happens in other threads *and* main loop
1368
# It is possible that the main demux thread is 'caught' here by
1369
# pad-alloc. Another thread may have the stream lock, or may be
1370
# temporarily held waiting in collectpads.
1371
# To make it through all this waiting and have a chance to actually
1372
# send the EOS, the feeds into collectpads need to be able to keep going
1373
# without the demuxer for a while.
1374
# As such, the queues should always have a certain level of buffers available
1375
# (e.g. using min-threshold-buffers.
1376
# Note that this is in effect implicitly the case for a live recording pipeline,
1377
# which is typically driven by various src pushing elements.
1378
def cb_do_eos(self, pad, is_blocked, dam):
1379
if dam not in self.blocked_pads:
1380
self.blocked_pads.append(dam)
1381
gst.debug("Sending EOS from " + to_str(dam))
1382
dam.send_event(gst.event_new_eos())
1384
gst.debug("Already sent eos from " + to_str(dam))
1386
def stop(self, force):
1387
# live pipeline may hang for obscure reason:
1388
# state changing thread fails to acquire LIVE_LOCK from v4l2src,
1389
# even though it is being released by the (pushsrc) thread ??
1390
if force and not self.no_preroll:
1391
gst.debug("Setting pipeline to NULL ...")
1392
self.pipeline.set_state(gst.STATE_NULL)
1393
self.pipeline.get_state(timeout=0)
1396
for dam in self.dams:
1397
gst.debug("Setting eos on " + to_str(dam))
1398
dam.set_property('force-eos', True)
1400
self.blocked_pads = []
1401
for dam in self.dams:
1402
peer = dam.sink_pads().next().get_peer()
1403
gst.debug("Blocking on pred of " + to_str(dam) + ", " + str(peer))
1404
peer.set_blocked_async(True, self.cb_do_eos, dam)
1405
# now we force queues to discharge and keep things going
1406
gst.debug("HACK: discharging queues")
1407
for element in self.pipeline.recurse():
1408
if gobject.type_name(element) == 'GstQueue':
1409
element.set_property('min-threshold-buffers', 0)
1410
# -- END NonLinPipeline
1414
# a bit of ugly code in here, but it is at least contained ...
1417
# duration according to external sources (= pipeline) is provided
1418
def __init__(self, sections, duration):
1420
# NONE as python None
1421
# times in gst clocktime (nanosec)
1422
self.duration = None
1424
self.position = None
1425
# info on proc or drop
1426
# dict with keys: type, clock, pos, initial, speed, cpu, cpuspeed
1429
# info on current section
1433
# total duration by external sources
1434
if not duration or duration == gst.CLOCK_TIME_NONE or duration < 0:
1436
self.total = duration
1438
self.sections = copy.deepcopy(sections)
1439
# info from previous run
1442
# may have open-ended section
1443
for i in range(len(self.sections)):
1444
if not self.sections[i]['end']:
1445
self.sections[i]['end'] = duration
1447
# duration determined by sections, if any
1450
self.duration = sum([x['end'] - x['start'] for x in self.sections])
1451
except: # may be None in there somewhere
1452
self.duration = None
1454
self.duration = self.total
1455
# most recent clocktime
1456
self.last['clock'] = time.time() * gst.SECOND
1457
self.last['cpu'] = os.times()[0] * gst.SECOND
1458
# most recently processed section index
1459
self.last['section'] = -1
1460
self.proc = { 'type': 'proc', 'clock': 0, 'pos': 0, 'initial': 0, 'cpu': 0 }
1461
self.drop = { 'type': 'drop', 'clock': 0, 'pos': 0, 'initial': 0, 'cpu': 0 }
1463
# returns (type, amount of stream time in this type, total amount in current section)
1464
# type can be 'proc' or 'drop'
1465
# current section can also refer to a skipped section
1466
# amounts can be None if unknown
1468
# this is a bit convoluted but should also work if sections are not
1469
# in input time order
1470
def get_proc_info(self, position):
1471
sections = self.sections
1474
return ('proc', position, position)
1477
for i, section in enumerate(sections):
1478
if position >= section['start'] \
1479
and (position <= section['end'] or not section['end']):
1480
total = total + position - section['start']
1483
sec = section['start'] - section['end']
1486
self.last['section'] = i
1489
total = total + section['end'] - section['start']
1491
i = self.last['section']
1492
self.proc['pos'] = 0
1493
for j, section in enumerate(sections):
1495
self.proc['pos'] = self.proc['pos'] + section['end'] - section['start']
1497
for i in range(len(sections)):
1498
if position < sections[i]['start']:
1500
start = sections[i-1]['end']
1503
total = total + position - start
1504
sec = sections[i]['start'] - start
1506
total = total + sections[i]['start'] - sections[i]['end']
1507
return (proctype, total, sec)
1509
def update(self, position):
1510
self.position = position
1511
# -- normal operation
1512
now = time.time() * gst.SECOND
1513
nowcpu = os.times()[0] * gst.SECOND
1514
clocktime = (now - self.last['clock'])
1515
cputime = (nowcpu - self.last['cpu'])
1516
self.last['clock'] = now
1517
self.last['cpu'] = nowcpu
1518
proctype, total, current = self.get_proc_info(position)
1519
if proctype == 'proc':
1520
self.current = self.proc
1521
toupdate = [self.proc]
1523
self.current = self.drop
1524
# processed section must be updated in any event; to advance MT counter
1525
toupdate = [self.drop, self.proc]
1526
self.current['clock'] = self.current['clock'] + clocktime
1527
self.current['cpu'] = self.current['cpu'] + cputime
1528
self.current['pos'] = total
1529
# take into account (for speed) that first clock measurement
1530
# may not have happended at 0 MT position
1531
if not self.current['initial']:
1532
self.current['initial'] = total
1533
self.current['clock'] = 0
1534
self.current['cpu'] = 0
1535
for update in toupdate:
1537
update['speed'] = float(update['pos'] - update['initial']) / update['clock']
1539
update['speed'] = None
1541
update['cpuspeed'] = float(update['pos'] - update['initial']) / update['cpu']
1543
update['cpuspeed'] = None
1544
# need MT in any event
1545
self.movietime = self.proc['pos']
1549
# provides runtime and progress info with a variety of callbacks and/or querying
1552
def __init__(self, entrans, nonlin):
1554
# nonlin we are monitoring
1555
self.nonlin = nonlin
1556
# configuration store
1557
self.config = entrans.config
1558
# settings can be obtained here
1559
self.entrans = entrans
1560
# did we get a signal (request) to stop
1561
self.got_signal = False
1562
# element used to query in the pipeline
1565
self.duration = None
1566
# info on last snapshot
1568
# info on progress so far
1569
self.progress = None
1570
# info on pipeline gathered by walk
1572
# tag info gathered from message
1574
# output needs to provide a progress header
1575
self.print_progress = False
1577
bus = nonlin.pipeline.get_bus()
1578
bus.add_signal_watch()
1579
bus.connect('message', self.cb_message)
1581
self.nonlin.connect('started', self.cb_started)
1582
self.nonlin.connect('playing', self.cb_playing)
1584
if self.config.verbose:
1585
self.nonlin.pipeline.connect("deep-notify", self.cb_deep_notify)
1587
def cb_deep_notify(self, gstobject, object, property):
1588
name = property.name
1589
if object_match_prop(object, self.config.include, name) \
1590
or not object_match_prop(object, self.config.exclude, name):
1591
if property.flags & gobject.PARAM_READABLE:
1592
prop = object.get_property(name)
1594
prop = '[not readable]'
1595
print "Notify from ", to_str(object), ': ', name, ' = ', prop
1596
# make output as synchronous as possible
1599
def cb_message(self, bus, message):
1600
if message.structure:
1601
name = message.structure.get_name()
1602
struct = message.structure.to_string()
1604
struct, name = '', ''
1605
if self.config.messages \
1606
and (message_match(message, self.config.display_msg) \
1607
or not message_match(message, self.config.ignore_msg)):
1608
print "Got Message from element %s (%s): %s" % \
1609
(message.src.get_name (), message.type.first_value_nick, struct)
1610
if message.type == gst.MESSAGE_TAG \
1611
and (not object_match_prop(message.src, self.config.ignore_prop, 'tag') \
1612
or object_match_prop(message.src, self.config.display_prop, 'tag')):
1613
if not self.tags.has_key(message.src):
1614
self.tags[message.src] = []
1615
self.tags[message.src].append(message.structure)
1617
if message.type == gst.MESSAGE_ELEMENT:
1618
if struct and name in self.config.save:
1620
filename = self.config.save[name]['file'].replace('${n}', message.src.get_name())
1621
f = open(filename, 'a')
1622
if not self.config.save[name]['append']:
1624
f.write(str(struct) + "\n")
1626
except IOError, (errno, strerror):
1627
gst.error('Failed to save message ' + name + ' to ' + filename + ': ' + strerror)
1629
del self.config.save[name]
1630
elif message.type == gst.MESSAGE_EOS:
1631
# avoid overwriting the (very likely present) status line
1633
print "Got EOS from element ", to_str(message.src)
1634
self.nonlin.stop(True)
1636
elif message.type == gst.MESSAGE_WARNING:
1637
error, debug = message.parse_warning()
1638
print "WARNING: from element " + to_str(message.src) + ":", error.message
1640
print "Additional debug info:"
1642
elif message.type == gst.MESSAGE_ERROR:
1643
error, debug = message.parse_error()
1644
message.src.default_error(error, debug)
1645
self.nonlin.stop(True)
1649
def cb_status(self):
1650
# do we need a header for good looks ?
1651
if self.print_progress and self.queryel:
1652
print "<<<< PROGRESS >>>>"
1653
if self.config.progress_real:
1654
timename = "realtime"
1656
timename = "cputime"
1657
print "[position] MT / Total (MT/%s = proc speed) ETA (queued buffers) bitrate" % (timename)
1658
self.print_progress = False
1659
# get position, duration info
1661
pos, format = self.queryel.query_position(gst.FORMAT_TIME)
1663
pos = gst.CLOCK_TIME_NONE
1664
print "Failed to obtain progress info."
1666
# update progress info
1667
self.progress.update(pos)
1668
# calculate eta based on proc speed
1669
if self.progress.duration and self.progress.proc['cpuspeed']:
1670
duration = self.progress.duration
1671
speed2 = self.progress.proc['speed']
1672
eta = (self.progress.duration - self.progress.movietime) / \
1677
# display current speed
1678
if self.progress.current['cpuspeed']:
1679
speed = self.progress.current['cpuspeed']
1680
speed2 = self.progress.current['speed']
1681
if self.config.progress_real:
1686
mt = self.progress.movietime / gst.SECOND
1688
queues = self.info['queue']['video']
1690
qs = "|".join(["%2d" % (q.get_property('current-level-buffers')) \
1696
if self.config.progress_fps:
1697
if self.nonlin.fps and speed != -1:
1698
fps = speed * self.nonlin.fps.num / self.nonlin.fps.denom
1706
for sink, location in self.info['sink'].iteritems():
1707
# only try on reasonable sinks
1709
# TODO needs filesink & co patching first
1710
#sink = sink.sink_pads().next()
1712
#fpos, format = sink.query_position(gst.FORMAT_BYTES)
1714
#gst.debug('Failed to query byte position of sink ' + to_str(sink))
1715
# falling back to direct call
1716
if os.path.isfile(location):
1717
fpos = os.path.getsize(location)
1721
rate.append(str(fpos * 8 / 1024 / mt ))
1723
rate = "|".join(rate) + ' kb/s'
1726
# and display collected info
1731
fps = "%2.2f" % (fps)
1736
speed = "%2.2f" % (speed)
1740
eta = get_time_as_str(eta * gst.SECOND, 0)
1744
duration = get_time_as_str(duration, 0)
1745
print "[%s: %s] %s / %s %s(x %s) ETA: %s %s %s" % \
1746
(self.progress.current['type'], get_time_as_str(self.progress.position),
1747
get_time_as_str(mt * gst.SECOND, 0),
1748
duration, fps, speed, eta, qs, rate),
1753
def walk_pipeline(self, bin):
1754
result = { 'src': {}, 'sink': {}, 'caps': [], 'props': {}, \
1755
'queue': { 'video': [], 'audio': [], 'other': [] }, \
1758
for element in bin_sorted_recursive(bin):
1759
# determine some useful location from the element (typically src or sink)
1761
for loc in ['location', 'device']:
1762
if loc in [x.name for x in gobject.list_properties(element)]:
1763
location = element.get_property(loc)
1764
if element_is_src(element):
1765
result['src'][element] = location
1767
if element_is_sink(element):
1768
result['sink'][element] = location
1770
for pad in element.pads():
1771
caps = pad.get_negotiated_caps()
1773
if caps not in result['caps']:
1774
result['caps'].append(caps)
1777
if not object_match_prop(element, self.config.ignore_prop, None):
1778
for pspec in gobject.list_properties(element):
1779
if pspec.flags & gobject.PARAM_READABLE and pspec.name != 'name':
1780
gst.log('Inspecting property %s.%s' % (to_str(element), pspec.name))
1781
if object_match_prop(element, self.config.display_prop, pspec.name) \
1782
or (not object_match_prop(element, self.config.ignore_prop, pspec.name)
1783
and element.get_property(pspec.name) !=
1784
clone_element(element).get_property(pspec.name)):
1785
props.append({ 'name': pspec.name,
1786
'value': element.get_property(pspec.name) })
1787
if props or not object_match_prop(element, self.config.ignore_prop):
1788
result['props'][element] = props
1790
res = result['queue']
1791
if gobject.type_name(element) in ['GstQueue', 'GstQueue2']:
1792
if element_is_video(element):
1793
res['video'].append(element)
1794
elif element_is_audio(element):
1795
res['audio'].append(element)
1797
res['other'].append(element)
1799
res = result['multiqueue']
1800
if gobject.type_name(element) == 'GstMultiQueue':
1804
def interrupt(self, signum, frame):
1805
print "Caught signal - exiting."
1806
# if we already tried to stop nicely, be more forceful
1808
self.nonlin.stop(True)
1811
self.got_signal = True
1812
self.entrans.exitcode = 1
1813
self.nonlin.stop(False)
1815
def caps_to_str(self, caps):
1816
if self.config.short_caps:
1817
return caps_to_short_str(caps)
1821
def cb_started(self, object):
1823
signal.signal(signal.SIGINT, self.interrupt)
1824
signal.signal(signal.SIGTERM, self.interrupt)
1825
# HACK now main loop should keep running
1826
self.entrans.on = True
1829
walk = self.walk_pipeline(self.nonlin.pipeline)
1832
# provide for status info
1833
# look for a good element to query position
1834
# a dam in a video stream is generally best/safest
1835
for dam in self.nonlin.dams:
1836
if element_is_video(dam):
1839
gst.debug("Dam used for querying " + str(self.queryel))
1840
# set fallback if none found
1841
if not self.queryel and self.nonlin.dams:
1842
self.queryel = self.nonlin.dams[0]
1843
gst.debug("Falling back to " + str(self.queryel) + " for querying")
1844
# there should be at least some queue in there
1845
# let's see if it already has caps
1846
if not self.queryel:
1847
if walk['queue']['video']:
1848
self.queryel = walk['queue']['video'][0]
1849
elif walk['queue']['audio']:
1850
self.queryel = walk['queue']['audio'][0]
1851
elif walk['queue']['other']:
1852
self.queryel = walk['queue']['other'][0]
1853
if not self.queryel:
1854
print "Unable to locate element to query. No progress info can be provided."
1856
gst.debug("Element used for querying " + str(self.queryel))
1857
# get position, duration info
1859
pos, format = self.queryel.query_position(gst.FORMAT_TIME)
1861
pos = gst.CLOCK_TIME_NONE
1863
duration, format = self.queryel.query_duration(gst.FORMAT_TIME)
1865
duration = gst.CLOCK_TIME_NONE
1866
self.progress = Progress(self.nonlin.sections, duration)
1867
gobject.timeout_add(self.config.progress, self.cb_status)
1869
# display some pipeline info
1871
print "<<<< INPUT - OUTPUT >>>>"
1872
for src, location in walk['src'].iteritems():
1874
location = '(' + str(location) + ')'
1877
print "Input:", to_str(src), location
1879
for src, location in walk['sink'].iteritems():
1881
location = '(' + str(location) + ')'
1884
print "Output:", to_str(src), location
1886
print "<<<< NON-DEFAULT (OR SELECTED) PROPERTIES >>>>"
1887
# display in top sorted order
1888
for element in bin_sorted_recursive(self.nonlin.pipeline):
1889
if walk['props'].has_key(element) or self.tags.has_key(element):
1890
print "Element:", to_str(element)
1891
if walk['props'].has_key(element):
1892
for prop in walk['props'][element]:
1893
print "\t", prop['name'] + ":", prop_to_str(prop['value'])
1894
if self.tags.has_key(element):
1895
for tag in self.tags[element]:
1896
print "\t", "tag:", tag.to_string()
1898
print "<<<< PIPELINE CAPS >>>>"
1899
for caps in walk['caps']:
1900
print self.caps_to_str(caps)
1901
# on to playing and/or progress ...
1902
self.print_progress = True
1904
def cb_playing(self, object):
1905
print "<<<< Now reached PLAYING state >>>>"
1906
walk = self.walk_pipeline(self.nonlin.pipeline)
1907
# forcibly set some properties on queues
1908
# possibly overriding defaults set earlier
1909
self.config.set(walk['queue']['video'], {}, True)
1910
self.config.set(walk['queue']['audio'], {}, True)
1911
self.config.set(walk['queue']['other'], {}, True)
1912
self.config.set(walk['multiqueue'], {}, True)
1913
# some remaining info to display
1915
# don't display if we had this already
1917
for caps in walk['caps']:
1918
if caps not in self.info['caps']:
1919
newcaps.append(caps)
1921
print "<<<< (MORE) PIPELINE CAPS >>>>"
1922
for caps in newcaps:
1923
print self.caps_to_str(caps)
1925
print "<<<< QUEUES >>>>"
1926
print "Video: [max-kB|max-buffers|max-sec]"
1927
l = copy.copy(walk['queue']['video'])
1928
l.extend(walk['multiqueue'])
1930
mkbytes = q.get_property('max-size-bytes') / 1024
1931
mbuffers = q.get_property('max-size-buffers')
1932
mtime = float(q.get_property('max-size-time')) / gst.SECOND
1933
pred = element_pred(q)
1934
post = element_succ(q)
1936
pred = to_str(pred[0]) + " - "
1940
post = " - " + to_str(post[0])
1943
print "%s%s [%d|%d|%.2f]%s" % (pred, to_str(q),
1944
mkbytes, mbuffers, mtime, post)
1945
# progress should follow
1946
self.print_progress = True
1947
# store this for future use
1953
# monitors for timeouts between expected states,
1954
# and tries to remedy or abort
1956
# consider it very 'friend'ly with Entrans below
1961
STATE_NO_MORE_PADS = 2
1964
# do not wait for playing:
1965
# in cut mode, that might really take some time
1966
# in any case, the pipeline/caps walk might take time as well
1967
# also do not wait for started:
1968
# it might take some time in case of a second pass having to load stats
1969
STATE_OK = STATE_NO_MORE_PADS
1971
display = { STATE_NULL: 'null',
1972
STATE_BLOCKED: 'pad-blocked',
1973
STATE_NO_MORE_PADS: 'no-more-pads',
1974
STATE_STARTED: 'started', STATE_PLAYING: 'playing' }
1976
def __init__(self, entrans, nonlin, timeout):
1977
self.timeout = timeout
1979
# nonlin we are monitoring
1980
self.nonlin = nonlin
1981
# configuration store
1982
self.config = entrans.config
1983
# settings can be obtained here
1984
self.entrans = entrans
1985
# id of the timeout source
1987
# timeout between states
1988
self.timeout = timeout
1990
self.state = self.STATE_NULL
1991
# avoid loop while trying to force things
1992
self.tried_no_more_pads = False
1997
if not self.timeout:
1999
self.nonlin.connect('blocked', self.cb_state, self.STATE_BLOCKED)
2000
self.nonlin.connect('started', self.cb_state, self.STATE_STARTED)
2001
self.nonlin.connect('playing', self.cb_state, self.STATE_PLAYING)
2002
decode = self.entrans.pipeline.get_by_name('decoder')
2004
decode.connect('no-more-pads', self.cb_state, self.STATE_NO_MORE_PADS)
2005
if self.entrans.raw:
2006
self.state = self.STATE_NO_MORE_PADS
2009
def setup_timer(self):
2011
gobject.source_remove(self.source)
2012
# only bother if there is anything left to check
2013
if self.state < self.STATE_OK:
2014
self.source = gobject.timeout_add(self.timeout, self.cb_timeout)
2016
def cb_state(self, *arguments):
2017
state = arguments[-1]
2018
# order of pad blocking and no-more-pads is undetermined
2019
if state > self.state:
2020
gst.info('Timeout monitor reached state ' + self.display[state])
2024
def cb_timeout(self):
2025
gst.debug('Timeout in state ' + self.display[self.state])
2026
# should only occur if we failed to reach a next state, check anyway
2027
if self.state >= self.STATE_OK:
2029
target_state = self.state + 1
2030
gst.error('Detected timeout trying to reach state ' + (self.display[target_state]))
2031
gst.error('See also --timeout option')
2032
if target_state == self.STATE_NO_MORE_PADS and not self.tried_no_more_pads:
2033
# try to remedy in hack-ish way, and continue timeout monitor
2034
gst.error('Trying to force state transition ...')
2035
self.entrans.cb_no_more_pads(None, None)
2036
self.tried_no_more_pads = True
2038
else: # terminate hard
2039
gst.error('Terminating ...')
2040
self.entrans.exitcode = 1
2047
# main application class, responsible for
2048
# - pipeline construction
2049
# - delegate option parsing
2050
# - completing a decodebin based dynamic pipeline if requested
2053
def __init__(self, argv):
2055
# do we still need mainloop
2057
# exitcode to return
2062
# pipeline that may have to completed dynamically
2063
self.pipeline = None
2064
# non-linear being constructed
2066
# main loop to keep things alive
2069
# raw launch pipeline
2073
# whether dam plugin is available
2074
self.have_dam = False
2075
# probe info: (dam, info) pair;
2076
# info is dict with keys probe_id
2078
# whether no-more-pads already received
2079
self.have_no_more_pads = False
2080
# pipeline fragments bookkeeping
2081
# bin: (stream number, bin launch line fragment)
2082
# no: no of streams seen so far
2083
# stream: (stream number, bin)
2084
self.pipes = { 'video': {'bin': {}, 'no': 0, 'stream': {} },
2085
'audio': {'bin': {}, 'no': 0, 'stream': {} },
2086
'other': {'bin': {}, 'no': 0, 'stream': {} }
2089
# link the given bin (already in pipeline) to the muxer
2090
def link_bin(self, bin):
2091
gst.log("Linking bin " + str(bin))
2093
mux = self.pipeline.get_by_name('muxer')
2095
for ghostpad in bin.src_pads():
2096
if ghostpad.get_direction() == gst.PAD_SRC and mux:
2097
muxpad = mux.get_compatible_pad(ghostpad)
2099
if binsrc and not muxpad:
2100
gst.debug("bin " + str(bin) + " not compatible with muxer")
2104
gst.debug("Linked " + str(binsrc) + " to " + str(muxpad))
2107
def cb_new_decode_pad(self, element, pad, no_more):
2109
# currently not used
2110
# custom way to look for unlinked pads in bin,
2111
# somewhat catering for the case where several sinkpads may be unlinked
2112
def bin_make_ghostpads(bin):
2113
# sink pads - needs to go first, otherwise interference from ghost pad
2115
for element in bin.recurse():
2116
pads.extend(element.sink_pads())
2117
pads = [p for p in pads if not p.get_peer()]
2119
bin.add_pad(gst.GhostPad("sink", pads[0]))
2121
# we look for a src pad whose downstream end does *not* have dangling src pad,
2122
# the one *with* dangling src pad is likely to connect to the muxer at the end
2123
# this allows for an in bin occurrence of a demuxer-like element
2125
endpad = pad_get_succ(p)[-1]
2127
if endpad.get_direction() == gst.PAD_SINK: # so no dangling src
2128
bin.add_pad(gst.GhostPad("sink", p))
2131
for element in bin.recurse():
2132
pads.extend(element.src_pads())
2133
pads = [p for p in pads if not p.get_peer()]
2135
gst.warning('More than 1 src pad detected in ' + to_str(bin) + ' ' + str(pads))
2137
bin.add_pad(gst.GhostPad("src", pads[0]))
2139
gst.debug("Found new decoded pad: " + str(pad) + ", last: " + str(no_more) + \
2140
", caps: " + str(pad.get_negotiated_caps()))
2141
if self.have_no_more_pads:
2142
gst.warning("Already received no-more-pads; ignoring new pad.")
2144
# if any linking, etc fails, exception will terminate things
2148
dostamp = self.config.stamp
2149
# HACK: ignore current pads
2150
if pad_is_current(pad):
2151
gst.debug('Detected current_ pad')
2152
elif pad_is_video(pad):
2154
pipe, nos, subst = self.pipes[ptype], self.config.vn, 'vn'
2155
elif pad_is_audio(pad):
2157
pipe, nos, subst = self.pipes[ptype], self.config.an, 'an'
2158
else: # subtitle or ??
2160
pipe, nos, subst = self.pipes[ptype], self.config.on, 'on'
2161
if pipe and pipe['bin']:
2163
stream_no, stream = pipe['no'], pipe['stream']
2164
if pipe['bin'].has_key(stream_no):
2165
bin = pipe['bin'][stream_no]
2166
elif pipe['bin'].has_key(0):
2167
bin = pipe['bin'][0]
2168
if bin and (not nos or pipe['no'] in nos):
2169
# FIXME perhaps do more/safer parsing some day
2170
bin = bin.replace('${' + subst + '}', str(stream_no))
2171
bin = gst.parse_bin_from_description(expand_caps(bin), True)
2172
subst = { subst: str(stream_no) }
2175
gst.debug('Filtered out new ' + ptype + ' decoded pad.')
2177
# don't put fakesink, because this also requires dam (for eos)
2178
# muxer should ignore NOT_LINKED, but fail if all NOT_LINKED (which is ok)
2179
gst.debug('Ignoring pad.')
2181
gst.debug('Trying to link pad type %s' % (ptype))
2182
# apply configuration to user part
2183
self.config.set(bin_sorted_recursive(bin), subst)
2184
# creation and config complete, now onto pipeline building
2185
if not bin.get_compatible_pad(pad):
2186
gst.debug("pad not compatible with bin")
2188
# now we can go and create, add, link, ...
2189
# must add first (as it breaks links), then link and set state
2190
self.pipeline.add(bin)
2191
# try to link this already, if possible
2192
if (self.config.vn or self.config.an or self.config.on) and \
2193
not self.config.sync_link:
2194
self.nonlin.do_block = True
2195
stream[stream_no] = bin
2197
if not self.link_bin(bin):
2198
self.pipeline.remove(bin)
2200
# some standard elements
2201
# decodebin already adds queues, so no need
2204
dam = gst.element_factory_make('dam')
2206
dam = gst.element_factory_make('identity', 'dam' + str(self.dam_no))
2207
dam.set_property('silent', True)
2208
self.pipeline.add(dam)
2210
# use identity for re-sequencing so no additional dependency is introduced
2211
stamp = gst.element_factory_make('identity', 'stamp' + str(self.dam_no))
2212
stamp.set_property('silent', True)
2213
stamp.set_property('single-segment', True)
2214
self.pipeline.add(stamp)
2215
pad.link(dam.get_compatible_pad(pad))
2216
# make sure that there is queue following dam
2218
for element in bin.recurse():
2219
if gobject.type_name(element) == 'GstQueue':
2229
q = gst.element_factory_make('queue')
2230
self.pipeline.add(q)
2237
q.set_state(gst.STATE_PAUSED)
2238
dam.set_state(gst.STATE_PAUSED)
2240
stamp.set_state(gst.STATE_PAUSED)
2241
bin.set_state(gst.STATE_PAUSED)
2242
# scan for language tags
2243
if self.config.at and format.find('audio/') == 0:
2244
self.probe[pad] = { }
2245
self.probe[pad]['buffer_id'] = pad.add_buffer_probe(self.cb_buffer_probe)
2246
self.probe[pad]['event_id'] = pad.add_event_probe(self.cb_event_probe)
2247
self.probe[pad]['request_pad_peer'] = list(bin.src_pads())
2248
gst.debug("Added probes on " + str(pad))
2249
# and inform of update
2250
self.nonlin.emit('updated')
2252
def remove_probe(self, pad):
2253
pad.remove_buffer_probe(self.probe[pad]['buffer_id'])
2254
pad.remove_buffer_probe(self.probe[pad]['event_id'])
2255
gst.debug("Removed probes on " + str(pad))
2257
def cb_event_probe(self, pad, event):
2258
def collect_pred(element, elements):
2259
if element not in elements:
2260
elements.append(element)
2261
for el in element_pred(element):
2262
collect_pred(el, elements)
2263
if event.type == gst.EVENT_TAG:
2264
taglist = event.parse_tag()
2265
if gst.TAG_LANGUAGE_CODE in taglist.keys():
2266
lang = taglist[gst.TAG_LANGUAGE_CODE]
2267
if [True for tag in self.config.at if re.search(tag, lang)]:
2268
gst.debug('Language tag on ' + str(pad) + ' matched')
2269
self.remove_probe(pad)
2271
gst.debug('Language tag on ' + str(pad) + ' did not match')
2272
self.remove_probe(pad)
2274
peer = pad.get_peer()
2276
mux = self.pipeline.get_by_name('muxer')
2279
for srcpad in self.probe[pad]['request_pad_peer']:
2280
peer = srcpad.get_peer()
2282
mux.release_request_pad(peer)
2283
# collect upstream element up to the pad
2284
element = srcpad.get_parent_element()
2285
collect_pred(element, elements)
2286
# and get rid of some elements
2287
gst.log('Removing ' + str(elements) + ' from pipeline')
2288
for element in elements:
2289
self.pipeline.remove(element)
2290
element.set_state(gst.STATE_NULL)
2294
def cb_buffer_probe(self, pad, buf):
2295
# data is coming, hands off from now on
2296
self.remove_probe(pad)
2299
def link_bins(self):
2300
if self.config.sync_link or \
2301
not (self.config.vn or self.config.an or self.config.on):
2303
l = [ [self.config.vn, self.pipes['video']['stream']], \
2304
[self.config.an, self.pipes['audio']['stream']], \
2305
[self.config.on, self.pipes['other']['stream']] ]
2307
nl, sl = ll[0], ll[1]
2311
nl = range(1, max(sl.keys()) + 1)
2314
if not self.link_bin(sl[n]):
2316
self.pipeline.remove(sl[n])
2318
def cb_no_more_pads(self, element, mux):
2319
gst.debug("no-more-pads")
2320
if self.have_no_more_pads:
2321
gst.warning("Already received no-more-pads; ignoring.")
2323
self.have_no_more_pads = True
2324
# caller may not have provided, try finding it
2326
mux = self.pipeline.get_by_name('muxer')
2329
if not element_pred(mux):
2330
for el in element_succ(mux):
2331
self.pipeline.remove(el)
2332
el.set_state(gst.STATE_NULL)
2333
self.pipeline.remove(mux)
2334
mux.set_state(gst.STATE_NULL)
2335
# unlock muxer state and set
2336
mux.set_locked_state(False)
2337
mux.set_state(gst.STATE_PAUSED)
2338
gst.debug("Signalling complete.")
2339
self.nonlin.emit('complete')
2341
def cb_queue_filled(self, object):
2342
gst.log("Suppressing overrun for " + to_str(object))
2343
object.stop_emission('overrun')
2345
# set properties of added element
2346
def cb_element_added(self, bin, element):
2347
if element.get_factory().get_name() == 'queue':
2348
# override decodebin's queue management with user settings
2350
# and prevent further adjustment if so requested
2351
if self.config.block_overrun:
2352
element.connect('overrun', self.cb_queue_filled)
2355
self.config.set([element], {}, force)
2357
# - need to exit hard if exception happens when invoked from native thread
2358
# - display trace guts depending on debug level
2359
def excepthook(self, t, value, tb):
2360
gst.error ('\n' + ''.join(traceback.format_tb (tb)))
2361
print ''.join(traceback.format_exception_only (t, value))
2365
# collect pipeline fragments --ptype into collector
2366
# multiple: whether or not --ptype:* is allowed
2367
def collect_pipeline(self, ptype, collector, multiple = True):
2370
for arg in self.config.args:
2372
if arg == '--' + ptype:
2374
elif arg[0:l + 3] == '--' + ptype + ':':
2378
n = (int) (arg[l + 3:])
2380
self.config.optparser.error("invalid argument: " + arg)
2382
collect = collector[n] = []
2383
elif arg[0:2] == '--':
2389
self.config.optparser.error(arg + " does not belong to a launch fragment")
2390
# convert to text form
2392
collector[i] = " ".join(collector[i])
2396
sys.excepthook = self.excepthook
2398
self.config = Configuration()
2400
# modify plugin level properties
2401
self.config.set_plugins()
2403
# extract pipeline fragments out of pipeline and re-assemble
2405
self.collect_pipeline('raw', self.raw, False)
2406
for pipe in self.pipes:
2407
self.collect_pipeline(pipe, self.pipes[pipe]['bin'])
2409
self.raw = self.raw[0]
2412
# will we have dams ?
2414
gst.element_factory_make('dam')
2415
except gst.PluginNotFoundError:
2416
print "<<<< WARNING: dam plugin could not be found; full operation not available >>>>"
2417
self.have_dam = False
2419
self.have_dam = True
2420
if not self.config.inputfile:
2421
self.config.optparser.error("-i or --raw must be given.")
2422
self.pipeline = gst.element_factory_make('pipeline')
2424
if gst.uri_is_valid(self.config.inputfile):
2425
filesrc = gst.element_make_from_uri(gst.URI_SRC, self.config.inputfile)
2427
if self.config.inputfile == '-':
2428
filesrc = gst.element_factory_make('fdsrc')
2429
filesrc.set_property('fd', 0)
2431
filesrc = gst.element_factory_make('filesrc')
2432
filesrc.set_property('location', self.config.inputfile)
2434
if self.config.outputfile:
2435
if gst.uri_is_valid(self.config.outputfile):
2436
filesink = gst.element_make_from_uri(gst.URI_SINK, self.config.outputfile)
2438
if self.config.outputfile == '-':
2439
self.config.optparser.error('output to stdout not supported')
2441
filesink = gst.element_factory_make('filesink')
2442
filesink.set_property('location', self.config.outputfile)
2444
if self.config.muxer:
2445
mux = gst.element_factory_make(self.config.muxer, 'muxer')
2446
if mux.get_factory().get_klass().find('Muxer') < 0:
2447
self.config.optparser.error('muxer element ' + self.config.muxer + ' is not Muxer')
2448
elif self.config.outputfile[-4:] == '.avi':
2449
mux = gst.element_factory_make('avimux', 'muxer')
2450
elif self.config.outputfile[-4:] in ['.mkv', '.mka']:
2451
mux = gst.element_factory_make('matroskamux', 'muxer')
2452
elif self.config.outputfile[-4:] in ['.ogm', '.ogg']:
2453
mux = gst.element_factory_make('oggmux', 'muxer')
2454
elif self.config.outputfile[-4:] == '.mov':
2455
mux = gst.element_factory_make('ffmux_mov', 'muxer')
2456
elif self.config.outputfile[-4:] in ['.mp4', '.m4a']:
2457
mux = gst.element_factory_make('ffmux_mp4', 'muxer')
2458
elif self.config.outputfile[-4:] == '.3gp':
2459
mux = gst.element_factory_make('ffmux_3gp', 'muxer')
2460
elif self.config.outputfile[-4:] == '.3g2':
2461
mux = gst.element_factory_make('ffmux_3g2', 'muxer')
2462
elif self.config.outputfile[-4:] == '.mpg':
2463
mux = gst.element_factory_make('ffmux_mpeg', 'muxer')
2464
elif self.config.outputfile[-3:] == '.ts':
2465
mux = gst.element_factory_make('ffmux_mpegts', 'muxer')
2466
elif self.config.outputfile[-4:] == '.flv':
2467
mux = gst.element_factory_make('ffmux_flv', 'muxer')
2468
# asf/wmv/wma works but not directly mapped here
2470
self.config.optparser.error('could not determine muxer for ' + self.config.outputfile)
2472
self.pipeline.add(mux, filesink)
2474
# mux should only change state if all dynamic stuff has completed
2475
mux.set_locked_state(True)
2479
decode = gst.element_factory_make(self.config.decoder, 'decoder')
2480
decode.connect('new-decoded-pad', self.cb_new_decode_pad)
2481
decode.connect('no-more-pads', self.cb_no_more_pads, mux)
2482
decode.connect('element-added', self.cb_element_added)
2485
self.pipeline.add(filesrc, decode)
2486
filesrc.link(decode)
2488
self.pipeline = gst.parse_launch(expand_caps(self.raw))
2490
self.have_dam = self.config.dam
2492
if not self.have_dam and self.config.seek in [NonLinPipeline.SECTIONS_CUT,
2493
NonLinPipeline.SECTIONS_CUT_TIME]:
2494
self.config.optparser.error('Invalid section mode without using dam, perhaps use --dam')
2496
# apply configuration to existing part
2497
self.config.set(bin_sorted_recursive(self.pipeline, True))
2498
# should be complete enough to handle tags
2500
tagsetter = self.pipeline.get_by_interface(gst.TagSetter)
2502
taglist = gst.TagList()
2503
for k, v in self.config.tag.iteritems():
2504
# type info only in more recent gst-python
2505
if hasattr(gst, 'tag_get_tag_type'):
2506
gtype = gst.tag_get_tag_type(k)
2507
if gtype == gobject.TYPE_INVALID:
2508
# this should not happen as tags have been validated earlier
2509
print "WARNING: tag %s appears invalid, skipping" % (k)
2512
taglist[k] = gobject_convert_value(v, gtype)
2514
print "WARNING: skipping tag %s; value %s is not valid" % (k, v)
2517
if not taglist.is_empty():
2518
tagsetter.merge_tags(taglist, gst.TAG_MERGE_PREPEND)
2520
print "<<<< WARNING: Could not find element to set tags. >>>"
2522
self.nonlin = NonLinPipeline(self.pipeline, self.have_dam, complete, self.config.sections,
2523
self.config.seek, self.config.accurate, self.config.fps)
2524
# do_block should really be the default,
2525
# since that allows setting muxer to PAUSED before it receives anything
2526
if self.config.timeout:
2527
self.nonlin.do_block = True
2528
timeout = Timeout(self, self.nonlin, self.config.timeout)
2529
monitor = Monitor(self, self.nonlin)
2532
self.loop = gobject.MainLoop()
2535
except KeyboardInterrupt:
2538
# on becomes True when signal handler is put in place
2542
except KeyboardInterrupt:
2548
if self.loop and self.loop.is_running():
2552
sys.exit(self.exitcode)
2555
if __name__ == '__main__':
2556
app = Entrans(sys.argv)