~ubuntu-branches/ubuntu/breezy/ace/breezy

« back to all changes in this revision

Viewing changes to TAO/orbsvcs/tests/AVStreams/Bidirectional_Flows/sender.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Adam Conrad, Benjamin Montgomery, Adam Conrad
  • Date: 2005-09-18 22:51:38 UTC
  • mfrom: (1.2.1 upstream) (2.1.1 sarge) (0.1.2 woody)
  • Revision ID: james.westby@ubuntu.com-20050918225138-seav22q6fyylb536
Tags: 5.4.7-3ubuntu1
[ Benjamin Montgomery ]
* Added a patch for amd64 and powerpc that disables the compiler
  option -fvisibility-inlines-hidden

[ Adam Conrad ]
* Added DPATCH_OPTION_CPP=1 to debian/patches/00options to make
  Benjamin's above changes work correctly with dpatch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// sender.cpp,v 1.7 2002/07/27 20:13:18 crodrigu Exp
2
 
 
3
 
#include "sender.h"
4
 
#include "tao/debug.h"
5
 
#include "ace/Get_Opt.h"
6
 
#include "ace/High_Res_Timer.h"
7
 
 
8
 
typedef ACE_Singleton<Sender, ACE_Null_Mutex> SENDER;
9
 
// Create a singleton instance of the Sender.
10
 
 
11
 
static FILE *output_file = 0;
12
 
// File handle of the file into which received data is written.
13
 
 
14
 
static const char *output_file_name = "output";
15
 
// File name of the file into which received data is written.
16
 
 
17
 
 
18
 
int
19
 
Sender_StreamEndPoint::get_callback (const char *,
20
 
                                     TAO_AV_Callback *&callback)
21
 
{
22
 
  // Create and return the sender application callback to AVStreams
23
 
  // for further upcalls.
24
 
  callback = &this->callback_;
25
 
  return 0;
26
 
}
27
 
 
28
 
int
29
 
Sender_StreamEndPoint::set_protocol_object (const char *,
30
 
                                            TAO_AV_Protocol_Object *object)
31
 
{
32
 
  // Set the sender protocol object corresponding to the transport
33
 
  // protocol selected.
34
 
  SENDER::instance ()->protocol_object (object);
35
 
  return 0;
36
 
}
37
 
 
38
 
Sender_Callback::Sender_Callback (void)
39
 
  : frame_count_ (1)
40
 
{
41
 
}
42
 
 
43
 
int
44
 
Sender_Callback::receive_frame (ACE_Message_Block *frame,
45
 
                                TAO_AV_frame_info *,
46
 
                                const ACE_Addr &)
47
 
{
48
 
  //
49
 
  // Upcall from the AVStreams when there is data to be received from
50
 
  // the sender.
51
 
  //
52
 
  ACE_DEBUG ((LM_DEBUG,
53
 
              "Sender_Callback::receive_frame for frame %d\n",
54
 
              this->frame_count_++));
55
 
 
56
 
  while (frame != 0)
57
 
    {
58
 
      // Write the received data to the file.
59
 
      size_t result =
60
 
        ACE_OS::fwrite (frame->rd_ptr (),
61
 
                        frame->length (),
62
 
                        1,
63
 
                        output_file);
64
 
 
65
 
      if (result == frame->length ())
66
 
        ACE_ERROR_RETURN ((LM_ERROR,
67
 
                           "Sender_Callback::fwrite failed\n"),
68
 
                          -1);
69
 
 
70
 
      frame = frame->cont ();
71
 
    }
72
 
 
73
 
  if (SENDER::instance ()->eof () == 1)
74
 
    SENDER::instance ()->shutdown ();
75
 
  return 0;
76
 
}
77
 
 
78
 
Sender::Sender (void)
79
 
  : sender_mmdevice_ (0),
80
 
    streamctrl_ (0),
81
 
    frame_count_ (0),
82
 
    filename_ ("input"),
83
 
    input_file_ (0),
84
 
    protocol_ ("UDP"),
85
 
    frame_rate_ (30),
86
 
    mb_ (BUFSIZ),
87
 
    eof_ (0)
88
 
{
89
 
}
90
 
 
91
 
void
92
 
Sender::protocol_object (TAO_AV_Protocol_Object *object)
93
 
{
94
 
  // Set the sender protocol object corresponding to the transport
95
 
  // protocol selected.
96
 
  this->protocol_object_ = object;
97
 
}
98
 
 
99
 
int
100
 
Sender::eof (void)
101
 
{
102
 
  return this->eof_;
103
 
}
104
 
 
105
 
void
106
 
Sender::shutdown (void)
107
 
{
108
 
  ACE_DECLARE_NEW_CORBA_ENV;
109
 
  ACE_TRY
110
 
  {
111
 
    // File reading is complete, destroy the stream.
112
 
    AVStreams::flowSpec stop_spec;
113
 
    this->streamctrl_->destroy (stop_spec
114
 
                                ACE_ENV_ARG_PARAMETER);
115
 
    ACE_TRY_CHECK;
116
 
 
117
 
      // Shut the orb down.
118
 
    TAO_AV_CORE::instance ()->orb ()->shutdown (0
119
 
                                                ACE_ENV_ARG_PARAMETER);
120
 
    ACE_TRY_CHECK;
121
 
  }
122
 
  ACE_CATCHANY
123
 
    {
124
 
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "shutdown\n");
125
 
    }
126
 
  ACE_ENDTRY;
127
 
}
128
 
 
129
 
int
130
 
Sender::parse_args (int argc,
131
 
                    char **argv)
132
 
{
133
 
  // Parse command line arguments
134
 
  ACE_Get_Opt opts (argc, argv, "f:p:r:d");
135
 
 
136
 
  int c;
137
 
  while ((c= opts ()) != -1)
138
 
    {
139
 
      switch (c)
140
 
        {
141
 
        case 'f':
142
 
          this->filename_ = opts.opt_arg ();
143
 
          break;
144
 
        case 'p':
145
 
          this->protocol_ = opts.opt_arg ();
146
 
          break;
147
 
        case 'r':
148
 
          this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
149
 
          break;
150
 
        case 'd':
151
 
          TAO_debug_level++;
152
 
          break;
153
 
        default:
154
 
          ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
155
 
          return -1;
156
 
        }
157
 
    }
158
 
  return 0;
159
 
}
160
 
 
161
 
// Method to get the object reference of the receiver
162
 
int
163
 
Sender::bind_to_receiver (ACE_ENV_SINGLE_ARG_DECL)
164
 
{
165
 
  CosNaming::Name name (1);
166
 
  name.length (1);
167
 
  name [0].id =
168
 
    CORBA::string_dup ("Receiver");
169
 
 
170
 
  // Resolve the receiver object reference from the Naming Service
171
 
  CORBA::Object_var receiver_mmdevice_obj =
172
 
    this->naming_client_->resolve (name
173
 
                                   ACE_ENV_ARG_PARAMETER);
174
 
  ACE_CHECK_RETURN (-1);
175
 
 
176
 
  this->receiver_mmdevice_ =
177
 
    AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ()
178
 
                                  ACE_ENV_ARG_PARAMETER);
179
 
  ACE_CHECK_RETURN (-1);
180
 
 
181
 
  if (CORBA::is_nil (this->receiver_mmdevice_.in ()))
182
 
    ACE_ERROR_RETURN ((LM_ERROR,
183
 
                       "Could not resolve Receiver_MMdevice in Naming service <%s>\n"),
184
 
                      -1);
185
 
 
186
 
  return 0;
187
 
}
188
 
 
189
 
int
190
 
Sender::init (int argc,
191
 
              char **argv
192
 
              ACE_ENV_ARG_DECL)
193
 
{
194
 
  // Initialize the endpoint strategy with the orb and poa.
195
 
  int result =
196
 
    this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
197
 
                                   TAO_AV_CORE::instance ()->poa ());
198
 
  if (result != 0)
199
 
    return result;
200
 
 
201
 
  // Initialize the naming services
202
 
  result =
203
 
    this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
204
 
  if (result != 0)
205
 
    return result;
206
 
 
207
 
  // Parse the command line arguments
208
 
  result =
209
 
    this->parse_args (argc,
210
 
                      argv);
211
 
  if (result != 0)
212
 
    return result;
213
 
 
214
 
  // Open file to read.
215
 
  this->input_file_ =
216
 
    ACE_OS::fopen (this->filename_.c_str (),
217
 
                   "r");
218
 
 
219
 
  if (this->input_file_ == 0)
220
 
    ACE_ERROR_RETURN ((LM_DEBUG,
221
 
                       "Cannot open input file %s\n",
222
 
                       this->filename_.c_str ()),
223
 
                      -1);
224
 
  else
225
 
    ACE_DEBUG ((LM_DEBUG,
226
 
                "File opened successfully\n"));
227
 
 
228
 
  // Resolve the object reference of the receiver from the Naming Service.
229
 
  result = this->bind_to_receiver (ACE_ENV_SINGLE_ARG_PARAMETER);
230
 
  ACE_CHECK_RETURN (-1);
231
 
 
232
 
  if (result != 0)
233
 
    ACE_ERROR_RETURN ((LM_ERROR,
234
 
                       "(%P|%t) Error binding to the naming service\n"),
235
 
                      -1);
236
 
 
237
 
 
238
 
  // Initialize the  QoS
239
 
  AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
240
 
 
241
 
  // Create the forward flow specification to describe the flow.
242
 
  TAO_Forward_FlowSpec_Entry entry ("Data_Receiver",
243
 
                                    "IN",
244
 
                                    "USER_DEFINED",
245
 
                                    "",
246
 
                                    this->protocol_.c_str (),
247
 
                                    0);
248
 
 
249
 
  AVStreams::flowSpec flow_spec (1);
250
 
  flow_spec.length (2);
251
 
  flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
252
 
 
253
 
    // Create the forward flow specification to describe the flow.
254
 
  TAO_Forward_FlowSpec_Entry entry1 ("Data_Receiver1",
255
 
                                    "OUT",
256
 
                                    "USER_DEFINED",
257
 
                                    "",
258
 
                                    this->protocol_.c_str (),
259
 
                                    0);
260
 
 
261
 
  flow_spec [1] = CORBA::string_dup (entry1.entry_to_string ());
262
 
 
263
 
  // Register the sender mmdevice object with the ORB
264
 
  ACE_NEW_RETURN (this->sender_mmdevice_,
265
 
                  TAO_MMDevice (&this->endpoint_strategy_),
266
 
                  -1);
267
 
 
268
 
  // Servant Reference Counting to manage lifetime
269
 
  PortableServer::ServantBase_var safe_mmdevice =
270
 
    this->sender_mmdevice_;
271
 
 
272
 
  AVStreams::MMDevice_var mmdevice =
273
 
    this->sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
274
 
  ACE_CHECK_RETURN (-1);
275
 
 
276
 
  ACE_NEW_RETURN (this->streamctrl_,
277
 
                  TAO_StreamCtrl,
278
 
                  -1);
279
 
 
280
 
  PortableServer::ServantBase_var safe_streamctrl =
281
 
    this->streamctrl_;
282
 
 
283
 
  // Bind/Connect the sender and receiver MMDevices.
284
 
  CORBA::Boolean bind_result =
285
 
    this->streamctrl_->bind_devs (mmdevice.in (),
286
 
                                  this->receiver_mmdevice_.in (),
287
 
                                  the_qos.inout (),
288
 
                                  flow_spec
289
 
                                  ACE_ENV_ARG_PARAMETER);
290
 
  ACE_CHECK_RETURN (-1);
291
 
 
292
 
  if (bind_result == 0)
293
 
    ACE_ERROR_RETURN ((LM_ERROR,
294
 
                       "streamctrl::bind_devs failed\n"),
295
 
                      -1);
296
 
 
297
 
  return 0;
298
 
}
299
 
 
300
 
// Method to send data at the specified rate
301
 
int
302
 
Sender::pace_data (ACE_ENV_SINGLE_ARG_DECL)
303
 
{
304
 
  // The time that should lapse between two consecutive frames sent.
305
 
  ACE_Time_Value inter_frame_time;
306
 
 
307
 
  // The time between two consecutive frames.
308
 
  inter_frame_time.set (1 / (double) this->frame_rate_);
309
 
 
310
 
  if (TAO_debug_level > 0)
311
 
    ACE_DEBUG ((LM_DEBUG,
312
 
                "Frame Rate = %d / second\n"
313
 
                "Inter Frame Time = %d (msec)\n",
314
 
                this->frame_rate_,
315
 
                inter_frame_time.msec ()));
316
 
 
317
 
  ACE_TRY
318
 
    {
319
 
      // The time taken for sending a frame and preparing for the next frame
320
 
      ACE_High_Res_Timer elapsed_timer;
321
 
 
322
 
      // Continue to send data till the file is read to the end.
323
 
      while (1)
324
 
        {
325
 
          // Read from the file into a message block.
326
 
          int n = ACE_OS::fread (this->mb_.wr_ptr (),
327
 
                                 1,
328
 
                                 this->mb_.size (),
329
 
                                 this->input_file_);
330
 
 
331
 
          if (n < 0)
332
 
            ACE_ERROR_RETURN ((LM_ERROR,
333
 
                               "Sender::pace_data fread failed\n"),
334
 
                              -1);
335
 
 
336
 
          if (n == 0)
337
 
            {
338
 
              // At end of file break the loop and end the sender.
339
 
              ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
340
 
              this->eof_ = 1;
341
 
              break;
342
 
            }
343
 
 
344
 
          this->mb_.wr_ptr (n);
345
 
 
346
 
          if (this->frame_count_ > 1)
347
 
            {
348
 
              //
349
 
              // Second frame and beyond
350
 
              //
351
 
 
352
 
              // Stop the timer that was started just before the previous frame was sent.
353
 
              elapsed_timer.stop ();
354
 
 
355
 
              // Get the time elapsed after sending the previous frame.
356
 
              ACE_Time_Value elapsed_time;
357
 
              elapsed_timer.elapsed_time (elapsed_time);
358
 
 
359
 
              if (TAO_debug_level > 0)
360
 
                ACE_DEBUG ((LM_DEBUG,
361
 
                            "Elapsed Time = %d\n",
362
 
                            elapsed_time.msec ()));
363
 
 
364
 
              // Check to see if the inter frame time has elapsed.
365
 
              if (elapsed_time < inter_frame_time)
366
 
                {
367
 
                  // Inter frame time has not elapsed.
368
 
 
369
 
                  // Calculate the time to wait before the next frame needs to be sent.
370
 
                  ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
371
 
 
372
 
                  if (TAO_debug_level > 0)
373
 
                    ACE_DEBUG ((LM_DEBUG,
374
 
                                "Wait Time = %d\n",
375
 
                                wait_time.msec ()));
376
 
 
377
 
                  // Run the orb for the wait time so the sender can
378
 
                  // continue other orb requests.
379
 
                  TAO_AV_CORE::instance ()->orb ()->run (wait_time
380
 
                                                         ACE_ENV_ARG_PARAMETER);
381
 
                  ACE_TRY_CHECK;
382
 
                }
383
 
            }
384
 
 
385
 
          // Start timer before sending the frame.
386
 
          elapsed_timer.start ();
387
 
 
388
 
          // Send frame.
389
 
          int result =
390
 
            this->protocol_object_->send_frame (&this->mb_);
391
 
 
392
 
          if (result < 0)
393
 
            ACE_ERROR_RETURN ((LM_ERROR,
394
 
                               "send failed:%p",
395
 
                               "Sender::pace_data send\n"),
396
 
                              -1);
397
 
 
398
 
          ACE_DEBUG ((LM_DEBUG,
399
 
                      "Sender::pace_data frame %d was sent succesfully\n",
400
 
                      ++this->frame_count_));
401
 
 
402
 
          // Reset the message block.
403
 
          this->mb_.reset ();
404
 
 
405
 
        } // end while
406
 
 
407
 
         // File reading is complete, destroy the stream.
408
 
        AVStreams::flowSpec stop_spec;
409
 
        this->streamctrl_->destroy (stop_spec
410
 
                                    ACE_ENV_ARG_PARAMETER);
411
 
        ACE_TRY_CHECK;
412
 
 
413
 
        // Shut the orb down.
414
 
        //TAO_AV_CORE::instance ()->orb ()->shutdown (1,
415
 
        //                                            ACE_ENV_SINGLE_ARG_PARAMETER);
416
 
        ACE_TRY_CHECK;
417
 
    }
418
 
  ACE_CATCHANY
419
 
    {
420
 
      //ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
421
 
       //                    "Sender::pace_data Failed\n");
422
 
      return -1;
423
 
    }
424
 
  ACE_ENDTRY;
425
 
  return 0;
426
 
}
427
 
 
428
 
int
429
 
main (int argc,
430
 
      char **argv)
431
 
{
432
 
  ACE_DECLARE_NEW_CORBA_ENV;
433
 
  ACE_TRY
434
 
    {
435
 
      CORBA::ORB_var orb =
436
 
        CORBA::ORB_init (argc,
437
 
                         argv,
438
 
                         0
439
 
                         ACE_ENV_ARG_PARAMETER);
440
 
 
441
 
      CORBA::Object_var obj
442
 
        = orb->resolve_initial_references ("RootPOA"
443
 
                                           ACE_ENV_ARG_PARAMETER);
444
 
      ACE_TRY_CHECK;
445
 
 
446
 
      // Get the POA_var object from Object_var
447
 
      PortableServer::POA_var root_poa
448
 
        = PortableServer::POA::_narrow (obj.in ()
449
 
                                        ACE_ENV_ARG_PARAMETER);
450
 
      ACE_TRY_CHECK;
451
 
 
452
 
      PortableServer::POAManager_var mgr
453
 
        = root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
454
 
      ACE_TRY_CHECK;
455
 
 
456
 
      mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
457
 
      ACE_TRY_CHECK;
458
 
 
459
 
      // Initialize the AV Stream components.
460
 
/*      TAO_AV_CORE::instance ()->init (orb.in (),
461
 
                                      root_poa.in ()
462
 
                                      ACE_ENV_ARG_PARAMETER); */
463
 
      ACE_TRY_CHECK;
464
 
 
465
 
      // Initialize the AVStreams components.
466
 
      TAO_AV_CORE::instance ()->init (orb.in (), root_poa.in () ACE_ENV_ARG_PARAMETER);
467
 
      ACE_TRY_CHECK;
468
 
 
469
 
      // Initialize the Sender.
470
 
      int result = 0;
471
 
      result = SENDER::instance ()->init (argc,
472
 
                                          argv
473
 
                                          ACE_ENV_ARG_PARAMETER);
474
 
      ACE_TRY_CHECK;
475
 
 
476
 
      if (result < 0)
477
 
        ACE_ERROR_RETURN ((LM_ERROR,
478
 
                           "Sender::init failed\n"),
479
 
                          -1);
480
 
 
481
 
      // Make sure we have a valid <output_file>
482
 
      output_file = ACE_OS::fopen (output_file_name,
483
 
                                   "w");
484
 
      if (output_file == 0)
485
 
        ACE_ERROR_RETURN ((LM_DEBUG,
486
 
                           "Cannot open output file %s\n",
487
 
                           output_file_name),
488
 
                          -1);
489
 
 
490
 
      else
491
 
        ACE_DEBUG ((LM_DEBUG,
492
 
                    "File Opened Successfully\n"));
493
 
 
494
 
      // Start sending data.
495
 
      result = SENDER::instance ()->pace_data (ACE_ENV_SINGLE_ARG_PARAMETER);
496
 
      ACE_TRY_CHECK;
497
 
      ACE_Time_Value tv(3,0);
498
 
      orb->run (tv);
499
 
      ACE_TRY_CHECK;
500
 
    }
501
 
  ACE_CATCHANY
502
 
    {
503
 
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
504
 
                           "Sender Failed\n");
505
 
      return -1;
506
 
    }
507
 
  ACE_ENDTRY;
508
 
  ACE_CHECK_RETURN (-1);
509
 
  return 0;
510
 
}
511
 
 
512
 
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
513
 
template class ACE_Singleton <Sender,ACE_Null_Mutex>;
514
 
template class TAO_AV_Endpoint_Reactive_Strategy_A<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
515
 
template class TAO_AV_Endpoint_Reactive_Strategy<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
516
 
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
517
 
#pragma instantiate ACE_Singleton <Sender,ACE_Null_Mutex>
518
 
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
519
 
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
520
 
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */