~mathiaz/+junk/ceph-new-pkg-review

« back to all changes in this revision

Viewing changes to src/mon/Paxos.h

  • Committer: Mathias Gug
  • Date: 2010-07-29 03:10:42 UTC
  • Revision ID: mathias.gug@canonical.com-20100729031042-n9n8kky962qb4onb
Import ceph_0.21-0ubuntu1 from https://launchpad.net/~clint-fewbar/+archive/ceph/+packages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 
2
// vim: ts=8 sw=2 smarttab
 
3
/*
 
4
 * Ceph - scalable distributed file system
 
5
 *
 
6
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 
7
 *
 
8
 * This is free software; you can redistribute it and/or
 
9
 * modify it under the terms of the GNU Lesser General Public
 
10
 * License version 2.1, as published by the Free Software 
 
11
 * Foundation.  See file COPYING.
 
12
 * 
 
13
 */
 
14
 
 
15
/*
 
16
time---->
 
17
 
 
18
cccccccccccccccccca????????????????????????????????????????
 
19
cccccccccccccccccca????????????????????????????????????????
 
20
cccccccccccccccccca???????????????????????????????????????? leader
 
21
cccccccccccccccccc????????????????????????????????????????? 
 
22
ccccc?????????????????????????????????????????????????????? 
 
23
 
 
24
last_committed
 
25
 
 
26
pn_from
 
27
pn
 
28
 
 
29
a 12v 
 
30
b 12v
 
31
c 14v
 
32
d
 
33
e 12v
 
34
 
 
35
 
 
36
*/
 
37
 
 
38
 
 
39
/*
 
40
 * NOTE: This libary is based on the Paxos algorithm, but varies in a few key ways:
 
41
 *  1- Only a single new value is generated at a time, simplifying the recovery logic.
 
42
 *  2- Nodes track "committed" values, and share them generously (and trustingly)
 
43
 *  3- A 'leasing' mechism is built-in, allowing nodes to determine when it is safe to 
 
44
 *     "read" their copy of the last committed value.
 
45
 *
 
46
 * This provides a simple replication substrate that services can be built on top of.
 
47
 * See PaxosService.h
 
48
 */
 
49
 
 
50
#ifndef CEPH_MON_PAXOS_H
 
51
#define CEPH_MON_PAXOS_H
 
52
 
 
53
#include "include/types.h"
 
54
#include "mon_types.h"
 
55
#include "include/buffer.h"
 
56
#include "messages/PaxosServiceMessage.h"
 
57
#include "msg/msg_types.h"
 
58
 
 
59
#include "include/Context.h"
 
60
 
 
61
#include "common/Timer.h"
 
62
 
 
63
class Monitor;
 
64
class MMonPaxos;
 
65
class Paxos;
 
66
 
 
67
 
 
68
// i am one state machine.
 
69
class Paxos {
 
70
  Monitor *mon;
 
71
  int whoami;
 
72
 
 
73
  // my state machine info
 
74
  int machine_id;
 
75
  const char *machine_name;
 
76
 
 
77
  friend class Monitor;
 
78
  friend class PaxosService;
 
79
  friend class PaxosObserver;
 
80
 
 
81
 
 
82
 
 
83
  // LEADER+PEON
 
84
 
 
85
  // -- generic state --
 
86
public:
 
87
  const static int STATE_RECOVERING = 1;  // leader|peon: recovering paxos state
 
88
  const static int STATE_ACTIVE     = 2;  // leader|peon: idle.  peon may or may not have valid lease
 
89
  const static int STATE_UPDATING   = 3;  // leader|peon: updating to new value
 
90
  static const char *get_statename(int s) {
 
91
    switch (s) {
 
92
    case STATE_RECOVERING: return "recovering";
 
93
    case STATE_ACTIVE: return "active";
 
94
    case STATE_UPDATING: return "updating";
 
95
    default: assert(0); return 0;
 
96
    }
 
97
  }
 
98
 
 
99
private:
 
100
  int state;
 
101
 
 
102
public:
 
103
  bool is_recovering() { return state == STATE_RECOVERING; }
 
104
  bool is_active() { return state == STATE_ACTIVE; }
 
105
  bool is_updating() { return state == STATE_UPDATING; }
 
106
 
 
107
private:
 
108
  // recovery (phase 1)
 
109
  version_t first_committed_any;
 
110
  version_t first_committed;
 
111
  version_t last_pn;
 
112
  version_t last_committed;
 
113
  version_t accepted_pn;
 
114
  version_t accepted_pn_from;
 
115
 
 
116
  // active (phase 2)
 
117
  utime_t lease_expire;
 
118
  list<Context*> waiting_for_active;
 
119
  list<Context*> waiting_for_readable;
 
120
 
 
121
  version_t latest_stashed;
 
122
 
 
123
  // -- leader --
 
124
  // recovery (paxos phase 1)
 
125
  unsigned   num_last;
 
126
  version_t  uncommitted_v;
 
127
  version_t  uncommitted_pn;
 
128
  bufferlist uncommitted_value;
 
129
 
 
130
  Context    *collect_timeout_event;
 
131
 
 
132
  // active
 
133
  set<int>   acked_lease;
 
134
  Context    *lease_renew_event;
 
135
  Context    *lease_ack_timeout_event;
 
136
  Context    *lease_timeout_event;
 
137
 
 
138
  // updating (paxos phase 2)
 
139
  bufferlist new_value;
 
140
  set<int>   accepted;
 
141
 
 
142
  Context    *accept_timeout_event;
 
143
 
 
144
  list<Context*> waiting_for_writeable;
 
145
  list<Context*> waiting_for_commit;
 
146
 
 
147
  // observers
 
148
  struct Observer {
 
149
    entity_inst_t inst;
 
150
    version_t last_version;
 
151
    utime_t timeout;
 
152
    Observer(entity_inst_t& ei, version_t v) : inst(ei), last_version(v) { }
 
153
  };
 
154
  map<entity_inst_t, Observer *> observers;
 
155
 
 
156
 
 
157
  class C_CollectTimeout : public Context {
 
158
    Paxos *paxos;
 
159
  public:
 
160
    C_CollectTimeout(Paxos *p) : paxos(p) {}
 
161
    void finish(int r) {
 
162
      paxos->collect_timeout();
 
163
    }
 
164
  };
 
165
 
 
166
  class C_AcceptTimeout : public Context {
 
167
    Paxos *paxos;
 
168
  public:
 
169
    C_AcceptTimeout(Paxos *p) : paxos(p) {}
 
170
    void finish(int r) {
 
171
      paxos->accept_timeout();
 
172
    }
 
173
  };
 
174
 
 
175
  class C_LeaseAckTimeout : public Context {
 
176
    Paxos *paxos;
 
177
  public:
 
178
    C_LeaseAckTimeout(Paxos *p) : paxos(p) {}
 
179
    void finish(int r) {
 
180
      paxos->lease_ack_timeout();
 
181
    }
 
182
  };
 
183
 
 
184
  class C_LeaseTimeout : public Context {
 
185
    Paxos *paxos;
 
186
  public:
 
187
    C_LeaseTimeout(Paxos *p) : paxos(p) {}
 
188
    void finish(int r) {
 
189
      paxos->lease_timeout();
 
190
    }
 
191
  };
 
192
 
 
193
  class C_LeaseRenew : public Context {
 
194
    Paxos *paxos;
 
195
  public:
 
196
    C_LeaseRenew(Paxos *p) : paxos(p) {}
 
197
    void finish(int r) {
 
198
      paxos->lease_renew_timeout();
 
199
    }
 
200
  };
 
201
 
 
202
 
 
203
  void collect(version_t oldpn);
 
204
  void handle_collect(MMonPaxos*);
 
205
  void handle_last(MMonPaxos*);
 
206
  void collect_timeout();
 
207
 
 
208
  void begin(bufferlist& value);
 
209
  void handle_begin(MMonPaxos*);
 
210
  void handle_accept(MMonPaxos*);
 
211
  void accept_timeout();
 
212
 
 
213
  void commit();
 
214
  void handle_commit(MMonPaxos*);
 
215
  void extend_lease();
 
216
  void handle_lease(MMonPaxos*);
 
217
  void handle_lease_ack(MMonPaxos*);
 
218
 
 
219
  void lease_ack_timeout();    // on leader, if lease isn't acked by all peons
 
220
  void lease_renew_timeout();  // on leader, to renew the lease
 
221
  void lease_timeout();        // on peon, if lease isn't extended
 
222
 
 
223
  void cancel_events();
 
224
 
 
225
  version_t get_new_proposal_number(version_t gt=0);
 
226
  
 
227
public:
 
228
  Paxos(Monitor *m, int w,
 
229
        int mid) : mon(m), whoami(w), 
 
230
                   machine_id(mid), 
 
231
                   machine_name(get_paxos_name(mid)),
 
232
                   state(STATE_RECOVERING),
 
233
                   collect_timeout_event(0),
 
234
                   lease_renew_event(0),
 
235
                   lease_ack_timeout_event(0),
 
236
                   lease_timeout_event(0),
 
237
                   accept_timeout_event(0) { }
 
238
 
 
239
  const char *get_machine_name() const {
 
240
    return machine_name;
 
241
  }
 
242
 
 
243
  void dispatch(PaxosServiceMessage *m);
 
244
 
 
245
  void init();
 
246
 
 
247
  void election_starting();
 
248
  void leader_init();
 
249
  void peon_init();
 
250
 
 
251
  void share_state(MMonPaxos *m, version_t first_committed, version_t last_committed);
 
252
  void store_state(MMonPaxos *m);
 
253
 
 
254
 
 
255
  // -- service interface --
 
256
  void wait_for_active(Context *c) {
 
257
    waiting_for_active.push_back(c);
 
258
  }
 
259
 
 
260
  void trim_to(version_t first);
 
261
  
 
262
  // read
 
263
  version_t get_version() { return last_committed; }
 
264
  bool is_readable(version_t seen=0);
 
265
  bool read(version_t v, bufferlist &bl);
 
266
  version_t read_current(bufferlist &bl);
 
267
  void wait_for_readable(Context *onreadable) {
 
268
    //assert(!is_readable());
 
269
    waiting_for_readable.push_back(onreadable);
 
270
  }
 
271
 
 
272
  // write
 
273
  bool is_leader();
 
274
  bool is_writeable();
 
275
  void wait_for_writeable(Context *c) {
 
276
    assert(!is_writeable());
 
277
    waiting_for_writeable.push_back(c);
 
278
  }
 
279
 
 
280
  bool propose_new_value(bufferlist& bl, Context *oncommit=0);
 
281
  void wait_for_commit(Context *oncommit) {
 
282
    waiting_for_commit.push_back(oncommit);
 
283
  }
 
284
  void wait_for_commit_front(Context *oncommit) {
 
285
    waiting_for_commit.push_front(oncommit);
 
286
  }
 
287
 
 
288
  // if state values are incrementals, it is usefult to keep
 
289
  // the latest copy of the complete structure.
 
290
  void stash_latest(version_t v, bufferlist& bl);
 
291
  version_t get_latest(bufferlist& bl);
 
292
 
 
293
  void register_observer(entity_inst_t inst, version_t v);
 
294
  void update_observers();
 
295
};
 
296
 
 
297
 
 
298
 
 
299
#endif
 
300