23
23
# set of utility functions
29
from pitivi.signalinterface import Signallable
30
import pitivi.log.log as log
31
from gettext import ngettext
33
UNKNOWN_DURATION = 2 ** 63 - 1
35
def time_to_string(value):
37
Converts the given time in nanoseconds to a human readable string
41
if value == gst.CLOCK_TIME_NONE:
43
ms = value / gst.MSECOND
50
return "%02d:%02d:%02d.%03d" % (hours, mins, sec, ms)
52
def beautify_length(length):
54
Converts the given time in nanoseconds to a human readable string
58
sec = length / gst.SECOND
66
parts.append(ngettext("%d hour", "%d hours", hours) % hours)
69
parts.append(ngettext("%d minute", "%d minutes", mins) % mins)
72
parts.append(ngettext("%d second", "%d seconds", sec) % sec)
74
return ", ".join(parts)
27
76
def bin_contains(bin, element):
28
77
""" Returns True if the bin contains the given element, the search is recursive """
36
85
if isinstance(elt, gst.Bin) and bin_contains(elt, element):
89
# Python re-implementation of binary search algorithm found here:
90
# http://en.wikipedia.org/wiki/Binary_search
92
# This is the iterative version without the early termination branch, which
93
# also tells us the element of A that are nearest to Value, if the element we
94
# want is not found. This is useful for implementing edge snaping in the UI,
95
# where we repeatedly search through a list of control points for the one
96
# closes to the cursor. Because we don't care whether the cursor position
97
# matches the list, this function returns the index of the lement closest to
100
def binary_search(col, value):
105
if (col[mid] < value):
108
#can't be high = mid-1: here col[mid] >= value,
109
#so high can't be < mid if col[mid] == value
113
# Returns the element of seq nearest to item, and the difference between them
115
def closest_item(seq, item, lo=0):
116
index = bisect.bisect(seq, item, lo)
117
if index >= len(seq):
120
diff = abs(res - item)
122
# binary_search returns largest element closest to item.
123
# if there is a smaller element...
125
res_a = seq[index - 1]
126
# ...and it is closer to the pointer...
127
diff_a = abs(res_a - item)
134
return res, diff, index
136
def argmax(func, seq):
137
"""return the element of seq that gives max(map(func, seq))"""
142
# using a generator expression here should save memory
143
objs = ((func(val), val) for val in seq)
144
return reduce(compare, objs)[1]
154
def data_probe(pad, data, section=""):
155
"""Callback to use for gst.Pad.add_*_probe.
157
The extra argument will be used to prefix the debug messages
160
section = "%s:%s" % (pad.get_parent().get_name(), pad.get_name())
161
if isinstance(data, gst.Buffer):
162
log.debug("probe","%s BUFFER timestamp:%s , duration:%s , size:%d , offset:%d , offset_end:%d",
163
section, gst.TIME_ARGS(data.timestamp), gst.TIME_ARGS(data.duration),
164
data.size, data.offset, data.offset_end)
165
if data.flags & gst.BUFFER_FLAG_DELTA_UNIT:
166
log.debug("probe","%s DELTA_UNIT", section)
167
if data.flags & gst.BUFFER_FLAG_DISCONT:
168
log.debug("probe","%s DISCONT", section)
169
if data.flags & gst.BUFFER_FLAG_GAP:
170
log.debug("probe","%s GAP", section)
171
log.debug("probe","%s flags:%r", section, data.flags)
173
log.debug("probe","%s EVENT %s", section, data.type)
174
if data.type == gst.EVENT_NEWSEGMENT:
175
log.debug("probe","%s %r", section, list(data.parse_new_segment()))
178
def linkDynamic(element, target):
180
def pad_added(bin, pad, target):
181
if target.get_compatible_pad(pad):
183
element.connect("pad-added", pad_added, target)
185
def element_make_many(*args):
186
return tuple((gst.element_factory_make(arg) for arg in args))
189
E = graph.iteritems()
197
except gst.LinkError:
202
f = gst.element_factory_make("capsfilter")
203
f.props.caps = gst.caps_from_string(caps)
210
def uri_is_valid(uri):
211
"""Checks if the given uri is a valid uri (of type file://)
213
Will also check if the size is valid (> 0).
215
@param uri: The location to check
218
res = gst.uri_is_valid(uri) and gst.uri_get_protocol(uri) == "file"
220
return len(os.path.basename(gst.uri_get_location(uri))) > 0
223
def uri_is_reachable(uri):
224
""" Check whether the given uri is reachable and we can read/write
227
@param uri: The location to check
229
@return: C{True} if the uri is reachable.
232
if not uri_is_valid(uri):
233
raise NotImplementedError(
234
_("%s doesn't yet handle non local projects") % APPNAME)
235
return os.path.isfile(gst.uri_get_location(uri))
237
class PropertyChangeTracker(Signallable):
238
def __init__(self, timeline_object):
241
for property_name in self.property_names:
242
self.properties[property_name] = \
243
getattr(timeline_object, property_name)
245
timeline_object.connect(property_name + '-changed',
246
self._propertyChangedCb, property_name)
248
def _propertyChangedCb(self, timeline_object, value, property_name):
249
old_value = self.properties[property_name]
250
self.properties[property_name] = value
252
self.emit(property_name + '-changed', timeline_object, old_value, value)
254
class Seeker(Signallable):
255
__signals__ = {'seek': ['position', 'format']}
257
def __init__(self, timeout):
258
self.timeout = timeout
259
self.pending_seek_id = None
263
def seek(self, position, format=gst.FORMAT_TIME, on_idle=False):
264
if self.pending_seek_id is None:
265
self.position = position
268
gobject.idle_add(self._seekTimeoutCb)
270
self._seekTimeoutCb()
271
self.pending_seek_id = self._scheduleSeek(self.timeout,
274
self.position = position
277
def _scheduleSeek(self, timeout, callback):
278
return gobject.timeout_add(timeout, callback)
280
def _seekTimeoutCb(self):
281
self.pending_seek_id = None
282
if self.position != None and self.format != None:
283
position, self.position = self.position, None
284
format, self.format = self.format, None
285
self.emit('seek', position, format)
288
def get_filesystem_encoding():
289
return sys.getfilesystemencoding() or "utf-8"