~ubuntu-branches/ubuntu/quantal/zeroc-ice/quantal

« back to all changes in this revision

Viewing changes to .pc/ice_for_gcc4.7.patch/cpp/src/IceStorm/NodeI.cpp

  • Committer: Package Import Robot
  • Author(s): Michael Ziegler
  • Date: 2012-07-07 15:24:30 UTC
  • mfrom: (6.1.25 sid)
  • Revision ID: package-import@ubuntu.com-20120707152430-kktx4mfmywkz382j
Tags: 3.4.2-8.1
* Non-maintainer upload.
* Revert the patch 'fixing' the FTBFS with gcc-4.7 that breaks ABI, and
  add force_gcc_4.6.patch.  We need to do this all in the upstream makefile
  because it has a silly check for the intel icpc compiler that we need to
  patch around or the build will fail with anything but c++ as the compiler,
  and we can't just override CXX in /rules because the wonderful 3.0 (quilt)
  system will revert that patch before the clean target is run ensuring that
  hilarious fail is guaranteed.
  Closes: #672066

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// **********************************************************************
2
 
//
3
 
// Copyright (c) 2003-2011 ZeroC, Inc. All rights reserved.
4
 
//
5
 
// This copy of Ice is licensed to you under the terms described in the
6
 
// ICE_LICENSE file included in this distribution.
7
 
//
8
 
// **********************************************************************
9
 
 
10
 
#include <IceStorm/NodeI.h>
11
 
#include <IceStorm/Observers.h>
12
 
#include <IceStorm/TraceLevels.h>
13
 
 
14
 
using namespace IceStorm;
15
 
using namespace IceStormElection;
16
 
using namespace std;
17
 
 
18
 
namespace
19
 
{
20
 
 
21
 
bool operator==(const GroupNodeInfo& info, int id)
22
 
{
23
 
    return info.id == id;
24
 
}
25
 
 
26
 
class CheckTask : public IceUtil::TimerTask
27
 
{
28
 
    const NodeIPtr _node;
29
 
 
30
 
public:
31
 
 
32
 
    CheckTask(const NodeIPtr& node) : _node(node) { }
33
 
    virtual void runTimerTask()
34
 
    {
35
 
        _node->check();
36
 
    }
37
 
};
38
 
 
39
 
class MergeTask : public IceUtil::TimerTask
40
 
{
41
 
    const NodeIPtr _node;
42
 
    const set<int> _s;
43
 
 
44
 
public:
45
 
 
46
 
    MergeTask(const NodeIPtr& node, const set<int>& s) : _node(node), _s(s) { }
47
 
    virtual void runTimerTask()
48
 
    {
49
 
        _node->merge(_s);
50
 
    }
51
 
};
52
 
 
53
 
class MergeContinueTask : public IceUtil::TimerTask
54
 
{
55
 
    const NodeIPtr _node;
56
 
 
57
 
public:
58
 
 
59
 
    MergeContinueTask(const NodeIPtr& node) : _node(node) { }
60
 
    virtual void runTimerTask()
61
 
    {
62
 
        _node->mergeContinue();
63
 
    }
64
 
};
65
 
 
66
 
class TimeoutTask: public IceUtil::TimerTask
67
 
{
68
 
    const NodeIPtr _node;
69
 
 
70
 
public:
71
 
 
72
 
    TimeoutTask(const NodeIPtr& node) : _node(node) { }
73
 
    virtual void runTimerTask()
74
 
    {
75
 
        _node->timeout();
76
 
    }
77
 
};
78
 
 
79
 
}
80
 
 
81
 
namespace
82
 
{
83
 
 
84
 
LogUpdate emptyLU = {0, 0};
85
 
 
86
 
}
87
 
 
88
 
GroupNodeInfo::GroupNodeInfo(int i) :
89
 
    id(i), llu(emptyLU)
90
 
{
91
 
}
92
 
 
93
 
GroupNodeInfo::GroupNodeInfo(int i, LogUpdate l, const Ice::ObjectPrx& o) :
94
 
    id(i), llu(l), observer(o)
95
 
{
96
 
}
97
 
 
98
 
bool
99
 
GroupNodeInfo::operator<(const GroupNodeInfo& rhs) const
100
 
{
101
 
    return id < rhs.id;
102
 
}
103
 
 
104
 
bool
105
 
GroupNodeInfo::operator==(const GroupNodeInfo& rhs) const
106
 
{
107
 
    return id == rhs.id;
108
 
}
109
 
 
110
 
Replica::~Replica()
111
 
{
112
 
    //cout << "~Replica" << endl;
113
 
}
114
 
 
115
 
namespace
116
 
{
117
 
static IceUtil::Time
118
 
getTimeout(const string& key, int def, const Ice::PropertiesPtr& properties, const TraceLevelsPtr& traceLevels)
119
 
{
120
 
    int t = properties->getPropertyAsIntWithDefault(key, def);
121
 
    if(t < 0)
122
 
    {
123
 
        Ice::Warning out(traceLevels->logger);
124
 
        out << traceLevels->electionCat << ": " << key << " < 0; Adjusted to 1";
125
 
        t = 1;
126
 
    }
127
 
    return IceUtil::Time::seconds(t);
128
 
}
129
 
 
130
 
static string
131
 
toString(const set<int>& s)
132
 
{
133
 
    ostringstream os;
134
 
    os << "(";
135
 
    for(set<int>::const_iterator p = s.begin(); p != s.end(); ++p)
136
 
    {
137
 
        if(p != s.begin())
138
 
        {
139
 
            os << ",";
140
 
        }
141
 
        os << *p;
142
 
    }
143
 
    os << ")";
144
 
    return os.str();
145
 
}
146
 
 
147
 
}
148
 
 
149
 
NodeI::NodeI(const InstancePtr& instance,
150
 
             const ReplicaPtr& replica,
151
 
             const Ice::ObjectPrx& replicaProxy,
152
 
             int id, const map<int, NodePrx>& nodes) :
153
 
    _timer(instance->timer()),
154
 
    _traceLevels(instance->traceLevels()),
155
 
    _observers(instance->observers()),
156
 
    _replica(replica),
157
 
    _replicaProxy(replicaProxy),
158
 
    _id(id),
159
 
    _nodes(nodes),
160
 
    _state(NodeStateInactive),
161
 
    _updateCounter(0),
162
 
    _max(0),
163
 
    _generation(-1),
164
 
    _destroy(false)
165
 
{
166
 
    map<int, NodePrx> oneway;
167
 
    for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
168
 
    {
169
 
        oneway[p->first] = NodePrx::uncheckedCast(p->second->ice_oneway());
170
 
    }
171
 
    const_cast<map<int, NodePrx>& >(_nodesOneway) = oneway;
172
 
 
173
 
    Ice::PropertiesPtr properties = instance->communicator()->getProperties();
174
 
    const_cast<IceUtil::Time&>(_masterTimeout) = getTimeout(
175
 
        instance->serviceName() + ".Election.MasterTimeout", 10, properties, _traceLevels);
176
 
    const_cast<IceUtil::Time&>(_electionTimeout) = getTimeout(
177
 
        instance->serviceName() + ".Election.ElectionTimeout", 10, properties, _traceLevels);
178
 
    const_cast<IceUtil::Time&>(_mergeTimeout) = getTimeout(
179
 
        instance->serviceName() + ".Election.ResponseTimeout", 10, properties, _traceLevels);
180
 
}
181
 
 
182
 
NodeI::~NodeI()
183
 
{
184
 
    //cout << "~NodeI" << endl;
185
 
}
186
 
 
187
 
void
188
 
NodeI::start()
189
 
{
190
 
    // As an optimization we want the initial election to occur as
191
 
    // soon as possible.
192
 
    //
193
 
    // However, if we have the node trigger the election immediately
194
 
    // upon startup then we'll have a clash with lower priority nodes
195
 
    // starting an election denying a higher priority node the
196
 
    // opportunity to start the election that results in it becoming
197
 
    // the leader. Of course, things will eventually reach a stable
198
 
    // state but it will take longer.
199
 
    //
200
 
    // As such as we schedule the initial election check inversely
201
 
    // proportional to our priority.
202
 
    //
203
 
    // By setting _checkTask first we stop recovery() from setting it
204
 
    // to the regular election interval.
205
 
    //
206
 
    
207
 
    //
208
 
    // We use this lock to ensure that recovery is called before CheckTask
209
 
    // is scheduled, even if timeout is 0
210
 
    //
211
 
    Lock sync(*this);
212
 
    
213
 
    _checkTask = new CheckTask(this);
214
 
    _timer->schedule(_checkTask, IceUtil::Time::seconds((_nodes.size() - _id) * 2));
215
 
    recovery();
216
 
}
217
 
 
218
 
void
219
 
NodeI::check()
220
 
{
221
 
    {
222
 
        Lock sync(*this);
223
 
        if(_destroy)
224
 
        {
225
 
            return;
226
 
        }
227
 
        assert(!_mergeTask);
228
 
 
229
 
        if(_state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
230
 
        {
231
 
            assert(_checkTask);
232
 
            _timer->schedule(_checkTask, _electionTimeout);
233
 
            return;
234
 
        }
235
 
        
236
 
        // Next get the set of nodes that were detected as unreachable
237
 
        // from the replica and remove them from our slave list.
238
 
        vector<int> dead;
239
 
        _observers->getReapedSlaves(dead);
240
 
        if(!dead.empty())
241
 
        {
242
 
            for(vector<int>::const_iterator p = dead.begin(); p != dead.end(); ++p)
243
 
            {
244
 
                set<GroupNodeInfo>::iterator q = _up.find(GroupNodeInfo(*p));
245
 
                if(q != _up.end())
246
 
                {
247
 
                    if(_traceLevels->election > 0)
248
 
                    {
249
 
                        Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
250
 
                        out << "node " << _id << ": reaping slave " << *p;
251
 
                    }
252
 
                    _up.erase(q);
253
 
                }
254
 
            }
255
 
                
256
 
            // If we no longer have the majority of the nodes under our
257
 
            // care then we need to stop our replica.
258
 
            if(_up.size() < _nodes.size()/2)
259
 
            {
260
 
                if(_traceLevels->election > 0)
261
 
                {
262
 
                    Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
263
 
                    out << "node " << _id << ": stopping replica";
264
 
                }
265
 
                // Clear _checkTask -- recovery() will reset the
266
 
                // timer.
267
 
                assert(_checkTask);
268
 
                _checkTask = 0;
269
 
                recovery();
270
 
                return;
271
 
            }
272
 
        }
273
 
    }
274
 
 
275
 
    // See if other groups exist for possible merge.
276
 
    set<int> tmpset;
277
 
    int max = -1;
278
 
    for(map<int, NodePrx>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
279
 
    {
280
 
        if(p->first == _id)
281
 
        {
282
 
            continue;
283
 
        }
284
 
        try
285
 
        {
286
 
            if(p->second->areYouCoordinator())
287
 
            {
288
 
                if(p->first > max)
289
 
                {
290
 
                    max = p->first;
291
 
                }
292
 
                tmpset.insert(p->first);
293
 
            }
294
 
        }
295
 
        catch(const Ice::Exception& ex)
296
 
        {
297
 
            if(_traceLevels->election > 0)
298
 
            {
299
 
                Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
300
 
                out << "node " << _id << ": call on node " << p->first << " failed: " << ex;
301
 
            }
302
 
        }
303
 
    }
304
 
 
305
 
    Lock sync(*this);
306
 
 
307
 
    // If the node state has changed while the mutex has been released
308
 
    // then bail. We don't schedule a re-check since we're either
309
 
    // destroyed in which case we're going to terminate or the end of
310
 
    // the election/reorg will re-schedule the check.
311
 
    if(_destroy || _state == NodeStateElection || _state == NodeStateReorganization || _coord != _id)
312
 
    {
313
 
        _checkTask = 0;
314
 
        return;
315
 
    }
316
 
 
317
 
    // If we didn't find any coordinators then we're done. Reschedule
318
 
    // the next check and terminate.
319
 
    if(tmpset.empty())
320
 
    {
321
 
        assert(_checkTask);
322
 
        _timer->schedule(_checkTask, _electionTimeout);
323
 
        return;
324
 
    }
325
 
 
326
 
    // _checkTask == 0 means that the check isn't scheduled.
327
 
    _checkTask = 0;
328
 
 
329
 
    if(_traceLevels->election > 0)
330
 
    {
331
 
        Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
332
 
        out << "node " << _id << ": highest priority node count: " << max;
333
 
    }
334
 
 
335
 
    IceUtil::Time delay = IceUtil::Time::seconds(0);
336
 
    if(_id < max)
337
 
    {
338
 
        // Reschedule timer proportial to p-i.
339
 
        delay = _mergeTimeout + _mergeTimeout * (max - _id);
340
 
        if(_traceLevels->election > 0)
341
 
        {
342
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
343
 
            out << "node " << _id << ": scheduling merge in " << delay.toDuration()
344
 
                << " seconds";
345
 
        }
346
 
    }
347
 
 
348
 
    assert(!_mergeTask);
349
 
    _mergeTask = new MergeTask(this, tmpset);
350
 
    _timer->schedule(_mergeTask, delay);
351
 
}
352
 
 
353
 
// Called if the node has not heard from the coordinator in some time.
354
 
void
355
 
NodeI::timeout()
356
 
{
357
 
    int myCoord;
358
 
    string myGroup;
359
 
    {
360
 
        Lock sync(*this);
361
 
        // If we're destroyed or we are our own coordinator then we're
362
 
        // done.
363
 
        if(_destroy || _coord == _id)
364
 
        {
365
 
            return;
366
 
        }
367
 
        myCoord = _coord;
368
 
        myGroup = _group;
369
 
    }
370
 
 
371
 
    bool failed = false;
372
 
    try
373
 
    {
374
 
        map<int, NodePrx>::const_iterator p = _nodes.find(myCoord);
375
 
        assert(p != _nodes.end());
376
 
        if(!p->second->areYouThere(myGroup, _id))
377
 
        {
378
 
            if(_traceLevels->election > 0)
379
 
            {
380
 
                Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
381
 
                out << "node " << _id << ": lost connection to coordinator " << myCoord
382
 
                    << ": areYouThere returned false";
383
 
            }
384
 
            failed = true;
385
 
        }
386
 
    }
387
 
    catch(const Ice::Exception& ex)
388
 
    {
389
 
        if(_traceLevels->election > 0)
390
 
        {
391
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
392
 
            out << "node " << _id << ": lost connection to coordinator " << myCoord << ": " << ex;
393
 
        }
394
 
        failed = true;
395
 
    }
396
 
    if(failed)
397
 
    {
398
 
        recovery();
399
 
    }
400
 
}
401
 
 
402
 
void
403
 
NodeI::merge(const set<int>& coordinatorSet)
404
 
{
405
 
    set<int> invited;
406
 
    string gp;
407
 
    {
408
 
        Lock sync(*this);
409
 
        _mergeTask = 0;
410
 
 
411
 
        // If the node is currently in an election, or reorganizing
412
 
        // then we're done.
413
 
        if(_state == NodeStateElection || _state == NodeStateReorganization)
414
 
        {
415
 
            return;
416
 
        }
417
 
 
418
 
        // This state change prevents this node from accepting
419
 
        // invitations while the merge is executing.
420
 
        setState(NodeStateElection);
421
 
 
422
 
        // No more replica changes are permitted.
423
 
        while(!_destroy && _updateCounter > 0)
424
 
        {
425
 
            wait();
426
 
        }
427
 
        if(_destroy)
428
 
        {
429
 
            return;
430
 
        }
431
 
 
432
 
        ostringstream os;
433
 
        os << _id << ":" << IceUtil::generateUUID();
434
 
        _group = os.str();
435
 
        gp = _group;
436
 
 
437
 
        _invitesAccepted.clear();
438
 
        _invitesIssued.clear();
439
 
 
440
 
        // Construct a set of node ids to invite. This is the union of
441
 
        // _up and set of coordinators gathered in the check stage.
442
 
        invited = coordinatorSet;
443
 
        for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
444
 
        {
445
 
            invited.insert(p->id);
446
 
        }
447
 
 
448
 
        _coord = _id;
449
 
        _up.clear();
450
 
 
451
 
        if(_traceLevels->election > 0)
452
 
        {
453
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
454
 
            out << "node " << _id << ": inviting " << toString(invited) << " to group " << _group;
455
 
        }
456
 
    }
457
 
 
458
 
    set<int>::iterator p = invited.begin();
459
 
    while(p != invited.end())
460
 
    {
461
 
        try
462
 
        {
463
 
            if(_traceLevels->election > 0)
464
 
            {
465
 
                Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
466
 
                out << "node " << _id << ": inviting node " << *p << " to group " << gp;
467
 
            }
468
 
            map<int, NodePrx>::const_iterator node = _nodesOneway.find(*p);
469
 
            assert(node != _nodesOneway.end());
470
 
            node->second->invitation(_id, gp);
471
 
            ++p;
472
 
        }
473
 
        catch(const Ice::Exception&)
474
 
        {
475
 
            invited.erase(p++);
476
 
        }
477
 
    }
478
 
 
479
 
    // Now we wait for responses to our invitation.
480
 
    {
481
 
        Lock sync(*this);
482
 
        if(_destroy)
483
 
        {
484
 
            return;
485
 
        }
486
 
 
487
 
        // Add each of the invited nodes in the invites issed set.
488
 
        _invitesIssued.insert(invited.begin(), invited.end());
489
 
 
490
 
        if(_traceLevels->election > 0)
491
 
        {
492
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
493
 
            out << "node " << _id << ": invites pending: " << toString(_invitesIssued);
494
 
        }
495
 
 
496
 
        // Schedule the mergeContinueTask.
497
 
        assert(_mergeContinueTask == 0);
498
 
        _mergeContinueTask = new MergeContinueTask(this);
499
 
            
500
 
        // At this point we may have already accepted all of the
501
 
        // invitations, if so then we want to schedule the
502
 
        // mergeContinue immediately.
503
 
        IceUtil::Time timeout = _mergeTimeout;
504
 
        if(_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted)
505
 
        {
506
 
            timeout = IceUtil::Time::seconds(0);
507
 
        }
508
 
        _timer->schedule(_mergeContinueTask, timeout);
509
 
    }
510
 
}
511
 
 
512
 
void
513
 
NodeI::mergeContinue()
514
 
{
515
 
    string gp;
516
 
    set<GroupNodeInfo> tmpSet;
517
 
    {
518
 
        Lock sync(*this);
519
 
        if(_destroy)
520
 
        {
521
 
            return;
522
 
        }
523
 
 
524
 
        // Copy variables for thread safety.
525
 
        gp = _group;
526
 
        tmpSet = _up;
527
 
 
528
 
        assert(_mergeContinueTask);
529
 
        _mergeContinueTask = 0;
530
 
 
531
 
        // The node is now reorganizing.
532
 
        assert(_state == NodeStateElection);
533
 
        setState(NodeStateReorganization);
534
 
 
535
 
        if(_traceLevels->election > 0)
536
 
        {
537
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
538
 
            out << "node " << _id << ": coordinator for " << (tmpSet.size() +1) << " nodes (including myself)";
539
 
        }
540
 
        
541
 
        // Now we need to decide whether we can start serving content. If
542
 
        // we're on initial startup then we need all nodes to participate
543
 
        // in the election. If we're running a subsequent election then we
544
 
        // need a majority of the nodes to be active in order to start
545
 
        // running.
546
 
        unsigned int ingroup = static_cast<unsigned int>(tmpSet.size());
547
 
        if((_max != _nodes.size() && ingroup != _nodes.size() -1) || ingroup < _nodes.size()/2)
548
 
        {
549
 
            if(_traceLevels->election > 0)
550
 
            {
551
 
                Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
552
 
                out << "node " << _id << ": not enough nodes " << (ingroup+1) << "/" << _nodes.size()
553
 
                     << " for replication to commence";
554
 
                if(_max != _nodes.size())
555
 
                {
556
 
                    out << " (require full participation for startup)";
557
 
                }
558
 
            }
559
 
            recovery();
560
 
            return;
561
 
        }
562
 
    }
563
 
 
564
 
    // Find out who has the highest available set of database
565
 
    // updates.
566
 
    int maxid = -1;
567
 
    LogUpdate maxllu = { -1, 0 };
568
 
    set<GroupNodeInfo>::const_iterator p;
569
 
    for(p = tmpSet.begin(); p != tmpSet.end(); ++p)
570
 
    {
571
 
        if(_traceLevels->election > 0)
572
 
        {
573
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
574
 
            out << "node id=" << p->id << " llu=" << p->llu.generation << "/" << p->llu.iteration;
575
 
        }
576
 
        if(p->llu.generation > maxllu.generation ||
577
 
           (p->llu.generation == maxllu.generation && p->llu.iteration > maxllu.iteration))
578
 
        {
579
 
            maxid = p->id;
580
 
            maxllu = p->llu;
581
 
        }
582
 
    }
583
 
 
584
 
    LogUpdate myLlu = _replica->getLastLogUpdate();
585
 
    if(_traceLevels->election > 0)
586
 
    {
587
 
        Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
588
 
        out << "node id=" << _id << " llu=" << myLlu.generation << "/" << myLlu.iteration;
589
 
    }
590
 
 
591
 
    // If its not us then we have to get the latest database data from
592
 
    // the replica with the latest set.
593
 
    //if(maxllu > _replica->getLastLogUpdate())
594
 
    if(maxllu > myLlu)
595
 
    {
596
 
        if(_traceLevels->election > 0)
597
 
        {
598
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
599
 
            out << "node " << _id << ": syncing database state with node " << maxid;
600
 
        }
601
 
        try
602
 
        {
603
 
            map<int, NodePrx>::const_iterator node = _nodes.find(maxid);
604
 
            assert(node != _nodes.end());
605
 
            _replica->sync(node->second->sync());
606
 
        }
607
 
        catch(const Ice::Exception& ex)
608
 
        {
609
 
            if(_traceLevels->election > 0)
610
 
            {
611
 
                Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
612
 
                out << "node " << _id << ": syncing database state with node "
613
 
                     << maxid << " failed: " << ex;
614
 
            }
615
 
            recovery();
616
 
            return;
617
 
        }
618
 
    }
619
 
    else
620
 
    {
621
 
        if(_traceLevels->election > 0)
622
 
        {
623
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
624
 
            out << "node " << _id << ": I have the latest database state.";
625
 
        }
626
 
    }
627
 
 
628
 
    // At this point we've ensured that we have the latest database
629
 
    // state, as such we can set our _max flag.
630
 
    unsigned int max = static_cast<unsigned int>(tmpSet.size()) + 1;
631
 
    {
632
 
        Lock sync(*this);
633
 
        if(max > _max)
634
 
        {
635
 
            _max = max;
636
 
        }
637
 
        max = _max;
638
 
    }
639
 
 
640
 
    // Prepare the LogUpdate for this generation.
641
 
    maxllu.generation++;
642
 
    maxllu.iteration = 0;
643
 
 
644
 
    try
645
 
    {
646
 
        // Tell the replica that it is now the master with the given
647
 
        // set of slaves and llu generation.
648
 
        _replica->initMaster(tmpSet, maxllu);
649
 
    }
650
 
    catch(const Ice::Exception& ex)
651
 
    {
652
 
        if(_traceLevels->election > 0)
653
 
        {
654
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
655
 
            out << "node " << _id << ": initMaster failed: " << ex;
656
 
        }
657
 
        recovery();
658
 
        return;
659
 
    }
660
 
 
661
 
    // Tell each node to go.
662
 
    for(p = tmpSet.begin(); p != tmpSet.end(); ++p)
663
 
    {
664
 
        try
665
 
        {
666
 
            map<int, NodePrx>::const_iterator node = _nodes.find(p->id);
667
 
            assert(node != _nodes.end());
668
 
            node->second->ready(_id, gp, _replicaProxy, max, maxllu.generation);
669
 
        }
670
 
        catch(const Ice::Exception& ex)
671
 
        {
672
 
            if(_traceLevels->election > 0)
673
 
            {
674
 
                Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
675
 
                out << "node " << _id << ": error calling ready on " << p->id << " ex: " << ex;
676
 
            }
677
 
            recovery();
678
 
            return;
679
 
        }
680
 
    }
681
 
 
682
 
    {
683
 
        Lock sync(*this);
684
 
        if(_destroy)
685
 
        {
686
 
            return;
687
 
        }
688
 
        if(_traceLevels->election > 0)
689
 
        {
690
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
691
 
            out << "node " << _id << ": reporting for duty in group " << _group << " as coordinator. ";
692
 
            out << "replication commencing with " << _up.size()+1 << "/" << _nodes.size()
693
 
                << " nodes with llu generation: " << maxllu.generation;
694
 
        }
695
 
        setState(NodeStateNormal);
696
 
        _coordinatorProxy = 0;
697
 
 
698
 
        _generation = maxllu.generation;
699
 
 
700
 
        assert(!_checkTask);
701
 
        _checkTask = new CheckTask(this);
702
 
        _timer->schedule(_checkTask, _electionTimeout);
703
 
    }
704
 
}
705
 
 
706
 
void
707
 
NodeI::invitation(int j, const string& gn, const Ice::Current&)
708
 
{
709
 
    if(_traceLevels->election > 0)
710
 
    {
711
 
        Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
712
 
        out << "node " << _id << ": invitation from " << j << " to group " << gn;
713
 
    }
714
 
 
715
 
    // Verify that j exists in our node set.
716
 
    if(_nodes.find(j) == _nodes.end())
717
 
    {
718
 
        Ice::Warning warn(_traceLevels->logger);
719
 
        warn << _traceLevels->electionCat << ": ignoring invitation from unknown node " << j;
720
 
        return;
721
 
    }
722
 
 
723
 
    int tmpCoord = -1;
724
 
    int max = -1;
725
 
    set<GroupNodeInfo> tmpSet;
726
 
    {
727
 
        Lock sync(*this);
728
 
        if(_destroy)
729
 
        {
730
 
            return;
731
 
        }
732
 
        // If we're in the election or reorg state a merge has already
733
 
        // started, so ignore the invitation.
734
 
        if(_state == NodeStateElection || _state == NodeStateReorganization)
735
 
        {
736
 
            if(_traceLevels->election > 0)
737
 
            {
738
 
                Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
739
 
                out << "node " << _id << ": invitation ignored";
740
 
            }
741
 
            return;
742
 
        }
743
 
 
744
 
        //
745
 
        // Upon receipt of an invitation we cancel any pending merge
746
 
        // task.
747
 
        //
748
 
        if(_mergeTask)
749
 
        {
750
 
            // If the timer doesn't cancel it means that the timer has
751
 
            // fired and the merge is currently in-progress in which
752
 
            // case we should reject the invitation.
753
 
            if(!_timer->cancel(_mergeTask))
754
 
            {
755
 
                // The merge task is cleared in the merge. This
756
 
                // ensures two invitations cannot cause a race with
757
 
                // the merge.
758
 
                //_mergeTask = 0;
759
 
                return;
760
 
            }
761
 
            _mergeTask = 0;
762
 
        }
763
 
 
764
 
        // We're now joining with another group. If we are active we
765
 
        // must stop serving as a master or slave.
766
 
        setState(NodeStateElection);
767
 
        while(!_destroy && _updateCounter > 0)
768
 
        {
769
 
            wait();
770
 
        }
771
 
        if(_destroy)
772
 
        {
773
 
            return;
774
 
        }
775
 
 
776
 
        tmpCoord = _coord;
777
 
        tmpSet = _up;
778
 
 
779
 
        _coord = j;
780
 
        _group = gn;
781
 
        max = _max;
782
 
    }
783
 
 
784
 
    Ice::IntSeq forwardedInvites;
785
 
    if(tmpCoord == _id) // Forward invitation to my old members.
786
 
    {
787
 
        for(set<GroupNodeInfo>::const_iterator p = tmpSet.begin(); p != tmpSet.end(); ++p)
788
 
        {
789
 
            try
790
 
            {
791
 
                map<int, NodePrx>::const_iterator node = _nodesOneway.find(p->id);
792
 
                assert(node != _nodesOneway.end());
793
 
                node->second->invitation(j, gn);
794
 
                forwardedInvites.push_back(p->id);
795
 
            }
796
 
            catch(const Ice::Exception&)
797
 
            {
798
 
            }
799
 
        }
800
 
    }
801
 
 
802
 
    // Set the state and timer before calling accept. This ensures
803
 
    // that if ready is called directly after accept is called then
804
 
    // everything is fine. Setting the state *after* calling accept
805
 
    // can cause a race.
806
 
    {
807
 
        Lock sync(*this);
808
 
        if(_destroy)
809
 
        {
810
 
            return;
811
 
        }
812
 
        assert(_state == NodeStateElection);
813
 
        setState(NodeStateReorganization);
814
 
        if(!_timeoutTask)
815
 
        {
816
 
            _timeoutTask = new TimeoutTask(this);
817
 
            _timer->scheduleRepeated(_timeoutTask, _masterTimeout);
818
 
        }
819
 
    }
820
 
 
821
 
    try
822
 
    {
823
 
        map<int, NodePrx>::const_iterator node = _nodesOneway.find(j);
824
 
        assert(node != _nodesOneway.end());
825
 
        node->second->accept(_id, gn, forwardedInvites, _replica->getObserver(), _replica->getLastLogUpdate(), max);
826
 
    }
827
 
    catch(const Ice::Exception&)
828
 
    {
829
 
        recovery();
830
 
        return;
831
 
    }
832
 
}
833
 
 
834
 
void
835
 
NodeI::ready(int j, const string& gn, const Ice::ObjectPrx& coordinator, int max, Ice::Long generation,
836
 
             const Ice::Current&)
837
 
{
838
 
    Lock sync(*this);
839
 
    if(!_destroy && _state == NodeStateReorganization && _group == gn)
840
 
    {
841
 
        // The coordinator must be j (this was set in the invitation).
842
 
        if(_coord != j)
843
 
        {
844
 
            Ice::Warning warn(_traceLevels->logger);
845
 
            warn << _traceLevels->electionCat << ": ignoring ready call from replica node " << j
846
 
                 << " (real coordinator is " << _coord << ")";
847
 
            return;
848
 
        }
849
 
 
850
 
        // Here we've already validated j in the invite call
851
 
        // (otherwise _group != gn).
852
 
        if(_traceLevels->election > 0)
853
 
        {
854
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
855
 
            out << "node " << _id << ": reporting for duty in group " << gn << " with coordinator " << j;
856
 
        }
857
 
 
858
 
        if(static_cast<unsigned int>(max) > _max)
859
 
        {
860
 
            _max = max;
861
 
        }
862
 
        _generation = generation;
863
 
 
864
 
        // Activate the replica here since the replica is now ready
865
 
        // for duty.
866
 
        setState(NodeStateNormal);
867
 
        _coordinatorProxy = coordinator;
868
 
 
869
 
        if(!_checkTask)
870
 
        {
871
 
            _checkTask = new CheckTask(this);
872
 
            _timer->schedule(_checkTask, _electionTimeout);
873
 
        }
874
 
    }
875
 
}
876
 
 
877
 
void
878
 
NodeI::accept(int j, const string& gn, const Ice::IntSeq& forwardedInvites, const Ice::ObjectPrx& observer,
879
 
              const LogUpdate& llu, int max, const Ice::Current&)
880
 
{
881
 
    // Verify that j exists in our node set.
882
 
    if(_nodes.find(j) == _nodes.end())
883
 
    {
884
 
        Ice::Warning warn(_traceLevels->logger);
885
 
        warn << _traceLevels->electionCat << ": ignoring accept from unknown node " << j;
886
 
        return;
887
 
    }
888
 
 
889
 
    Lock sync(*this);
890
 
    if(!_destroy && _state == NodeStateElection && _group == gn && _coord == _id)
891
 
    {
892
 
        _up.insert(GroupNodeInfo(j, llu, observer));
893
 
 
894
 
        if(static_cast<unsigned int>(max) > _max)
895
 
        {
896
 
            _max = max;
897
 
        }
898
 
 
899
 
        if(_traceLevels->election > 0)
900
 
        {
901
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
902
 
            out << "node " << _id << ": accept " << j << " forward invites (";
903
 
            for(Ice::IntSeq::const_iterator p = forwardedInvites.begin(); p != forwardedInvites.end(); ++p)
904
 
            {
905
 
                if(p != forwardedInvites.begin())
906
 
                {
907
 
                    out << ",";
908
 
                }
909
 
                out << *p;
910
 
            }
911
 
            out << ") with llu "
912
 
                << llu.generation << "/" << llu.iteration << " into group " << gn
913
 
                << " group size " << (_up.size() + 1);
914
 
        }
915
 
 
916
 
        // Add each of the forwarded invites to the list of issued
917
 
        // invitations. This doesn't use set_union since
918
 
        // forwardedInvites may not be sorted.
919
 
        _invitesIssued.insert(forwardedInvites.begin(), forwardedInvites.end());
920
 
        // We've accepted the invitation from node j.
921
 
        _invitesAccepted.insert(j);
922
 
 
923
 
        if(_traceLevels->election > 0)
924
 
        {
925
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
926
 
            out << "node " << _id << ": invites pending: " << toString(_invitesIssued)
927
 
                << " invites accepted: " << toString(_invitesAccepted);
928
 
        }
929
 
 
930
 
        // If invitations have been accepted from all nodes and the
931
 
        // merge task has already been scheduled then reschedule the
932
 
        // merge continue immediately. Otherwise, we let the existing
933
 
        // merge() schedule continue.
934
 
        if((_up.size() == _nodes.size()-1 || _invitesIssued == _invitesAccepted) &&
935
 
           _mergeContinueTask && _timer->cancel(_mergeContinueTask))
936
 
        {
937
 
            _timer->schedule(_mergeContinueTask, IceUtil::Time::seconds(0));
938
 
        }
939
 
    }
940
 
}
941
 
 
942
 
bool
943
 
NodeI::areYouCoordinator(const Ice::Current&) const
944
 
{
945
 
    Lock sync(*this);
946
 
    return _state != NodeStateElection && _state != NodeStateReorganization && _coord == _id;
947
 
}
948
 
 
949
 
bool
950
 
NodeI::areYouThere(const string& gn, int j, const Ice::Current&) const
951
 
{
952
 
    Lock sync(*this);
953
 
    return _group == gn && _coord == _id && _up.find(GroupNodeInfo(j)) != _up.end();
954
 
}
955
 
 
956
 
Ice::ObjectPrx
957
 
NodeI::sync(const Ice::Current&) const
958
 
{
959
 
    return _replica->getSync();
960
 
}
961
 
 
962
 
NodeInfoSeq
963
 
NodeI::nodes(const Ice::Current&) const
964
 
{
965
 
    NodeInfoSeq seq;
966
 
    for(map<int, NodePrx>::const_iterator q = _nodes.begin(); q != _nodes.end(); ++q)
967
 
    {
968
 
        NodeInfo ni;
969
 
        ni.id = q->first;
970
 
        ni.n = q->second;
971
 
        seq.push_back(ni);
972
 
    }
973
 
 
974
 
    return seq;
975
 
}
976
 
 
977
 
QueryInfo
978
 
NodeI::query(const Ice::Current&) const
979
 
{
980
 
    Lock sync(*this);
981
 
    QueryInfo info;
982
 
    info.id = _id;
983
 
    info.coord = _coord;
984
 
    info.group = _group;
985
 
    info.replica = _replicaProxy;
986
 
    info.state = _state;
987
 
    info.max = _max;
988
 
 
989
 
    for(set<GroupNodeInfo>::const_iterator p = _up.begin(); p != _up.end(); ++p)
990
 
    {
991
 
        GroupInfo gi;
992
 
        gi.id = p->id;
993
 
        gi.llu = p->llu;
994
 
        info.up.push_back(gi);
995
 
    }
996
 
 
997
 
    return info;
998
 
}
999
 
 
1000
 
void
1001
 
NodeI::recovery(Ice::Long generation)
1002
 
{
1003
 
    Lock sync(*this);
1004
 
 
1005
 
    // Ignore the recovery if the node has already advanced a
1006
 
    // generation.
1007
 
    if(generation != -1 && generation != _generation)
1008
 
    {
1009
 
        return;
1010
 
    }
1011
 
 
1012
 
    setState(NodeStateInactive);
1013
 
    while(!_destroy && _updateCounter > 0)
1014
 
    {
1015
 
        wait();
1016
 
    }
1017
 
    if(_destroy)
1018
 
    {
1019
 
        return;
1020
 
    }
1021
 
 
1022
 
    ostringstream os;
1023
 
    os << _id << ":" << IceUtil::generateUUID();
1024
 
    _group = os.str();
1025
 
 
1026
 
    _generation = -1;
1027
 
    _coord = _id;
1028
 
    _up.clear();
1029
 
    
1030
 
    if(_traceLevels->election > 0)
1031
 
    {
1032
 
        Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1033
 
        out << "node " << _id << ": creating new self-coordinated group " << _group;
1034
 
    }
1035
 
 
1036
 
    // Reset the timer states.
1037
 
    if(_mergeTask)
1038
 
    {
1039
 
        _timer->cancel(_mergeTask);
1040
 
        _mergeTask = 0;
1041
 
    }
1042
 
    if(_timeoutTask)
1043
 
    {
1044
 
        _timer->cancel(_timeoutTask);
1045
 
        _timeoutTask = 0;
1046
 
    }
1047
 
    if(!_checkTask)
1048
 
    {
1049
 
        _checkTask = new CheckTask(this);
1050
 
        _timer->schedule(_checkTask, _electionTimeout);
1051
 
    }
1052
 
}
1053
 
 
1054
 
void
1055
 
NodeI::destroy()
1056
 
{
1057
 
    Lock sync(*this);
1058
 
    assert(!_destroy);
1059
 
 
1060
 
    while(_updateCounter > 0)
1061
 
    {
1062
 
        wait();
1063
 
    }
1064
 
    _destroy = true;
1065
 
    notifyAll();
1066
 
 
1067
 
    // Cancel the timers.
1068
 
    if(_checkTask)
1069
 
    {
1070
 
        _timer->cancel(_checkTask);
1071
 
        _checkTask = 0;
1072
 
    }
1073
 
 
1074
 
    if(_timeoutTask)
1075
 
    {
1076
 
        _timer->cancel(_timeoutTask);
1077
 
        _timeoutTask = 0;
1078
 
    }
1079
 
 
1080
 
    if(_mergeTask)
1081
 
    {
1082
 
        _timer->cancel(_mergeTask);
1083
 
        _mergeTask = 0;
1084
 
    }
1085
 
}
1086
 
 
1087
 
// A node should only receive an observer init call if the node is
1088
 
// reorganizing and its not the coordinator.
1089
 
void
1090
 
NodeI::checkObserverInit(Ice::Long generation)
1091
 
{
1092
 
    Lock sync(*this);
1093
 
    if(_state != NodeStateReorganization)
1094
 
    {
1095
 
        throw ObserverInconsistencyException("init cannot block when state != NodeStateReorganization");
1096
 
    }
1097
 
    if(_coord == _id)
1098
 
    {
1099
 
        throw ObserverInconsistencyException("init called on coordinator");
1100
 
    }
1101
 
}
1102
 
 
1103
 
// Notify the node that we're about to start an update.
1104
 
Ice::ObjectPrx
1105
 
NodeI::startUpdate(Ice::Long& generation, const char* file, int line)
1106
 
{
1107
 
    bool majority = _observers->check();
1108
 
 
1109
 
    Lock sync(*this);
1110
 
    
1111
 
    // If we've actively replicating & lost the majority of our replicas then recover.
1112
 
    if(!_coordinatorProxy && !_destroy && _state == NodeStateNormal && !majority)
1113
 
    {
1114
 
        recovery();
1115
 
    }
1116
 
 
1117
 
    while(!_destroy && _state != NodeStateNormal)
1118
 
    {
1119
 
        wait();
1120
 
    }
1121
 
    if(_destroy)
1122
 
    {
1123
 
        throw Ice::UnknownException(file, line);
1124
 
    }
1125
 
    if(!_coordinatorProxy)
1126
 
    {
1127
 
        ++_updateCounter;
1128
 
    }
1129
 
    generation = _generation;
1130
 
    return _coordinatorProxy;
1131
 
}
1132
 
 
1133
 
bool
1134
 
NodeI::updateMaster(const char* file, int line)
1135
 
{
1136
 
    bool majority = _observers->check();
1137
 
 
1138
 
    Lock sync(*this);
1139
 
 
1140
 
    // If the node is destroyed, or is not a coordinator then we're
1141
 
    // done.
1142
 
    if(_destroy || _coordinatorProxy)
1143
 
    {
1144
 
        return false;
1145
 
    }
1146
 
    
1147
 
    // If we've lost the majority of our replicas then recover.
1148
 
    if(_state == NodeStateNormal && !majority)
1149
 
    {
1150
 
        recovery();
1151
 
    }
1152
 
 
1153
 
    // If we're not replicating then we're done.
1154
 
    if(_state != NodeStateNormal)
1155
 
    {
1156
 
        return false;
1157
 
    }
1158
 
 
1159
 
    // Otherwise adjust the update counter, and return success.
1160
 
    ++_updateCounter;
1161
 
    return true;
1162
 
}
1163
 
 
1164
 
Ice::ObjectPrx
1165
 
NodeI::startCachedRead(Ice::Long& generation, const char* file, int line)
1166
 
{
1167
 
    Lock sync(*this);
1168
 
    while(!_destroy && _state != NodeStateNormal)
1169
 
    {
1170
 
        wait();
1171
 
    }
1172
 
    if(_destroy)
1173
 
    {
1174
 
        throw Ice::UnknownException(file, line);
1175
 
    }
1176
 
    generation = _generation;
1177
 
    ++_updateCounter;
1178
 
    return _coordinatorProxy;
1179
 
}
1180
 
 
1181
 
void
1182
 
NodeI::startObserverUpdate(Ice::Long generation, const char* file, int line)
1183
 
{
1184
 
    Lock sync(*this);
1185
 
    if(_destroy)
1186
 
    {
1187
 
        throw Ice::UnknownException(file, line);
1188
 
    }
1189
 
    if(_state != NodeStateNormal)
1190
 
    {
1191
 
        throw ObserverInconsistencyException("update called on inactive node");
1192
 
    }
1193
 
    if(!_coordinatorProxy)
1194
 
    {
1195
 
        throw ObserverInconsistencyException("update called on the master");
1196
 
    }
1197
 
    if(generation != _generation)
1198
 
    {
1199
 
        throw ObserverInconsistencyException("invalid generation");
1200
 
    }
1201
 
    ++_updateCounter;
1202
 
}
1203
 
 
1204
 
void
1205
 
NodeI::finishUpdate()
1206
 
{
1207
 
    Lock sync(*this);
1208
 
    assert(!_destroy);
1209
 
    --_updateCounter;
1210
 
    assert(_updateCounter >= 0);
1211
 
    if(_updateCounter == 0)
1212
 
    {
1213
 
        notifyAll();
1214
 
    }
1215
 
}
1216
 
 
1217
 
namespace
1218
 
{
1219
 
static string
1220
 
stateToString(NodeState s)
1221
 
{
1222
 
    switch(s)
1223
 
    {
1224
 
    case NodeStateInactive:
1225
 
        return "inactive";
1226
 
    case NodeStateElection:
1227
 
        return "election";
1228
 
    case NodeStateReorganization:
1229
 
        return "reorganization";
1230
 
    case NodeStateNormal:
1231
 
        return "normal";
1232
 
    }
1233
 
    return "unknown";
1234
 
}
1235
 
}
1236
 
 
1237
 
void
1238
 
NodeI::setState(NodeState s)
1239
 
{
1240
 
    if(s != _state)
1241
 
    {
1242
 
        if(_traceLevels->election > 0)
1243
 
        {
1244
 
            Ice::Trace out(_traceLevels->logger, _traceLevels->electionCat);
1245
 
            out << "node " << _id << ": transition from " << stateToString(_state) << " to "
1246
 
                << stateToString(s);
1247
 
        }
1248
 
        _state = s;
1249
 
        if(_state == NodeStateNormal)
1250
 
        {
1251
 
            notifyAll();
1252
 
        }
1253
 
    }
1254
 
}