~mathiaz/+junk/ceph-new-pkg-review

« back to all changes in this revision

Viewing changes to src/common/Finisher.h

  • Committer: Mathias Gug
  • Date: 2010-07-29 03:10:42 UTC
  • Revision ID: mathias.gug@canonical.com-20100729031042-n9n8kky962qb4onb
Import ceph_0.21-0ubuntu1 from https://launchpad.net/~clint-fewbar/+archive/ceph/+packages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 
2
// vim: ts=8 sw=2 smarttab
 
3
/*
 
4
 * Ceph - scalable distributed file system
 
5
 *
 
6
 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
 
7
 *
 
8
 * This is free software; you can redistribute it and/or
 
9
 * modify it under the terms of the GNU Lesser General Public
 
10
 * License version 2.1, as published by the Free Software 
 
11
 * Foundation.  See file COPYING.
 
12
 * 
 
13
 */
 
14
 
 
15
#ifndef CEPH_FINISHER_H
 
16
#define CEPH_FINISHER_H
 
17
 
 
18
#include "common/Mutex.h"
 
19
#include "common/Cond.h"
 
20
#include "common/Thread.h"
 
21
 
 
22
class Finisher {
 
23
  Mutex          finisher_lock;
 
24
  Cond           finisher_cond, finisher_empty_cond;
 
25
  bool           finisher_stop, finisher_running;
 
26
  vector<Context*> finisher_queue;
 
27
  list<pair<Context*,int> > finisher_queue_rval;
 
28
  
 
29
  void *finisher_thread_entry();
 
30
 
 
31
  struct FinisherThread : public Thread {
 
32
    Finisher *fin;    
 
33
    FinisherThread(Finisher *f) : fin(f) {}
 
34
    void* entry() { return (void*)fin->finisher_thread_entry(); }
 
35
  } finisher_thread;
 
36
 
 
37
 public:
 
38
  void queue(Context *c, int r = 0) {
 
39
    finisher_lock.Lock();
 
40
    if (r) {
 
41
      finisher_queue_rval.push_back(pair<Context*, int>(c, r));
 
42
      finisher_queue.push_back(NULL);
 
43
    } else
 
44
      finisher_queue.push_back(c);
 
45
    finisher_cond.Signal();
 
46
    finisher_lock.Unlock();
 
47
  }
 
48
  void queue(vector<Context*>& ls) {
 
49
    finisher_lock.Lock();
 
50
    finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
 
51
    finisher_cond.Signal();
 
52
    finisher_lock.Unlock();
 
53
    ls.clear();
 
54
  }
 
55
  void queue(deque<Context*>& ls) {
 
56
    finisher_lock.Lock();
 
57
    finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
 
58
    finisher_cond.Signal();
 
59
    finisher_lock.Unlock();
 
60
    ls.clear();
 
61
  }
 
62
  
 
63
  void start();
 
64
  void stop();
 
65
 
 
66
  void wait_for_empty();
 
67
 
 
68
  Finisher() : finisher_lock("Finisher::finisher_lock"),
 
69
               finisher_stop(false), finisher_running(false), finisher_thread(this) {}
 
70
};
 
71
 
 
72
class C_OnFinisher : public Context {
 
73
  Context *con;
 
74
  Finisher *fin;
 
75
public:
 
76
  C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) {}
 
77
  void finish(int r) {
 
78
    fin->queue(con, r);
 
79
  }
 
80
};
 
81
 
 
82
#endif