~markwright/scalestack/zeromq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/*
 * Scale Stack
 *
 * Copyright 2010 Eric Day
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/**
 * @file
 * @brief libevent Event Thread Declarations
 */

#ifndef SCALESTACK_EVENT_LIBEVENT_THREAD_H
#define SCALESTACK_EVENT_LIBEVENT_THREAD_H

#include <event.h>
#include <pthread.h>
#include <stdint.h>
#include <vector>

#include <scalestack/kernel/macros.h>

namespace scalestack
{
namespace kernel
{

class module;

} /* namespace kernel */

namespace event
{
namespace libevent
{

class handler_provider;
class service;

/**
 * Function passed to pthread_create() to invoke thread::_run().
 */
extern "C" void* thread_run(void* context);

/**
 * Function used by notification event to invoke thread::_handle_notification().
 */
extern "C" void thread_handle_notification(int file_descriptor,
                                           short events,
                                           void* context);

class thread
{
public:

  /**
   * Constructor for event threads. This also starts a new pthread. Once
   * starting this call, the _run() method can be called at any time, even
   * before the constructor completes.
   *
   * @param[in] service Service that is creating this thread.
   * @param[in] thread_index The 0-based index for this thread.
   */
  thread(service& service, size_t thread_index);

  ~thread();

  /**
   * Notify a thread to stop running. This is called by the service provider
   * class for each thread before deleting the threads. This allows all
   * threads to be notified to stop before any joins are attempted.
   */
  void stop(void);

  /**
   * Queue a handler provider to be checked on this thread.
   *
   * @param[in] handler_provider Event handler provider to check.
   */
  void check_handler_provider(handler_provider* handler_provider);

  /**
   * Get the event base struct. This is used by handler providers once they
   * are added to this thread.
   */
  struct event_base* get_event_base(void);

  /**
   * Get a cached time structure.
   */
  const struct timeval& get_now(void);

private:

  /**
   * Don't allow copying of objects.
   */
  SCALESTACK_LOCAL
  thread(const thread&);

  /**
   * Don't allow assignment of objects.
   */
  SCALESTACK_LOCAL
  thread& operator=(const thread&);

  /**
   * Method that implements the body of the running thread.
   */
  SCALESTACK_LOCAL
  void _run(void);

  /**
   * Start the notification pipe.
   */
  SCALESTACK_LOCAL
  void _add_notification_pipe(void);

  /**
   * Stop the notification pipe.
   */
  SCALESTACK_LOCAL
  void _remove_notification_pipe(void);

  /**
   * Send a notification to run and process events.
   */
  SCALESTACK_LOCAL
  void _send_notification(void);

  /**
   * Handle a notification event.
   */
  SCALESTACK_LOCAL
  void _handle_notification(int file_descriptor, short events);

  /**
   * Add mutexes needed for thread.
   */
  SCALESTACK_LOCAL
  void _add_mutexes(void);

  /**
   * Remove mutexes added in _add_mutexes().
   */
  SCALESTACK_LOCAL
  void _remove_mutexes(void);

  /**
   * Start the pthread.
   */
  SCALESTACK_LOCAL
  void _add_thread(void);

  /**
   * Stop the pthread by joining it.
   */
  SCALESTACK_LOCAL
  void _remove_thread(void);

  typedef std::vector<handler_provider*> handler_providers;

  bool _shutdown;
  uint8_t _handler_providers_index;
  size_t _thread_index;
  kernel::module& _module;
  service& _service;
  struct event_base* _base;
  int _notification_pipe[2];
  pthread_t _thread_id;
  struct event _notification_event;
  struct timeval _notification_timeout;
  struct timeval _now;
  pthread_mutex_t _handler_providers_mutex;
  handler_providers _handler_providers[2];

  friend void* thread_run(void*);
  friend void thread_handle_notification(int file_descriptor,
                                         short events,
                                         void* context);
};

} /* namespace libevent */
} /* namespace event */
} /* namespace scalestack */

#endif /* SCALESTACK_EVENT_LIBEVENT_THREAD_H */