~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/test/src/NDBT_Thread.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
#include <NDBT_Thread.hpp>
 
18
#include <NdbApi.hpp>
 
19
 
 
20
NDBT_Thread::NDBT_Thread()
 
21
{
 
22
  create(0, -1);
 
23
}
 
24
 
 
25
NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no)
 
26
{
 
27
  create(thread_set, thread_no);
 
28
}
 
29
 
 
30
void
 
31
NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no)
 
32
{
 
33
  m_magic = NDBT_Thread::Magic;
 
34
 
 
35
  m_state = Wait;
 
36
  m_thread_set = thread_set;
 
37
  m_thread_no = thread_no;
 
38
  m_func = 0;
 
39
  m_input = 0;
 
40
  m_output = 0;
 
41
  m_ndb = 0;
 
42
  m_err = 0;
 
43
 
 
44
  m_mutex = NdbMutex_Create();
 
45
  assert(m_mutex != 0);
 
46
  m_cond = NdbCondition_Create();
 
47
  assert(m_cond != 0);
 
48
 
 
49
  char buf[20];
 
50
  sprintf(buf, "NDBT_%04u");
 
51
  const char* name = strdup(buf);
 
52
  assert(name != 0);
 
53
 
 
54
  unsigned stacksize = 512 * 1024;
 
55
  NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
 
56
  m_thread = NdbThread_Create(NDBT_Thread_run,
 
57
                              (void**)this, stacksize, name, prio);
 
58
  assert(m_thread != 0);
 
59
}
 
60
 
 
61
NDBT_Thread::~NDBT_Thread()
 
62
{
 
63
  if (m_thread != 0) {
 
64
    NdbThread_Destroy(&m_thread);
 
65
    m_thread = 0;
 
66
  }
 
67
  if (m_cond != 0) {
 
68
    NdbCondition_Destroy(m_cond);
 
69
    m_cond = 0;
 
70
  }
 
71
  if (m_mutex != 0) {
 
72
    NdbMutex_Destroy(m_mutex);
 
73
    m_mutex = 0;
 
74
  }
 
75
}
 
76
 
 
77
static void*
 
78
NDBT_Thread_run(void* arg)
 
79
{
 
80
  assert(arg != 0);
 
81
  NDBT_Thread& thr = *(NDBT_Thread*)arg;
 
82
  assert(thr.m_magic == NDBT_Thread::Magic);
 
83
  thr.run();
 
84
  return 0;
 
85
}
 
86
 
 
87
void
 
88
NDBT_Thread::run()
 
89
{
 
90
  while (1) {
 
91
    lock();
 
92
    while (m_state != Start && m_state != Exit) {
 
93
      wait();
 
94
    }
 
95
    if (m_state == Exit) {
 
96
      unlock();
 
97
      break;
 
98
    }
 
99
    (*m_func)(*this);
 
100
    m_state = Stop;
 
101
    signal();
 
102
    unlock();
 
103
  }
 
104
}
 
105
 
 
106
// methods for main process
 
107
 
 
108
void
 
109
NDBT_Thread::start()
 
110
{
 
111
  lock();
 
112
  m_state = Start;
 
113
  signal();
 
114
  unlock();
 
115
}
 
116
 
 
117
void
 
118
NDBT_Thread::stop()
 
119
{
 
120
  lock();
 
121
  while (m_state != Stop)
 
122
    wait();
 
123
  m_state = Wait;
 
124
  unlock();
 
125
}
 
126
 
 
127
void
 
128
NDBT_Thread::exit()
 
129
{
 
130
  lock();
 
131
  m_state = Exit;
 
132
  signal();
 
133
  unlock();
 
134
}
 
135
 
 
136
void
 
137
NDBT_Thread::join()
 
138
{
 
139
  NdbThread_WaitFor(m_thread, &m_status);
 
140
  m_thread = 0;
 
141
}
 
142
 
 
143
int
 
144
NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db)
 
145
{
 
146
  m_ndb = new Ndb(ncc, db);
 
147
  if (m_ndb->init() == -1 ||
 
148
      m_ndb->waitUntilReady() == -1) {
 
149
    m_err = m_ndb->getNdbError().code;
 
150
    return -1;
 
151
  }
 
152
  return 0;
 
153
}
 
154
 
 
155
void
 
156
NDBT_Thread::disconnect()
 
157
{
 
158
  delete m_ndb;
 
159
  m_ndb = 0;
 
160
}
 
161
 
 
162
// set of threads
 
163
 
 
164
NDBT_ThreadSet::NDBT_ThreadSet(int count)
 
165
{
 
166
  m_count = count;
 
167
  m_thread = new NDBT_Thread* [count];
 
168
  for (int n = 0; n < count; n++) {
 
169
    m_thread[n] = new NDBT_Thread(this, n);
 
170
  }
 
171
}
 
172
 
 
173
NDBT_ThreadSet::~NDBT_ThreadSet()
 
174
{
 
175
  delete_output();
 
176
  for (int n = 0; n < m_count; n++) {
 
177
    delete m_thread[n];
 
178
    m_thread[n] = 0;
 
179
  }
 
180
  delete [] m_thread;
 
181
}
 
182
 
 
183
void
 
184
NDBT_ThreadSet::start()
 
185
{
 
186
  for (int n = 0; n < m_count; n++) {
 
187
    NDBT_Thread& thr = *m_thread[n];
 
188
    thr.start();
 
189
  }
 
190
}
 
191
 
 
192
void
 
193
NDBT_ThreadSet::stop()
 
194
{
 
195
  for (int n = 0; n < m_count; n++) {
 
196
    NDBT_Thread& thr = *m_thread[n];
 
197
    thr.stop();
 
198
  }
 
199
}
 
200
 
 
201
void
 
202
NDBT_ThreadSet::exit()
 
203
{
 
204
  for (int n = 0; n < m_count; n++) {
 
205
    NDBT_Thread& thr = *m_thread[n];
 
206
    thr.exit();
 
207
  }
 
208
}
 
209
 
 
210
void
 
211
NDBT_ThreadSet::join()
 
212
{
 
213
  for (int n = 0; n < m_count; n++) {
 
214
    NDBT_Thread& thr = *m_thread[n];
 
215
    thr.join();
 
216
  }
 
217
}
 
218
 
 
219
void
 
220
NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func)
 
221
{
 
222
  for (int n = 0; n < m_count; n++) {
 
223
    NDBT_Thread& thr = *m_thread[n];
 
224
    thr.set_func(func);
 
225
  }
 
226
}
 
227
 
 
228
void
 
229
NDBT_ThreadSet::set_input(const void* input)
 
230
{
 
231
  for (int n = 0; n < m_count; n++) {
 
232
    NDBT_Thread& thr = *m_thread[n];
 
233
    thr.set_input(input);
 
234
  }
 
235
}
 
236
 
 
237
void
 
238
NDBT_ThreadSet::delete_output()
 
239
{
 
240
  for (int n = 0; n < m_count; n++) {
 
241
    if (m_thread[n] != 0) {
 
242
      NDBT_Thread& thr = *m_thread[n];
 
243
      thr.delete_output();
 
244
    }
 
245
  }
 
246
}
 
247
 
 
248
int
 
249
NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db)
 
250
{
 
251
  for (int n = 0; n < m_count; n++) {
 
252
    assert(m_thread[n] != 0);
 
253
    NDBT_Thread& thr = *m_thread[n];
 
254
    if (thr.connect(ncc, db) == -1)
 
255
      return -1;
 
256
  }
 
257
  return 0;
 
258
}
 
259
 
 
260
void
 
261
NDBT_ThreadSet::disconnect()
 
262
{
 
263
  for (int n = 0; n < m_count; n++) {
 
264
    if (m_thread[n] != 0) {
 
265
      NDBT_Thread& thr = *m_thread[n];
 
266
      thr.disconnect();
 
267
    }
 
268
  }
 
269
}
 
270
 
 
271
int
 
272
NDBT_ThreadSet::get_err() const
 
273
{
 
274
  for (int n = 0; n < m_count; n++) {
 
275
    if (m_thread[n] != 0) {
 
276
      NDBT_Thread& thr = *m_thread[n];
 
277
      int err = thr.get_err();
 
278
      if (err != 0)
 
279
        return err;
 
280
    }
 
281
  }
 
282
  return 0;
 
283
}