2
* @file YarpRobotNavigation.cpp
4
* This file YarpRobotNavigation.cpp is created at Almende B.V. It is open-source software and part
5
* of the Common Hybrid Agent Platform (CHAP). A toolbox with a lot of open-source tools.
6
* Ranging from thread pools, and TCP/IP components to control architectures and learning
7
* algorithms. This software is published under the GNU Lesser General Public license,
9
* It is not possible to add usage restrictions to an open-source license. Nevertheless,
10
* we personally strongly object against this software used by the military, in the
11
* bio-industry, for animal experimentation, or anything that violates the Universal
12
* Declaration of Human Rights.
14
* @author Andrei A. Rusu
16
* @project Replicator FP7
17
* @company Almende B.V.
27
#include <yarp/sig/all.h>
28
#include <yarp/os/all.h>
29
#include <yarp/dev/all.h>
32
#include <srInterface/YarpRobotNavigation.h>
34
using namespace srCore;
36
namespace srInterface {
38
/* **************************************************************************************
39
* Protocol vocab commands
40
* **************************************************************************************/
42
#define RPC_VOCAB_HELP VOCAB4('h','e','l','p')
43
#define RPC_VOCAB_SET VOCAB3('s','e','t')
44
#define RPC_VOCAB_GET VOCAB3('g','e','t')
45
#define RPC_VOCAB_IS VOCAB2('i','s')
46
#define RPC_VOCAB_FAILED VOCAB4('f','a','i','l')
47
#define RPC_VOCAB_OK VOCAB2('o','k')
48
#define RPC_VOCAB_ADV VOCAB3('a','d','v')
49
#define RPC_VOCAB_ROT VOCAB3('r','o','t')
50
#define RPC_VOCAB_UADV VOCAB4('u','a','d','v')
51
#define RPC_VOCAB_UROT VOCAB4('u','r','o','t')
52
#define RPC_VOCAB_CLR VOCAB3('c','l','r')
53
#define RPC_VOCAB_QLMT VOCAB4('q','l','m','t')
54
#define RPC_VOCAB_NOW VOCAB3('n','o','w')
55
#define RPC_VOCAB_SYNC VOCAB4('s','y','n','c')
56
#define RPC_VOCAB_STRF VOCAB4('s','t','r','f')
58
/* **************************************************************************************
59
* Implementation of YarpRobotNavigation
60
* **************************************************************************************/
62
YarpRobotNavigation::YarpRobotNavigation(const std::string &name,
63
RobotActorBase *robotActor) :
64
Agent(name, robotActor), RPC_ADV_UNIT(0.1), RPC_ROT_UNIT(1.0), RPC_QLMT(10000) {
67
osg::Vec3 rot = getRobotRotation();
69
setRobotRotation(rot);
72
YarpRobotNavigation::~YarpRobotNavigation() {
75
void YarpRobotNavigation::SetPort(const std::string &portName) {
76
if (!rpcPort.open(portName.c_str())) {
77
stringstream msg; msg.clear(); msg.str("");
78
msg << "Could not open RpcPort name: " << portName << ". Exiting...";
80
LOG_ALWAYS("The RPC thread will not be started");
85
rpcThread = new YarpRobotNavigationRpcThread(this);
90
//#define DEBUG_AGGREGATE
92
bool YarpRobotNavigation::respond(
93
const std::list<yarp::os::Bottle> &commandList, yarp::os::Bottle &reply) {
98
// quick exit on empty command list
99
if (commandList.size() == 0) {
101
#ifdef DEBUG_AGGREGATE
102
std::stringstream ossLog;
103
ossLog << "Empty command list.";
104
LOG_INFO(ossLog.str().c_str());
105
#endif //DEBUG_AGGREGATE
106
reply.addVocab(RPC_VOCAB_OK);
110
// get exclusive rights to resource
114
// aggregate advancement
115
double aggregateAdv = 0.0;
117
double aggregateStrf = 0.0;
118
// aggregate rotation
119
double aggregateRot = 0.0;
121
#ifdef DEBUG_AGGREGATE
122
std::stringstream ossLog;
124
ossLog << "Aggregating " << commandList.size() << " commands." << endl;
125
#endif //DEBUG_AGGREGATE
126
std::list<yarp::os::Bottle>::const_iterator command = commandList.begin();
128
// evaluate all commands
129
while (command != commandList.end()) {
130
#ifdef DEBUG_AGGREGATE
131
ossLog << command->toString() << endl;
132
#endif //DEBUG_AGGREGATE
134
switch (command->get(0).asVocab()) {
136
aggregateAdv += (command->get(1).asDouble());
140
aggregateStrf += (command->get(1).asDouble());
144
aggregateRot += (command->get(1).asDouble());
148
// if one command is not ok, break operation
152
// advance to the next command
157
reply.addVocab(RPC_VOCAB_FAILED);
159
#ifdef DEBUG_AGGREGATE
160
ossLog << "Executing commands: adv " << aggregateAdv << "; rot: "
161
<< aggregateRot << "; strf: "<<aggregateStrf << endl;
162
LOG_INFO(ossLog.str().c_str());
163
#endif //DEBUG_AGGREGATE
164
// execute aggregate commands
165
if (abs(aggregateAdv) > 0.0)
166
advanceX(aggregateAdv);
167
if (abs(aggregateStrf) > 0.0)
168
strafeX(aggregateStrf);
169
if (abs(aggregateRot) > 0.0)
170
rotateX(aggregateRot);
171
reply.addVocab(RPC_VOCAB_OK);
180
bool YarpRobotNavigation::respond(const yarp::os::Bottle &command,
181
yarp::os::Bottle &reply) {
183
// get exclusive rights to resource
187
bool rec = false; // is the command recognized?
190
switch (command.get(0).asVocab()) {
193
argDouble = command.get(1).asDouble();
195
reply.addVocab(RPC_VOCAB_STRF);
196
reply.addDouble(argDouble);
201
argDouble = command.get(1).asDouble();
203
reply.addVocab(RPC_VOCAB_ADV);
204
reply.addDouble(argDouble);
209
argDouble = command.get(1).asDouble();
211
reply.addVocab(RPC_VOCAB_ROT);
212
reply.addDouble(argDouble);
220
reply.addVocab(RPC_VOCAB_FAILED);
222
reply.addVocab(RPC_VOCAB_OK);
230
//////////////////////////////////////////////////////////////////////////
231
void YarpRobotNavigation::ProcessMessage(const dtGame::Message& message) {
232
Agent::ProcessMessage(message);
234
if (message.GetMessageType() == dtGame::MessageType::TICK_LOCAL) {
238
// Evaluate front left distance sensor
239
// unsigned long val;
240
// DistanceSensor *flDistanceSensor =
241
// dynamic_cast<DistanceSensor*> (getSensor(
242
// "Front Left DistanceSensor").get());
243
// flDistanceSensor->evaluate(MAKE_PARAMETER_VALUE(&(val)));
245
// // Evaluate front right distance sensor
246
// DistanceSensor *frDistanceSensor =
247
// dynamic_cast<DistanceSensor*> (getSensor(
248
// "Front Right DistanceSensor").get());
249
// frDistanceSensor->evaluate(MAKE_PARAMETER_VALUE(&(val)));
252
std::list<yarp::os::Bottle> commandList;
254
// execute a command from the command queue, if any
255
if (commandQueueMutex.check()) {
256
// LOG_ALWAYS("Lock queue to check size.");
257
if (mainCommandQueue.size() > 0) {
258
// LOG_ALWAYS("Executing first command in queue.");
259
// handle the call using the controller's respond method
260
commandList.push_back(mainCommandQueue.front());
262
mainCommandQueue.pop_front();
263
// LOG_ALWAYS("First command in queue executed.");
266
// execute commands from remaining sync queues and remove the empty ones
267
std::list<std::deque<yarp::os::Bottle> >::iterator qsit =
268
syncCommandQueues.begin();
272
std::stringstream oss;
273
oss << "Size of MAIN queue: " << mainCommandQueue.size();
274
LOG_ALWAYS(oss.str());
278
std::stringstream oss;
279
oss << "Sync queue count: " << syncCommandQueues.size();
280
LOG_ALWAYS(oss.str());
283
std::stringstream ossLog;
285
if (qsit != syncCommandQueues.end())
286
ossLog << "Size of sync queue: ";
287
#endif //DEBUG_QUEUES
288
// execute a command from all non-empty synchronous queues,
290
while (qsit != syncCommandQueues.end()) {
293
ossLog << qsit->size() << " ";
294
#endif //DEBUG_QUEUES
296
// erase empty queue, move forward
297
qsit = syncCommandQueues.erase(qsit);
299
// commands still present in this sync queue, execute
300
commandList.push_back(qsit->front());
306
if (qsit == syncCommandQueues.end())
307
LOG_ALWAYS(ossLog.str());
308
#endif //DEBUG_QUEUES
311
if (commandList.size() > 0) {
312
/// EXECUTE COMMAND QUEUE
313
yarp::os::Bottle reply;
314
respond(commandList, reply);
315
if (reply.get(0).asVocab() != RPC_VOCAB_OK) {
317
std::stringstream oss;
319
<< "Respond() issued a failure reply. Command list not correct: "
320
<< reply.toString() << endl;
321
LOG_ALWAYS(oss.str());
327
commandQueueMutex.post();
333
void YarpRobotNavigation::setSpeed(float speedL, float speedR) {
335
float dt = getDeltaSimTime();
337
osg::Vec3 trans = getRobotPosition();
338
osg::Vec3 rot = getRobotRotation();
341
std::ostringstream oss;
342
oss << "Trans: " << trans << "; Rot: " << rot << endl;
345
float d = dt * (speedL + speedR) / 2.0;
346
float phi = dt * (speedR - speedL) / 0.001;
349
oss << "Phi: " << phi << "; Dt: " << dt << endl;
351
trans[0] += (d * cos(phi));
352
trans[1] += (d * sin(phi));
357
oss << "NEW Trans: " << trans << "; NEW Rot: " << rot << endl;
361
setRobotRotation(rot);
362
setRobotTranslation(trans);
365
void YarpRobotNavigation::advanceX(float X) {
366
osg::Vec3 trans = getRobotPosition();
367
osg::Vec3 rot = getRobotRotation();
371
const float angleRad = ((rot[0] > 0 ? rot[0] : (360 + rot[0])) * M_PI
374
trans[0] += (X * cos(angleRad));
375
trans[1] += (X * sin(angleRad));
377
setRobotTranslation(trans);
380
void YarpRobotNavigation::strafeX(float X) {
381
osg::Vec3 trans = getRobotPosition();
382
osg::Vec3 rot = getRobotRotation();
386
const float angleRad = ((rot[0] > 0 ? rot[0] : (360 + rot[0])) * M_PI
389
trans[0] += (X * cos(angleRad));
390
trans[1] += (X * sin(angleRad));
392
setRobotTranslation(trans);
395
void YarpRobotNavigation::rotateX(float X) {
396
osg::Vec3 rot = getRobotRotation();
399
std::ostringstream oss;
400
oss << "Rot: " << rot << endl;
406
oss << "NEW Rot: " << rot << endl;
409
setRobotRotation(rot);
412
void YarpRobotNavigation::task() {
420
/* **************************************************************************************
421
* Implementation of YarpRobotNavigationRpcThread
422
* **************************************************************************************/
424
void YarpRobotNavigationRpcThread::run() {
425
yarp::os::Bottle command, replay;
427
// repeat while this thread is not asked to stop
428
while (isStopping() != true) {
429
// do a blocking read from the rpc port
430
if (ctrl->rpcPort.read(command, true)) {
432
yarp::os::Bottle reply;
434
// pass the command to the sequencer
435
sequencer(command, reply);
437
ctrl->rpcPort.reply(reply);
443
void YarpRobotNavigationRpcThread::sequencer(yarp::os::Bottle &command,
444
yarp::os::Bottle &reply) {
446
// wait for exclusive privileges to the queue
447
ctrl->commandQueueMutex.wait();
450
// std::stringstream oss;
451
// oss << "Inserting command: " << command.toString()
452
// << " into queue of size: " << (ctrl->commandQueue.size())
454
// LOG_ALWAYS(oss.str());
458
bool rec = false; // is the command recognized?
460
std::list<std::deque<yarp::os::Bottle> >::iterator qsit;
462
yarp::os::Bottle subCommand;
463
unsigned int repeats = 0;
465
switch (command.get(0).asVocab()) {
466
case RPC_VOCAB_HELP: {
468
reply.addString("Commands:");
470
"adv <double> // enqueues advance commands with a total effect of <double> simulator length units at the BACK of the main queue ******");
472
"strf <double> // enqueues strafe commands with a total effect of <double> simulator length units at the BACK of the main queue ******");
474
"rot <double> // enqueues rotate commands with a total effect of <double> simulator rotation units at the BACK of the main queue ******");
476
"now adv <double> // enqueues rotate commands with a total effect of <double> simulator length units at the FRONT of the main queue ******");
478
"now strf <double> // enqueues strafe commands with a total effect of <double> simulator length units at the FRONT of the main queue ******");
480
"now rot <double> // enqueues rotate commands with a total effect of <double> simulator rotation units at the FRONT of the main queue ******");
482
"sync adv <double> // enqueues rotate commands with a total effect of <double> simulator length units in a new queue, which is executed in parallel with the main queue ******");
484
"sync strf <double> // enqueues strafe commands with a total effect of <double> simulator length units in a new queue, which is executed in parallel with the main queue ******");
486
"sync rot <double> // enqueues rotate commands with a total effect of <double> simulator rotation units in a new queue, which is executed in parallel with the main queue ******");
488
"set uadv <double> // sets a new value for advancement unit, executed in a ProcessMessage call; this affects the speed of advancement ******");
490
"set urot <double> // sets a new value for rotation unit, executed in a ProcessMessage call; this affects the speed of rotation ******");
492
"set qlmt <double> // modifies the maximum length of all execution queues; queues above this size are trimmed from the front end ******");
494
"get uadv // gets the current value for advancement unit ******");
496
"get urot // gets current value for rotation unit ******");
498
"get qlmt // gets current value for maximum queue length ******");
499
reply.addString("clr // clears all execution queues ******");
504
case RPC_VOCAB_CLR: {
506
reply.addVocab(RPC_VOCAB_CLR);
508
ctrl->mainCommandQueue.clear();
509
qsit = ctrl->syncCommandQueues.begin();
510
// clear all synchronous queues
511
while (qsit != ctrl->syncCommandQueues.end()) {
519
case RPC_VOCAB_SYNC: {
521
reply.addVocab(RPC_VOCAB_SYNC);
522
switch (command.get(1).asVocab()) {
524
argDouble = command.get(2).asDouble();
526
// break down the command in sub-commands
528
subCommand.addVocab(command.get(1).asVocab());
530
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
532
// cannot introduce more commands than the maximum queue size
533
if (repeats > ctrl->RPC_QLMT) {
534
std::stringstream oss;
535
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
536
<< " commands in the queue: " << repeats
537
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
538
LOG_ALWAYS(oss.str());
540
// introduce only the maximum queue size
541
repeats = ctrl->RPC_QLMT;
544
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
546
// create a new synchronous queue
547
ctrl->syncCommandQueues.push_back(std::deque<yarp::os::Bottle>());
549
// insert sub-commands into the new sync queue
550
while (repeats-- > 0)
551
ctrl->syncCommandQueues.back().push_back(subCommand);
553
reply.addVocab(RPC_VOCAB_STRF);
554
reply.addDouble(argDouble);
558
argDouble = command.get(2).asDouble();
560
// break down the command in sub-commands
562
subCommand.addVocab(command.get(1).asVocab());
564
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
566
// cannot introduce more commands than the maximum queue size
567
if (repeats > ctrl->RPC_QLMT) {
568
std::stringstream oss;
569
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
570
<< " commands in the queue: " << repeats
571
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
572
LOG_ALWAYS(oss.str());
574
// introduce only the maximum queue size
575
repeats = ctrl->RPC_QLMT;
578
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
580
// create a new synchronous queue
581
ctrl->syncCommandQueues.push_back(std::deque<yarp::os::Bottle>());
583
// insert sub-commands into the new sync queue
584
while (repeats-- > 0)
585
ctrl->syncCommandQueues.back().push_back(subCommand);
587
reply.addVocab(RPC_VOCAB_ADV);
588
reply.addDouble(argDouble);
592
argDouble = command.get(2).asDouble();
594
// break down the command in sub-commands
596
subCommand.addVocab(command.get(1).asVocab());
598
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ROT_UNIT);
600
// cannot introduce more commands than the maximum queue size
601
if (repeats > ctrl->RPC_QLMT) {
602
std::stringstream oss;
603
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
604
<< " commands in the queue: " << repeats
605
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
606
LOG_ALWAYS(oss.str());
608
// introduce only the maximum queue size
609
repeats = ctrl->RPC_QLMT;
612
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ROT_UNIT);
614
// create a new synchronous queue
615
ctrl->syncCommandQueues.push_back(std::deque<yarp::os::Bottle>());
617
// insert sub-commands into the new sync queue
618
while (repeats-- > 0)
619
ctrl->syncCommandQueues.back().push_back(subCommand);
621
reply.addVocab(RPC_VOCAB_ROT);
622
reply.addDouble(argDouble);
628
case RPC_VOCAB_SET: {
630
reply.addVocab(RPC_VOCAB_SET);
631
switch (command.get(1).asVocab()) {
632
case RPC_VOCAB_UADV: {
634
reply.addVocab(RPC_VOCAB_UADV);
635
ctrl->RPC_ADV_UNIT = command.get(2).asDouble();
639
case RPC_VOCAB_UROT: {
641
reply.addVocab(RPC_VOCAB_UROT);
642
ctrl->RPC_ROT_UNIT = command.get(2).asDouble();
646
case RPC_VOCAB_QLMT: {
648
reply.addVocab(RPC_VOCAB_QLMT);
649
ctrl->RPC_QLMT = (unsigned int) command.get(2).asInt();
656
case RPC_VOCAB_GET: {
658
switch (command.get(1).asVocab()) {
659
case RPC_VOCAB_UADV: {
661
reply.addVocab(RPC_VOCAB_UADV);
662
reply.addDouble(ctrl->RPC_ADV_UNIT);
666
case RPC_VOCAB_UROT: {
668
reply.addVocab(RPC_VOCAB_UROT);
669
reply.addDouble(ctrl->RPC_ROT_UNIT);
673
case RPC_VOCAB_QLMT: {
675
reply.addVocab(RPC_VOCAB_QLMT);
676
reply.addInt(ctrl->RPC_QLMT);
684
case RPC_VOCAB_NOW: {
686
reply.addVocab(RPC_VOCAB_NOW);
687
switch (command.get(1).asVocab()) {
689
argDouble = command.get(2).asDouble();
691
// break down the command in sub-commands
693
subCommand.addVocab(command.get(1).asVocab());
695
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
697
// cannot introduce more commands than the maximum queue size
698
if (repeats > ctrl->RPC_QLMT) {
699
std::stringstream oss;
700
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
701
<< " commands in the queue: " << repeats
702
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
703
LOG_ALWAYS(oss.str());
705
// introduce only the maximum queue size
706
repeats = ctrl->RPC_QLMT;
709
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
711
// insert sub-commands into the queue
712
while (repeats-- > 0)
713
ctrl->mainCommandQueue.push_front(subCommand);
715
reply.addVocab(RPC_VOCAB_STRF);
716
reply.addDouble(argDouble);
720
argDouble = command.get(2).asDouble();
722
// break down the command in sub-commands
724
subCommand.addVocab(command.get(1).asVocab());
726
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
728
// cannot introduce more commands than the maximum queue size
729
if (repeats > ctrl->RPC_QLMT) {
730
std::stringstream oss;
731
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
732
<< " commands in the queue: " << repeats
733
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
734
LOG_ALWAYS(oss.str());
736
// introduce only the maximum queue size
737
repeats = ctrl->RPC_QLMT;
740
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
742
// insert sub-commands into the queue
743
while (repeats-- > 0)
744
ctrl->mainCommandQueue.push_front(subCommand);
746
reply.addVocab(RPC_VOCAB_ADV);
747
reply.addDouble(argDouble);
751
argDouble = command.get(2).asDouble();
753
// break down the command in sub-commands
755
subCommand.addVocab(command.get(1).asVocab());
757
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ROT_UNIT);
759
// cannot introduce more commands than the maximum queue size
760
if (repeats > ctrl->RPC_QLMT) {
761
std::stringstream oss;
762
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
763
<< " commands in the queue: " << repeats
764
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
765
LOG_ALWAYS(oss.str());
767
// introduce only the maximum queue size
768
repeats = ctrl->RPC_QLMT;
771
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ROT_UNIT);
773
// insert sub-commands into the queue
774
while (repeats-- > 0)
775
ctrl->mainCommandQueue.push_front(subCommand);
777
reply.addVocab(RPC_VOCAB_ROT);
778
reply.addDouble(argDouble);
786
argDouble = command.get(1).asDouble();
788
// break down the command in sub-commands
790
subCommand.addVocab(command.get(0).asVocab());
792
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
794
// cannot introduce more commands than the maximum queue size
795
if (repeats > ctrl->RPC_QLMT) {
796
std::stringstream oss;
797
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
798
<< " commands in the queue: " << repeats
799
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
800
LOG_ALWAYS(oss.str());
802
// introduce only the maximum queue size
803
repeats = ctrl->RPC_QLMT;
806
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
808
// insert sub-commands into the queue
809
while (repeats-- > 0)
810
ctrl->mainCommandQueue.push_back(subCommand);
812
reply.addVocab(RPC_VOCAB_STRF);
813
reply.addDouble(argDouble);
818
argDouble = command.get(1).asDouble();
820
// break down the command in sub-commands
822
subCommand.addVocab(command.get(0).asVocab());
824
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
826
// cannot introduce more commands than the maximum queue size
827
if (repeats > ctrl->RPC_QLMT) {
828
std::stringstream oss;
829
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
830
<< " commands in the queue: " << repeats
831
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
832
LOG_ALWAYS(oss.str());
834
// introduce only the maximum queue size
835
repeats = ctrl->RPC_QLMT;
838
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
840
// insert sub-commands into the queue
841
while (repeats-- > 0)
842
ctrl->mainCommandQueue.push_back(subCommand);
844
reply.addVocab(RPC_VOCAB_ADV);
845
reply.addDouble(argDouble);
850
argDouble = command.get(1).asDouble();
852
// break down the command in sub-commands
854
subCommand.addVocab(command.get(0).asVocab());
856
repeats = (unsigned int) abs(argDouble / ctrl->RPC_ROT_UNIT);
858
// cannot introduce more commands than the maximum queue size
859
if (repeats > ctrl->RPC_QLMT) {
860
std::stringstream oss;
861
oss << "Asking to introduce more than " << ctrl->RPC_QLMT
862
<< " commands in the queue: " << repeats
863
<< ". Trimming down to: " << ctrl->RPC_QLMT << endl;
864
LOG_ALWAYS(oss.str());
866
// introduce only the maximum queue size
867
repeats = ctrl->RPC_QLMT;
870
subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ROT_UNIT);
872
// insert sub-commands into the queue
873
while (repeats-- > 0)
874
ctrl->mainCommandQueue.push_back(subCommand);
876
reply.addVocab(RPC_VOCAB_ROT);
877
reply.addDouble(argDouble);
885
reply.addVocab(RPC_VOCAB_FAILED);
887
reply.addVocab(RPC_VOCAB_OK);
889
// log when discarding commands
890
if (ctrl->mainCommandQueue.size() > ctrl->RPC_QLMT) {
891
std::stringstream oss;
892
oss << "More than " << ctrl->RPC_QLMT << " commands in the queue: "
893
<< (ctrl->mainCommandQueue.size()) << ". Clearing oldest."
895
LOG_ALWAYS(oss.str());
897
// if more than RPC_QLMT command in queue, discard the oldest
899
ctrl->mainCommandQueue.pop_front();
900
} while (ctrl->mainCommandQueue.size() > ctrl->RPC_QLMT);
902
std::stringstream oss;
903
oss << "New size of queue: " << ctrl->mainCommandQueue.size()
905
LOG_ALWAYS(oss.str());
909
// trim commands from remaining sync queues and remove the empty ones
910
std::list<std::deque<yarp::os::Bottle> >::iterator qsit =
911
ctrl->syncCommandQueues.begin();
913
// trim a command from all non-empty synchronous queues,
914
while (qsit != ctrl->syncCommandQueues.end()) {
917
// erase empty queue, move forward
918
qsit = ctrl->syncCommandQueues.erase(qsit);
920
// log when discarding commands
921
if (qsit->size() > ctrl->RPC_QLMT) {
922
std::stringstream oss;
923
oss << "More than " << ctrl->RPC_QLMT
924
<< " commands in the queue: " << (qsit->size())
925
<< ". Clearing oldest." << endl;
926
LOG_ALWAYS(oss.str());
928
// if more than RPC_QLMT command in queue, discard the oldest
931
} while (qsit->size() > ctrl->RPC_QLMT);
932
// show result of trimming
934
std::stringstream oss;
935
oss << "New size of queue: " << qsit->size() << endl;
936
LOG_ALWAYS(oss.str());
944
// release lock on queue
945
ctrl->commandQueueMutex.post();
948
} // end of namespace srInterface