1
/* Copyright (C) 2003 MySQL AB
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
16
#include <ndb_global.h>
17
#include <NDBT_Thread.hpp>
20
NDBT_Thread::NDBT_Thread()
25
NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no)
27
create(thread_set, thread_no);
31
NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no)
33
m_magic = NDBT_Thread::Magic;
36
m_thread_set = thread_set;
37
m_thread_no = thread_no;
44
m_mutex = NdbMutex_Create();
46
m_cond = NdbCondition_Create();
50
sprintf(buf, "NDBT_%04u");
51
const char* name = strdup(buf);
54
unsigned stacksize = 512 * 1024;
55
NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
56
m_thread = NdbThread_Create(NDBT_Thread_run,
57
(void**)this, stacksize, name, prio);
58
assert(m_thread != 0);
61
NDBT_Thread::~NDBT_Thread()
64
NdbThread_Destroy(&m_thread);
68
NdbCondition_Destroy(m_cond);
72
NdbMutex_Destroy(m_mutex);
78
NDBT_Thread_run(void* arg)
81
NDBT_Thread& thr = *(NDBT_Thread*)arg;
82
assert(thr.m_magic == NDBT_Thread::Magic);
92
while (m_state != Start && m_state != Exit) {
95
if (m_state == Exit) {
106
// methods for main process
121
while (m_state != Stop)
139
NdbThread_WaitFor(m_thread, &m_status);
144
NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db)
146
m_ndb = new Ndb(ncc, db);
147
if (m_ndb->init() == -1 ||
148
m_ndb->waitUntilReady() == -1) {
149
m_err = m_ndb->getNdbError().code;
156
NDBT_Thread::disconnect()
164
NDBT_ThreadSet::NDBT_ThreadSet(int count)
167
m_thread = new NDBT_Thread* [count];
168
for (int n = 0; n < count; n++) {
169
m_thread[n] = new NDBT_Thread(this, n);
173
NDBT_ThreadSet::~NDBT_ThreadSet()
176
for (int n = 0; n < m_count; n++) {
184
NDBT_ThreadSet::start()
186
for (int n = 0; n < m_count; n++) {
187
NDBT_Thread& thr = *m_thread[n];
193
NDBT_ThreadSet::stop()
195
for (int n = 0; n < m_count; n++) {
196
NDBT_Thread& thr = *m_thread[n];
202
NDBT_ThreadSet::exit()
204
for (int n = 0; n < m_count; n++) {
205
NDBT_Thread& thr = *m_thread[n];
211
NDBT_ThreadSet::join()
213
for (int n = 0; n < m_count; n++) {
214
NDBT_Thread& thr = *m_thread[n];
220
NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func)
222
for (int n = 0; n < m_count; n++) {
223
NDBT_Thread& thr = *m_thread[n];
229
NDBT_ThreadSet::set_input(const void* input)
231
for (int n = 0; n < m_count; n++) {
232
NDBT_Thread& thr = *m_thread[n];
233
thr.set_input(input);
238
NDBT_ThreadSet::delete_output()
240
for (int n = 0; n < m_count; n++) {
241
if (m_thread[n] != 0) {
242
NDBT_Thread& thr = *m_thread[n];
249
NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db)
251
for (int n = 0; n < m_count; n++) {
252
assert(m_thread[n] != 0);
253
NDBT_Thread& thr = *m_thread[n];
254
if (thr.connect(ncc, db) == -1)
261
NDBT_ThreadSet::disconnect()
263
for (int n = 0; n < m_count; n++) {
264
if (m_thread[n] != 0) {
265
NDBT_Thread& thr = *m_thread[n];
272
NDBT_ThreadSet::get_err() const
274
for (int n = 0; n < m_count; n++) {
275
if (m_thread[n] != 0) {
276
NDBT_Thread& thr = *m_thread[n];
277
int err = thr.get_err();