~robot3d-team/robot3d/trunk

« back to all changes in this revision

Viewing changes to src/srInterface/YarpRobotNavigation.cpp

  • Committer: Anne van Rossum
  • Date: 2010-08-10 15:58:55 UTC
  • Revision ID: anne@gamix-20100810155855-kve7x2vwouagdij9
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * @file YarpRobotNavigation.cpp
 
3
 *
 
4
 * This file YarpRobotNavigation.cpp is created at Almende B.V. It is open-source software and part
 
5
 * of the Common Hybrid Agent Platform (CHAP). A toolbox with a lot of open-source tools.
 
6
 * Ranging from thread pools, and TCP/IP components to control architectures and learning
 
7
 * algorithms. This software is published under the GNU Lesser General Public license,
 
8
 *
 
9
 * It is not possible to add usage restrictions to an open-source license. Nevertheless,
 
10
 * we personally strongly object against this software used by the military, in the
 
11
 * bio-industry, for animal experimentation, or anything that violates the Universal
 
12
 * Declaration of Human Rights.
 
13
 *
 
14
 * @author      Andrei A. Rusu
 
15
 * @date        Jun 15, 2010
 
16
 * @project     Replicator FP7
 
17
 * @company     Almende B.V.
 
18
 * @case        yarp
 
19
 */
 
20
 
 
21
// General files
 
22
#include <iostream>
 
23
#include <iomanip>
 
24
#include <deque>
 
25
 
 
26
// YARP
 
27
#include <yarp/sig/all.h>
 
28
#include <yarp/os/all.h>
 
29
#include <yarp/dev/all.h>
 
30
 
 
31
// Robot3D files
 
32
#include <srInterface/YarpRobotNavigation.h>
 
33
 
 
34
using namespace srCore;
 
35
 
 
36
namespace srInterface {
 
37
 
 
38
/* **************************************************************************************
 
39
 * Protocol vocab commands
 
40
 * **************************************************************************************/
 
41
 
 
42
#define RPC_VOCAB_HELP VOCAB4('h','e','l','p')
 
43
#define RPC_VOCAB_SET VOCAB3('s','e','t')
 
44
#define RPC_VOCAB_GET VOCAB3('g','e','t')
 
45
#define RPC_VOCAB_IS VOCAB2('i','s')
 
46
#define RPC_VOCAB_FAILED VOCAB4('f','a','i','l')
 
47
#define RPC_VOCAB_OK VOCAB2('o','k')
 
48
#define RPC_VOCAB_ADV VOCAB3('a','d','v')
 
49
#define RPC_VOCAB_ROT VOCAB3('r','o','t')
 
50
#define RPC_VOCAB_UADV VOCAB4('u','a','d','v')
 
51
#define RPC_VOCAB_UROT VOCAB4('u','r','o','t')
 
52
#define RPC_VOCAB_CLR VOCAB3('c','l','r')
 
53
#define RPC_VOCAB_QLMT VOCAB4('q','l','m','t')
 
54
#define RPC_VOCAB_NOW VOCAB3('n','o','w')
 
55
#define RPC_VOCAB_SYNC VOCAB4('s','y','n','c')
 
56
#define RPC_VOCAB_STRF VOCAB4('s','t','r','f')
 
57
 
 
58
/* **************************************************************************************
 
59
 * Implementation of YarpRobotNavigation
 
60
 * **************************************************************************************/
 
61
 
 
62
YarpRobotNavigation::YarpRobotNavigation(const std::string &name,
 
63
                RobotActorBase *robotActor) :
 
64
                Agent(name, robotActor), RPC_ADV_UNIT(0.1), RPC_ROT_UNIT(1.0), RPC_QLMT(10000) {
 
65
 
 
66
        start();
 
67
        osg::Vec3 rot = getRobotRotation();
 
68
        rot[0] = -1;
 
69
        setRobotRotation(rot);
 
70
}
 
71
 
 
72
YarpRobotNavigation::~YarpRobotNavigation() {
 
73
}
 
74
 
 
75
void YarpRobotNavigation::SetPort(const std::string &portName) {
 
76
        if (!rpcPort.open(portName.c_str())) {
 
77
                stringstream msg; msg.clear(); msg.str("");
 
78
                msg << "Could not open RpcPort name: " << portName << ". Exiting...";
 
79
                LOG_ERROR(msg.str());
 
80
                LOG_ALWAYS("The RPC thread will not be started");
 
81
                return;
 
82
        }
 
83
 
 
84
        // start RPC thread
 
85
        rpcThread = new YarpRobotNavigationRpcThread(this);
 
86
        rpcThread->start();
 
87
 
 
88
}
 
89
 
 
90
//#define DEBUG_AGGREGATE
 
91
 
 
92
bool YarpRobotNavigation::respond(
 
93
                const std::list<yarp::os::Bottle> &commandList, yarp::os::Bottle &reply) {
 
94
 
 
95
        // clear reply
 
96
        reply.clear();
 
97
 
 
98
        // quick exit on empty command list
 
99
        if (commandList.size() == 0) {
 
100
 
 
101
#ifdef DEBUG_AGGREGATE
 
102
                std::stringstream ossLog;
 
103
                ossLog << "Empty command list.";
 
104
                LOG_INFO(ossLog.str().c_str());
 
105
#endif //DEBUG_AGGREGATE
 
106
                reply.addVocab(RPC_VOCAB_OK);
 
107
                return true;
 
108
        }
 
109
 
 
110
        // get exclusive rights to resource
 
111
        rpcMutex.wait();
 
112
 
 
113
        bool ok = false;
 
114
        // aggregate advancement
 
115
        double aggregateAdv = 0.0;
 
116
        // aggregate strafe
 
117
        double aggregateStrf = 0.0;
 
118
        // aggregate rotation
 
119
        double aggregateRot = 0.0;
 
120
 
 
121
#ifdef DEBUG_AGGREGATE
 
122
        std::stringstream ossLog;
 
123
 
 
124
        ossLog << "Aggregating " << commandList.size() << " commands." << endl;
 
125
#endif //DEBUG_AGGREGATE
 
126
        std::list<yarp::os::Bottle>::const_iterator command = commandList.begin();
 
127
 
 
128
        // evaluate all commands
 
129
        while (command != commandList.end()) {
 
130
#ifdef DEBUG_AGGREGATE
 
131
                ossLog << command->toString() << endl;
 
132
#endif //DEBUG_AGGREGATE
 
133
                ok = false;
 
134
                switch (command->get(0).asVocab()) {
 
135
                case RPC_VOCAB_ADV:
 
136
                        aggregateAdv += (command->get(1).asDouble());
 
137
                        ok = true;
 
138
                        break;
 
139
                case RPC_VOCAB_STRF:
 
140
                        aggregateStrf += (command->get(1).asDouble());
 
141
                        ok = true;
 
142
                        break;
 
143
                case RPC_VOCAB_ROT:
 
144
                        aggregateRot += (command->get(1).asDouble());
 
145
                        ok = true;
 
146
                        break;
 
147
                }
 
148
                // if one command is not ok, break operation
 
149
                if (!ok) {
 
150
                        break;
 
151
                }
 
152
                // advance to the next command
 
153
                ++command;
 
154
        }
 
155
 
 
156
        if (!ok) {
 
157
                reply.addVocab(RPC_VOCAB_FAILED);
 
158
        } else {
 
159
#ifdef DEBUG_AGGREGATE
 
160
                ossLog << "Executing commands: adv " << aggregateAdv << "; rot: "
 
161
                                << aggregateRot << "; strf: "<<aggregateStrf << endl;
 
162
                LOG_INFO(ossLog.str().c_str());
 
163
#endif //DEBUG_AGGREGATE
 
164
                // execute aggregate commands
 
165
                if (abs(aggregateAdv) > 0.0)
 
166
                        advanceX(aggregateAdv);
 
167
                if (abs(aggregateStrf) > 0.0)
 
168
                        strafeX(aggregateStrf);
 
169
                if (abs(aggregateRot) > 0.0)
 
170
                        rotateX(aggregateRot);
 
171
                reply.addVocab(RPC_VOCAB_OK);
 
172
        }
 
173
 
 
174
        // release lock
 
175
        rpcMutex.post();
 
176
 
 
177
        return ok;
 
178
}
 
179
 
 
180
bool YarpRobotNavigation::respond(const yarp::os::Bottle &command,
 
181
                yarp::os::Bottle &reply) {
 
182
 
 
183
        // get exclusive rights to resource
 
184
        rpcMutex.wait();
 
185
 
 
186
        bool ok = false;
 
187
        bool rec = false; // is the command recognized?
 
188
        double argDouble;
 
189
 
 
190
        switch (command.get(0).asVocab()) {
 
191
        case RPC_VOCAB_STRF:
 
192
                rec = true;
 
193
                argDouble = command.get(1).asDouble();
 
194
                strafeX(argDouble);
 
195
                reply.addVocab(RPC_VOCAB_STRF);
 
196
                reply.addDouble(argDouble);
 
197
                ok = true;
 
198
                break;
 
199
        case RPC_VOCAB_ADV:
 
200
                rec = true;
 
201
                argDouble = command.get(1).asDouble();
 
202
                advanceX(argDouble);
 
203
                reply.addVocab(RPC_VOCAB_ADV);
 
204
                reply.addDouble(argDouble);
 
205
                ok = true;
 
206
                break;
 
207
        case RPC_VOCAB_ROT:
 
208
                rec = true;
 
209
                argDouble = command.get(1).asDouble();
 
210
                rotateX(argDouble);
 
211
                reply.addVocab(RPC_VOCAB_ROT);
 
212
                reply.addDouble(argDouble);
 
213
                ok = true;
 
214
                break;
 
215
 
 
216
        }
 
217
 
 
218
        if (!ok) {
 
219
                reply.clear();
 
220
                reply.addVocab(RPC_VOCAB_FAILED);
 
221
        } else
 
222
                reply.addVocab(RPC_VOCAB_OK);
 
223
 
 
224
        // release lock
 
225
        rpcMutex.post();
 
226
 
 
227
        return ok;
 
228
}
 
229
 
 
230
//////////////////////////////////////////////////////////////////////////
 
231
void YarpRobotNavigation::ProcessMessage(const dtGame::Message& message) {
 
232
        Agent::ProcessMessage(message);
 
233
 
 
234
        if (message.GetMessageType() == dtGame::MessageType::TICK_LOCAL) {
 
235
                resume();
 
236
        }
 
237
 
 
238
        // Evaluate front left distance sensor
 
239
        //      unsigned long val;
 
240
        //      DistanceSensor *flDistanceSensor =
 
241
        //                      dynamic_cast<DistanceSensor*> (getSensor(
 
242
        //                                      "Front Left DistanceSensor").get());
 
243
        //      flDistanceSensor->evaluate(MAKE_PARAMETER_VALUE(&(val)));
 
244
        //
 
245
        //      // Evaluate front right distance sensor
 
246
        //      DistanceSensor *frDistanceSensor =
 
247
        //                      dynamic_cast<DistanceSensor*> (getSensor(
 
248
        //                                      "Front Right DistanceSensor").get());
 
249
        //      frDistanceSensor->evaluate(MAKE_PARAMETER_VALUE(&(val)));
 
250
 
 
251
 
 
252
        std::list<yarp::os::Bottle> commandList;
 
253
 
 
254
        // execute a command from the command queue, if any
 
255
        if (commandQueueMutex.check()) {
 
256
                //              LOG_ALWAYS("Lock queue to check size.");
 
257
                if (mainCommandQueue.size() > 0) {
 
258
                        //                      LOG_ALWAYS("Executing first command in queue.");
 
259
                        // handle the call using the controller's respond method
 
260
                        commandList.push_back(mainCommandQueue.front());
 
261
                        // send replay back
 
262
                        mainCommandQueue.pop_front();
 
263
                        //                      LOG_ALWAYS("First command in queue executed.");
 
264
                }
 
265
 
 
266
                // execute commands from remaining sync queues and remove the empty ones
 
267
                std::list<std::deque<yarp::os::Bottle> >::iterator qsit =
 
268
                                syncCommandQueues.begin();
 
269
 
 
270
#ifdef DEBUG_QUEUES
 
271
                {
 
272
                        std::stringstream oss;
 
273
                        oss << "Size of MAIN queue: " << mainCommandQueue.size();
 
274
                        LOG_ALWAYS(oss.str());
 
275
                }
 
276
 
 
277
                {
 
278
                        std::stringstream oss;
 
279
                        oss << "Sync queue count: " << syncCommandQueues.size();
 
280
                        LOG_ALWAYS(oss.str());
 
281
                }
 
282
 
 
283
                std::stringstream ossLog;
 
284
 
 
285
                if (qsit != syncCommandQueues.end())
 
286
                        ossLog << "Size of sync queue: ";
 
287
#endif //DEBUG_QUEUES
 
288
                // execute a command from all non-empty synchronous queues,
 
289
                // remove empty ones
 
290
                while (qsit != syncCommandQueues.end()) {
 
291
 
 
292
#ifdef DEBUG_QUEUES
 
293
                        ossLog << qsit->size() << " ";
 
294
#endif //DEBUG_QUEUES
 
295
                        if (qsit->empty()) {
 
296
                                // erase empty queue, move forward
 
297
                                qsit = syncCommandQueues.erase(qsit);
 
298
                        } else {
 
299
                                // commands still present in this sync queue, execute
 
300
                                commandList.push_back(qsit->front());
 
301
                                // send replay back
 
302
                                qsit->pop_front();
 
303
                                ++qsit;
 
304
                        }
 
305
#ifdef DEBUG_QUEUES
 
306
                        if (qsit == syncCommandQueues.end())
 
307
                                LOG_ALWAYS(ossLog.str());
 
308
#endif //DEBUG_QUEUES
 
309
                }
 
310
 
 
311
                if (commandList.size() > 0) {
 
312
                        /// EXECUTE COMMAND QUEUE
 
313
                        yarp::os::Bottle reply;
 
314
                        respond(commandList, reply);
 
315
                        if (reply.get(0).asVocab() != RPC_VOCAB_OK) {
 
316
                                {
 
317
                                        std::stringstream oss;
 
318
                                        oss
 
319
                                        << "Respond() issued a failure reply. Command list not correct: "
 
320
                                        << reply.toString() << endl;
 
321
                                        LOG_ALWAYS(oss.str());
 
322
                                }
 
323
                        }
 
324
                }
 
325
 
 
326
                // release queues
 
327
                commandQueueMutex.post();
 
328
        }
 
329
}
 
330
 
 
331
//#define DEBUG
 
332
 
 
333
void YarpRobotNavigation::setSpeed(float speedL, float speedR) {
 
334
 
 
335
        float dt = getDeltaSimTime();
 
336
 
 
337
        osg::Vec3 trans = getRobotPosition();
 
338
        osg::Vec3 rot = getRobotRotation();
 
339
 
 
340
#ifdef DEBUG
 
341
        std::ostringstream oss;
 
342
        oss << "Trans: " << trans << "; Rot: " << rot << endl;
 
343
#endif
 
344
 
 
345
        float d = dt * (speedL + speedR) / 2.0;
 
346
        float phi = dt * (speedR - speedL) / 0.001;
 
347
 
 
348
#ifdef DEBUG
 
349
        oss << "Phi: " << phi << "; Dt: " << dt << endl;
 
350
#endif
 
351
        trans[0] += (d * cos(phi));
 
352
        trans[1] += (d * sin(phi));
 
353
 
 
354
        rot[0] = phi;
 
355
 
 
356
#ifdef DEBUG
 
357
        oss << "NEW Trans: " << trans << "; NEW Rot: " << rot << endl;
 
358
        LOG_INFO(oss.str());
 
359
#endif
 
360
 
 
361
        setRobotRotation(rot);
 
362
        setRobotTranslation(trans);
 
363
}
 
364
 
 
365
void YarpRobotNavigation::advanceX(float X) {
 
366
        osg::Vec3 trans = getRobotPosition();
 
367
        osg::Vec3 rot = getRobotRotation();
 
368
 
 
369
        rot[0] += 90;
 
370
 
 
371
        const float angleRad = ((rot[0] > 0 ? rot[0] : (360 + rot[0])) * M_PI
 
372
                        / 180.0);
 
373
 
 
374
        trans[0] += (X * cos(angleRad));
 
375
        trans[1] += (X * sin(angleRad));
 
376
 
 
377
        setRobotTranslation(trans);
 
378
}
 
379
 
 
380
void YarpRobotNavigation::strafeX(float X) {
 
381
        osg::Vec3 trans = getRobotPosition();
 
382
        osg::Vec3 rot = getRobotRotation();
 
383
 
 
384
        rot[0] += 180;
 
385
 
 
386
        const float angleRad = ((rot[0] > 0 ? rot[0] : (360 + rot[0])) * M_PI
 
387
                        / 180.0);
 
388
 
 
389
        trans[0] += (X * cos(angleRad));
 
390
        trans[1] += (X * sin(angleRad));
 
391
 
 
392
        setRobotTranslation(trans);
 
393
}
 
394
 
 
395
void YarpRobotNavigation::rotateX(float X) {
 
396
        osg::Vec3 rot = getRobotRotation();
 
397
 
 
398
#ifdef DEBUG
 
399
        std::ostringstream oss;
 
400
        oss << "Rot: " << rot << endl;
 
401
#endif
 
402
 
 
403
        rot[0] += X;
 
404
 
 
405
#ifdef DEBUG
 
406
        oss << "NEW Rot: " << rot << endl;
 
407
        LOG_INFO(oss.str());
 
408
#endif
 
409
        setRobotRotation(rot);
 
410
}
 
411
 
 
412
void YarpRobotNavigation::task() {
 
413
        while (true) {
 
414
                wait();
 
415
                suspend();
 
416
        }
 
417
}
 
418
 
 
419
 
 
420
/* **************************************************************************************
 
421
 * Implementation of YarpRobotNavigationRpcThread
 
422
 * **************************************************************************************/
 
423
 
 
424
void YarpRobotNavigationRpcThread::run() {
 
425
        yarp::os::Bottle command, replay;
 
426
 
 
427
        // repeat while this thread is not asked to stop
 
428
        while (isStopping() != true) {
 
429
                // do a blocking read from the rpc port
 
430
                if (ctrl->rpcPort.read(command, true)) {
 
431
                        // confirm reception
 
432
                        yarp::os::Bottle reply;
 
433
 
 
434
                        // pass the command to the sequencer
 
435
                        sequencer(command, reply);
 
436
 
 
437
                        ctrl->rpcPort.reply(reply);
 
438
                }
 
439
        }
 
440
 
 
441
}
 
442
 
 
443
void YarpRobotNavigationRpcThread::sequencer(yarp::os::Bottle &command,
 
444
                yarp::os::Bottle &reply) {
 
445
 
 
446
        // wait for exclusive privileges to the queue
 
447
        ctrl->commandQueueMutex.wait();
 
448
 
 
449
        //      {
 
450
        //              std::stringstream oss;
 
451
        //              oss << "Inserting command: " << command.toString()
 
452
        //                              << " into queue of size: " << (ctrl->commandQueue.size())
 
453
        //                              << endl;
 
454
        //              LOG_ALWAYS(oss.str());
 
455
        //      }
 
456
 
 
457
        bool ok = false;
 
458
        bool rec = false; // is the command recognized?
 
459
        double argDouble;
 
460
        std::list<std::deque<yarp::os::Bottle> >::iterator qsit;
 
461
 
 
462
        yarp::os::Bottle subCommand;
 
463
        unsigned int repeats = 0;
 
464
 
 
465
        switch (command.get(0).asVocab()) {
 
466
        case RPC_VOCAB_HELP: {
 
467
                rec = true;
 
468
                reply.addString("Commands:");
 
469
                reply.addString(
 
470
                                "adv <double> // enqueues advance commands with a total effect of <double> simulator length units at the BACK of the main queue ******");
 
471
                reply.addString(
 
472
                                "strf <double> // enqueues strafe commands with a total effect of <double> simulator length units at the BACK of the main queue ******");
 
473
                reply.addString(
 
474
                                "rot <double> // enqueues rotate commands with a total effect of <double> simulator rotation units at the BACK of the main queue ******");
 
475
                reply.addString(
 
476
                                "now adv <double> // enqueues  rotate commands with a total effect of <double> simulator length units at the FRONT of the main queue ******");
 
477
                reply.addString(
 
478
                                "now strf <double> // enqueues  strafe commands with a total effect of <double> simulator length units at the FRONT of the main queue ******");
 
479
                reply.addString(
 
480
                                "now rot <double> // enqueues  rotate commands with a total effect of <double> simulator rotation units at the FRONT of the main queue ******");
 
481
                reply.addString(
 
482
                                "sync adv <double> // enqueues  rotate commands with a total effect of <double> simulator length units in a new queue, which is executed in parallel with the main queue ******");
 
483
                reply.addString(
 
484
                                "sync strf <double> // enqueues  strafe commands with a total effect of <double> simulator length units in a new queue, which is executed in parallel with the main queue ******");
 
485
                reply.addString(
 
486
                                "sync rot <double> // enqueues  rotate commands with a total effect of <double> simulator rotation units in a new queue, which is executed in parallel with the main queue ******");
 
487
                reply.addString(
 
488
                                "set uadv <double> // sets a new value for advancement unit, executed in a ProcessMessage call; this affects the speed of advancement ******");
 
489
                reply.addString(
 
490
                                "set urot <double> // sets a new value for rotation unit, executed in a ProcessMessage call; this affects the speed of rotation ******");
 
491
                reply.addString(
 
492
                                "set qlmt <double> // modifies the maximum length of all execution queues; queues above this size are trimmed from the front end ******");
 
493
                reply.addString(
 
494
                                "get uadv // gets the current value for advancement unit ******");
 
495
                reply.addString(
 
496
                                "get urot // gets current value for rotation unit ******");
 
497
                reply.addString(
 
498
                                "get qlmt // gets current value for maximum queue length ******");
 
499
                reply.addString("clr // clears all execution queues ******");
 
500
 
 
501
                ok = true;
 
502
        }
 
503
        break;
 
504
        case RPC_VOCAB_CLR: {
 
505
                rec = true;
 
506
                reply.addVocab(RPC_VOCAB_CLR);
 
507
                // clear main queue
 
508
                ctrl->mainCommandQueue.clear();
 
509
                qsit = ctrl->syncCommandQueues.begin();
 
510
                // clear all synchronous queues
 
511
                while (qsit != ctrl->syncCommandQueues.end()) {
 
512
                        qsit->clear();
 
513
                        ++qsit;
 
514
                }
 
515
 
 
516
                ok = true;
 
517
        }
 
518
        break;
 
519
        case RPC_VOCAB_SYNC: {
 
520
                rec = true;
 
521
                reply.addVocab(RPC_VOCAB_SYNC);
 
522
                switch (command.get(1).asVocab()) {
 
523
                case RPC_VOCAB_STRF:
 
524
                        argDouble = command.get(2).asDouble();
 
525
 
 
526
                        // break down the command in sub-commands
 
527
                        subCommand.clear();
 
528
                        subCommand.addVocab(command.get(1).asVocab());
 
529
 
 
530
                        repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
 
531
 
 
532
                        // cannot introduce more commands than the maximum queue size
 
533
                        if (repeats > ctrl->RPC_QLMT) {
 
534
                                std::stringstream oss;
 
535
                                oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
536
                                                << " commands in the queue: " << repeats
 
537
                                                << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
538
                                LOG_ALWAYS(oss.str());
 
539
 
 
540
                                // introduce only the maximum queue size
 
541
                                repeats = ctrl->RPC_QLMT;
 
542
                        }
 
543
 
 
544
                        subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
 
545
 
 
546
                        // create a new synchronous queue
 
547
                        ctrl->syncCommandQueues.push_back(std::deque<yarp::os::Bottle>());
 
548
 
 
549
                        // insert sub-commands into the new sync queue
 
550
                        while (repeats-- > 0)
 
551
                                ctrl->syncCommandQueues.back().push_back(subCommand);
 
552
 
 
553
                        reply.addVocab(RPC_VOCAB_STRF);
 
554
                        reply.addDouble(argDouble);
 
555
                        ok = true;
 
556
                        break;
 
557
                case RPC_VOCAB_ADV:
 
558
                        argDouble = command.get(2).asDouble();
 
559
 
 
560
                        // break down the command in sub-commands
 
561
                        subCommand.clear();
 
562
                        subCommand.addVocab(command.get(1).asVocab());
 
563
 
 
564
                        repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
 
565
 
 
566
                        // cannot introduce more commands than the maximum queue size
 
567
                        if (repeats > ctrl->RPC_QLMT) {
 
568
                                std::stringstream oss;
 
569
                                oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
570
                                                << " commands in the queue: " << repeats
 
571
                                                << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
572
                                LOG_ALWAYS(oss.str());
 
573
 
 
574
                                // introduce only the maximum queue size
 
575
                                repeats = ctrl->RPC_QLMT;
 
576
                        }
 
577
 
 
578
                        subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
 
579
 
 
580
                        // create a new synchronous queue
 
581
                        ctrl->syncCommandQueues.push_back(std::deque<yarp::os::Bottle>());
 
582
 
 
583
                        // insert sub-commands into the new sync queue
 
584
                        while (repeats-- > 0)
 
585
                                ctrl->syncCommandQueues.back().push_back(subCommand);
 
586
 
 
587
                        reply.addVocab(RPC_VOCAB_ADV);
 
588
                        reply.addDouble(argDouble);
 
589
                        ok = true;
 
590
                        break;
 
591
                case RPC_VOCAB_ROT:
 
592
                        argDouble = command.get(2).asDouble();
 
593
 
 
594
                        // break down the command in sub-commands
 
595
                        subCommand.clear();
 
596
                        subCommand.addVocab(command.get(1).asVocab());
 
597
 
 
598
                        repeats = (unsigned int) abs(argDouble / ctrl->RPC_ROT_UNIT);
 
599
 
 
600
                        // cannot introduce more commands than the maximum queue size
 
601
                        if (repeats > ctrl->RPC_QLMT) {
 
602
                                std::stringstream oss;
 
603
                                oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
604
                                                << " commands in the queue: " << repeats
 
605
                                                << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
606
                                LOG_ALWAYS(oss.str());
 
607
 
 
608
                                // introduce only the maximum queue size
 
609
                                repeats = ctrl->RPC_QLMT;
 
610
                        }
 
611
 
 
612
                        subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ROT_UNIT);
 
613
 
 
614
                        // create a new synchronous queue
 
615
                        ctrl->syncCommandQueues.push_back(std::deque<yarp::os::Bottle>());
 
616
 
 
617
                        // insert sub-commands into the new sync queue
 
618
                        while (repeats-- > 0)
 
619
                                ctrl->syncCommandQueues.back().push_back(subCommand);
 
620
 
 
621
                        reply.addVocab(RPC_VOCAB_ROT);
 
622
                        reply.addDouble(argDouble);
 
623
                        ok = true;
 
624
                        break;
 
625
                }
 
626
        }
 
627
        break;
 
628
        case RPC_VOCAB_SET: {
 
629
                rec = true;
 
630
                reply.addVocab(RPC_VOCAB_SET);
 
631
                switch (command.get(1).asVocab()) {
 
632
                case RPC_VOCAB_UADV: {
 
633
                        rec = true;
 
634
                        reply.addVocab(RPC_VOCAB_UADV);
 
635
                        ctrl->RPC_ADV_UNIT = command.get(2).asDouble();
 
636
                        ok = true;
 
637
                }
 
638
                break;
 
639
                case RPC_VOCAB_UROT: {
 
640
                        rec = true;
 
641
                        reply.addVocab(RPC_VOCAB_UROT);
 
642
                        ctrl->RPC_ROT_UNIT = command.get(2).asDouble();
 
643
                        ok = true;
 
644
                }
 
645
                break;
 
646
                case RPC_VOCAB_QLMT: {
 
647
                        rec = true;
 
648
                        reply.addVocab(RPC_VOCAB_QLMT);
 
649
                        ctrl->RPC_QLMT = (unsigned int) command.get(2).asInt();
 
650
                        ok = true;
 
651
                }
 
652
                break;
 
653
                }
 
654
        }
 
655
        break;
 
656
        case RPC_VOCAB_GET: {
 
657
                rec = true;
 
658
                switch (command.get(1).asVocab()) {
 
659
                case RPC_VOCAB_UADV: {
 
660
                        rec = true;
 
661
                        reply.addVocab(RPC_VOCAB_UADV);
 
662
                        reply.addDouble(ctrl->RPC_ADV_UNIT);
 
663
                        ok = true;
 
664
                }
 
665
                break;
 
666
                case RPC_VOCAB_UROT: {
 
667
                        rec = true;
 
668
                        reply.addVocab(RPC_VOCAB_UROT);
 
669
                        reply.addDouble(ctrl->RPC_ROT_UNIT);
 
670
                        ok = true;
 
671
                }
 
672
                break;
 
673
                case RPC_VOCAB_QLMT: {
 
674
                        rec = true;
 
675
                        reply.addVocab(RPC_VOCAB_QLMT);
 
676
                        reply.addInt(ctrl->RPC_QLMT);
 
677
                        ok = true;
 
678
                }
 
679
                break;
 
680
                }
 
681
        }
 
682
        break;
 
683
 
 
684
        case RPC_VOCAB_NOW: {
 
685
                rec = true;
 
686
                reply.addVocab(RPC_VOCAB_NOW);
 
687
                switch (command.get(1).asVocab()) {
 
688
                case RPC_VOCAB_STRF:
 
689
                        argDouble = command.get(2).asDouble();
 
690
 
 
691
                        // break down the command in sub-commands
 
692
                        subCommand.clear();
 
693
                        subCommand.addVocab(command.get(1).asVocab());
 
694
 
 
695
                        repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
 
696
 
 
697
                        // cannot introduce more commands than the maximum queue size
 
698
                        if (repeats > ctrl->RPC_QLMT) {
 
699
                                std::stringstream oss;
 
700
                                oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
701
                                                << " commands in the queue: " << repeats
 
702
                                                << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
703
                                LOG_ALWAYS(oss.str());
 
704
 
 
705
                                // introduce only the maximum queue size
 
706
                                repeats = ctrl->RPC_QLMT;
 
707
                        }
 
708
 
 
709
                        subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
 
710
 
 
711
                        // insert sub-commands into the queue
 
712
                        while (repeats-- > 0)
 
713
                                ctrl->mainCommandQueue.push_front(subCommand);
 
714
 
 
715
                        reply.addVocab(RPC_VOCAB_STRF);
 
716
                        reply.addDouble(argDouble);
 
717
                        ok = true;
 
718
                        break;
 
719
                case RPC_VOCAB_ADV:
 
720
                        argDouble = command.get(2).asDouble();
 
721
 
 
722
                        // break down the command in sub-commands
 
723
                        subCommand.clear();
 
724
                        subCommand.addVocab(command.get(1).asVocab());
 
725
 
 
726
                        repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
 
727
 
 
728
                        // cannot introduce more commands than the maximum queue size
 
729
                        if (repeats > ctrl->RPC_QLMT) {
 
730
                                std::stringstream oss;
 
731
                                oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
732
                                                << " commands in the queue: " << repeats
 
733
                                                << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
734
                                LOG_ALWAYS(oss.str());
 
735
 
 
736
                                // introduce only the maximum queue size
 
737
                                repeats = ctrl->RPC_QLMT;
 
738
                        }
 
739
 
 
740
                        subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
 
741
 
 
742
                        // insert sub-commands into the queue
 
743
                        while (repeats-- > 0)
 
744
                                ctrl->mainCommandQueue.push_front(subCommand);
 
745
 
 
746
                        reply.addVocab(RPC_VOCAB_ADV);
 
747
                        reply.addDouble(argDouble);
 
748
                        ok = true;
 
749
                        break;
 
750
                case RPC_VOCAB_ROT:
 
751
                        argDouble = command.get(2).asDouble();
 
752
 
 
753
                        // break down the command in sub-commands
 
754
                        subCommand.clear();
 
755
                        subCommand.addVocab(command.get(1).asVocab());
 
756
 
 
757
                        repeats = (unsigned int) abs(argDouble / ctrl->RPC_ROT_UNIT);
 
758
 
 
759
                        // cannot introduce more commands than the maximum queue size
 
760
                        if (repeats > ctrl->RPC_QLMT) {
 
761
                                std::stringstream oss;
 
762
                                oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
763
                                                << " commands in the queue: " << repeats
 
764
                                                << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
765
                                LOG_ALWAYS(oss.str());
 
766
 
 
767
                                // introduce only the maximum queue size
 
768
                                repeats = ctrl->RPC_QLMT;
 
769
                        }
 
770
 
 
771
                        subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ROT_UNIT);
 
772
 
 
773
                        // insert sub-commands into the queue
 
774
                        while (repeats-- > 0)
 
775
                                ctrl->mainCommandQueue.push_front(subCommand);
 
776
 
 
777
                        reply.addVocab(RPC_VOCAB_ROT);
 
778
                        reply.addDouble(argDouble);
 
779
                        ok = true;
 
780
                        break;
 
781
                }
 
782
        }
 
783
        break;
 
784
        case RPC_VOCAB_STRF:
 
785
                rec = true;
 
786
                argDouble = command.get(1).asDouble();
 
787
 
 
788
                // break down the command in sub-commands
 
789
                subCommand.clear();
 
790
                subCommand.addVocab(command.get(0).asVocab());
 
791
 
 
792
                repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
 
793
 
 
794
                // cannot introduce more commands than the maximum queue size
 
795
                if (repeats > ctrl->RPC_QLMT) {
 
796
                        std::stringstream oss;
 
797
                        oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
798
                                        << " commands in the queue: " << repeats
 
799
                                        << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
800
                        LOG_ALWAYS(oss.str());
 
801
 
 
802
                        // introduce only the maximum queue size
 
803
                        repeats = ctrl->RPC_QLMT;
 
804
                }
 
805
 
 
806
                subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
 
807
 
 
808
                // insert sub-commands into the queue
 
809
                while (repeats-- > 0)
 
810
                        ctrl->mainCommandQueue.push_back(subCommand);
 
811
 
 
812
                reply.addVocab(RPC_VOCAB_STRF);
 
813
                reply.addDouble(argDouble);
 
814
                ok = true;
 
815
                break;
 
816
        case RPC_VOCAB_ADV:
 
817
                rec = true;
 
818
                argDouble = command.get(1).asDouble();
 
819
 
 
820
                // break down the command in sub-commands
 
821
                subCommand.clear();
 
822
                subCommand.addVocab(command.get(0).asVocab());
 
823
 
 
824
                repeats = (unsigned int) abs(argDouble / ctrl->RPC_ADV_UNIT);
 
825
 
 
826
                // cannot introduce more commands than the maximum queue size
 
827
                if (repeats > ctrl->RPC_QLMT) {
 
828
                        std::stringstream oss;
 
829
                        oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
830
                                        << " commands in the queue: " << repeats
 
831
                                        << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
832
                        LOG_ALWAYS(oss.str());
 
833
 
 
834
                        // introduce only the maximum queue size
 
835
                        repeats = ctrl->RPC_QLMT;
 
836
                }
 
837
 
 
838
                subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ADV_UNIT);
 
839
 
 
840
                // insert sub-commands into the queue
 
841
                while (repeats-- > 0)
 
842
                        ctrl->mainCommandQueue.push_back(subCommand);
 
843
 
 
844
                reply.addVocab(RPC_VOCAB_ADV);
 
845
                reply.addDouble(argDouble);
 
846
                ok = true;
 
847
                break;
 
848
        case RPC_VOCAB_ROT:
 
849
                rec = true;
 
850
                argDouble = command.get(1).asDouble();
 
851
 
 
852
                // break down the command in sub-commands
 
853
                subCommand.clear();
 
854
                subCommand.addVocab(command.get(0).asVocab());
 
855
 
 
856
                repeats = (unsigned int) abs(argDouble / ctrl->RPC_ROT_UNIT);
 
857
 
 
858
                // cannot introduce more commands than the maximum queue size
 
859
                if (repeats > ctrl->RPC_QLMT) {
 
860
                        std::stringstream oss;
 
861
                        oss << "Asking to introduce more than " << ctrl->RPC_QLMT
 
862
                                        << " commands in the queue: " << repeats
 
863
                                        << ". Trimming down to: " << ctrl->RPC_QLMT << endl;
 
864
                        LOG_ALWAYS(oss.str());
 
865
 
 
866
                        // introduce only the maximum queue size
 
867
                        repeats = ctrl->RPC_QLMT;
 
868
                }
 
869
 
 
870
                subCommand.addDouble((argDouble > 0 ? 1 : -1) * ctrl->RPC_ROT_UNIT);
 
871
 
 
872
                // insert sub-commands into the queue
 
873
                while (repeats-- > 0)
 
874
                        ctrl->mainCommandQueue.push_back(subCommand);
 
875
 
 
876
                reply.addVocab(RPC_VOCAB_ROT);
 
877
                reply.addDouble(argDouble);
 
878
                ok = true;
 
879
                break;
 
880
 
 
881
        }
 
882
 
 
883
        if (!ok) {
 
884
                reply.clear();
 
885
                reply.addVocab(RPC_VOCAB_FAILED);
 
886
        } else {
 
887
                reply.addVocab(RPC_VOCAB_OK);
 
888
 
 
889
                // log when discarding commands
 
890
                if (ctrl->mainCommandQueue.size() > ctrl->RPC_QLMT) {
 
891
                        std::stringstream oss;
 
892
                        oss << "More than " << ctrl->RPC_QLMT << " commands in the queue: "
 
893
                                        << (ctrl->mainCommandQueue.size()) << ". Clearing oldest."
 
894
                                        << endl;
 
895
                        LOG_ALWAYS(oss.str());
 
896
 
 
897
                        // if more than RPC_QLMT command in queue, discard the oldest
 
898
                        do {
 
899
                                ctrl->mainCommandQueue.pop_front();
 
900
                        } while (ctrl->mainCommandQueue.size() > ctrl->RPC_QLMT);
 
901
                        {
 
902
                                std::stringstream oss;
 
903
                                oss << "New size of queue: " << ctrl->mainCommandQueue.size()
 
904
                                                                                << endl;
 
905
                                LOG_ALWAYS(oss.str());
 
906
                        }
 
907
                }
 
908
 
 
909
                // trim commands from remaining sync queues and remove the empty ones
 
910
                std::list<std::deque<yarp::os::Bottle> >::iterator qsit =
 
911
                                ctrl->syncCommandQueues.begin();
 
912
 
 
913
                // trim a command from all non-empty synchronous queues,
 
914
                while (qsit != ctrl->syncCommandQueues.end()) {
 
915
                        // remove empty ones
 
916
                        if (qsit->empty()) {
 
917
                                // erase empty queue, move forward
 
918
                                qsit = ctrl->syncCommandQueues.erase(qsit);
 
919
                        } else {
 
920
                                // log when discarding commands
 
921
                                if (qsit->size() > ctrl->RPC_QLMT) {
 
922
                                        std::stringstream oss;
 
923
                                        oss << "More than " << ctrl->RPC_QLMT
 
924
                                                        << " commands in the queue: " << (qsit->size())
 
925
                                                        << ". Clearing oldest." << endl;
 
926
                                        LOG_ALWAYS(oss.str());
 
927
 
 
928
                                        // if more than RPC_QLMT command in queue, discard the oldest
 
929
                                        do {
 
930
                                                qsit->pop_front();
 
931
                                        } while (qsit->size() > ctrl->RPC_QLMT);
 
932
                                        // show result of trimming
 
933
                                        {
 
934
                                                std::stringstream oss;
 
935
                                                oss << "New size of queue: " << qsit->size() << endl;
 
936
                                                LOG_ALWAYS(oss.str());
 
937
                                        }
 
938
                                }
 
939
                                ++qsit;
 
940
                        }
 
941
                }
 
942
        }
 
943
 
 
944
        // release lock on queue
 
945
        ctrl->commandQueueMutex.post();
 
946
}
 
947
 
 
948
} // end of namespace srInterface