~ubuntu-branches/ubuntu/precise/ceph/precise

« back to all changes in this revision

Viewing changes to src/msg/Message.h

  • Committer: Bazaar Package Importer
  • Author(s): Clint Byrum, Clint Byrum, Micah Gersten
  • Date: 2011-02-12 22:50:26 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20110212225026-yyyw4tk0msgql3ul
Tags: 0.24.2-0ubuntu1
[ Clint Byrum <clint@ubuntu.com> ]
* New upstream release. (LP: #658670, LP: #684011)
* debian/patches/fix-mkcephfs.patch: dropped (applied upstream)
* Removed .la files from libceph1-dev, libcrush1-dev and 
  librados1-dev (per Debian policy v3.9.1 10.2).
* debian/control: adding pkg-config as a build dependency
* debian/control: depend on libcrypto++-dev instead of libssl-dev
* debian/watch: added watch file

[ Micah Gersten <micahg@ubuntu.com> ]
* debian/control: add Homepage

Show diffs side-by-side

added added

removed removed

Lines of Context:
73
73
#define MSG_REMOVE_SNAPS       90
74
74
 
75
75
#define MSG_OSD_SCRUB          91
 
76
#define MSG_OSD_PG_MISSING     92
76
77
 
77
78
 
78
79
 
158
159
};
159
160
 
160
161
struct Connection : public RefCountedObject {
161
 
  atomic_t nref;
162
162
  Mutex lock;
163
163
  RefCountedObject *priv;
164
164
  int peer_type;
166
166
  unsigned features;
167
167
  RefCountedObject *pipe;
168
168
 
 
169
  int rx_buffers_version;
 
170
  map<tid_t,pair<bufferlist,int> > rx_buffers;
 
171
 
169
172
public:
170
 
  Connection() : nref(1), lock("Connection::lock"), priv(NULL), peer_type(-1), features(0), pipe(NULL) {}
 
173
  Connection() : lock("Connection::lock"), priv(NULL), peer_type(-1), features(0), pipe(NULL),
 
174
                 rx_buffers_version(0) {}
171
175
  ~Connection() {
172
176
    //generic_dout(0) << "~Connection " << this << dendl;
173
177
    if (priv) {
179
183
  }
180
184
 
181
185
  Connection *get() {
182
 
    nref.inc();
183
 
    return this;
184
 
  }
185
 
  void put() {
186
 
    if (nref.dec() == 0)
187
 
      delete this;
 
186
    return (Connection *)RefCountedObject::get();
188
187
  }
189
188
 
190
189
  void set_priv(RefCountedObject *o) {
213
212
      pipe = NULL;
214
213
    }
215
214
  }
 
215
  void reset_pipe(RefCountedObject *p) {
 
216
    Mutex::Locker l(lock);
 
217
    if (pipe)
 
218
      pipe->put();
 
219
    pipe = p->get();
 
220
  }
216
221
 
217
222
  int get_peer_type() { return peer_type; }
218
223
  void set_peer_type(int t) { peer_type = t; }
229
234
  bool has_feature(int f) const { return features & f; }
230
235
  void set_features(unsigned f) { features = f; }
231
236
  void set_feature(unsigned f) { features |= f; }
 
237
 
 
238
  void post_rx_buffer(tid_t tid, bufferlist& bl) {
 
239
    Mutex::Locker l(lock);
 
240
    ++rx_buffers_version;
 
241
    rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
 
242
  }
 
243
  void revoke_rx_buffer(tid_t tid) {
 
244
    Mutex::Locker l(lock);
 
245
    rx_buffers.erase(tid);
 
246
  }
232
247
};
233
248
 
234
249
 
235
250
 
236
251
// abstract Message class
237
252
 
238
 
class Message {
 
253
class Message : public RefCountedObject {
239
254
protected:
240
255
  ceph_msg_header  header;      // headerelope
241
256
  ceph_msg_footer  footer;
260
275
  friend class Messenger;
261
276
 
262
277
public:
263
 
  atomic_t nref;
264
 
 
265
 
  Message() : connection(NULL), dispatch_throttle_size(0), nref(1) {
 
278
  Message() : connection(NULL), dispatch_throttle_size(0) {
266
279
    memset(&header, 0, sizeof(header));
267
280
    memset(&footer, 0, sizeof(footer));
268
281
    throttler = NULL;
269
282
  };
270
 
  Message(int t) : connection(NULL), dispatch_throttle_size(0), nref(1) {
 
283
  Message(int t) : connection(NULL), dispatch_throttle_size(0) {
271
284
    memset(&header, 0, sizeof(header));
272
285
    header.type = t;
273
286
    header.version = 1;
276
289
    memset(&footer, 0, sizeof(footer));
277
290
    throttler = NULL;
278
291
  }
 
292
 
 
293
  Message *get() {
 
294
    return (Message *)RefCountedObject::get();
 
295
  }
 
296
 
279
297
protected:
280
298
  virtual ~Message() { 
281
299
    assert(nref.read() == 0);
285
303
      throttler->put(payload.length() + middle.length() + data.length());
286
304
  }
287
305
public:
288
 
  Message *get() {
289
 
    //int r = 
290
 
    nref.inc();
291
 
    //*_dout << dbeginl << "message(" << this << ").get " << (r-1) << " -> " << r << std::endl;
292
 
    //_dout_end_line();
293
 
    return this;
294
 
  }
295
 
  void put() {
296
 
    int r = nref.dec();
297
 
    //*_dout << dbeginl << "message(" << this << ").put " << (r+1) << " -> " << r << std::endl;
298
 
    //_dout_end_line();
299
 
    if (r == 0)
300
 
      delete this;
301
 
  }
302
 
 
303
306
  Connection *get_connection() { return connection; }
304
307
  void set_connection(Connection *c) {
305
308
    if (connection)