~ubuntu-branches/ubuntu/quantal/zeroc-ice/quantal

« back to all changes in this revision

Viewing changes to src/IceGrid/Topics.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Francisco Moya
  • Date: 2006-08-06 19:00:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20060806190057-3q4p9ws4ucyamn10
Tags: 3.1.0-2
* Patches #5 to #6 from ZeroC forums.
* Patch by Michael Pugach for DescriptorHelper.cpp.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
// **********************************************************************
2
2
//
3
 
// Copyright (c) 2003-2005 ZeroC, Inc. All rights reserved.
 
3
// Copyright (c) 2003-2006 ZeroC, Inc. All rights reserved.
4
4
//
5
5
// This copy of Ice is licensed to you under the terms described in the
6
6
// ICE_LICENSE file included in this distribution.
34
34
    void
35
35
    ice_exception(const Ice::Exception& ex)
36
36
    {
37
 
        Ice::Warning out(_observer->ice_communicator()->getLogger());
 
37
        Ice::Warning out(_observer->ice_getCommunicator()->getLogger());
38
38
        out << "couldn't initialize registry observer:\n" << ex;    
39
39
    }
40
40
 
65
65
    void
66
66
    ice_exception(const Ice::Exception& ex)
67
67
    {
68
 
        Ice::Warning out(_observer->ice_communicator()->getLogger());
 
68
        Ice::Warning out(_observer->ice_getCommunicator()->getLogger());
69
69
        out << "couldn't initialize node observer:\n" << ex;    
70
70
    }
71
71
    
77
77
};
78
78
 
79
79
 
80
 
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) : 
81
 
    _topic(topic), _publisher(publisher), _serial(0)
 
80
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0)
82
81
{
 
82
    IceStorm::TopicPrx t;
 
83
    try
 
84
    {
 
85
        t = topicManager->create("NodeObserver");
 
86
    }
 
87
    catch(const IceStorm::TopicExists&)
 
88
    {
 
89
        t = topicManager->retrieve("NodeObserver");
 
90
    }
 
91
 
 
92
    //
 
93
    // NOTE: collocation optimization needs to be turned on for the
 
94
    // topic because the subscribe() method is given a fixed proxy
 
95
    // which can't be marshalled.
 
96
    //
 
97
    const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
 
98
    const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_topic->getPublisher());
83
99
}
84
100
 
85
101
void
107
123
    }
108
124
}
109
125
 
110
 
 
111
126
void 
112
127
NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
113
128
{
144
159
    {
145
160
        servers.push_back(server);
146
161
    }
147
 
    
 
162
 
148
163
    _publisher->updateServer(node, server);
149
164
}
150
165
 
229
244
    _topic->unsubscribe(observer);
230
245
}
231
246
 
232
 
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, 
233
 
                                             const RegistryObserverPrx& publisher,
234
 
                                             NodeObserverTopic& nodeObserver) :
235
 
    _topic(topic), _publisher(publisher), _nodeObserver(nodeObserver), _serial(0)
 
247
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0)
236
248
{
 
249
    IceStorm::TopicPrx t;
 
250
    try
 
251
    {
 
252
        t = topicManager->create("RegistryObserver");
 
253
    }
 
254
    catch(const IceStorm::TopicExists&)
 
255
    {
 
256
        t = topicManager->retrieve("RegistryObserver");
 
257
    }
 
258
 
 
259
    //
 
260
    // NOTE: collocation optimization needs to be turned on for the
 
261
    // topic because the subscribe() method is given a fixed proxy
 
262
    // which can't be marshalled.
 
263
    //
 
264
    const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
 
265
    const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_topic->getPublisher());
237
266
}
238
267
 
239
268
void 
240
 
RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&)
 
269
RegistryObserverTopic::init(int serial, 
 
270
                            const ApplicationDescriptorSeq& apps, 
 
271
                            const AdapterInfoSeq& adpts,
 
272
                            const ObjectInfoSeq& objects,
 
273
                            const Ice::Current&)
241
274
{
242
275
    Lock sync(*this);
243
276
 
244
277
    _serial = serial;
245
 
    _applications = apps;
246
 
 
247
 
    _publisher->init(serial, apps);
 
278
 
 
279
    for(ApplicationDescriptorSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
 
280
    {
 
281
        _applications.insert(make_pair(p->name, *p));
 
282
    }
 
283
    for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
 
284
    {
 
285
        _adapters.insert(make_pair(q->id, *q));
 
286
    }
 
287
    for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
 
288
    {
 
289
        _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
 
290
    }
 
291
 
 
292
    _publisher->init(serial, apps, adpts, objects);
248
293
}
249
294
 
250
295
void 
254
299
 
255
300
    updateSerial(serial);
256
301
 
257
 
    _applications.push_back(desc);
 
302
    _applications.insert(make_pair(desc.name, desc));
258
303
 
259
304
    _publisher->applicationAdded(serial, desc);
260
305
}
265
310
    Lock sync(*this);
266
311
 
267
312
    updateSerial(serial);
268
 
    for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p)
269
 
    {
270
 
        if(p->name == name)
271
 
        {
272
 
            _applications.erase(p);
273
 
            break;
274
 
        }
275
 
    }
 
313
 
 
314
    _applications.erase(name);
276
315
 
277
316
    _publisher->applicationRemoved(serial, name);
278
317
}
285
324
    updateSerial(serial);
286
325
    try
287
326
    {
288
 
        for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p)
 
327
        map<string, ApplicationDescriptor>::iterator p = _applications.find(desc.name);
 
328
        if(p != _applications.end())
289
329
        {
290
 
            if(p->name == desc.name)
291
 
            {
292
 
                ApplicationHelper helper(*p);
293
 
                helper.update(desc);
294
 
                *p = helper.getDescriptor();
295
 
                break;
296
 
            }
 
330
            ApplicationHelper helper(c.adapter->getCommunicator(), p->second);
 
331
            p->second = helper.update(desc);
297
332
        }
298
333
    }
299
334
    catch(const DeploymentException& ex)
320
355
}
321
356
 
322
357
void 
 
358
RegistryObserverTopic::adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&)
 
359
{
 
360
    Lock sync(*this);
 
361
 
 
362
    updateSerial(serial);
 
363
 
 
364
    _adapters.insert(make_pair(info.id, info));
 
365
 
 
366
    _publisher->adapterAdded(serial, info);
 
367
}
 
368
 
 
369
void 
 
370
RegistryObserverTopic::adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&)
 
371
{
 
372
    Lock sync(*this);
 
373
 
 
374
    updateSerial(serial);
 
375
 
 
376
    _adapters[info.id] = info;
 
377
 
 
378
    _publisher->adapterUpdated(serial, info);
 
379
}
 
380
 
 
381
void
 
382
RegistryObserverTopic::adapterRemoved(int serial, const string& id, const Ice::Current&)
 
383
{
 
384
    Lock sync(*this);
 
385
 
 
386
    updateSerial(serial);
 
387
 
 
388
    _adapters.erase(id);
 
389
 
 
390
    _publisher->adapterRemoved(serial, id);
 
391
}
 
392
 
 
393
void 
 
394
RegistryObserverTopic::objectAdded(int serial, const ObjectInfo& info, const Ice::Current&)
 
395
{
 
396
    Lock sync(*this);
 
397
 
 
398
    updateSerial(serial);
 
399
 
 
400
    _objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
 
401
 
 
402
    _publisher->objectAdded(serial, info);
 
403
}
 
404
 
 
405
void 
 
406
RegistryObserverTopic::objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&)
 
407
{
 
408
    Lock sync(*this);
 
409
 
 
410
    updateSerial(serial);
 
411
 
 
412
    _objects[info.proxy->ice_getIdentity()] = info;
 
413
 
 
414
    _publisher->objectUpdated(serial, info);
 
415
}
 
416
 
 
417
void
 
418
RegistryObserverTopic::objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&)
 
419
{
 
420
    Lock sync(*this);
 
421
 
 
422
    updateSerial(serial);
 
423
 
 
424
    _objects.erase(id);
 
425
 
 
426
    _publisher->objectRemoved(serial, id);
 
427
}
 
428
 
 
429
void 
323
430
RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial)
324
431
{
325
432
    while(true)
327
434
        if(serial == -1)
328
435
        {
329
436
            ApplicationDescriptorSeq applications;
 
437
            AdapterInfoSeq adapters;
 
438
            ObjectInfoSeq objects;
330
439
            {
331
440
                Lock sync(*this);
332
441
                assert(_serial != -1);
333
442
                serial = _serial;
334
 
                applications = _applications;
 
443
 
 
444
                map<string, ApplicationDescriptor>::const_iterator p;
 
445
                for(p = _applications.begin(); p != _applications.end(); ++p)
 
446
                {
 
447
                    applications.push_back(p->second);
 
448
                }
 
449
                
 
450
                map<string, AdapterInfo>::const_iterator q;
 
451
                for(q = _adapters.begin(); q != _adapters.end(); ++q)
 
452
                {
 
453
                    adapters.push_back(q->second);
 
454
                }
 
455
 
 
456
                map<Ice::Identity, ObjectInfo>::const_iterator r;
 
457
                for(r = _objects.begin(); r != _objects.end(); ++r)
 
458
                {
 
459
                    objects.push_back(r->second);
 
460
                }
335
461
            }
336
 
            observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications);
 
462
            observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications, adapters, objects);
337
463
            return;
338
464
        }
339
465