1
/* This file is part of the KDE project
2
Copyright (C) 2006 Tim Beaulen <tbscope@gmail.com>
3
Copyright (C) 2006-2007 Matthias Kretz <kretz@kde.org>
5
This program is free software; you can redistribute it and/or
6
modify it under the terms of the GNU Library General Public
7
License as published by the Free Software Foundation; either
8
version 2 of the License, or (at your option) any later version.
10
This library is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13
Library General Public License for more details.
15
You should have received a copy of the GNU Library General Public License
16
along with this library; see the file COPYING.LIB. If not, write to
17
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18
Boston, MA 02110-1301, USA.
22
#include "bytestream.h"
24
#include "xineengine.h"
33
#define this this_xine
34
#include <xine/input_plugin.h> // needed for MAX_PREVIEW_SIZE
35
#include <xine/xine_internal.h>
39
//#define VERBOSE_DEBUG
41
# define PXINE_VDEBUG debug() << Q_FUNC_INFO
43
# define PXINE_VDEBUG debug() << Q_FUNC_INFO
45
#define PXINE_DEBUG debug() << Q_FUNC_INFO
52
ByteStream *ByteStream::fromMrl(const QByteArray &mrl)
54
if (!mrl.startsWith("kbytestream:/")) {
58
Q_ASSERT(mrl.length() >= 13 + (int)sizeof(void *) && mrl.length() <= 13 + 2 * (int)sizeof(void *));
59
const unsigned char *encoded = reinterpret_cast<const unsigned char *>(mrl.constData() + 13);
60
unsigned char *addrHack = reinterpret_cast<unsigned char *>(&ret);
61
for (unsigned int i = 0; i < sizeof(void *); ++i, ++encoded) {
62
if (*encoded == 0x01) {
81
addrHack[i] = *encoded;
87
ByteStream::ByteStream(const MediaSource &mediaSource, MediaObject *parent)
88
: QObject(0), // don't let MediaObject's ~QObject delete us - the input plugin will delete us
89
m_mediaObject(parent),
100
connect(this, SIGNAL(resetQueued()), this, SLOT(callStreamInterfaceReset()), Qt::BlockingQueuedConnection);
101
connect(this, SIGNAL(needDataQueued()), this, SLOT(needData()), Qt::QueuedConnection);
102
connect(this, SIGNAL(seekStreamQueued(qint64)), this, SLOT(syncSeekStream(qint64)), Qt::QueuedConnection);
104
connectToSource(mediaSource);
106
// created in the main thread
107
m_mainThread = pthread_self();
110
void ByteStream::pullBuffer(char *buf, int len)
115
// never called from main thread
116
//Q_ASSERT(m_mainThread != pthread_self());
118
PXINE_VDEBUG << len << ", m_offset = " << m_offset << ", m_currentPosition = "
119
<< m_currentPosition << ", m_buffersize = " << m_buffersize;
121
if (m_buffers.isEmpty()) {
122
// pullBuffer is only called when there's => len data available
123
qFatal("m_currentPosition = %lld, m_preview.size() = %d, len = %d",
124
m_currentPosition, m_preview.size() ,len);
126
if (m_buffers.head().size() - m_offset <= len) {
127
// The whole data of the next buffer is needed
128
QByteArray buffer = m_buffers.dequeue();
129
PXINE_VDEBUG << "dequeue one buffer of size " << buffer.size()
130
<< ", reading at offset = " << m_offset << ", resetting m_offset to 0";
131
Q_ASSERT(buffer.size() > 0);
132
int tocopy = buffer.size() - m_offset;
133
Q_ASSERT(tocopy > 0);
134
xine_fast_memcpy(buf, buffer.constData() + m_offset, tocopy);
138
Q_ASSERT(m_buffersize >= static_cast<size_t>(tocopy));
139
m_buffersize -= tocopy;
142
// only a part of the next buffer is needed
143
PXINE_VDEBUG << "read " << len
144
<< " bytes from the first buffer at offset = " << m_offset;
145
QByteArray &buffer = m_buffers.head();
146
Q_ASSERT(buffer.size() > 0);
147
xine_fast_memcpy(buf, buffer.constData() + m_offset , len);
149
Q_ASSERT(m_buffersize >= static_cast<size_t>(len));
156
int ByteStream::peekBuffer(void *buf)
162
// never called from main thread
163
//Q_ASSERT(m_mainThread != pthread_self());
165
if (m_preview.size() < MAX_PREVIEW_SIZE && !m_eod) {
166
QMutexLocker lock(&m_mutex);
167
// the thread needs to sleep until a wait condition is signalled from writeData
168
while (!m_eod && !m_stopped && m_preview.size() < MAX_PREVIEW_SIZE) {
169
PXINE_VDEBUG << "xine waits for data: " << m_buffersize << ", " << m_eod;
170
emit needDataQueued();
171
m_waitingForData.wait(&m_mutex);
174
PXINE_DEBUG << "returning 0, m_stopped = true";
179
xine_fast_memcpy(buf, m_preview.constData(), m_preview.size());
180
return m_preview.size();
183
qint64 ByteStream::readFromBuffer(void *buf, size_t count)
188
// never called from main thread
189
//Q_ASSERT(m_mainThread != pthread_self());
191
const qint64 currentPosition = m_currentPosition;
193
PXINE_VDEBUG << count;
195
QMutexLocker lock(&m_mutex);
196
//debug() << Q_FUNC_INFO << "LOCKED m_mutex: ";
197
// get data while more is needed and while we're still receiving data
198
if (m_buffersize < count && !m_eod) {
199
// the thread needs to sleep until a wait condition is signalled from writeData
200
while (!m_eod && !m_stopped && m_buffersize < count) {
201
PXINE_VDEBUG << "xine waits for data: " << m_buffersize << ", " << m_eod;
202
emit needDataQueued();
203
m_waitingForData.wait(&m_mutex);
206
PXINE_DEBUG << "returning 0, m_stopped = true";
207
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
210
Q_ASSERT(currentPosition == m_currentPosition);
211
Q_UNUSED(currentPosition);
212
//PXINE_VDEBUG << "m_buffersize = " << m_buffersize;
214
if (m_buffersize >= count) {
215
PXINE_VDEBUG << "calling pullBuffer with m_buffersize = " << m_buffersize;
216
pullBuffer(static_cast<char *>(buf), count);
217
m_currentPosition += count;
218
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
222
if (m_buffersize > 0) {
223
PXINE_VDEBUG << "calling pullBuffer with m_buffersize = " << m_buffersize;
224
const int len = m_buffersize;
225
pullBuffer(static_cast<char *>(buf), len);
226
m_currentPosition += len;
227
PXINE_DEBUG << "returning less data than requested, the stream is at its end";
228
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
231
PXINE_DEBUG << "return 0, the stream is at its end";
232
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
236
off_t ByteStream::seekBuffer(qint64 offset)
241
// never called from main thread
242
//Q_ASSERT(m_mainThread != pthread_self());
245
if (offset == m_currentPosition) {
246
return m_currentPosition;
250
if (offset > m_streamSize) {
251
qWarning() << "xine is asking to seek behind the end of the data stream";
252
return m_currentPosition;
255
// first try to seek in the data we have buffered
257
//debug() << Q_FUNC_INFO << "LOCKED m_mutex: ";
258
if (offset > m_currentPosition && offset < m_currentPosition + m_buffersize) {
259
debug() << Q_FUNC_INFO << "seeking behind current position, but inside the buffered data";
260
// seek behind the current position in the buffer
261
while (offset > m_currentPosition) {
262
const int gap = offset - m_currentPosition;
263
Q_ASSERT(!m_buffers.isEmpty());
264
const int buffersize = m_buffers.head().size() - m_offset;
265
if (buffersize <= gap) {
266
// discard buffers if they hold data before offset
267
Q_ASSERT(!m_buffers.isEmpty());
268
QByteArray buffer = m_buffers.dequeue();
269
m_buffersize -= buffersize;
270
m_currentPosition += buffersize;
273
// offset points to data in the next buffer
275
m_currentPosition += gap;
279
Q_ASSERT(offset == m_currentPosition);
280
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
282
return m_currentPosition;
283
} else if (offset < m_currentPosition && m_currentPosition - offset <= m_offset) {
284
debug() << Q_FUNC_INFO << "seeking in current buffer: m_currentPosition = " << m_currentPosition << ", m_offset = " << m_offset;
285
// seek before the current position in the buffer
286
m_offset -= m_currentPosition - offset;
287
m_buffersize += m_currentPosition - offset;
288
Q_ASSERT(m_offset >= 0);
289
m_currentPosition = offset;
290
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
292
return m_currentPosition;
295
// the ByteStream is not seekable: no chance to seek to the requested offset
297
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
299
return m_currentPosition;
302
PXINE_DEBUG << "seeking to a position that's not in the buffered data: clear the buffer. "
303
" new offset = " << offset <<
304
", m_buffersize = " << m_buffersize <<
305
", m_offset = " << m_offset <<
306
", m_eod = " << m_eod <<
307
", m_currentPosition = " << m_currentPosition;
309
// throw away the buffers and ask for new data
315
m_currentPosition = offset;
316
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
319
QMutexLocker seekLock(&m_seekMutex);
323
emit seekStreamQueued(offset); //calls syncSeekStream from the main thread
324
m_seekWaitCondition.wait(&m_seekMutex); // waits until the seekStream signal returns
328
off_t ByteStream::currentPosition() const
330
return m_currentPosition;
333
ByteStream::~ByteStream()
335
Q_ASSERT(m_mainThread == pthread_self());
339
QByteArray ByteStream::mrl() const
341
QByteArray mrl("kbytestream:/");
342
// the address can contain 0s which will null-terminate the C-string
343
// use a simple encoding: 0x00 -> 0x0101, 0x01 -> 0x0102
344
const ByteStream *iface = this;
345
const unsigned char *that = reinterpret_cast<const unsigned char *>(&iface);
346
for(unsigned int i = 0; i < sizeof(void *); ++i) {
348
case 0: // escape 0 as it terminates the string
352
case 1: // escape 1 because it is used for escaping
356
case '#': // escape # because xine splits the mrl at #s
360
case '%': // escape % because xine will replace e.g. %20 with ' '
372
void ByteStream::setStreamSize(qint64 x)
375
QMutexLocker lock(&m_streamSizeMutex);
377
if (m_streamSize != 0) {
378
emit needDataQueued();
379
m_waitForStreamSize.wakeAll();
383
void ByteStream::setPauseForBuffering(bool b)
386
QCoreApplication::postEvent(m_mediaObject->stream().data(), new QEVENT(PauseForBuffering));
389
QCoreApplication::postEvent(m_mediaObject->stream().data(), new QEVENT(UnpauseForBuffering));
394
void ByteStream::endOfData()
400
m_streamSizeMutex.lock();
402
// don't reset the XineStream because many demuxers hit eod while trying to find the format of
404
// stream().setMrl(mrl());
405
m_seekWaitCondition.wakeAll();
406
m_seekMutex.unlock();
407
m_waitingForData.wakeAll();
409
m_waitForStreamSize.wakeAll();
410
m_streamSizeMutex.unlock();
413
void ByteStream::setStreamSeekable(bool seekable)
415
m_seekable = seekable;
418
void ByteStream::writeData(const QByteArray &data)
420
if (data.size() <= 0) {
424
// first fill the preview buffer
425
if (m_preview.size() != MAX_PREVIEW_SIZE) {
426
PXINE_DEBUG << "fill preview";
427
// more data than the preview buffer needs
428
if (m_preview.size() + data.size() > MAX_PREVIEW_SIZE) {
429
int tocopy = MAX_PREVIEW_SIZE - m_preview.size();
430
m_preview += data.left(tocopy);
431
} else { // all data fits into the preview buffer
435
PXINE_VDEBUG << "filled preview buffer to " << m_preview.size();
438
PXINE_VDEBUG << data.size() << " m_streamSize = " << m_streamSize;
440
QMutexLocker lock(&m_mutex);
441
//debug() << Q_FUNC_INFO << "LOCKED m_mutex: ";
442
m_buffers.enqueue(data);
443
m_buffersize += data.size();
444
PXINE_VDEBUG << "m_buffersize = " << m_buffersize;
445
// FIXME accessing m_mediaObject is not threadsafe
446
switch (m_mediaObject->state()) {
447
case Phonon::BufferingState: // if nbc is buffering we want more data
448
case Phonon::LoadingState: // if the preview is not ready we want me more data
451
enoughData(); // else it's enough
453
m_waitingForData.wakeAll();
454
//debug() << Q_FUNC_INFO << "UNLOCKING m_mutex: ";
457
void ByteStream::callStreamInterfaceReset()
459
StreamInterface::reset();
462
void ByteStream::syncSeekStream(qint64 offset)
467
m_seekWaitCondition.wakeAll();
468
m_seekMutex.unlock();
471
qint64 ByteStream::streamSize() const
473
if (m_streamSize == 0) {
474
// stream size has not been set yet
475
QMutexLocker lock(&m_streamSizeMutex);
476
if (m_streamSize == 0 && !m_eod) {
477
m_waitForStreamSize.wait(&m_streamSizeMutex);
483
void ByteStream::stop()
489
m_streamSizeMutex.lock();
491
// the other thread is now not between m_mutex.lock() and m_waitingForData.wait(&m_mutex), so it
492
// won't get stuck in m_waitingForData.wait if it's not there right now
493
m_seekWaitCondition.wakeAll();
494
m_seekMutex.unlock();
495
m_waitingForData.wakeAll();
497
m_waitForStreamSize.wakeAll();
498
m_streamSizeMutex.unlock();
501
void ByteStream::reset()
504
debug() << Q_FUNC_INFO << "first reset";
505
m_firstReset = false;
509
m_currentPosition = 0;
515
if (m_streamSize != 0) {
516
emit needDataQueued();
520
}} //namespace Phonon::Xine
522
#include "bytestream.moc"