2
/* -*-c++-*- OpenSceneGraph - Copyright (C) 1998-2008 Robert Osfield
4
* This library is open source and may be redistributed and/or modified under
5
* the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
6
* (at your option) any later version. The full license is in LICENSE file
7
* included with this distribution, and on the openscenegraph.org website.
9
* This library is distributed in the hope that it will be useful,
10
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
* OpenSceneGraph Public License for more details.
15
#include "VideoFrameDispatcher.h"
21
VideoImageStream::VideoImageStream()
23
, _needsDispatching(false)
29
VideoImageStream::VideoImageStream(const VideoImageStream& image,const osg::CopyOp& copyop)
30
: osg::ImageStream(image, copyop)
31
, _needsDispatching(image._needsDispatching)
32
, _dispatcher(image._dispatcher)
37
VideoImageStream::~VideoImageStream()
39
setNeedsDispatching(StopUpdate);
43
bool VideoImageStream::setNeedsDispatching(RequestMode request_mode)
45
_needsDispatching = (_needsDispatching || (request_mode == RequestContinuousUpdate)) && (request_mode != StopUpdate);
49
if (request_mode == StopUpdate) {
50
_dispatcher->removeFromQueue(this);
54
_dispatcher->addToQueue(this);
57
return (_dispatcher != NULL);
62
VideoFrameDispatchQueue::VideoFrameDispatchQueue()
63
: OpenThreads::Thread()
73
void VideoFrameDispatchQueue::run()
76
static unsigned int frame_delay = 1000 * 1000 / 120;
83
unsigned int num_items(0);
85
osg::Timer_t last_tick(t.tick());
86
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_mutex);
87
for(Queue::iterator i = _queue.begin(); i != _queue.end(); )
89
osg::observer_ptr<VideoImageStream> stream(*i);
91
if (stream.valid() && stream->needsDispatching())
94
stream->decodeFrame();
101
stream->setDispatchQueue(NULL);
106
_numItems = num_items;
109
unsigned int dt = t.delta_u(last_tick, t.tick());
110
if (dt < frame_delay) {
111
OpenThreads::Thread::microSleep(frame_delay - dt);
118
// std::cout << this << " blocking" << std::endl;
126
void VideoFrameDispatchQueue::addItem(osgVideo::VideoImageStream *stream)
128
if (_finished) return;
130
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_mutex);
131
_queue.insert(stream);
132
stream->setDispatchQueue(this);
134
_numItems = _queue.size();
136
// std::cout << this << " release" << std::endl;
139
void VideoFrameDispatchQueue::removeItem(osgVideo::VideoImageStream* stream)
141
stream->setDispatchQueue(NULL);
142
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_mutex);
143
_queue.erase(stream);
144
_numItems = _queue.size();
147
VideoFrameDispatchQueue::~VideoFrameDispatchQueue()
156
VideoFrameDispatcher::VideoFrameDispatcher(unsigned int num_threads)
160
num_threads = num_threads ? num_threads : OpenThreads::GetNumberOfProcessors();
161
OSG_ALWAYS << "VideoFrameDispatcher: creating " << num_threads << " queues." << std::endl;
162
for(unsigned int i = 0; i < num_threads; ++i)
164
VideoFrameDispatchQueue* q = new VideoFrameDispatchQueue();
166
_queues.push_back(q);
171
void VideoFrameDispatcher::addToQueue(VideoImageStream *stream)
173
stream->setThreadSafeRefUnref(true);
174
if (stream->getDispatchQueue())
177
VideoFrameDispatchQueue* queue = *std::min_element(_queues.begin(), _queues.end(), VideoFrameDispatchQueueComparator());
178
queue->addItem(stream);
181
void VideoFrameDispatcher::removeFromQueue(VideoImageStream* stream)
183
if (stream->getDispatchQueue())
184
stream->getDispatchQueue()->removeItem(stream);
b'\\ No newline at end of file'