~ubuntu-branches/ubuntu/quantal/zeroc-ice/quantal

« back to all changes in this revision

Viewing changes to cpp/src/Ice/OutgoingAsync.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Cleto Martin Angelina
  • Date: 2011-04-25 18:44:24 UTC
  • mfrom: (6.1.14 sid)
  • Revision ID: james.westby@ubuntu.com-20110425184424-sep9i9euu434vq4c
Tags: 3.4.1-7
* Bug fix: "libdb5.1-java.jar was renamed to db.jar", thanks to Ondřej
  Surý (Closes: #623555).
* Bug fix: "causes noise in php5", thanks to Jayen Ashar (Closes:
  #623533).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
// **********************************************************************
2
2
//
3
 
// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
 
3
// Copyright (c) 2003-2010 ZeroC, Inc. All rights reserved.
4
4
//
5
5
// This copy of Ice is licensed to you under the terms described in the
6
6
// ICE_LICENSE file included in this distribution.
30
30
using namespace Ice;
31
31
using namespace IceInternal;
32
32
 
 
33
IceUtil::Shared* IceInternal::upCast(AsyncResult* p) { return p; }
 
34
 
33
35
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; }
34
36
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
35
37
IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
36
 
IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; }
37
 
IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; }
38
 
IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; }
 
38
IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; }
 
39
IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; }
 
40
IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; }
39
41
 
 
42
const unsigned char Ice::AsyncResult::OK = 0x1;
 
43
const unsigned char Ice::AsyncResult::Done = 0x2;
 
44
const unsigned char Ice::AsyncResult::Sent = 0x4;
 
45
const unsigned char Ice::AsyncResult::EndCalled = 0x8;
40
46
 
41
47
namespace
42
48
{
43
49
 
44
 
class CallException : public ThreadPoolWorkItem
45
 
{
46
 
public:
47
 
    
48
 
    CallException(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::LocalException& ex) :
49
 
        _outAsync(outAsync), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
50
 
    {
51
 
    }
52
 
    
53
 
    virtual void
54
 
    execute(const ThreadPoolPtr& threadPool)
55
 
    {
56
 
        threadPool->promoteFollower();
57
 
        _outAsync->__exception(*_exception.get());
58
 
    }
59
 
    
60
 
private:
61
 
    
62
 
    const OutgoingAsyncMessageCallbackPtr _outAsync;
63
 
    const auto_ptr<Ice::LocalException> _exception;
64
 
};
65
 
 
66
 
};
67
 
 
68
 
IceInternal::OutgoingAsyncMessageCallback::OutgoingAsyncMessageCallback() :
69
 
    __is(0),
70
 
    __os(0)
71
 
{
72
 
}
73
 
 
74
 
IceInternal::OutgoingAsyncMessageCallback::~OutgoingAsyncMessageCallback()
75
 
{
76
 
    assert(!__is);
77
 
    assert(!__os);
78
 
}
79
 
 
80
 
void
81
 
IceInternal::OutgoingAsyncMessageCallback::__sentCallback(const InstancePtr& instance)
82
 
{
83
 
    try
84
 
    {
85
 
        dynamic_cast<Ice::AMISentCallback*>(this)->ice_sent();
86
 
    }
87
 
    catch(const std::exception& ex)
88
 
    {
89
 
        __warning(instance, ex);
90
 
    }
91
 
    catch(...)
92
 
    {
93
 
        __warning(instance);
94
 
    }
95
 
}
96
 
 
97
 
void
98
 
IceInternal::OutgoingAsyncMessageCallback::__exception(const Ice::Exception& exc)
99
 
{    
100
 
    try
101
 
    {
102
 
        ice_exception(exc);
103
 
    }
104
 
    catch(const std::exception& ex)
105
 
    {
106
 
        __warning(ex);
107
 
    }
108
 
    catch(...)
109
 
    {
110
 
        __warning();
111
 
    }
112
 
 
113
 
    __releaseCallback();
114
 
}
115
 
 
116
 
void
117
 
IceInternal::OutgoingAsyncMessageCallback::__acquireCallback(const Ice::ObjectPrx& proxy)
118
 
{
119
 
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
120
 
 
121
 
    //
122
 
    // We must first wait for other requests to finish.
123
 
    //
124
 
    while(__os)
125
 
    {
126
 
        __monitor.wait();
127
 
    }
128
 
 
129
 
    Instance* instance = proxy->__reference()->getInstance().get();
130
 
    assert(!__os);
131
 
    __os = new BasicStream(instance);
132
 
}
133
 
 
134
 
void
135
 
IceInternal::OutgoingAsyncMessageCallback::__releaseCallback(const Ice::LocalException& exc)
136
 
{
137
 
    assert(__os);
138
 
 
139
 
    //
140
 
    // This is called by the invoking thread to release the callback following a direct 
141
 
    // failure to marhsall/send the request. We call the ice_exception() callback with
142
 
    // the thread pool to avoid potential deadlocks in case the invoking thread locked 
143
 
    // some mutexes/resources (which couldn't be re-acquired by the callback).
144
 
    //
145
 
 
146
 
    try
147
 
    {
148
 
        //
149
 
        // COMPILERFIX: The following in done in two separate lines in order to work around
150
 
        //              bug in C++Builder 2009.
151
 
        //
152
 
        ThreadPoolPtr threadPool = __os->instance()->clientThreadPool();
153
 
        threadPool->execute(new CallException(this, exc));
 
50
class AsynchronousException : public DispatchWorkItem
 
51
{
 
52
public:
 
53
    
 
54
    AsynchronousException(const IceInternal::InstancePtr& instance,
 
55
                          const Ice::AsyncResultPtr& result,
 
56
                          const Ice::Exception& ex) :
 
57
        DispatchWorkItem(instance), _result(result), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
 
58
    {
 
59
    }
 
60
    
 
61
    virtual void
 
62
    run()
 
63
    {
 
64
        _result->__exception(*_exception.get());
 
65
    }
 
66
    
 
67
private:
 
68
    
 
69
    const Ice::AsyncResultPtr _result;
 
70
    const auto_ptr<Ice::Exception> _exception;
 
71
};
 
72
 
 
73
class AsynchronousSent : public DispatchWorkItem
 
74
{
 
75
public:
 
76
    
 
77
    AsynchronousSent(const IceInternal::InstancePtr& instance, const Ice::AsyncResultPtr& result) : 
 
78
        DispatchWorkItem(instance), _result(result)
 
79
    {
 
80
    }
 
81
    
 
82
    virtual void
 
83
    run()
 
84
    {
 
85
        _result->__sent();
 
86
    }
 
87
    
 
88
private:
 
89
    
 
90
    const Ice::AsyncResultPtr _result;
 
91
};
 
92
 
 
93
};
 
94
 
 
95
Ice::AsyncResult::AsyncResult(const IceInternal::InstancePtr& instance,
 
96
                              const string& op,
 
97
                              const CallbackBasePtr& del,
 
98
                              const LocalObjectPtr& cookie) :
 
99
    _instance(instance),
 
100
    _operation(op), 
 
101
    _callback(del),
 
102
    _cookie(cookie),
 
103
    _is(instance.get()),
 
104
    _os(instance.get()),
 
105
    _state(0),
 
106
    _exception(0)
 
107
{
 
108
    if(!_callback)
 
109
    {
 
110
        throw IceUtil::IllegalArgumentException(__FILE__, __LINE__);
 
111
    }
 
112
    const_cast<CallbackBasePtr&>(_callback) = _callback->__verify(const_cast<LocalObjectPtr&>(_cookie));
 
113
}
 
114
 
 
115
Ice::AsyncResult::~AsyncResult()
 
116
{
 
117
}
 
118
 
 
119
bool
 
120
Ice::AsyncResult::operator==(const AsyncResult& r) const
 
121
{
 
122
    return this == &r;
 
123
}
 
124
 
 
125
bool
 
126
Ice::AsyncResult::operator<(const AsyncResult& r) const
 
127
{
 
128
    return this < &r;
 
129
}
 
130
 
 
131
Int
 
132
Ice::AsyncResult::getHash() const
 
133
{
 
134
    return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
 
135
}
 
136
 
 
137
bool
 
138
Ice::AsyncResult::isCompleted() const
 
139
{
 
140
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
141
    return _state & Done;
 
142
}
 
143
 
 
144
void 
 
145
Ice::AsyncResult::waitForCompleted()
 
146
{
 
147
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
148
    while(!(_state & Done))
 
149
    {
 
150
        _monitor.wait();
 
151
    }
 
152
}
 
153
 
 
154
bool 
 
155
Ice::AsyncResult::isSent() const
 
156
{
 
157
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
158
    return _state & Sent;
 
159
}
 
160
 
 
161
void
 
162
Ice::AsyncResult::waitForSent()
 
163
{
 
164
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
165
    while(!(_state & (Sent | Done)))
 
166
    {
 
167
        _monitor.wait();
 
168
    }
 
169
}
 
170
 
 
171
bool
 
172
Ice::AsyncResult::__wait()
 
173
{
 
174
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
175
    if(_state & EndCalled)
 
176
    {
 
177
        throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
 
178
    }
 
179
    _state |= EndCalled;
 
180
    while(!(_state & Done))
 
181
    {
 
182
        _monitor.wait();
 
183
    }
 
184
    if(_exception.get())
 
185
    {
 
186
        _exception.get()->ice_throw();
 
187
    }
 
188
    return _state & OK;
 
189
}
 
190
 
 
191
void
 
192
Ice::AsyncResult::__throwUserException()
 
193
{
 
194
    try
 
195
    {
 
196
        _is.startReadEncaps();
 
197
        _is.throwException();
 
198
    }
 
199
    catch(const Ice::UserException&)
 
200
    {
 
201
        _is.endReadEncaps();
 
202
        throw;
 
203
    }
 
204
}
 
205
 
 
206
void
 
207
Ice::AsyncResult::__sent()
 
208
{
 
209
    //
 
210
    // Note: no need to change the _state here, specializations are responsible for
 
211
    // changing the state.
 
212
    //
 
213
 
 
214
    if(_callback)
 
215
    {
 
216
        try
 
217
        {
 
218
            AsyncResultPtr self(this);
 
219
            _callback->__sent(self);
 
220
        }
 
221
        catch(const std::exception& ex)
 
222
        {
 
223
            __warning(ex);
 
224
        }
 
225
        catch(...)
 
226
        {
 
227
            __warning();
 
228
        }
 
229
    }
 
230
}
 
231
 
 
232
void
 
233
Ice::AsyncResult::__sentAsync()
 
234
{
 
235
    //
 
236
    // This is called when it's not safe to call the sent callback synchronously
 
237
    // from this thread. Instead the exception callback is called asynchronously from
 
238
    // the client thread pool.
 
239
    //
 
240
    try
 
241
    {
 
242
        _instance->clientThreadPool()->execute(new AsynchronousSent(_instance, this));
154
243
    }
155
244
    catch(const Ice::CommunicatorDestroyedException&)
156
245
    {
157
 
        __releaseCallback();
158
 
        throw; // CommunicatorDestroyedException is the only exception that can propagate directly.
159
 
    }
160
 
}
161
 
 
162
 
void
163
 
IceInternal::OutgoingAsyncMessageCallback::__releaseCallbackNoSync()
164
 
{
165
 
    if(__is)
166
 
    {
167
 
        delete __is;
168
 
        __is = 0;
169
 
    }
170
 
 
171
 
    assert(__os);
172
 
    delete __os;
173
 
    __os = 0;
174
 
 
175
 
    __monitor.notify();
176
 
}
177
 
 
178
 
void
179
 
IceInternal::OutgoingAsyncMessageCallback::__warning(const std::exception& exc) const
180
 
{
181
 
    if(__os) // Don't print anything if release() was already called.
182
 
    {
183
 
        __warning(__os->instance(), exc);
184
 
    }
185
 
}
186
 
 
187
 
void
188
 
IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance, const std::exception& exc) const
189
 
{
190
 
    if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
191
 
    {
192
 
        Warning out(instance->initializationData().logger);
 
246
    }
 
247
}
 
248
 
 
249
void
 
250
Ice::AsyncResult::__exception(const Ice::Exception& ex)
 
251
{
 
252
    {
 
253
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
254
        _state |= Done;
 
255
        _exception.reset(ex.ice_clone());
 
256
        _monitor.notifyAll();
 
257
    }
 
258
 
 
259
    if(_callback)
 
260
    {
 
261
        try
 
262
        {
 
263
            AsyncResultPtr self(this);
 
264
            _callback->__completed(self);
 
265
        }
 
266
        catch(const std::exception& ex)
 
267
        {
 
268
            __warning(ex);
 
269
        }
 
270
        catch(...)
 
271
        {
 
272
            __warning();
 
273
        }
 
274
    }
 
275
}
 
276
 
 
277
void
 
278
Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex)
 
279
{
 
280
    //
 
281
    // This is called when it's not safe to call the exception callback synchronously
 
282
    // from this thread. Instead the exception callback is called asynchronously from
 
283
    // the client thread pool.
 
284
    //
 
285
    // CommunicatorDestroyedException is the only exception that can propagate directly
 
286
    // from this method.
 
287
    //
 
288
    _instance->clientThreadPool()->execute(new AsynchronousException(_instance, this, ex));
 
289
}
 
290
 
 
291
void
 
292
Ice::AsyncResult::__response()
 
293
{
 
294
    //
 
295
    // Note: no need to change the _state here, specializations are responsible for
 
296
    // changing the state.
 
297
    //
 
298
 
 
299
    if(_callback)
 
300
    {
 
301
        try
 
302
        {
 
303
            AsyncResultPtr self(this);
 
304
            _callback->__completed(self);
 
305
        }
 
306
        catch(const std::exception& ex)
 
307
        {
 
308
            __warning(ex);
 
309
        }
 
310
        catch(...)
 
311
        {
 
312
            __warning();
 
313
        }
 
314
    }
 
315
}
 
316
 
 
317
void
 
318
Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
 
319
{
 
320
    __check(r, operation);
 
321
    if(r->getProxy().get() != prx)
 
322
    {
 
323
        throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation +
 
324
                                                " does not match proxy that was used to call corresponding begin_" +
 
325
                                                operation + " method");
 
326
    }
 
327
}
 
328
 
 
329
void
 
330
Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation)
 
331
{
 
332
    __check(r, operation);
 
333
    if(r->getCommunicator().get() != com)
 
334
    {
 
335
        throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation +
 
336
                                                " does not match communicator that was used to call corresponding " +
 
337
                                                "begin_" + operation + " method");
 
338
    }
 
339
}
 
340
 
 
341
void
 
342
Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation)
 
343
{
 
344
    __check(r, operation);
 
345
    if(r->getConnection().get() != con)
 
346
    {
 
347
        throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation +
 
348
                                                " does not match connection that was used to call corresponding " +
 
349
                                                "begin_" + operation + " method");
 
350
    }
 
351
}
 
352
 
 
353
void
 
354
Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation)
 
355
{
 
356
    if(!r)
 
357
    {
 
358
        throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null");
 
359
    }
 
360
    else if(&r->_operation != &operation)
 
361
    {
 
362
        throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation + 
 
363
                                                " method: " + r->_operation);
 
364
    }
 
365
}
 
366
 
 
367
 
 
368
void
 
369
Ice::AsyncResult::__warning(const std::exception& exc) const
 
370
{
 
371
    if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
 
372
    {
 
373
        Warning out(_instance->initializationData().logger);
193
374
        const Exception* ex = dynamic_cast<const Exception*>(&exc);
194
375
        if(ex)
195
376
        {
203
384
}
204
385
 
205
386
void
206
 
IceInternal::OutgoingAsyncMessageCallback::__warning() const
207
 
{
208
 
    if(__os) // Don't print anything if release() was already called.
209
 
    {
210
 
        __warning(__os->instance());
211
 
    }
212
 
}
213
 
 
214
 
void
215
 
IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance) const
216
 
{
217
 
    if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
218
 
    {
219
 
        Warning out(instance->initializationData().logger);
 
387
Ice::AsyncResult::__warning() const
 
388
{
 
389
    if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
 
390
    {
 
391
        Warning out(_instance->initializationData().logger);
220
392
        out << "unknown exception raised by AMI callback";
221
393
    }
222
394
}
223
395
 
 
396
IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, 
 
397
                                          const std::string& operation,
 
398
                                          const CallbackBasePtr& delegate,
 
399
                                          const Ice::LocalObjectPtr& cookie) :
 
400
    AsyncResult(prx->__reference()->getInstance(), operation, delegate, cookie),
 
401
    _proxy(prx)
 
402
{
 
403
}
 
404
 
224
405
void
 
406
IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context)
 
407
{
 
408
    _delegate = 0;
 
409
    _cnt = 0;
 
410
    _mode = mode;
 
411
    _sentSynchronously = false;
 
412
 
 
413
    //
 
414
    // Can't call async via a batch proxy.
 
415
    //
 
416
    if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
 
417
    {
 
418
        throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
 
419
    }
 
420
 
 
421
    _os.writeBlob(requestHdr, sizeof(requestHdr));
 
422
 
 
423
    Reference* ref = _proxy->__reference().get();
 
424
 
 
425
    ref->getIdentity().__write(&_os);
 
426
 
 
427
    //
 
428
    // For compatibility with the old FacetPath.
 
429
    //
 
430
    if(ref->getFacet().empty())
 
431
    {
 
432
        _os.write(static_cast<string*>(0), static_cast<string*>(0));
 
433
    }
 
434
    else
 
435
    {
 
436
        string facet = ref->getFacet();
 
437
        _os.write(&facet, &facet + 1);
 
438
    }
 
439
 
 
440
    _os.write(operation, false);
 
441
 
 
442
    _os.write(static_cast<Byte>(_mode));
 
443
 
 
444
    if(context != 0)
 
445
    {
 
446
        //
 
447
        // Explicit context
 
448
        //
 
449
        __writeContext(&_os, *context);
 
450
    }
 
451
    else
 
452
    {
 
453
        //
 
454
        // Implicit context
 
455
        //
 
456
        const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
 
457
        const Context& prxContext = ref->getContext()->getValue();
 
458
        if(implicitContext == 0)
 
459
        {
 
460
            __writeContext(&_os, prxContext);
 
461
        }
 
462
        else
 
463
        {
 
464
            implicitContext->write(prxContext, &_os);
 
465
        }
 
466
    }
 
467
        
 
468
    _os.startWriteEncaps();
 
469
}
 
470
 
 
471
bool
225
472
IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
226
473
{
227
 
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
228
 
    _sent = true;
229
 
    
230
 
    if(!_proxy->ice_isTwoway())
231
 
    {
232
 
        __releaseCallbackNoSync(); // No response expected, we're done with the OutgoingAsync.
233
 
    }
234
 
    else if(_response) 
235
 
    {
236
 
        __monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
237
 
    }
238
 
    else if(connection->timeout() > 0)
239
 
    {
240
 
        assert(!_timerTaskConnection && __os);
241
 
        _timerTaskConnection = connection;
242
 
        IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
243
 
        __os->instance()->timer()->schedule(this, timeout);
 
474
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
475
 
 
476
    bool alreadySent = _state & Sent; // Expected in case of a retry.
 
477
    _state |= Sent;
 
478
 
 
479
    //
 
480
    // It's possible for the request to be done already when using IOCP. This 
 
481
    // is the case for example if the send callback is dispatched after the 
 
482
    // read callback for the response/exception.
 
483
    //
 
484
    if(!(_state & Done))
 
485
    {
 
486
        if(!_proxy->ice_isTwoway())
 
487
        {
 
488
            _state |= Done | OK;
 
489
        }
 
490
        else if(connection->timeout() > 0)
 
491
        {
 
492
            assert(!_timerTaskConnection);
 
493
            _timerTaskConnection = connection;
 
494
            IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
 
495
            _instance->timer()->schedule(this, timeout);
 
496
        }
 
497
    }
 
498
    _monitor.notifyAll();
 
499
    return !alreadySent && _callback && _callback->__hasSentCallback();
 
500
}
 
501
 
 
502
void
 
503
IceInternal::OutgoingAsync::__sent()
 
504
{
 
505
#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug
 
506
    AsyncResult::__sent();
 
507
#else
 
508
    ::Ice::AsyncResult::__sent();
 
509
#endif
 
510
}
 
511
 
 
512
void
 
513
IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent)
 
514
{
 
515
    {
 
516
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
517
        assert(!(_state & Done));
 
518
        if(_timerTaskConnection)
 
519
        {
 
520
            _instance->timer()->cancel(this);
 
521
            _timerTaskConnection = 0;
 
522
        }
 
523
    }
 
524
     
 
525
    //
 
526
    // NOTE: at this point, synchronization isn't needed, no other threads should be
 
527
    // calling on the callback.
 
528
    //
 
529
 
 
530
    try
 
531
    {
 
532
        int interval = handleException(exc, sent); // This will throw if the invocation can't be retried.
 
533
        if(interval > 0)
 
534
        {
 
535
            _instance->retryQueue()->add(this, interval);
 
536
        }
 
537
        else
 
538
        {
 
539
            __send(false);
 
540
        }
 
541
    }
 
542
    catch(const Ice::LocalException& ex)
 
543
    {
 
544
        __exception(ex);
 
545
    }
 
546
}
 
547
 
 
548
void
 
549
IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
 
550
{
 
551
    //
 
552
    // NOTE: at this point, synchronization isn't needed, no other threads should be
 
553
    // calling on the callback. The LocalExceptionWrapper exception is only called
 
554
    // before the invocation is sent.
 
555
    //
 
556
 
 
557
    try
 
558
    {
 
559
        int interval = handleException(exc); // This will throw if the invocation can't be retried.
 
560
        if(interval > 0)
 
561
        {
 
562
            _instance->retryQueue()->add(this, interval);
 
563
        }
 
564
        else
 
565
        {
 
566
            __send(false);
 
567
        }
 
568
    }
 
569
    catch(const Ice::LocalException& ex)
 
570
    {
 
571
        __exception(ex);
244
572
    }
245
573
}
246
574
 
252
580
    Ice::Byte replyStatus;
253
581
    try
254
582
    {
255
 
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
256
 
        assert(__os);
257
 
        _response = true;
258
 
        
259
 
        if(_timerTaskConnection && __os->instance()->timer()->cancel(this))
260
 
        {
261
 
            _timerTaskConnection = 0; // Timer cancelled.
262
 
        }
263
 
 
264
 
        while(!_sent || _timerTaskConnection)
265
 
        {
266
 
            __monitor.wait();
267
 
        }
268
 
 
269
 
        __is = new BasicStream(__os->instance());
270
 
        __is->swap(is);  
271
 
        __is->read(replyStatus);
 
583
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
584
        assert(!_exception.get() && !(_state & Done));
 
585
 
 
586
        if(_timerTaskConnection)
 
587
        {
 
588
            _instance->timer()->cancel(this);
 
589
            _timerTaskConnection = 0;
 
590
        }
 
591
 
 
592
        _is.swap(is);
 
593
        _is.read(replyStatus);
272
594
        
273
595
        switch(replyStatus)
274
596
        {
283
605
            case replyOperationNotExist:
284
606
            {
285
607
                Identity ident;
286
 
                ident.__read(__is);
 
608
                ident.__read(&_is);
287
609
                
288
610
                //
289
611
                // For compatibility with the old FacetPath.
290
612
                //
291
613
                vector<string> facetPath;
292
 
                __is->read(facetPath);
 
614
                _is.read(facetPath);
293
615
                string facet;
294
616
                if(!facetPath.empty())
295
617
                {
301
623
                }
302
624
                
303
625
                string operation;
304
 
                __is->read(operation, false);
 
626
                _is.read(operation, false);
305
627
                
306
628
                auto_ptr<RequestFailedException> ex;
307
629
                switch(replyStatus)
342
664
            case replyUnknownUserException:
343
665
            {
344
666
                string unknown;
345
 
                __is->read(unknown, false);
 
667
                _is.read(unknown, false);
346
668
                
347
669
                auto_ptr<UnknownException> ex;
348
670
                switch(replyStatus)
381
703
                throw UnknownReplyStatusException(__FILE__, __LINE__);
382
704
            }
383
705
        }
 
706
 
 
707
        _state |= Done;
 
708
        if(replyStatus == replyOK)
 
709
        {
 
710
            _state |= OK;
 
711
        }
 
712
        _monitor.notifyAll();
384
713
    }
385
714
    catch(const LocalException& ex)
386
715
    {
387
 
        __finished(ex);
 
716
        __finished(ex, true);
388
717
        return;
389
718
    }
390
719
 
391
720
    assert(replyStatus == replyOK || replyStatus == replyUserException);
392
 
 
393
 
    try
394
 
    {
395
 
        __response(replyStatus == replyOK);
396
 
    }
397
 
    catch(const std::exception& ex)
398
 
    {
399
 
        __warning(ex);
400
 
        __releaseCallback();
401
 
    }
402
 
    catch(...)
403
 
    {
404
 
        __warning();
405
 
        __releaseCallback();
406
 
    }
407
 
}
408
 
 
409
 
void
410
 
IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc)
411
 
{
412
 
    {
413
 
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
414
 
        assert(__os);
415
 
        
416
 
        if(_timerTaskConnection && __os->instance()->timer()->cancel(this))
417
 
        {
418
 
            _timerTaskConnection = 0; // Timer cancelled.
419
 
        }
420
 
        
421
 
        while(_timerTaskConnection)
422
 
        {
423
 
            __monitor.wait();
424
 
        }
425
 
    }
426
 
     
427
 
    //
428
 
    // NOTE: at this point, synchronization isn't needed, no other threads should be
429
 
    // calling on the callback.
430
 
    //
431
 
 
432
 
    try
433
 
    {
434
 
        handleException(exc); // This will throw if the invocation can't be retried.
435
 
    }
436
 
    catch(const Ice::LocalException& ex)
437
 
    {
438
 
        __exception(ex);
439
 
    }
440
 
}
441
 
 
442
 
void
443
 
IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
444
 
{
445
 
    assert(__os && !_sent);
446
 
    
447
 
    //
448
 
    // NOTE: at this point, synchronization isn't needed, no other threads should be
449
 
    // calling on the callback. The LocalExceptionWrapper exception is only called
450
 
    // before the invocation is sent.
451
 
    //
452
 
 
453
 
    try
454
 
    {
455
 
        handleException(exc); // This will throw if the invocation can't be retried.
456
 
    }
457
 
    catch(const Ice::LocalException& ex)
458
 
    {
459
 
        __exception(ex);
460
 
    }
461
 
}
462
 
 
463
 
void
464
 
IceInternal::OutgoingAsync::__retry(int interval)
465
 
{
466
 
    //
467
 
    // This method is called by the proxy to retry an invocation, no
468
 
    // other threads can access this object.
469
 
    //
470
 
    if(interval > 0)
471
 
    {
472
 
        assert(__os);
473
 
        __os->instance()->retryQueue()->add(this, interval);
474
 
    }
475
 
    else
476
 
    {
477
 
        __send();
478
 
    }
 
721
    __response();
479
722
}
480
723
 
481
724
bool
482
 
IceInternal::OutgoingAsync::__send()
 
725
IceInternal::OutgoingAsync::__send(bool synchronous)
483
726
{
484
 
    try
485
 
    {
486
 
        _sent = false;
487
 
        _response = false;
488
 
        _delegate = _proxy->__getDelegate(true);
489
 
        _sentSynchronously = _delegate->__getRequestHandler()->sendAsyncRequest(this);
490
 
    }
491
 
    catch(const LocalExceptionWrapper& ex)
492
 
    {
493
 
        handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
494
 
    }
495
 
    catch(const Ice::LocalException& ex)
496
 
    {
497
 
        handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
 
727
    while(true)
 
728
    {
 
729
        int interval = 0;
 
730
        try
 
731
        {
 
732
            _delegate = _proxy->__getDelegate(true);
 
733
            AsyncStatus status = _delegate->__getRequestHandler()->sendAsyncRequest(this);
 
734
            if(status & AsyncStatusSent)
 
735
            {
 
736
                if(synchronous)
 
737
                {
 
738
                    _sentSynchronously = true;
 
739
                    if(status & AsyncStatusInvokeSentCallback)
 
740
                    {
 
741
                        __sent(); // Call the sent callback from the user thread.
 
742
                    }
 
743
                }
 
744
                else
 
745
                {
 
746
                    if(status & AsyncStatusInvokeSentCallback)
 
747
                    {
 
748
                        __sentAsync(); // Call the sent callback from a client thread pool thread.
 
749
                    }
 
750
                }
 
751
            }
 
752
            break;
 
753
        }
 
754
        catch(const LocalExceptionWrapper& ex)
 
755
        {
 
756
            interval = handleException(ex);
 
757
        }
 
758
        catch(const Ice::LocalException& ex)
 
759
        {
 
760
            interval = handleException(ex, false);
 
761
        }
 
762
 
 
763
        if(interval > 0)
 
764
        {
 
765
            _instance->retryQueue()->add(this, interval);
 
766
            return false;
 
767
        }
498
768
    }
499
769
    return _sentSynchronously;
500
770
}
501
771
 
502
 
void
503
 
IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode, 
504
 
                                      const Context* context)
505
 
{
506
 
    _proxy = prx;
507
 
    _delegate = 0;
508
 
    _cnt = 0;
509
 
    _mode = mode;
510
 
    _sentSynchronously = false;
511
 
 
512
 
    //
513
 
    // Can't call async via a batch proxy.
514
 
    //
515
 
    if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
516
 
    {
517
 
        throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
518
 
    }
519
 
 
520
 
    __os->writeBlob(requestHdr, sizeof(requestHdr));
521
 
 
522
 
    Reference* ref = _proxy->__reference().get();
523
 
 
524
 
    ref->getIdentity().__write(__os);
525
 
 
526
 
    //
527
 
    // For compatibility with the old FacetPath.
528
 
    //
529
 
    if(ref->getFacet().empty())
530
 
    {
531
 
        __os->write(static_cast<string*>(0), static_cast<string*>(0));
532
 
    }
533
 
    else
534
 
    {
535
 
        string facet = ref->getFacet();
536
 
        __os->write(&facet, &facet + 1);
537
 
    }
538
 
 
539
 
    __os->write(operation, false);
540
 
 
541
 
    __os->write(static_cast<Byte>(_mode));
542
 
 
543
 
    if(context != 0)
544
 
    {
545
 
        //
546
 
        // Explicit context
547
 
        //
548
 
        __writeContext(__os, *context);
549
 
    }
550
 
    else
551
 
    {
552
 
        //
553
 
        // Implicit context
554
 
        //
555
 
        const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
556
 
        const Context& prxContext = ref->getContext()->getValue();
557
 
        if(implicitContext == 0)
558
 
        {
559
 
            __writeContext(__os, prxContext);
560
 
        }
561
 
        else
562
 
        {
563
 
            implicitContext->write(prxContext, __os);
564
 
        }
565
 
    }
566
 
        
567
 
    __os->startWriteEncaps();
568
 
}
569
 
 
570
 
void
571
 
IceInternal::OutgoingAsync::__throwUserException()
572
 
{
573
 
    try
574
 
    {
575
 
        assert(__is);
576
 
        __is->startReadEncaps();
577
 
        __is->throwException();
578
 
    }
579
 
    catch(const Ice::UserException&)
580
 
    {
581
 
        __is->endReadEncaps();
582
 
        throw;
583
 
    }
584
 
}
585
 
 
586
 
void
 
772
int
587
773
IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex)
588
774
{
589
775
    if(_mode == Nonmutating || _mode == Idempotent)
590
776
    {
591
 
        _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
 
777
        return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt);
592
778
    }
593
779
    else
594
780
    {
595
 
        _proxy->__handleExceptionWrapper(_delegate, ex, this);
 
781
        return _proxy->__handleExceptionWrapper(_delegate, ex);
596
782
    }
597
783
}
598
784
 
599
 
void
600
 
IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc)
 
785
int
 
786
IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool sent)
601
787
{
602
788
    try
603
789
    {
611
797
        // "at-most-once" (see the implementation of the checkRetryAfterException method of
612
798
        // the ProxyFactory class for the reasons why it can be useful).
613
799
        // 
614
 
        if(!_sent || 
 
800
        if(!sent || 
615
801
           dynamic_cast<const CloseConnectionException*>(&exc) || 
616
802
           dynamic_cast<const ObjectNotExistException*>(&exc))
617
803
        {
629
815
    {
630
816
        if(_mode == Nonmutating || _mode == Idempotent)
631
817
        {
632
 
            _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
 
818
            return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt);
633
819
        }
634
820
        else
635
821
        {
636
 
            _proxy->__handleExceptionWrapper(_delegate, ex, this);
 
822
            return _proxy->__handleExceptionWrapper(_delegate, ex);
637
823
        }
638
824
    }
639
825
    catch(const Ice::LocalException& ex)
640
826
    {
641
 
        _proxy->__handleException(_delegate, ex, this, _cnt);
 
827
        return _proxy->__handleException(_delegate, ex, false, _cnt);
642
828
    }
 
829
    return 0; // Keep the compiler happy.
643
830
}
644
831
 
645
832
void
647
834
{
648
835
    Ice::ConnectionIPtr connection;
649
836
    {
650
 
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
651
 
        assert(_timerTaskConnection && _sent); // Can only be set once the request is sent.
652
 
 
653
 
        if(!_response) // If the response was just received, don't close the connection.
654
 
        {
655
 
            connection = _timerTaskConnection;
656
 
        }
 
837
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
838
        connection = _timerTaskConnection;
657
839
        _timerTaskConnection = 0;
658
 
        __monitor.notifyAll();
659
840
    }
660
841
 
661
842
    if(connection)
664
845
    }
665
846
}
666
847
 
667
 
void
 
848
IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const InstancePtr& instance, 
 
849
                                                    const std::string& operation,
 
850
                                                    const CallbackBasePtr& delegate,
 
851
                                                    const Ice::LocalObjectPtr& cookie) :
 
852
    AsyncResult(instance, operation, delegate, cookie)
 
853
{
 
854
}
 
855
 
 
856
bool
668
857
IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
669
858
{
670
 
    __releaseCallback();
671
 
}
672
 
 
673
 
void
674
 
IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc)
 
859
    IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
860
    assert(!_exception.get());
 
861
    _state |= Done | OK | Sent;
 
862
    _monitor.notifyAll();
 
863
    return _callback && _callback->__hasSentCallback();
 
864
}
 
865
 
 
866
void
 
867
IceInternal::BatchOutgoingAsync::__sent()
 
868
{
 
869
#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug
 
870
    AsyncResult::__sent();
 
871
#else
 
872
    ::Ice::AsyncResult::__sent();
 
873
#endif
 
874
}
 
875
 
 
876
void
 
877
IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool)
675
878
{
676
879
    __exception(exc);
677
880
}
678
881
 
679
 
bool
680
 
Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
681
 
                                     const vector<Byte>& inParams, const Context* context)
682
 
{
683
 
    __acquireCallback(prx);
684
 
    try
685
 
    {
686
 
        __prepare(prx, operation, mode, context);
687
 
        __os->writeBlob(inParams);
688
 
        __os->endWriteEncaps();
689
 
        return __send();
690
 
    }
691
 
    catch(const Ice::LocalException& ex)
692
 
    {
693
 
        __releaseCallback(ex);
694
 
        return false;
695
 
    }
696
 
}
697
 
 
698
 
void
699
 
Ice::AMI_Object_ice_invoke::__response(bool ok) // ok == true means no user exception.
700
 
{
701
 
    vector<Byte> outParams;
702
 
    try
703
 
    {
704
 
        __is->startReadEncaps();
705
 
        Int sz = __is->getReadEncapsSize();
706
 
        __is->readBlob(outParams, sz);
707
 
        __is->endReadEncaps();
708
 
    }
709
 
    catch(const LocalException& ex)
710
 
    {
711
 
        __finished(ex);
712
 
        return;
713
 
    }
714
 
    ice_response(ok, outParams);
715
 
    __releaseCallback();
716
 
}
717
 
 
718
 
bool
719
 
Ice::AMI_Array_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
720
 
                                           const pair<const Byte*, const Byte*>& inParams, const Context* context)
721
 
{
722
 
    __acquireCallback(prx);
723
 
    try
724
 
    {
725
 
        __prepare(prx, operation, mode, context);
726
 
        __os->writeBlob(inParams.first, static_cast<Int>(inParams.second - inParams.first));
727
 
        __os->endWriteEncaps();
728
 
        return __send();
729
 
    }
730
 
    catch(const Ice::LocalException& ex)
731
 
    {
732
 
        __releaseCallback(ex);
733
 
        return false;
734
 
    }
735
 
}
736
 
 
737
 
void
738
 
Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no user exception.
739
 
{
740
 
    pair<const Byte*, const Byte*> outParams;
741
 
    try
742
 
    {
743
 
        __is->startReadEncaps();
744
 
        Int sz = __is->getReadEncapsSize();
745
 
        __is->readBlob(outParams.first, sz);
746
 
        outParams.second = outParams.first + sz;
747
 
        __is->endReadEncaps();
748
 
    }
749
 
    catch(const LocalException& ex)
750
 
    {
751
 
        __finished(ex);
752
 
        return;
753
 
    }
754
 
    ice_response(ok, outParams);
755
 
    __releaseCallback();
756
 
}
757
 
 
758
 
bool
759
 
Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx)
760
 
{
761
 
    __acquireCallback(prx);
762
 
    try
763
 
    {
764
 
        //
765
 
        // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
766
 
        // requests were queued with the connection, they would be lost without being noticed.
767
 
        //
768
 
        Handle< ::IceDelegate::Ice::Object> delegate;
769
 
        int cnt = -1; // Don't retry.
770
 
        try
771
 
        {
772
 
            delegate = prx->__getDelegate(true);
773
 
            return delegate->__getRequestHandler()->flushAsyncBatchRequests(this);
774
 
        }
775
 
        catch(const Ice::LocalException& ex)
776
 
        {
777
 
            prx->__handleException(delegate, ex, 0, cnt);
778
 
        }
779
 
    }
780
 
    catch(const Ice::LocalException& ex)
781
 
    {
782
 
        __releaseCallback(ex);
783
 
    }
784
 
    return false;
 
882
IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy, 
 
883
                                                              const std::string& operation,
 
884
                                                              const CallbackBasePtr& delegate,
 
885
                                                              const Ice::LocalObjectPtr& cookie) :
 
886
    BatchOutgoingAsync(proxy->__reference()->getInstance(), operation, delegate, cookie),
 
887
    _proxy(proxy)
 
888
{
 
889
}
 
890
 
 
891
void
 
892
IceInternal::ProxyBatchOutgoingAsync::__send()
 
893
{
 
894
    //
 
895
    // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
 
896
    // requests were queued with the connection, they would be lost without being noticed.
 
897
    //
 
898
    Handle<IceDelegate::Ice::Object> delegate;
 
899
    int cnt = -1; // Don't retry.
 
900
    try
 
901
    {
 
902
        delegate = _proxy->__getDelegate(true);
 
903
        AsyncStatus status = delegate->__getRequestHandler()->flushAsyncBatchRequests(this);
 
904
        if(status & AsyncStatusSent)
 
905
        {
 
906
            _sentSynchronously = true;
 
907
            if(status & AsyncStatusInvokeSentCallback)
 
908
            {
 
909
                __sent();
 
910
            }
 
911
        }
 
912
    }
 
913
    catch(const ::Ice::LocalException& ex)
 
914
    {
 
915
        _proxy->__handleException(delegate, ex, 0, cnt);
 
916
    }
 
917
}
 
918
 
 
919
IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con,
 
920
                                                                        const InstancePtr& instance,
 
921
                                                                        const string& operation,
 
922
                                                                        const CallbackBasePtr& delegate,
 
923
                                                                        const Ice::LocalObjectPtr& cookie) :
 
924
    BatchOutgoingAsync(instance, operation, delegate, cookie),
 
925
    _connection(con)
 
926
{
 
927
}
 
928
 
 
929
void
 
930
IceInternal::ConnectionBatchOutgoingAsync::__send()
 
931
{
 
932
    AsyncStatus status = _connection->flushAsyncBatchRequests(this);
 
933
    if(status & AsyncStatusSent)
 
934
    {
 
935
        _sentSynchronously = true;
 
936
        if(status & AsyncStatusInvokeSentCallback)
 
937
        {
 
938
            __sent();
 
939
        }
 
940
    }
 
941
}
 
942
 
 
943
Ice::ConnectionPtr
 
944
IceInternal::ConnectionBatchOutgoingAsync::getConnection() const
 
945
{
 
946
    return _connection;
 
947
}
 
948
 
 
949
IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator,
 
950
                                                                            const InstancePtr& instance,
 
951
                                                                            const string& operation,
 
952
                                                                            const CallbackBasePtr& delegate,
 
953
                                                                            const Ice::LocalObjectPtr& cookie) :
 
954
    BatchOutgoingAsync(instance, operation, delegate, cookie),
 
955
    _communicator(communicator)
 
956
{
 
957
    //
 
958
    // _useCount is initialized to 1 to prevent premature callbacks.
 
959
    // The caller must invoke ready() after all flush requests have
 
960
    // been initiated.
 
961
    //
 
962
    _useCount = 1;
 
963
 
 
964
    //
 
965
    // Assume all connections are flushed synchronously.
 
966
    //
 
967
    _sentSynchronously = true;
 
968
}
 
969
 
 
970
void
 
971
IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionPtr& con)
 
972
{
 
973
    {
 
974
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
975
        ++_useCount;
 
976
    }
 
977
    CallbackPtr cb = newCallback(this, &CommunicatorBatchOutgoingAsync::completed,
 
978
                                 &CommunicatorBatchOutgoingAsync::sent);
 
979
    con->begin_flushBatchRequests(cb);
 
980
}
 
981
 
 
982
void
 
983
IceInternal::CommunicatorBatchOutgoingAsync::ready()
 
984
{
 
985
    check(0, 0, true);
 
986
}
 
987
 
 
988
void
 
989
IceInternal::CommunicatorBatchOutgoingAsync::completed(const AsyncResultPtr& r)
 
990
{
 
991
    ConnectionPtr con = r->getConnection();
 
992
    assert(con);
 
993
 
 
994
    try
 
995
    {
 
996
        con->end_flushBatchRequests(r);
 
997
        assert(false); // completed() should only be called when an exception occurs.
 
998
    }
 
999
    catch(const Ice::LocalException& ex)
 
1000
    {
 
1001
        check(r, &ex, false);
 
1002
    }
 
1003
}
 
1004
 
 
1005
void
 
1006
IceInternal::CommunicatorBatchOutgoingAsync::sent(const AsyncResultPtr& r)
 
1007
{
 
1008
    check(r, 0, r->sentSynchronously());
 
1009
}
 
1010
 
 
1011
void
 
1012
IceInternal::CommunicatorBatchOutgoingAsync::check(const AsyncResultPtr& r, const LocalException* ex, bool userThread)
 
1013
{
 
1014
    bool done = false;
 
1015
 
 
1016
    {
 
1017
        IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
 
1018
        assert(_useCount > 0);
 
1019
        --_useCount;
 
1020
 
 
1021
        //
 
1022
        // We report that the communicator flush request was sent synchronously
 
1023
        // if all of the connection flush requests are sent synchronously.
 
1024
        //
 
1025
        if((r && !r->sentSynchronously()) || ex)
 
1026
        {
 
1027
            _sentSynchronously = false;
 
1028
        }
 
1029
 
 
1030
        if(_useCount == 0)
 
1031
        {
 
1032
            done = true;
 
1033
            _state |= Done | OK | Sent;
 
1034
            _monitor.notifyAll();
 
1035
        }
 
1036
    }
 
1037
 
 
1038
    if(done)
 
1039
    {
 
1040
        //
 
1041
        // _sentSynchronously is immutable here.
 
1042
        //
 
1043
        if(!_sentSynchronously && userThread)
 
1044
        {
 
1045
            __sentAsync();
 
1046
        }
 
1047
        else
 
1048
        {
 
1049
            assert(_sentSynchronously == userThread); // sentSynchronously && !userThread is impossible.
 
1050
            BatchOutgoingAsync::__sent();
 
1051
        }
 
1052
    }
 
1053
}
 
1054
 
 
1055
namespace
 
1056
{
 
1057
 
 
1058
//
 
1059
// Dummy class derived from CallbackBase
 
1060
// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
 
1061
// this allows us to test whether the user supplied a null delegate instance to the
 
1062
// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
 
1063
// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
 
1064
// object code.
 
1065
//
 
1066
class DummyCallback : public CallbackBase
 
1067
{
 
1068
public:
 
1069
 
 
1070
    DummyCallback()
 
1071
    {
 
1072
    }
 
1073
 
 
1074
    virtual void __completed(const Ice::AsyncResultPtr&) const
 
1075
    {
 
1076
         assert(false);
 
1077
    }
 
1078
 
 
1079
    virtual CallbackBasePtr __verify(Ice::LocalObjectPtr&)
 
1080
    {
 
1081
        //
 
1082
        // Called by the AsyncResult constructor to verify the delegate. The dummy
 
1083
        // delegate is passed when the user used a begin_ method without delegate. 
 
1084
        // By returning 0 here, we tell the AsyncResult that no delegates was 
 
1085
        // provided.
 
1086
        //
 
1087
        return 0;
 
1088
    }
 
1089
 
 
1090
    virtual void __sent(const AsyncResultPtr&) const
 
1091
    {
 
1092
         assert(false);
 
1093
    }
 
1094
 
 
1095
    virtual bool __hasSentCallback() const
 
1096
    {
 
1097
        assert(false);
 
1098
        return false;
 
1099
    }
 
1100
};
 
1101
 
 
1102
}
 
1103
 
 
1104
//
 
1105
// This gives a pointer value to compare against in the generated
 
1106
// begin_ method to decide whether the caller passed a null pointer
 
1107
// versus the generated inline version of the begin_ method having
 
1108
// passed a pointer to the dummy delegate.
 
1109
//
 
1110
CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
 
1111
 
 
1112
void
 
1113
Ice::AMICallbackBase::__exception(const ::Ice::Exception& ex)
 
1114
{
 
1115
    ice_exception(ex);
 
1116
}
 
1117
 
 
1118
void
 
1119
Ice::AMICallbackBase::__sent(bool sentSynchronously)
 
1120
{
 
1121
    if(!sentSynchronously)
 
1122
    {
 
1123
        dynamic_cast<AMISentCallback*>(this)->ice_sent();
 
1124
    }
785
1125
}