~ubuntu-branches/ubuntu/quantal/zeroc-ice/quantal

« back to all changes in this revision

Viewing changes to cpp/src/IceStorm/TopicI.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Cleto Martin Angelina
  • Date: 2011-04-25 18:44:24 UTC
  • mfrom: (6.1.14 sid)
  • Revision ID: james.westby@ubuntu.com-20110425184424-sep9i9euu434vq4c
Tags: 3.4.1-7
* Bug fix: "libdb5.1-java.jar was renamed to db.jar", thanks to Ondřej
  Surý (Closes: #623555).
* Bug fix: "causes noise in php5", thanks to Jayen Ashar (Closes:
  #623533).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
// **********************************************************************
2
2
//
3
 
// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
 
3
// Copyright (c) 2003-2010 ZeroC, Inc. All rights reserved.
4
4
//
5
5
// This copy of Ice is licensed to you under the terms described in the
6
6
// ICE_LICENSE file included in this distribution.
14
14
#include <IceStorm/TraceLevels.h>
15
15
#include <IceStorm/NodeI.h>
16
16
#include <IceStorm/Observers.h>
17
 
 
 
17
#include <IceStorm/DB.h>
18
18
#include <Ice/LoggerUtil.h>
19
 
 
20
 
#include <Freeze/Freeze.h>
21
 
 
22
19
#include <algorithm>
23
20
 
24
21
using namespace std;
25
22
using namespace IceStorm;
26
23
using namespace IceStormElection;
27
24
 
 
25
using namespace IceDB;
 
26
 
28
27
namespace
29
28
{
30
29
 
31
30
void
32
 
halt(const Ice::CommunicatorPtr& com, const Freeze::DatabaseException& ex)
 
31
halt(const Ice::CommunicatorPtr& com, const DatabaseException& ex)
33
32
{
34
33
    {
35
34
        Ice::Error error(com->getLogger());
404
403
    _instance(instance),
405
404
    _name(name),
406
405
    _id(id),
407
 
    _connection(Freeze::createConnection(instance->communicator(), instance->serviceName())),
408
 
    _subscriberMap(_connection, "subscribers"),
409
 
    _llumap(_connection, "llu"),
 
406
    _databaseCache(instance->databaseCache()),
410
407
    _destroyed(false)
411
408
{
412
409
    try
413
410
    {
414
411
        __setNoDelete(true);
 
412
        
415
413
        // TODO: If we want to improve the performance of the
416
414
        // non-replicated case we could allocate a null-topic impl here.
417
415
        _servant = new TopicI(this, instance);
529
527
#if defined(_MSC_VER) && (_MSC_VER < 1300)
530
528
namespace
531
529
{
532
 
static vector<SubscriberPtr>::iterator
 
530
vector<SubscriberPtr>::iterator
533
531
find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, const Ice::Identity& ident)
534
532
{
535
533
    while(start != end)
547
545
 
548
546
namespace
549
547
{
550
 
static void
 
548
void
551
549
trace(Ice::Trace& out, const InstancePtr& instance, const vector<SubscriberPtr>& s)
552
550
{
553
551
    out << '[';
656
654
        {
657
655
            try
658
656
            {
659
 
                Freeze::TransactionHolder txn(_connection);
 
657
                DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
658
                TransactionHolder txn(connection);
 
659
 
660
660
                SubscriberRecordKey key;
661
661
                key.topic = _id;
662
662
                key.id =  record.id;
663
 
                SubscriberMap::iterator e = _subscriberMap.find(key);
664
 
                if(e != _subscriberMap.end())
665
 
                {
666
 
                    _subscriberMap.erase(e);
667
 
                }
668
 
                LLUMap::iterator ci = _llumap.find("_manager");
669
 
                assert(ci != _llumap.end());
670
 
                llu = ci->second;
 
663
 
 
664
                SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
665
                subscribersWrapper->erase(key);
 
666
 
 
667
                LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
 
668
                llu = lluWrapper->get();
671
669
                llu.iteration++;
672
 
                ci.set(llu);
 
670
                lluWrapper->put(llu);
 
671
 
673
672
                txn.commit();
674
673
                break;
675
674
            }
676
 
            catch(const Freeze::DeadlockException&)
 
675
            catch(const DeadlockException&)
677
676
            {
678
677
                continue;
679
678
            }
680
 
            catch(const Freeze::DatabaseException& ex)
 
679
            catch(const DatabaseException& ex)
681
680
            {
682
681
                halt(_instance->communicator(), ex);
683
682
            }   
692
691
    {
693
692
        try
694
693
        {
695
 
            Freeze::TransactionHolder txn(_connection);
 
694
            DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
695
            TransactionHolder txn(connection);
 
696
 
696
697
            SubscriberRecordKey key;
697
698
            key.topic = _id;
698
699
            key.id = subscriber->id();
699
 
            _subscriberMap.put(SubscriberMap::value_type(key, record));
 
700
 
 
701
            SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
702
            subscribersWrapper->put(key, record);
 
703
 
700
704
            // Update the LLU.
701
 
            LLUMap::iterator ci = _llumap.find("_manager");
702
 
            assert(ci != _llumap.end());
703
 
            llu = ci->second;
 
705
            LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
 
706
            llu = lluWrapper->get();
704
707
            llu.iteration++;
705
 
            ci.set(llu);
 
708
            lluWrapper->put(llu);
 
709
 
706
710
            txn.commit();
707
711
            break;
708
712
        }
709
 
        catch(const Freeze::DeadlockException&)
 
713
        catch(const DeadlockException&)
710
714
        {
711
715
            continue;
712
716
        }
713
 
        catch(const Freeze::DatabaseException& ex)
 
717
        catch(const DatabaseException& ex)
714
718
        {
715
719
            halt(_instance->communicator(), ex);
716
720
        }       
770
774
    {
771
775
        try
772
776
        {
773
 
            Freeze::TransactionHolder txn(_connection);
 
777
            DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
778
            TransactionHolder txn(connection);
774
779
 
775
780
            SubscriberRecordKey key;
776
781
            key.topic = _id;
777
782
            key.id = subscriber->id();
778
 
            _subscriberMap.put(SubscriberMap::value_type(key, record));
779
 
 
780
 
            LLUMap::iterator ci = _llumap.find("_manager");
781
 
            assert(ci != _llumap.end());
782
 
            llu = ci->second;
 
783
 
 
784
            SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
785
            subscribersWrapper->put(key, record);
 
786
 
 
787
            LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
 
788
            llu = lluWrapper->get();
783
789
            llu.iteration++;
784
 
            ci.set(llu);
 
790
            lluWrapper->put(llu);
 
791
 
785
792
            txn.commit();
786
793
            break;
787
794
        }
788
 
        catch(const Freeze::DeadlockException&)
 
795
        catch(const DeadlockException&)
789
796
        {
790
797
            continue;
791
798
        }
792
 
        catch(const Freeze::DatabaseException& ex)
 
799
        catch(const DatabaseException& ex)
793
800
        {
794
801
            halt(_instance->communicator(), ex);
795
802
        }       
889
896
    {
890
897
        try
891
898
        {
892
 
            Freeze::TransactionHolder txn(_connection);
 
899
            DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
900
            TransactionHolder txn(connection);
893
901
 
894
902
            SubscriberRecordKey key;
895
903
            key.topic = _id;
896
904
            key.id = id;
897
 
            _subscriberMap.put(SubscriberMap::value_type(key, record));
898
 
 
899
 
            LLUMap::iterator ci = _llumap.find("_manager");
900
 
            assert(ci != _llumap.end());
901
 
            llu = ci->second;
 
905
 
 
906
            SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
907
            subscribersWrapper->put(key, record);
 
908
 
 
909
            LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
 
910
            llu = lluWrapper->get();
902
911
            llu.iteration++;
903
 
            ci.set(llu);
 
912
            lluWrapper->put(llu);
 
913
 
904
914
            txn.commit();
905
915
            break;
906
916
        }
907
 
        catch(const Freeze::DeadlockException&)
 
917
        catch(const DeadlockException&)
908
918
        {
909
919
            continue;
910
920
        }
911
 
        catch(const Freeze::DatabaseException& ex)
 
921
        catch(const DatabaseException& ex)
912
922
        {
913
923
            halt(_instance->communicator(), ex);
914
924
        }       
1283
1293
    {
1284
1294
        try
1285
1295
        {
1286
 
            Freeze::TransactionHolder txn(_connection);
 
1296
            DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
1297
            TransactionHolder txn(connection);
1287
1298
 
1288
1299
            SubscriberRecordKey key;
1289
1300
            key.topic = _id;
1290
1301
            key.id = subscriber->id();
1291
 
            _subscriberMap.put(SubscriberMap::value_type(key, record));
 
1302
 
 
1303
            SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
1304
            subscribersWrapper->put(key, record);
1292
1305
 
1293
1306
            // Update the LLU.
1294
 
            LLUMap::iterator ci = _llumap.find("_manager");
1295
 
            assert(ci != _llumap.end());
1296
 
            ci.set(llu);
 
1307
            LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
 
1308
            lluWrapper->put(llu);
 
1309
 
1297
1310
            txn.commit();
1298
1311
            break;
1299
1312
        }
1300
 
        catch(const Freeze::DeadlockException&)
 
1313
        catch(const DeadlockException&)
1301
1314
        {
1302
1315
            continue;
1303
1316
        }
1304
 
        catch(const Freeze::DatabaseException& ex)
 
1317
        catch(const DatabaseException& ex)
1305
1318
        {
1306
1319
            halt(_instance->communicator(), ex);
1307
1320
        }       
1349
1362
    {
1350
1363
        try
1351
1364
        {
1352
 
            Freeze::TransactionHolder txn(_connection);
 
1365
            DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
1366
            TransactionHolder txn(connection);
 
1367
 
1353
1368
            for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1354
1369
            {
1355
1370
                SubscriberRecordKey key;
1356
1371
                key.topic = _id;
1357
1372
                key.id = *id;
1358
 
                SubscriberMap::iterator e = _subscriberMap.find(key);
1359
 
                if(e != _subscriberMap.end())
1360
 
                {
1361
 
                    _subscriberMap.erase(e);
1362
 
                }
 
1373
 
 
1374
                SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
1375
                subscribersWrapper->erase(key);
1363
1376
            }
1364
 
            LLUMap::iterator ci = _llumap.find("_manager");
1365
 
            assert(ci != _llumap.end());
1366
 
            ci.set(llu);
 
1377
 
 
1378
            LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
 
1379
            lluWrapper->put(llu);
 
1380
 
1367
1381
            txn.commit();
1368
1382
            break;
1369
1383
        }
1370
 
        catch(const Freeze::DeadlockException&)
 
1384
        catch(const DeadlockException&)
1371
1385
        {
1372
1386
            continue;
1373
1387
        }
1374
 
        catch(const Freeze::DatabaseException& ex)
 
1388
        catch(const DatabaseException& ex)
1375
1389
        {
1376
1390
            halt(_instance->communicator(), ex);
1377
1391
        }       
1424
1438
    {
1425
1439
        try
1426
1440
        {
1427
 
            SubscriberRecordKey key;
1428
 
            key.topic = _id;
 
1441
            DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
1442
            TransactionHolder txn(connection);
1429
1443
 
1430
 
            Freeze::TransactionHolder txn(_connection);
1431
1444
            // Erase all subscriber records and the topic record.
1432
 
            SubscriberMap::iterator p = _subscriberMap.find(key);
1433
 
            while(p != _subscriberMap.end() && p->first.topic == key.topic)
1434
 
            {
1435
 
                _subscriberMap.erase(p++);
1436
 
            }
 
1445
            SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
1446
            subscribersWrapper->eraseTopic(_id);
1437
1447
 
1438
1448
            // Update the LLU.
1439
 
            LLUMap::iterator ci = _llumap.find("_manager");
1440
 
            assert(ci != _llumap.end());
 
1449
            LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
1441
1450
            if(master)
1442
1451
            {
1443
 
                llu = ci->second;
 
1452
                llu = lluWrapper->get();
1444
1453
                llu.iteration++;
1445
1454
            }
1446
1455
            else
1447
1456
            {
1448
1457
                llu = origLLU;
1449
1458
            }
1450
 
            ci.set(llu);
 
1459
            lluWrapper->put(llu);
 
1460
 
1451
1461
            txn.commit();
1452
1462
            break;
1453
1463
        }
1454
 
        catch(const Freeze::DeadlockException&)
 
1464
        catch(const DeadlockException&)
1455
1465
        {
1456
1466
            continue;
1457
1467
        }
1458
 
        catch(const Freeze::DatabaseException& ex)
 
1468
        catch(const DatabaseException& ex)
1459
1469
        {
1460
1470
            halt(_instance->communicator(), ex);
1461
1471
        }       
1503
1513
    {
1504
1514
        try
1505
1515
        {
1506
 
            Freeze::TransactionHolder txn(_connection);
 
1516
            DatabaseConnectionPtr connection = _databaseCache->getConnection();
 
1517
            TransactionHolder txn(connection);
1507
1518
 
1508
1519
            for(Ice::IdentitySeq::const_iterator id = ids.begin(); id != ids.end(); ++id)
1509
1520
            {
1510
1521
                SubscriberRecordKey key;
1511
1522
                key.topic = _id;
1512
1523
                key.id = *id;
1513
 
                SubscriberMap::iterator e = _subscriberMap.find(key);
1514
 
                if(e != _subscriberMap.end())
1515
 
                {
1516
 
                    _subscriberMap.erase(e);
1517
 
                }
 
1524
                
 
1525
                SubscribersWrapperPtr subscribersWrapper = _databaseCache->getSubscribers(connection);
 
1526
                subscribersWrapper->erase(key);
1518
1527
            }
1519
1528
 
1520
 
            LLUMap::iterator ci = _llumap.find("_manager");
1521
 
            assert(ci != _llumap.end());
1522
 
            llu = ci->second;
 
1529
            LLUWrapperPtr lluWrapper = _databaseCache->getLLU(connection);
 
1530
            llu = lluWrapper->get();
1523
1531
            llu.iteration++;
1524
 
            ci.set(llu);
 
1532
            lluWrapper->put(llu);
 
1533
 
1525
1534
            txn.commit();
1526
1535
            break;
1527
1536
        }
1528
 
        catch(const Freeze::DeadlockException&)
 
1537
        catch(const DeadlockException&)
1529
1538
        {
1530
1539
            continue;
1531
1540
        }
1532
 
        catch(const Freeze::DatabaseException& ex)
 
1541
        catch(const DatabaseException& ex)
1533
1542
        {
1534
1543
            halt(_instance->communicator(), ex);
1535
1544
        }