~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/cw/cpcd/CPCD.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
 
 
17
#include <ndb_global.h>
 
18
#include <NdbOut.hpp>
 
19
 
 
20
#include "APIService.hpp"
 
21
#include "CPCD.hpp"
 
22
#include <NdbMutex.h> 
 
23
 
 
24
#include "common.hpp"
 
25
 
 
26
extern const ParserRow<CPCDAPISession> commands[];
 
27
 
 
28
 
 
29
CPCD::CPCD() {
 
30
  loadingProcessList = false;
 
31
  m_processes.clear();
 
32
  m_monitor = NULL;
 
33
  m_monitor = new Monitor(this);
 
34
  m_procfile = "ndb_cpcd.db";
 
35
}
 
36
 
 
37
CPCD::~CPCD() {
 
38
  if(m_monitor != NULL) {
 
39
    delete m_monitor;
 
40
    m_monitor = NULL;
 
41
  }
 
42
}
 
43
 
 
44
int
 
45
CPCD::findUniqueId() {
 
46
  int id;
 
47
  bool ok = false;
 
48
  m_processes.lock();
 
49
  
 
50
  while(!ok) {
 
51
    ok = true;
 
52
    id = random() % 8192; /* Don't want so big numbers */
 
53
    
 
54
    if(id == 0)
 
55
      ok = false;
 
56
 
 
57
    for(size_t i = 0; i<m_processes.size(); i++) {
 
58
      if(m_processes[i]->m_id == id)
 
59
        ok = false;
 
60
    }
 
61
  }
 
62
  m_processes.unlock();
 
63
  return id;
 
64
}
 
65
 
 
66
bool
 
67
CPCD::defineProcess(RequestStatus * rs, Process * arg){
 
68
  if(arg->m_id == -1)
 
69
    arg->m_id = findUniqueId();
 
70
 
 
71
  Guard tmp(m_processes);
 
72
 
 
73
  for(size_t i = 0; i<m_processes.size(); i++) {
 
74
    Process * proc = m_processes[i];
 
75
    
 
76
    if((strcmp(arg->m_name.c_str(), proc->m_name.c_str()) == 0) && 
 
77
       (strcmp(arg->m_group.c_str(), proc->m_group.c_str()) == 0)) {
 
78
      /* Identical names in the same group */
 
79
      rs->err(AlreadyExists, "Name already exists");
 
80
      return false;
 
81
    }
 
82
 
 
83
    if(arg->m_id == proc->m_id) {
 
84
      /* Identical ID numbers */
 
85
      rs->err(AlreadyExists, "Id already exists");
 
86
      return false;
 
87
    }
 
88
  }
 
89
  
 
90
  m_processes.push_back(arg, false);
 
91
 
 
92
  notifyChanges();
 
93
  report(arg->m_id, CPCEvent::ET_PROC_USER_DEFINE);
 
94
 
 
95
  return true;
 
96
}
 
97
 
 
98
bool
 
99
CPCD::undefineProcess(CPCD::RequestStatus *rs, int id) {
 
100
 
 
101
  Guard tmp(m_processes);
 
102
 
 
103
  Process * proc = 0;
 
104
  size_t i;
 
105
  for(i = 0; i < m_processes.size(); i++) {
 
106
    if(m_processes[i]->m_id == id) {
 
107
      proc = m_processes[i];
 
108
      break;
 
109
    }
 
110
  }
 
111
 
 
112
  if(proc == 0){
 
113
    rs->err(NotExists, "No such process");
 
114
    return false;
 
115
  }
 
116
 
 
117
  switch(proc->m_status){
 
118
  case RUNNING:
 
119
  case STOPPED:
 
120
  case STOPPING:
 
121
  case STARTING:
 
122
    proc->stop();
 
123
    m_processes.erase(i, false /* Already locked */);
 
124
  }
 
125
  
 
126
  
 
127
  notifyChanges();
 
128
  
 
129
  report(id, CPCEvent::ET_PROC_USER_UNDEFINE);
 
130
 
 
131
  return true;
 
132
}
 
133
 
 
134
bool
 
135
CPCD::startProcess(CPCD::RequestStatus *rs, int id) {
 
136
 
 
137
  Process * proc = 0;
 
138
  {
 
139
 
 
140
    Guard tmp(m_processes);
 
141
    
 
142
    for(size_t i = 0; i < m_processes.size(); i++) {
 
143
      if(m_processes[i]->m_id == id) {
 
144
        proc = m_processes[i];
 
145
        break;
 
146
      }
 
147
    }
 
148
    
 
149
    if(proc == 0){
 
150
      rs->err(NotExists, "No such process");
 
151
      return false;
 
152
    }
 
153
    
 
154
    switch(proc->m_status){
 
155
    case STOPPED:
 
156
      proc->m_status = STARTING;
 
157
      if(proc->start() != 0){
 
158
        rs->err(Error, "Failed to start");
 
159
        return false;
 
160
      }
 
161
      break;
 
162
    case STARTING:
 
163
      rs->err(Error, "Already starting");
 
164
      return false;
 
165
    case RUNNING:
 
166
      rs->err(Error, "Already started");
 
167
      return false;
 
168
    case STOPPING:
 
169
      rs->err(Error, "Currently stopping");
 
170
      return false;
 
171
    }
 
172
    
 
173
    notifyChanges();
 
174
  }
 
175
  report(id, CPCEvent::ET_PROC_USER_START);
 
176
 
 
177
  return true;
 
178
}
 
179
 
 
180
bool
 
181
CPCD::stopProcess(CPCD::RequestStatus *rs, int id) {
 
182
 
 
183
  Guard tmp(m_processes);
 
184
 
 
185
  Process * proc = 0;
 
186
  for(size_t i = 0; i < m_processes.size(); i++) {
 
187
    if(m_processes[i]->m_id == id) {
 
188
      proc = m_processes[i];
 
189
      break;
 
190
    }
 
191
  }
 
192
 
 
193
  if(proc == 0){
 
194
    rs->err(NotExists, "No such process");
 
195
    return false;
 
196
  }
 
197
 
 
198
  switch(proc->m_status){
 
199
  case STARTING:
 
200
  case RUNNING:
 
201
    proc->stop();
 
202
    break;
 
203
  case STOPPED:
 
204
    rs->err(AlreadyStopped, "Already stopped");
 
205
    return false;
 
206
    break;
 
207
  case STOPPING:
 
208
    rs->err(Error, "Already stopping");
 
209
    return false;
 
210
  }
 
211
  
 
212
  notifyChanges();
 
213
 
 
214
  report(id, CPCEvent::ET_PROC_USER_START);
 
215
  
 
216
  return true;
 
217
}
 
218
 
 
219
bool
 
220
CPCD::notifyChanges() {
 
221
  bool ret = true;
 
222
  if(!loadingProcessList)
 
223
    ret = saveProcessList();
 
224
 
 
225
  m_monitor->signal();
 
226
 
 
227
  return ret;
 
228
}
 
229
 
 
230
/* Must be called with m_processlist locked */
 
231
bool
 
232
CPCD::saveProcessList(){
 
233
  char newfile[PATH_MAX+4];
 
234
  char oldfile[PATH_MAX+4];
 
235
  char curfile[PATH_MAX];
 
236
  FILE *f;
 
237
 
 
238
  /* Create the filenames that we will use later */
 
239
  BaseString::snprintf(newfile, sizeof(newfile), "%s.new", m_procfile.c_str());
 
240
  BaseString::snprintf(oldfile, sizeof(oldfile), "%s.old", m_procfile.c_str());
 
241
  BaseString::snprintf(curfile, sizeof(curfile), "%s", m_procfile.c_str());
 
242
 
 
243
  f = fopen(newfile, "w");
 
244
 
 
245
  if(f == NULL) {
 
246
    /* XXX What should be done here? */
 
247
    logger.critical("Cannot open `%s': %s\n", newfile, strerror(errno));
 
248
    return false;
 
249
  }
 
250
 
 
251
  for(size_t i = 0; i<m_processes.size(); i++){
 
252
    m_processes[i]->print(f);
 
253
    fprintf(f, "\n");
 
254
 
 
255
    if(m_processes[i]->m_processType == TEMPORARY){
 
256
      /**
 
257
       * Interactive process should never be "restarted" on cpcd restart
 
258
       */
 
259
      continue;
 
260
    }
 
261
    
 
262
    if(m_processes[i]->m_status == RUNNING || 
 
263
       m_processes[i]->m_status == STARTING){
 
264
      fprintf(f, "start process\nid: %d\n\n", m_processes[i]->m_id);
 
265
    }
 
266
  }
 
267
  
 
268
  fclose(f);
 
269
  f = NULL;
 
270
  
 
271
  /* This will probably only work on reasonably Unix-like systems. You have
 
272
   * been warned...
 
273
   * 
 
274
   * The motivation behind all this link()ing is that the daemon might
 
275
   * crash right in the middle of updating the configuration file, and in
 
276
   * that case we want to be sure that the old file is around until we are
 
277
   * guaranteed that there is always at least one copy of either the old or
 
278
   * the new configuration file left.
 
279
   */
 
280
 
 
281
  /* Remove an old config file if it exists */
 
282
  unlink(oldfile);
 
283
 
 
284
  if(link(curfile, oldfile) != 0) /* make a backup of the running config */
 
285
    logger.error("Cannot rename '%s' -> '%s'", curfile, oldfile);
 
286
  else {
 
287
    if(unlink(curfile) != 0) { /* remove the running config file */
 
288
      logger.critical("Cannot remove file '%s'", curfile);
 
289
      return false;
 
290
    }
 
291
  }
 
292
 
 
293
  if(link(newfile, curfile) != 0) { /* put the new config file in place */
 
294
    printf("-->%d\n", __LINE__);
 
295
 
 
296
    logger.critical("Cannot rename '%s' -> '%s': %s", 
 
297
                    curfile, newfile, strerror(errno));
 
298
    return false;
 
299
  }
 
300
 
 
301
  /* XXX Ideally we would fsync() the directory here, but I'm not sure if
 
302
   * that actually works.
 
303
   */
 
304
 
 
305
  unlink(newfile); /* remove the temporary file */
 
306
  unlink(oldfile); /* remove the old file */
 
307
 
 
308
  logger.info("Process list saved as '%s'", curfile);
 
309
 
 
310
  return true;
 
311
}
 
312
 
 
313
bool
 
314
CPCD::loadProcessList(){
 
315
  BaseString secondfile;
 
316
  FILE *f;
 
317
 
 
318
  loadingProcessList = true;
 
319
 
 
320
  secondfile.assfmt("%s.new", m_procfile.c_str());
 
321
 
 
322
  /* Try to open the config file */
 
323
  f = fopen(m_procfile.c_str(), "r");
 
324
 
 
325
  /* If it did not exist, try to open the backup. See the saveProcessList()
 
326
   * method for an explanation why it is done this way.
 
327
   */
 
328
  if(f == NULL) {
 
329
    f = fopen(secondfile.c_str(), "r");
 
330
    
 
331
    if(f == NULL) {
 
332
      /* XXX What to do here? */
 
333
      logger.info("Configuration file `%s' not found",
 
334
                  m_procfile.c_str());
 
335
      logger.info("Starting with empty configuration");
 
336
      loadingProcessList = false;
 
337
      return false;
 
338
    } else {
 
339
      logger.info("Configuration file `%s' missing",
 
340
                  m_procfile.c_str());
 
341
      logger.info("Backup configuration file `%s' is used",
 
342
                  secondfile.c_str());
 
343
      /* XXX Maybe we should just rename the backup file to the official
 
344
       * name, and be done with it?
 
345
       */
 
346
    }
 
347
  }
 
348
 
 
349
  CPCDAPISession sess(f, *this);
 
350
  sess.loadFile();
 
351
  loadingProcessList = false;
 
352
 
 
353
  size_t i;
 
354
  Vector<int> temporary;
 
355
  for(i = 0; i<m_processes.size(); i++){
 
356
    Process * proc = m_processes[i];
 
357
    proc->readPid();
 
358
    if(proc->m_processType == TEMPORARY){
 
359
      temporary.push_back(proc->m_id);
 
360
    }
 
361
  }
 
362
  
 
363
  for(i = 0; i<temporary.size(); i++){
 
364
    RequestStatus rs;
 
365
    undefineProcess(&rs, temporary[i]);
 
366
  }
 
367
  
 
368
  /* Don't call notifyChanges here, as that would save the file we just
 
369
     loaded */
 
370
  m_monitor->signal();
 
371
  return true;
 
372
}
 
373
 
 
374
MutexVector<CPCD::Process *> *
 
375
CPCD::getProcessList() {
 
376
  return &m_processes;
 
377
}
 
378
 
 
379
void
 
380
CPCD::RequestStatus::err(enum RequestStatusCode status, const char *msg) {
 
381
  m_status = status;
 
382
  BaseString::snprintf(m_errorstring, sizeof(m_errorstring), "%s", msg);
 
383
}
 
384
 
 
385
#if 0
 
386
void
 
387
CPCD::sigchild(int pid){
 
388
  m_processes.lock(); 
 
389
  for(size_t i = 0; i<m_processes.size(); i++){
 
390
    if(m_processes[i].m_pid == pid){
 
391
    }
 
392
  }
 
393
  wait(pid, 0, 0);
 
394
}
 
395
#endif
 
396
 
 
397
  /** Register event subscriber */
 
398
void
 
399
CPCD::do_register(EventSubscriber * sub){
 
400
  m_subscribers.lock();
 
401
  m_subscribers.push_back(sub, false);
 
402
  m_subscribers.unlock();  
 
403
}
 
404
 
 
405
EventSubscriber*
 
406
CPCD::do_unregister(EventSubscriber * sub){
 
407
  m_subscribers.lock();
 
408
 
 
409
  for(size_t i = 0; i<m_subscribers.size(); i++){
 
410
    if(m_subscribers[i] == sub){
 
411
      m_subscribers.erase(i);
 
412
      m_subscribers.unlock();  
 
413
      return sub;
 
414
    }
 
415
  }
 
416
 
 
417
  m_subscribers.unlock();  
 
418
  return 0;
 
419
}
 
420
 
 
421
void
 
422
CPCD::report(int id, CPCEvent::EventType t){
 
423
  CPCEvent e;
 
424
  e.m_time = time(0);
 
425
  e.m_proc = id;
 
426
  e.m_type = t;
 
427
  m_subscribers.lock();
 
428
  for(size_t i = 0; i<m_subscribers.size(); i++){
 
429
    (* m_subscribers[i]).report(e);
 
430
  }
 
431
  m_subscribers.unlock();
 
432
}
 
433
 
 
434
template class MutexVector<EventSubscriber*>;