30
30
using namespace Ice;
31
31
using namespace IceInternal;
33
IceUtil::Shared* IceInternal::upCast(AsyncResult* p) { return p; }
33
35
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; }
34
36
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
35
37
IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
36
IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; }
37
IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; }
38
IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; }
38
IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; }
39
IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; }
40
IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; }
42
const unsigned char Ice::AsyncResult::OK = 0x1;
43
const unsigned char Ice::AsyncResult::Done = 0x2;
44
const unsigned char Ice::AsyncResult::Sent = 0x4;
45
const unsigned char Ice::AsyncResult::EndCalled = 0x8;
44
class CallException : public ThreadPoolWorkItem
48
CallException(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::LocalException& ex) :
49
_outAsync(outAsync), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
54
execute(const ThreadPoolPtr& threadPool)
56
threadPool->promoteFollower();
57
_outAsync->__exception(*_exception.get());
62
const OutgoingAsyncMessageCallbackPtr _outAsync;
63
const auto_ptr<Ice::LocalException> _exception;
68
IceInternal::OutgoingAsyncMessageCallback::OutgoingAsyncMessageCallback() :
74
IceInternal::OutgoingAsyncMessageCallback::~OutgoingAsyncMessageCallback()
81
IceInternal::OutgoingAsyncMessageCallback::__sentCallback(const InstancePtr& instance)
85
dynamic_cast<Ice::AMISentCallback*>(this)->ice_sent();
87
catch(const std::exception& ex)
89
__warning(instance, ex);
98
IceInternal::OutgoingAsyncMessageCallback::__exception(const Ice::Exception& exc)
104
catch(const std::exception& ex)
117
IceInternal::OutgoingAsyncMessageCallback::__acquireCallback(const Ice::ObjectPrx& proxy)
119
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
122
// We must first wait for other requests to finish.
129
Instance* instance = proxy->__reference()->getInstance().get();
131
__os = new BasicStream(instance);
135
IceInternal::OutgoingAsyncMessageCallback::__releaseCallback(const Ice::LocalException& exc)
140
// This is called by the invoking thread to release the callback following a direct
141
// failure to marhsall/send the request. We call the ice_exception() callback with
142
// the thread pool to avoid potential deadlocks in case the invoking thread locked
143
// some mutexes/resources (which couldn't be re-acquired by the callback).
149
// COMPILERFIX: The following in done in two separate lines in order to work around
150
// bug in C++Builder 2009.
152
ThreadPoolPtr threadPool = __os->instance()->clientThreadPool();
153
threadPool->execute(new CallException(this, exc));
50
class AsynchronousException : public DispatchWorkItem
54
AsynchronousException(const IceInternal::InstancePtr& instance,
55
const Ice::AsyncResultPtr& result,
56
const Ice::Exception& ex) :
57
DispatchWorkItem(instance), _result(result), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
64
_result->__exception(*_exception.get());
69
const Ice::AsyncResultPtr _result;
70
const auto_ptr<Ice::Exception> _exception;
73
class AsynchronousSent : public DispatchWorkItem
77
AsynchronousSent(const IceInternal::InstancePtr& instance, const Ice::AsyncResultPtr& result) :
78
DispatchWorkItem(instance), _result(result)
90
const Ice::AsyncResultPtr _result;
95
Ice::AsyncResult::AsyncResult(const IceInternal::InstancePtr& instance,
97
const CallbackBasePtr& del,
98
const LocalObjectPtr& cookie) :
110
throw IceUtil::IllegalArgumentException(__FILE__, __LINE__);
112
const_cast<CallbackBasePtr&>(_callback) = _callback->__verify(const_cast<LocalObjectPtr&>(_cookie));
115
Ice::AsyncResult::~AsyncResult()
120
Ice::AsyncResult::operator==(const AsyncResult& r) const
126
Ice::AsyncResult::operator<(const AsyncResult& r) const
132
Ice::AsyncResult::getHash() const
134
return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
138
Ice::AsyncResult::isCompleted() const
140
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
141
return _state & Done;
145
Ice::AsyncResult::waitForCompleted()
147
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
148
while(!(_state & Done))
155
Ice::AsyncResult::isSent() const
157
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
158
return _state & Sent;
162
Ice::AsyncResult::waitForSent()
164
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
165
while(!(_state & (Sent | Done)))
172
Ice::AsyncResult::__wait()
174
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
175
if(_state & EndCalled)
177
throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
180
while(!(_state & Done))
186
_exception.get()->ice_throw();
192
Ice::AsyncResult::__throwUserException()
196
_is.startReadEncaps();
197
_is.throwException();
199
catch(const Ice::UserException&)
207
Ice::AsyncResult::__sent()
210
// Note: no need to change the _state here, specializations are responsible for
211
// changing the state.
218
AsyncResultPtr self(this);
219
_callback->__sent(self);
221
catch(const std::exception& ex)
233
Ice::AsyncResult::__sentAsync()
236
// This is called when it's not safe to call the sent callback synchronously
237
// from this thread. Instead the exception callback is called asynchronously from
238
// the client thread pool.
242
_instance->clientThreadPool()->execute(new AsynchronousSent(_instance, this));
155
244
catch(const Ice::CommunicatorDestroyedException&)
158
throw; // CommunicatorDestroyedException is the only exception that can propagate directly.
163
IceInternal::OutgoingAsyncMessageCallback::__releaseCallbackNoSync()
179
IceInternal::OutgoingAsyncMessageCallback::__warning(const std::exception& exc) const
181
if(__os) // Don't print anything if release() was already called.
183
__warning(__os->instance(), exc);
188
IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance, const std::exception& exc) const
190
if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
192
Warning out(instance->initializationData().logger);
250
Ice::AsyncResult::__exception(const Ice::Exception& ex)
253
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
255
_exception.reset(ex.ice_clone());
256
_monitor.notifyAll();
263
AsyncResultPtr self(this);
264
_callback->__completed(self);
266
catch(const std::exception& ex)
278
Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex)
281
// This is called when it's not safe to call the exception callback synchronously
282
// from this thread. Instead the exception callback is called asynchronously from
283
// the client thread pool.
285
// CommunicatorDestroyedException is the only exception that can propagate directly
288
_instance->clientThreadPool()->execute(new AsynchronousException(_instance, this, ex));
292
Ice::AsyncResult::__response()
295
// Note: no need to change the _state here, specializations are responsible for
296
// changing the state.
303
AsyncResultPtr self(this);
304
_callback->__completed(self);
306
catch(const std::exception& ex)
318
Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
320
__check(r, operation);
321
if(r->getProxy().get() != prx)
323
throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation +
324
" does not match proxy that was used to call corresponding begin_" +
325
operation + " method");
330
Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation)
332
__check(r, operation);
333
if(r->getCommunicator().get() != com)
335
throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation +
336
" does not match communicator that was used to call corresponding " +
337
"begin_" + operation + " method");
342
Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation)
344
__check(r, operation);
345
if(r->getConnection().get() != con)
347
throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation +
348
" does not match connection that was used to call corresponding " +
349
"begin_" + operation + " method");
354
Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation)
358
throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null");
360
else if(&r->_operation != &operation)
362
throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation +
363
" method: " + r->_operation);
369
Ice::AsyncResult::__warning(const std::exception& exc) const
371
if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
373
Warning out(_instance->initializationData().logger);
193
374
const Exception* ex = dynamic_cast<const Exception*>(&exc);
206
IceInternal::OutgoingAsyncMessageCallback::__warning() const
208
if(__os) // Don't print anything if release() was already called.
210
__warning(__os->instance());
215
IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance) const
217
if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
219
Warning out(instance->initializationData().logger);
387
Ice::AsyncResult::__warning() const
389
if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
391
Warning out(_instance->initializationData().logger);
220
392
out << "unknown exception raised by AMI callback";
396
IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
397
const std::string& operation,
398
const CallbackBasePtr& delegate,
399
const Ice::LocalObjectPtr& cookie) :
400
AsyncResult(prx->__reference()->getInstance(), operation, delegate, cookie),
406
IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context)
411
_sentSynchronously = false;
414
// Can't call async via a batch proxy.
416
if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
418
throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
421
_os.writeBlob(requestHdr, sizeof(requestHdr));
423
Reference* ref = _proxy->__reference().get();
425
ref->getIdentity().__write(&_os);
428
// For compatibility with the old FacetPath.
430
if(ref->getFacet().empty())
432
_os.write(static_cast<string*>(0), static_cast<string*>(0));
436
string facet = ref->getFacet();
437
_os.write(&facet, &facet + 1);
440
_os.write(operation, false);
442
_os.write(static_cast<Byte>(_mode));
449
__writeContext(&_os, *context);
456
const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
457
const Context& prxContext = ref->getContext()->getValue();
458
if(implicitContext == 0)
460
__writeContext(&_os, prxContext);
464
implicitContext->write(prxContext, &_os);
468
_os.startWriteEncaps();
225
472
IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
227
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
230
if(!_proxy->ice_isTwoway())
232
__releaseCallbackNoSync(); // No response expected, we're done with the OutgoingAsync.
236
__monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
238
else if(connection->timeout() > 0)
240
assert(!_timerTaskConnection && __os);
241
_timerTaskConnection = connection;
242
IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
243
__os->instance()->timer()->schedule(this, timeout);
474
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
476
bool alreadySent = _state & Sent; // Expected in case of a retry.
480
// It's possible for the request to be done already when using IOCP. This
481
// is the case for example if the send callback is dispatched after the
482
// read callback for the response/exception.
486
if(!_proxy->ice_isTwoway())
490
else if(connection->timeout() > 0)
492
assert(!_timerTaskConnection);
493
_timerTaskConnection = connection;
494
IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
495
_instance->timer()->schedule(this, timeout);
498
_monitor.notifyAll();
499
return !alreadySent && _callback && _callback->__hasSentCallback();
503
IceInternal::OutgoingAsync::__sent()
505
#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug
506
AsyncResult::__sent();
508
::Ice::AsyncResult::__sent();
513
IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent)
516
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
517
assert(!(_state & Done));
518
if(_timerTaskConnection)
520
_instance->timer()->cancel(this);
521
_timerTaskConnection = 0;
526
// NOTE: at this point, synchronization isn't needed, no other threads should be
527
// calling on the callback.
532
int interval = handleException(exc, sent); // This will throw if the invocation can't be retried.
535
_instance->retryQueue()->add(this, interval);
542
catch(const Ice::LocalException& ex)
549
IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
552
// NOTE: at this point, synchronization isn't needed, no other threads should be
553
// calling on the callback. The LocalExceptionWrapper exception is only called
554
// before the invocation is sent.
559
int interval = handleException(exc); // This will throw if the invocation can't be retried.
562
_instance->retryQueue()->add(this, interval);
569
catch(const Ice::LocalException& ex)
381
703
throw UnknownReplyStatusException(__FILE__, __LINE__);
708
if(replyStatus == replyOK)
712
_monitor.notifyAll();
385
714
catch(const LocalException& ex)
716
__finished(ex, true);
391
720
assert(replyStatus == replyOK || replyStatus == replyUserException);
395
__response(replyStatus == replyOK);
397
catch(const std::exception& ex)
410
IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc)
413
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
416
if(_timerTaskConnection && __os->instance()->timer()->cancel(this))
418
_timerTaskConnection = 0; // Timer cancelled.
421
while(_timerTaskConnection)
428
// NOTE: at this point, synchronization isn't needed, no other threads should be
429
// calling on the callback.
434
handleException(exc); // This will throw if the invocation can't be retried.
436
catch(const Ice::LocalException& ex)
443
IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
445
assert(__os && !_sent);
448
// NOTE: at this point, synchronization isn't needed, no other threads should be
449
// calling on the callback. The LocalExceptionWrapper exception is only called
450
// before the invocation is sent.
455
handleException(exc); // This will throw if the invocation can't be retried.
457
catch(const Ice::LocalException& ex)
464
IceInternal::OutgoingAsync::__retry(int interval)
467
// This method is called by the proxy to retry an invocation, no
468
// other threads can access this object.
473
__os->instance()->retryQueue()->add(this, interval);
482
IceInternal::OutgoingAsync::__send()
725
IceInternal::OutgoingAsync::__send(bool synchronous)
488
_delegate = _proxy->__getDelegate(true);
489
_sentSynchronously = _delegate->__getRequestHandler()->sendAsyncRequest(this);
491
catch(const LocalExceptionWrapper& ex)
493
handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
495
catch(const Ice::LocalException& ex)
497
handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
732
_delegate = _proxy->__getDelegate(true);
733
AsyncStatus status = _delegate->__getRequestHandler()->sendAsyncRequest(this);
734
if(status & AsyncStatusSent)
738
_sentSynchronously = true;
739
if(status & AsyncStatusInvokeSentCallback)
741
__sent(); // Call the sent callback from the user thread.
746
if(status & AsyncStatusInvokeSentCallback)
748
__sentAsync(); // Call the sent callback from a client thread pool thread.
754
catch(const LocalExceptionWrapper& ex)
756
interval = handleException(ex);
758
catch(const Ice::LocalException& ex)
760
interval = handleException(ex, false);
765
_instance->retryQueue()->add(this, interval);
499
769
return _sentSynchronously;
503
IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode,
504
const Context* context)
510
_sentSynchronously = false;
513
// Can't call async via a batch proxy.
515
if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
517
throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
520
__os->writeBlob(requestHdr, sizeof(requestHdr));
522
Reference* ref = _proxy->__reference().get();
524
ref->getIdentity().__write(__os);
527
// For compatibility with the old FacetPath.
529
if(ref->getFacet().empty())
531
__os->write(static_cast<string*>(0), static_cast<string*>(0));
535
string facet = ref->getFacet();
536
__os->write(&facet, &facet + 1);
539
__os->write(operation, false);
541
__os->write(static_cast<Byte>(_mode));
548
__writeContext(__os, *context);
555
const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
556
const Context& prxContext = ref->getContext()->getValue();
557
if(implicitContext == 0)
559
__writeContext(__os, prxContext);
563
implicitContext->write(prxContext, __os);
567
__os->startWriteEncaps();
571
IceInternal::OutgoingAsync::__throwUserException()
576
__is->startReadEncaps();
577
__is->throwException();
579
catch(const Ice::UserException&)
581
__is->endReadEncaps();
587
773
IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex)
589
775
if(_mode == Nonmutating || _mode == Idempotent)
591
_proxy->__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
777
return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt);
595
_proxy->__handleExceptionWrapper(_delegate, ex, this);
781
return _proxy->__handleExceptionWrapper(_delegate, ex);
600
IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc)
786
IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool sent)
848
IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const InstancePtr& instance,
849
const std::string& operation,
850
const CallbackBasePtr& delegate,
851
const Ice::LocalObjectPtr& cookie) :
852
AsyncResult(instance, operation, delegate, cookie)
668
857
IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
674
IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc)
859
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
860
assert(!_exception.get());
861
_state |= Done | OK | Sent;
862
_monitor.notifyAll();
863
return _callback && _callback->__hasSentCallback();
867
IceInternal::BatchOutgoingAsync::__sent()
869
#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug
870
AsyncResult::__sent();
872
::Ice::AsyncResult::__sent();
877
IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool)
676
879
__exception(exc);
680
Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
681
const vector<Byte>& inParams, const Context* context)
683
__acquireCallback(prx);
686
__prepare(prx, operation, mode, context);
687
__os->writeBlob(inParams);
688
__os->endWriteEncaps();
691
catch(const Ice::LocalException& ex)
693
__releaseCallback(ex);
699
Ice::AMI_Object_ice_invoke::__response(bool ok) // ok == true means no user exception.
701
vector<Byte> outParams;
704
__is->startReadEncaps();
705
Int sz = __is->getReadEncapsSize();
706
__is->readBlob(outParams, sz);
707
__is->endReadEncaps();
709
catch(const LocalException& ex)
714
ice_response(ok, outParams);
719
Ice::AMI_Array_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
720
const pair<const Byte*, const Byte*>& inParams, const Context* context)
722
__acquireCallback(prx);
725
__prepare(prx, operation, mode, context);
726
__os->writeBlob(inParams.first, static_cast<Int>(inParams.second - inParams.first));
727
__os->endWriteEncaps();
730
catch(const Ice::LocalException& ex)
732
__releaseCallback(ex);
738
Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no user exception.
740
pair<const Byte*, const Byte*> outParams;
743
__is->startReadEncaps();
744
Int sz = __is->getReadEncapsSize();
745
__is->readBlob(outParams.first, sz);
746
outParams.second = outParams.first + sz;
747
__is->endReadEncaps();
749
catch(const LocalException& ex)
754
ice_response(ok, outParams);
759
Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx)
761
__acquireCallback(prx);
765
// We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
766
// requests were queued with the connection, they would be lost without being noticed.
768
Handle< ::IceDelegate::Ice::Object> delegate;
769
int cnt = -1; // Don't retry.
772
delegate = prx->__getDelegate(true);
773
return delegate->__getRequestHandler()->flushAsyncBatchRequests(this);
775
catch(const Ice::LocalException& ex)
777
prx->__handleException(delegate, ex, 0, cnt);
780
catch(const Ice::LocalException& ex)
782
__releaseCallback(ex);
882
IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy,
883
const std::string& operation,
884
const CallbackBasePtr& delegate,
885
const Ice::LocalObjectPtr& cookie) :
886
BatchOutgoingAsync(proxy->__reference()->getInstance(), operation, delegate, cookie),
892
IceInternal::ProxyBatchOutgoingAsync::__send()
895
// We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
896
// requests were queued with the connection, they would be lost without being noticed.
898
Handle<IceDelegate::Ice::Object> delegate;
899
int cnt = -1; // Don't retry.
902
delegate = _proxy->__getDelegate(true);
903
AsyncStatus status = delegate->__getRequestHandler()->flushAsyncBatchRequests(this);
904
if(status & AsyncStatusSent)
906
_sentSynchronously = true;
907
if(status & AsyncStatusInvokeSentCallback)
913
catch(const ::Ice::LocalException& ex)
915
_proxy->__handleException(delegate, ex, 0, cnt);
919
IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con,
920
const InstancePtr& instance,
921
const string& operation,
922
const CallbackBasePtr& delegate,
923
const Ice::LocalObjectPtr& cookie) :
924
BatchOutgoingAsync(instance, operation, delegate, cookie),
930
IceInternal::ConnectionBatchOutgoingAsync::__send()
932
AsyncStatus status = _connection->flushAsyncBatchRequests(this);
933
if(status & AsyncStatusSent)
935
_sentSynchronously = true;
936
if(status & AsyncStatusInvokeSentCallback)
944
IceInternal::ConnectionBatchOutgoingAsync::getConnection() const
949
IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator,
950
const InstancePtr& instance,
951
const string& operation,
952
const CallbackBasePtr& delegate,
953
const Ice::LocalObjectPtr& cookie) :
954
BatchOutgoingAsync(instance, operation, delegate, cookie),
955
_communicator(communicator)
958
// _useCount is initialized to 1 to prevent premature callbacks.
959
// The caller must invoke ready() after all flush requests have
965
// Assume all connections are flushed synchronously.
967
_sentSynchronously = true;
971
IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con)
974
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
977
CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed,
978
&CommunicatorBatchOutgoingAsync::sent);
979
con->begin_flushBatchRequests(cb);
983
IceInternal::CommunicatorBatchOutgoingAsync::ready()
989
IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r)
991
ConnectionPtr con = r->getConnection();
996
con->end_flushBatchRequests(r);
997
assert(false); // completed() should only be called when an exception occurs.
999
catch(const Ice::LocalException& ex)
1001
check(r, &ex, false);
1006
IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r)
1008
check(r, 0, r->sentSynchronously());
1012
IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread)
1017
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
1018
assert(_useCount > 0);
1022
// We report that the communicator flush request was sent synchronously
1023
// if all of the connection flush requests are sent synchronously.
1025
if((r && !r->sentSynchronously()) || ex)
1027
_sentSynchronously = false;
1033
_state |= Done | OK | Sent;
1034
_monitor.notifyAll();
1041
// _sentSynchronously is immutable here.
1043
if(!_sentSynchronously && userThread)
1049
assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
1050
BatchOutgoingAsync::__sent();
1059
// Dummy class derived from CallbackBase
1060
// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
1061
// this allows us to test whether the user supplied a null delegate instance to the
1062
// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
1063
// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
1066
class DummyCallback : public CallbackBase
1074
virtual void __completed(const Ice::AsyncResultPtr&) const
1079
virtual CallbackBasePtr __verify(Ice::LocalObjectPtr&)
1082
// Called by the AsyncResult constructor to verify the delegate. The dummy
1083
// delegate is passed when the user used a begin_ method without delegate.
1084
// By returning 0 here, we tell the AsyncResult that no delegates was
1090
virtual void __sent(const AsyncResultPtr&) const
1095
virtual bool __hasSentCallback() const
1105
// This gives a pointer value to compare against in the generated
1106
// begin_ method to decide whether the caller passed a null pointer
1107
// versus the generated inline version of the begin_ method having
1108
// passed a pointer to the dummy delegate.
1110
CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
1113
Ice::AMICallbackBase::__exception(const ::Ice::Exception& ex)
1119
Ice::AMICallbackBase::__sent(bool sentSynchronously)
1121
if(!sentSynchronously)
1123
dynamic_cast<AMISentCallback*>(this)->ice_sent();