1
// sender.cpp,v 1.7 2002/07/27 20:13:18 crodrigu Exp
5
#include "ace/Get_Opt.h"
6
#include "ace/High_Res_Timer.h"
8
typedef ACE_Singleton<Sender, ACE_Null_Mutex> SENDER;
9
// Create a singleton instance of the Sender.
11
static FILE *output_file = 0;
12
// File handle of the file into which received data is written.
14
static const char *output_file_name = "output";
15
// File name of the file into which received data is written.
19
Sender_StreamEndPoint::get_callback (const char *,
20
TAO_AV_Callback *&callback)
22
// Create and return the sender application callback to AVStreams
23
// for further upcalls.
24
callback = &this->callback_;
29
Sender_StreamEndPoint::set_protocol_object (const char *,
30
TAO_AV_Protocol_Object *object)
32
// Set the sender protocol object corresponding to the transport
34
SENDER::instance ()->protocol_object (object);
38
Sender_Callback::Sender_Callback (void)
44
Sender_Callback::receive_frame (ACE_Message_Block *frame,
49
// Upcall from the AVStreams when there is data to be received from
53
"Sender_Callback::receive_frame for frame %d\n",
54
this->frame_count_++));
58
// Write the received data to the file.
60
ACE_OS::fwrite (frame->rd_ptr (),
65
if (result == frame->length ())
66
ACE_ERROR_RETURN ((LM_ERROR,
67
"Sender_Callback::fwrite failed\n"),
70
frame = frame->cont ();
73
if (SENDER::instance ()->eof () == 1)
74
SENDER::instance ()->shutdown ();
79
: sender_mmdevice_ (0),
92
Sender::protocol_object (TAO_AV_Protocol_Object *object)
94
// Set the sender protocol object corresponding to the transport
96
this->protocol_object_ = object;
106
Sender::shutdown (void)
108
ACE_DECLARE_NEW_CORBA_ENV;
111
// File reading is complete, destroy the stream.
112
AVStreams::flowSpec stop_spec;
113
this->streamctrl_->destroy (stop_spec
114
ACE_ENV_ARG_PARAMETER);
117
// Shut the orb down.
118
TAO_AV_CORE::instance ()->orb ()->shutdown (0
119
ACE_ENV_ARG_PARAMETER);
124
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION, "shutdown\n");
130
Sender::parse_args (int argc,
133
// Parse command line arguments
134
ACE_Get_Opt opts (argc, argv, "f:p:r:d");
137
while ((c= opts ()) != -1)
142
this->filename_ = opts.opt_arg ();
145
this->protocol_ = opts.opt_arg ();
148
this->frame_rate_ = ACE_OS::atoi (opts.opt_arg ());
154
ACE_DEBUG ((LM_DEBUG, "Unknown Option\n"));
161
// Method to get the object reference of the receiver
163
Sender::bind_to_receiver (ACE_ENV_SINGLE_ARG_DECL)
165
CosNaming::Name name (1);
168
CORBA::string_dup ("Receiver");
170
// Resolve the receiver object reference from the Naming Service
171
CORBA::Object_var receiver_mmdevice_obj =
172
this->naming_client_->resolve (name
173
ACE_ENV_ARG_PARAMETER);
174
ACE_CHECK_RETURN (-1);
176
this->receiver_mmdevice_ =
177
AVStreams::MMDevice::_narrow (receiver_mmdevice_obj.in ()
178
ACE_ENV_ARG_PARAMETER);
179
ACE_CHECK_RETURN (-1);
181
if (CORBA::is_nil (this->receiver_mmdevice_.in ()))
182
ACE_ERROR_RETURN ((LM_ERROR,
183
"Could not resolve Receiver_MMdevice in Naming service <%s>\n"),
190
Sender::init (int argc,
194
// Initialize the endpoint strategy with the orb and poa.
196
this->endpoint_strategy_.init (TAO_AV_CORE::instance ()->orb (),
197
TAO_AV_CORE::instance ()->poa ());
201
// Initialize the naming services
203
this->naming_client_.init (TAO_AV_CORE::instance ()->orb ());
207
// Parse the command line arguments
209
this->parse_args (argc,
214
// Open file to read.
216
ACE_OS::fopen (this->filename_.c_str (),
219
if (this->input_file_ == 0)
220
ACE_ERROR_RETURN ((LM_DEBUG,
221
"Cannot open input file %s\n",
222
this->filename_.c_str ()),
225
ACE_DEBUG ((LM_DEBUG,
226
"File opened successfully\n"));
228
// Resolve the object reference of the receiver from the Naming Service.
229
result = this->bind_to_receiver (ACE_ENV_SINGLE_ARG_PARAMETER);
230
ACE_CHECK_RETURN (-1);
233
ACE_ERROR_RETURN ((LM_ERROR,
234
"(%P|%t) Error binding to the naming service\n"),
238
// Initialize the QoS
239
AVStreams::streamQoS_var the_qos (new AVStreams::streamQoS);
241
// Create the forward flow specification to describe the flow.
242
TAO_Forward_FlowSpec_Entry entry ("Data_Receiver",
246
this->protocol_.c_str (),
249
AVStreams::flowSpec flow_spec (1);
250
flow_spec.length (2);
251
flow_spec [0] = CORBA::string_dup (entry.entry_to_string ());
253
// Create the forward flow specification to describe the flow.
254
TAO_Forward_FlowSpec_Entry entry1 ("Data_Receiver1",
258
this->protocol_.c_str (),
261
flow_spec [1] = CORBA::string_dup (entry1.entry_to_string ());
263
// Register the sender mmdevice object with the ORB
264
ACE_NEW_RETURN (this->sender_mmdevice_,
265
TAO_MMDevice (&this->endpoint_strategy_),
268
// Servant Reference Counting to manage lifetime
269
PortableServer::ServantBase_var safe_mmdevice =
270
this->sender_mmdevice_;
272
AVStreams::MMDevice_var mmdevice =
273
this->sender_mmdevice_->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
274
ACE_CHECK_RETURN (-1);
276
ACE_NEW_RETURN (this->streamctrl_,
280
PortableServer::ServantBase_var safe_streamctrl =
283
// Bind/Connect the sender and receiver MMDevices.
284
CORBA::Boolean bind_result =
285
this->streamctrl_->bind_devs (mmdevice.in (),
286
this->receiver_mmdevice_.in (),
289
ACE_ENV_ARG_PARAMETER);
290
ACE_CHECK_RETURN (-1);
292
if (bind_result == 0)
293
ACE_ERROR_RETURN ((LM_ERROR,
294
"streamctrl::bind_devs failed\n"),
300
// Method to send data at the specified rate
302
Sender::pace_data (ACE_ENV_SINGLE_ARG_DECL)
304
// The time that should lapse between two consecutive frames sent.
305
ACE_Time_Value inter_frame_time;
307
// The time between two consecutive frames.
308
inter_frame_time.set (1 / (double) this->frame_rate_);
310
if (TAO_debug_level > 0)
311
ACE_DEBUG ((LM_DEBUG,
312
"Frame Rate = %d / second\n"
313
"Inter Frame Time = %d (msec)\n",
315
inter_frame_time.msec ()));
319
// The time taken for sending a frame and preparing for the next frame
320
ACE_High_Res_Timer elapsed_timer;
322
// Continue to send data till the file is read to the end.
325
// Read from the file into a message block.
326
int n = ACE_OS::fread (this->mb_.wr_ptr (),
332
ACE_ERROR_RETURN ((LM_ERROR,
333
"Sender::pace_data fread failed\n"),
338
// At end of file break the loop and end the sender.
339
ACE_DEBUG ((LM_DEBUG,"Handle_Start:End of file\n"));
344
this->mb_.wr_ptr (n);
346
if (this->frame_count_ > 1)
349
// Second frame and beyond
352
// Stop the timer that was started just before the previous frame was sent.
353
elapsed_timer.stop ();
355
// Get the time elapsed after sending the previous frame.
356
ACE_Time_Value elapsed_time;
357
elapsed_timer.elapsed_time (elapsed_time);
359
if (TAO_debug_level > 0)
360
ACE_DEBUG ((LM_DEBUG,
361
"Elapsed Time = %d\n",
362
elapsed_time.msec ()));
364
// Check to see if the inter frame time has elapsed.
365
if (elapsed_time < inter_frame_time)
367
// Inter frame time has not elapsed.
369
// Calculate the time to wait before the next frame needs to be sent.
370
ACE_Time_Value wait_time (inter_frame_time - elapsed_time);
372
if (TAO_debug_level > 0)
373
ACE_DEBUG ((LM_DEBUG,
377
// Run the orb for the wait time so the sender can
378
// continue other orb requests.
379
TAO_AV_CORE::instance ()->orb ()->run (wait_time
380
ACE_ENV_ARG_PARAMETER);
385
// Start timer before sending the frame.
386
elapsed_timer.start ();
390
this->protocol_object_->send_frame (&this->mb_);
393
ACE_ERROR_RETURN ((LM_ERROR,
395
"Sender::pace_data send\n"),
398
ACE_DEBUG ((LM_DEBUG,
399
"Sender::pace_data frame %d was sent succesfully\n",
400
++this->frame_count_));
402
// Reset the message block.
407
// File reading is complete, destroy the stream.
408
AVStreams::flowSpec stop_spec;
409
this->streamctrl_->destroy (stop_spec
410
ACE_ENV_ARG_PARAMETER);
413
// Shut the orb down.
414
//TAO_AV_CORE::instance ()->orb ()->shutdown (1,
415
// ACE_ENV_SINGLE_ARG_PARAMETER);
420
//ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
421
// "Sender::pace_data Failed\n");
432
ACE_DECLARE_NEW_CORBA_ENV;
436
CORBA::ORB_init (argc,
439
ACE_ENV_ARG_PARAMETER);
441
CORBA::Object_var obj
442
= orb->resolve_initial_references ("RootPOA"
443
ACE_ENV_ARG_PARAMETER);
446
// Get the POA_var object from Object_var
447
PortableServer::POA_var root_poa
448
= PortableServer::POA::_narrow (obj.in ()
449
ACE_ENV_ARG_PARAMETER);
452
PortableServer::POAManager_var mgr
453
= root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
456
mgr->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
459
// Initialize the AV Stream components.
460
/* TAO_AV_CORE::instance ()->init (orb.in (),
462
ACE_ENV_ARG_PARAMETER); */
465
// Initialize the AVStreams components.
466
TAO_AV_CORE::instance ()->init (orb.in (), root_poa.in () ACE_ENV_ARG_PARAMETER);
469
// Initialize the Sender.
471
result = SENDER::instance ()->init (argc,
473
ACE_ENV_ARG_PARAMETER);
477
ACE_ERROR_RETURN ((LM_ERROR,
478
"Sender::init failed\n"),
481
// Make sure we have a valid <output_file>
482
output_file = ACE_OS::fopen (output_file_name,
484
if (output_file == 0)
485
ACE_ERROR_RETURN ((LM_DEBUG,
486
"Cannot open output file %s\n",
491
ACE_DEBUG ((LM_DEBUG,
492
"File Opened Successfully\n"));
494
// Start sending data.
495
result = SENDER::instance ()->pace_data (ACE_ENV_SINGLE_ARG_PARAMETER);
497
ACE_Time_Value tv(3,0);
503
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
508
ACE_CHECK_RETURN (-1);
512
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
513
template class ACE_Singleton <Sender,ACE_Null_Mutex>;
514
template class TAO_AV_Endpoint_Reactive_Strategy_A<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
515
template class TAO_AV_Endpoint_Reactive_Strategy<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>;
516
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
517
#pragma instantiate ACE_Singleton <Sender,ACE_Null_Mutex>
518
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy_A<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
519
#pragma instantiate TAO_AV_Endpoint_Reactive_Strategy<Sender_StreamEndPoint,TAO_VDev,AV_Null_MediaCtrl>
520
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */