1
// Notify_Sequence_Push_Consumer.cpp,v 1.9 2003/10/30 20:59:10 bala Exp
3
#include "Notify_Sequence_Push_Consumer.h"
4
#include "orbsvcs/TimeBaseC.h"
8
Notify_Sequence_Push_Consumer::Notify_Sequence_Push_Consumer (
15
discard_policy_ (policy),
25
Notify_Sequence_Push_Consumer::_connect (
26
CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
28
ACE_THROW_SPEC ((CORBA::SystemException))
30
CosNotifyComm::SequencePushConsumer_var objref =
31
this->_this (ACE_ENV_SINGLE_ARG_PARAMETER);
34
CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
35
consumer_admin->obtain_notification_push_supplier (
36
CosNotifyChannelAdmin::SEQUENCE_EVENT,
38
ACE_ENV_ARG_PARAMETER);
42
CosNotifyChannelAdmin::SequenceProxyPushSupplier::_narrow (
43
proxysupplier.in () ACE_ENV_ARG_PARAMETER);
46
CosNotification::QoSProperties properties (4);
47
properties.length (4);
48
properties[0].name = CORBA::string_dup (CosNotification::MaximumBatchSize);
49
properties[0].value <<= (CORBA::Long)5;
50
properties[1].name = CORBA::string_dup (CosNotification::PacingInterval);
51
properties[1].value <<= (TimeBase::TimeT)3;
52
properties[2].name = CORBA::string_dup (CosNotification::DiscardPolicy);
53
properties[2].value <<= this->discard_policy_;
54
properties[3].name = CORBA::string_dup (CosNotification::MaxEventsPerConsumer);
55
properties[3].value <<= (CORBA::Long)2;
57
this->proxy_->set_qos (properties);
58
this->proxy_->connect_sequence_push_consumer (objref.in ()
59
ACE_ENV_ARG_PARAMETER);
62
// give ownership to POA
63
this->_remove_ref (ACE_ENV_SINGLE_ARG_PARAMETER);
69
Notify_Sequence_Push_Consumer::push_structured_events (
70
const CosNotification::EventBatch& events
71
ACE_ENV_ARG_DECL_NOT_USED /*ACE_ENV_SINGLE_ARG_PARAMETER*/)
72
ACE_THROW_SPEC ((CORBA::SystemException))
74
CORBA::ULong length = events.length ();
77
ACE_DEBUG ((LM_DEBUG, "Received %u events:\n", length));
79
for (CORBA::ULong e = 0; e < length; e++)
81
CORBA::ULong hlength = events[e].header.variable_header.length ();
82
for (CORBA::ULong hi = 0; hi < hlength; hi++)
87
(const char*)events[e].header.variable_header[hi].name,
88
Any_String (events[e].header.variable_header[hi].value)));
91
CORBA::ULong flength = events[e].filterable_data.length ();
92
for (CORBA::ULong i = 0; i < flength; i++)
97
(const char*)events[e].filterable_data[i].name,
98
Any_String (events[e].filterable_data[i].value)));
102
ACE_DEBUG ((LM_DEBUG,
103
"-------------------------\n"));
106
if (this->count_ > this->high_)
109
ACE_ERROR ((LM_ERROR,
110
ACE_TEXT ("Sequence Consumer (%P|%t): ERROR: too "
111
"many events received.\n")));
113
else if (this->count_ == this->low_)