40
40
IMPL_DEBUG_MODULE( StreamProcessorManager, StreamProcessorManager, DEBUG_LEVEL_VERBOSE );
42
42
StreamProcessorManager::StreamProcessorManager(DeviceManager &p)
43
: m_time_of_transfer ( 0 )
45
, m_time_of_transfer2 ( 0 )
44
48
, m_SyncSource(NULL)
46
50
, m_xrun_happened( false )
47
51
, m_activity_wait_timeout_nsec( 0 ) // dynamically set
48
52
, m_nb_buffers( 0 )
50
55
, m_audio_datatype( eADT_Float )
51
56
, m_nominal_framerate ( 0 )
61
66
StreamProcessorManager::StreamProcessorManager(DeviceManager &p, unsigned int period,
62
67
unsigned int framerate, unsigned int nb_buffers)
68
: m_time_of_transfer ( 0 )
70
, m_time_of_transfer2 ( 0 )
64
73
, m_SyncSource(NULL)
66
75
, m_xrun_happened( false )
67
76
, m_activity_wait_timeout_nsec( 0 ) // dynamically set
68
77
, m_nb_buffers(nb_buffers)
70
80
, m_audio_datatype( eADT_Float )
71
81
, m_nominal_framerate ( framerate )
463
473
// get the options
464
474
int signal_delay_ticks = STREAMPROCESSORMANAGER_SIGNAL_DELAY_TICKS;
475
int xmit_prebuffer_frames = STREAMPROCESSORMANAGER_XMIT_PREBUFFER_FRAMES;
465
476
int sync_wait_time_msec = STREAMPROCESSORMANAGER_SYNC_WAIT_TIME_MSEC;
466
477
int cycles_for_startup = STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP;
467
478
int prestart_cycles_for_xmit = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_XMIT;
468
479
int prestart_cycles_for_recv = STREAMPROCESSORMANAGER_PRESTART_CYCLES_FOR_RECV;
469
480
Util::Configuration &config = m_parent.getConfiguration();
470
481
config.getValueForSetting("streaming.spm.signal_delay_ticks", signal_delay_ticks);
482
config.getValueForSetting("streaming.spm.xmit_prebuffer_frames", xmit_prebuffer_frames);
471
483
config.getValueForSetting("streaming.spm.sync_wait_time_msec", sync_wait_time_msec);
472
484
config.getValueForSetting("streaming.spm.cycles_for_startup", cycles_for_startup);
473
485
config.getValueForSetting("streaming.spm.prestart_cycles_for_xmit", prestart_cycles_for_xmit);
482
494
debugOutput( DEBUG_LEVEL_VERBOSE, "Finding minimal sync delay...\n");
483
495
int max_of_min_delay = 0;
484
496
int min_delay = 0;
497
int packet_size_frames = 0;
498
int max_packet_size_frames = 0;
485
500
for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
486
501
it != m_ReceiveProcessors.end();
488
503
min_delay = (*it)->getMaxFrameLatency();
489
504
if(min_delay > max_of_min_delay) max_of_min_delay = min_delay;
505
packet_size_frames = (*it)->getNominalFramesPerPacket();
506
if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
508
for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
509
it != m_TransmitProcessors.end();
511
packet_size_frames = (*it)->getNominalFramesPerPacket();
512
if(packet_size_frames > max_packet_size_frames) max_packet_size_frames = packet_size_frames;
514
debugOutput( DEBUG_LEVEL_VERBOSE, " max_of_min_delay = %d, max_packet_size_frames = %d...\n", max_of_min_delay, max_packet_size_frames);
492
516
// add some processing margin. This only shifts the time
493
517
// at which the buffer is transfer()'ed. This makes things somewhat
494
// more robust. It should be noted though that shifting the transfer
495
// time to a later time instant also causes the xmit buffer fill to be
497
max_of_min_delay += signal_delay_ticks;
499
m_SyncSource->setSyncDelay(max_of_min_delay);
500
unsigned int syncdelay = m_SyncSource->getSyncDelay();
501
debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay = %d => %d ticks (%03us %04uc %04ut)...\n",
502
max_of_min_delay, syncdelay,
503
(unsigned int)TICKS_TO_SECS(syncdelay),
504
(unsigned int)TICKS_TO_CYCLES(syncdelay),
505
(unsigned int)TICKS_TO_OFFSET(syncdelay));
519
m_sync_delay = max_of_min_delay + signal_delay_ticks;
507
521
//STEP X: when we implement such a function, we can wait for a signal from the devices that they
508
522
// have aquired lock
517
531
nb_sync_runs /= 1000;
518
532
nb_sync_runs /= getPeriodSize();
520
int64_t time_till_next_period;
521
534
while(nb_sync_runs--) { // or while not sync-ed?
522
535
// check if we were woken up too soon
523
time_till_next_period = m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
524
debugOutput( DEBUG_LEVEL_VERY_VERBOSE, "waiting for %d usecs...\n", time_till_next_period);
525
if(time_till_next_period > 0) {
526
// wait for the period
527
SleepRelativeUsec(time_till_next_period);
536
uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
537
uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
538
uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
541
int64_t now = Util::SystemTimeSource::getCurrentTime();
542
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR pred: %lld, syncdelay: %lld, diff: %lld\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
543
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
546
// wait until it's time to transfer
547
Util::SystemTimeSource::SleepUsecAbsolute(pred_system_time_at_xfer);
550
now = Util::SystemTimeSource::getCurrentTime();
551
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %lld, now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
531
555
debugOutput( DEBUG_LEVEL_VERBOSE, "Propagate sync info...\n");
532
556
// FIXME: in the SPM it would be nice to have system time instead of
559
float syncrate = 0.0;
560
float tpf = m_SyncSource->getTicksPerFrame();
562
syncrate = 24576000.0/tpf;
564
debugWarning("tpf <= 0? %f\n", tpf);
566
debugOutput( DEBUG_LEVEL_VERBOSE, " sync source frame rate: %f fps (%f tpf)\n", syncrate, tpf);
535
568
// we now should have decent sync info on the sync source
536
569
// determine a point in time where the system should start
537
570
// figure out where we are now
545
578
// start wet-running in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles
546
579
// this is the time window we have to setup all SP's such that they
547
580
// can start wet-running correctly.
581
// we have to round this time to an integer number of audio packets
582
double time_for_startup_abs = (double)(cycles_for_startup * TICKS_PER_CYCLE);
583
int time_for_startup_frames = (int)(time_for_startup_abs / tpf);
584
time_for_startup_frames = ((time_for_startup_frames / max_packet_size_frames) + 1) * max_packet_size_frames;
585
uint64_t time_for_startup_ticks = (uint64_t)((float)time_for_startup_frames * tpf);
548
587
time_of_first_sample = addTicks(time_of_first_sample,
549
cycles_for_startup * TICKS_PER_CYCLE);
588
time_for_startup_ticks);
589
debugOutput( DEBUG_LEVEL_VERBOSE, " add %d frames (%011llu ticks)...\n",
590
time_for_startup_frames, time_for_startup_ticks);
551
592
debugOutput( DEBUG_LEVEL_VERBOSE, " => first sample at TS=%011llu (%03us %04uc %04ut)...\n",
552
593
time_of_first_sample,
557
598
// we should start wet-running the transmit SP's some cycles in advance
558
599
// such that we know it is wet-running when it should output its first sample
559
uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
600
uint64_t time_to_start_xmit = substractTicks(time_of_first_sample,
560
601
prestart_cycles_for_xmit * TICKS_PER_CYCLE);
562
603
uint64_t time_to_start_recv = substractTicks(time_of_first_sample,
572
613
(unsigned int)TICKS_TO_CYCLES(time_to_start_recv),
573
614
(unsigned int)TICKS_TO_OFFSET(time_to_start_recv));
616
// print the sync delay
617
int sync_delay_frames = (int)((float)m_sync_delay / m_SyncSource->getTicksPerFrame());
618
debugOutput( DEBUG_LEVEL_VERBOSE, " sync delay: %d = %d + %d ticks (%03us %04uc %04ut) [%d frames]...\n",
619
m_sync_delay, max_of_min_delay, signal_delay_ticks,
620
(unsigned int)TICKS_TO_SECS(m_sync_delay),
621
(unsigned int)TICKS_TO_CYCLES(m_sync_delay),
622
(unsigned int)TICKS_TO_OFFSET(m_sync_delay),
625
// check if this can even work.
626
// the worst case point where we can receive a period is at 1 period + sync delay
627
// this means that the number of frames in the xmit buffer has to be at least
628
// 1 period + sync delay
629
if(xmit_prebuffer_frames + m_period * m_nb_buffers < m_period + sync_delay_frames) {
630
debugWarning("The amount of transmit buffer frames (%d) is too small (< %d). "
631
"This will most likely cause xruns.\n",
632
xmit_prebuffer_frames + m_period * m_nb_buffers,
633
m_period + sync_delay_frames);
575
636
// at this point the buffer head timestamp of the transmit buffers can be set
576
637
// this is the presentation time of the first sample in the buffer
577
638
for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
578
639
it != m_TransmitProcessors.end();
641
// set the number of prebuffer frames
642
(*it)->setExtraBufferFrames(xmit_prebuffer_frames);
644
// set the TSP of the first sample in the buffer
580
645
(*it)->setBufferHeadTimestamp(time_of_first_sample);
646
ffado_timestamp_t ts;
648
(*it)->getBufferHeadTimestamp ( &ts, &fc );
649
debugOutput( DEBUG_LEVEL_VERBOSE, " transmit buffer tail %010lld => head TS %010lld, fc=%d...\n",
650
time_of_first_sample, (uint64_t)ts, fc);
653
// the receive processors can be delayed by sync_delay ticks
654
// this means that in the worst case we have to be able to accomodate
655
// an extra sync_delay ticks worth of frames in the receive SP buffer
656
// the sync delay should be rounded to an integer amount of max_packet_size
657
int tmp = sync_delay_frames / max_packet_size_frames;
659
sync_delay_frames = tmp * max_packet_size_frames;
660
if (sync_delay_frames < 1024) sync_delay_frames = 1024; //HACK
662
for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
663
it != m_ReceiveProcessors.end();
665
// set the number of extra buffer frames
666
(*it)->setExtraBufferFrames(sync_delay_frames);
583
669
// switch syncsource to running state
619
705
// that will block the waitForPeriod call until everyone has started (theoretically)
620
706
// note: the SP's are scheduled to start in STREAMPROCESSORMANAGER_CYCLES_FOR_STARTUP cycles,
621
707
// so a 20 times this value should be a good timeout
622
int cnt = cycles_for_startup * 20; // by then it should have started
708
//int cnt = cycles_for_startup * 20; // by then it should have started
709
// or maybe we just have to use 1 second, as this wraps the cycle counter
623
711
while (!m_SyncSource->isRunning() && cnt) {
624
712
SleepRelativeUsec(125);
632
720
// the sync source is running, we can now read a decent received timestamp from it
633
721
m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
635
// and a (still very rough) approximation of the rate
723
// and a (rough) approximation of the rate
636
724
float rate = m_SyncSource->getTicksPerFrame();
637
int64_t delay_in_ticks=(int64_t)(((float)((m_nb_buffers-1) * m_period)) * rate);
638
// also add the sync delay
639
delay_in_ticks += m_SyncSource->getSyncDelay();
727
// the time at which the previous period would have passed
728
m_time_of_transfer2 = m_time_of_transfer;
729
m_time_of_transfer2 = substractTicks(m_time_of_transfer2, (uint64_t)(m_period * rate));
640
732
debugOutput( DEBUG_LEVEL_VERBOSE, " initial time of transfer %010lld, rate %f...\n",
641
733
m_time_of_transfer, rate);
735
// FIXME: ideally we'd want the SP itself to account for the xmit_prebuffer_frames
736
// but that would also require to use a different approach to setting the initial TSP's
737
int64_t delay_in_ticks = (int64_t)(((float)((m_nb_buffers-1) * m_period + xmit_prebuffer_frames)) * rate);
643
739
// then use this information to initialize the xmit handlers
645
741
// we now set the buffer tail timestamp of the transmit buffer
646
742
// to the period transfer time instant plus what's nb_buffers - 1
647
743
// in ticks. This due to the fact that we (should) have received one period
648
// worth of ticks at t=m_time_of_transfer
744
// worth of ticks at t = m_time_of_transfer
649
745
// hence one period of frames should also have been transmitted, which means
650
746
// that there should be (nb_buffers - 1) * periodsize of frames in the xmit buffer
747
// there are also xmit_prebuffer_frames frames extra present in the buffer
651
748
// that allows us to calculate the tail timestamp for the buffer.
653
750
int64_t transmit_tail_timestamp = addTicks(m_time_of_transfer, delay_in_ticks);
655
751
debugOutput( DEBUG_LEVEL_VERBOSE, " preset transmit tail TS %010lld, rate %f...\n",
656
752
transmit_tail_timestamp, rate);
658
754
for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
659
755
it != m_TransmitProcessors.end();
757
(*it)->setTicksPerFrame(rate);
661
758
(*it)->setBufferTailTimestamp(transmit_tail_timestamp);
662
(*it)->setTicksPerFrame(rate);
759
ffado_timestamp_t ts;
761
(*it)->getBufferHeadTimestamp ( &ts, &fc );
762
debugOutput( DEBUG_LEVEL_VERBOSE, " => transmit head TS %010lld, fc=%d...\n",
665
766
// align the received streams to be phase aligned
980
1081
"waiting for period (%d frames in buffer)...\n",
981
1082
m_SyncSource->getBufferFill());
982
1083
uint64_t ticks_at_period = m_SyncSource->getTimeAtPeriod();
983
uint64_t ticks_at_period_margin = ticks_at_period + m_SyncSource->getSyncDelay();
1084
uint64_t ticks_at_period_margin = ticks_at_period + m_sync_delay;
984
1085
uint64_t pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(ticks_at_period_margin);
987
1088
int64_t now = Util::SystemTimeSource::getCurrentTime();
988
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
1089
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "CTR pred: %lld, syncdelay: %lld, diff: %lld\n", ticks_at_period, ticks_at_period_margin, ticks_at_period_margin-ticks_at_period );
1090
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "PREWAIT pred: %lld, now: %lld, wait: %lld\n", pred_system_time_at_xfer, now, pred_system_time_at_xfer-now );
991
1093
// wait until it's time to transfer
995
1097
now = Util::SystemTimeSource::getCurrentTime();
996
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "pred: %lld now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
1098
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "POSTWAIT pred: %lld, now: %lld, excess: %lld\n", pred_system_time_at_xfer, now, now-pred_system_time_at_xfer );
999
1101
// the period should be ready now
1107
for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1108
it != m_ReceiveProcessors.end();
1110
rcv_fills[i] = (*it)->getBufferFill();
1111
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "RECV SP %p bufferfill: %05d\n", *it, rcv_fills[i]);
1115
for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1116
it != m_TransmitProcessors.end();
1118
xmt_fills[i] = (*it)->getBufferFill();
1119
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "XMIT SP %p bufferfill: %05d\n", *it, xmt_fills[i]);
1123
debugOutputExtreme(DEBUG_LEVEL_VERBOSE, "SP %02d RECV: %05d [%05d] XMIT: %05d [%05d] DIFF: %05d\n", i,
1124
rcv_fills[i], rcv_fills[i] - m_period,
1125
xmt_fills[i], xmt_fills[i] - m_period,
1126
rcv_fills[i] - xmt_fills[i]);
1001
1130
#if STREAMPROCESSORMANAGER_ALLOW_DELAYED_PERIOD_SIGNAL
1002
1131
// HACK: we force wait until every SP is ready. this is needed
1089
1218
m_time_of_transfer = m_SyncSource->getTimeAtPeriod();
1092
static uint64_t m_time_of_transfer2 = m_time_of_transfer;
1094
1221
int ticks_per_period = (int)(m_SyncSource->getTicksPerFrame() * m_period);
1095
int diff=diffTicks(m_time_of_transfer, m_time_of_transfer2);
1223
int diff = diffTicks(m_time_of_transfer, m_time_of_transfer2);
1096
1224
// display message if the difference between two successive tick
1097
1225
// values is more than 50 ticks. 1 sample at 48k is 512 ticks
1098
1226
// so 50 ticks = 10%, which is a rather large jitter value.
1107
1235
"transfer period %d at %llu ticks...\n",
1108
1236
m_nbperiods, m_time_of_transfer);
1110
// this is to notify the client of the delay that we introduced by waiting
1111
m_delayed_usecs = - m_SyncSource->getTimeUntilNextPeriodSignalUsecs();
1112
debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1113
"delayed for %d usecs...\n",
1117
1239
int rcv_bf=0, xmt_bf=0;
1118
1240
for ( StreamProcessorVectorIterator it = m_ReceiveProcessors.begin();
1119
1241
it != m_ReceiveProcessors.end();
1158
1280
"Xrun on XMIT SP %p due to buffer side xrun\n", *it);
1286
// this is to notify the client of the delay that we introduced by waiting
1287
pred_system_time_at_xfer = m_SyncSource->getParent().get1394Service().getSystemTimeForCycleTimerTicks(m_time_of_transfer);
1289
m_delayed_usecs = Util::SystemTimeSource::getCurrentTime() - pred_system_time_at_xfer;
1290
debugOutputExtreme(DEBUG_LEVEL_VERBOSE,
1291
"delayed for %d usecs...\n",
1163
1294
// now we can signal the client that we are (should be) ready
1164
1295
return !xrun_occurred;
1213
1344
// FIXME: in the SPM it would be nice to have system time instead of
1215
1346
float rate = m_SyncSource->getTicksPerFrame();
1216
int64_t one_ringbuffer_in_ticks=(int64_t)(((float)((m_nb_buffers * m_period))) * rate);
1218
// the data we are putting into the buffer is intended to be transmitted
1219
// one ringbuffer size after it has been received
1221
// we also add one syncdelay as a safety margin, since that's the amount of time we can get
1223
int syncdelay = m_SyncSource->getSyncDelay();
1224
int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1226
1348
for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1227
1349
it != m_TransmitProcessors.end();
1229
// FIXME: in the SPM it would be nice to have system time instead of
1351
// this is the delay in frames between the point where a frame is received and
1352
// when it is transmitted again
1353
unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1354
int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1356
// the data we are putting into the buffer is intended to be transmitted
1357
// one ringbuffer size after it has been received
1358
int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1231
1360
if(!(*it)->putFrames(m_period, transmit_timestamp)) {
1232
1361
debugWarning("could not putFrames(%u,%llu) to stream processor (%p)\n",
1233
1362
m_period, transmit_timestamp, *it);
1293
1422
// FIXME: in the SPM it would be nice to have system time instead of
1295
1424
float rate = m_SyncSource->getTicksPerFrame();
1296
int64_t one_ringbuffer_in_ticks=(int64_t)(((float)(m_nb_buffers * m_period)) * rate);
1298
// the data we are putting into the buffer is intended to be transmitted
1299
// one ringbuffer size after it has been received
1300
// we also add one syncdelay as a safety margin, since that's the amount of time we can get
1302
int syncdelay = m_SyncSource->getSyncDelay();
1303
int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks + syncdelay);
1305
1426
for ( StreamProcessorVectorIterator it = m_TransmitProcessors.begin();
1306
1427
it != m_TransmitProcessors.end();
1308
// FIXME: in the SPM it would be nice to have system time instead of
1429
// this is the delay in frames between the point where a frame is received and
1430
// when it is transmitted again
1431
unsigned int one_ringbuffer_in_frames = m_nb_buffers * m_period + (*it)->getExtraBufferFrames();
1432
int64_t one_ringbuffer_in_ticks = (int64_t)(((float)one_ringbuffer_in_frames) * rate);
1434
// the data we are putting into the buffer is intended to be transmitted
1435
// one ringbuffer size after it has been received
1436
int64_t transmit_timestamp = addTicks(m_time_of_transfer, one_ringbuffer_in_ticks);
1310
1438
if(!(*it)->putSilenceFrames(m_period, transmit_timestamp)) {
1311
1439
debugWarning("could not putSilenceFrames(%u,%llu) to stream processor (%p)\n",
1312
1440
m_period, transmit_timestamp, *it);