~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to storage/ndb/src/kernel/vm/mt.cpp

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
 
 
18
#include <VMSignal.hpp>
 
19
#include <kernel_types.h>
 
20
#include <Prio.hpp>
 
21
#include <SignalLoggerManager.hpp>
 
22
#include <SimulatedBlock.hpp>
 
23
#include <ErrorHandlingMacros.hpp>
 
24
#include <GlobalData.hpp>
 
25
#include <WatchDog.hpp>
 
26
#include <TransporterDefinitions.hpp>
 
27
#include "FastScheduler.hpp"
 
28
#include "mt.hpp"
 
29
#include <DebuggerNames.hpp>
 
30
#include <signaldata/StopForCrash.hpp>
 
31
#include "TransporterCallbackKernel.hpp"
 
32
#include <NdbSleep.h>
 
33
#include <portlib/ndb_prefetch.h>
 
34
 
 
35
#include "mt-asm.h"
 
36
 
 
37
inline
 
38
SimulatedBlock*
 
39
GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
 
40
{
 
41
  SimulatedBlock* b = getBlock(blockNo);
 
42
  if (b != 0 && instanceNo != 0)
 
43
    b = b->getInstance(instanceNo);
 
44
  return b;
 
45
}
 
46
 
 
47
#ifdef __GNUC__
 
48
/* Provides a small (but noticeable) speedup in benchmarks. */
 
49
#define memcpy __builtin_memcpy
 
50
#endif
 
51
 
 
52
/* size of a cacheline */
 
53
#define NDB_CL 64
 
54
 
 
55
/* Constants found by benchmarks to be reasonable values. */
 
56
 
 
57
/* Maximum number of signals to execute before sending to remote nodes. */
 
58
static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
 
59
 
 
60
/*
 
61
 * Max. signals to execute from one job buffer before considering other
 
62
 * possible stuff to do.
 
63
 */
 
64
static const Uint32 MAX_SIGNALS_PER_JB = 100;
 
65
 
 
66
/**
 
67
 * Max signals written to other thread before calling flush_jbb_write_state
 
68
 */
 
69
static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
 
70
static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
 
71
static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
 
72
 
 
73
//#define NDB_MT_LOCK_TO_CPU
 
74
 
 
75
#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
 
76
#define NUM_MAIN_THREADS 2 // except receiver
 
77
#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
 
78
 
 
79
/* If this is too small it crashes before first signal. */
 
80
#define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
 
81
 
 
82
static Uint32 num_lqh_workers = 0;
 
83
static Uint32 num_lqh_threads = 0;
 
84
static Uint32 num_threads = 0;
 
85
static Uint32 receiver_thread_no = 0;
 
86
 
 
87
#define NO_SEND_THREAD (MAX_THREADS + 1)
 
88
 
 
89
/* max signal is 32 words, 7 for signal header and 25 datawords */
 
90
#define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
 
91
 
 
92
struct mt_lock_stat
 
93
{
 
94
  const void * m_ptr;
 
95
  char * m_name;
 
96
  Uint32 m_contended_count;
 
97
  Uint32 m_spin_count;
 
98
};
 
99
static void register_lock(const void * ptr, const char * name);
 
100
static mt_lock_stat * lookup_lock(const void * ptr);
 
101
 
 
102
#if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
 
103
#define USE_FUTEX
 
104
#endif
 
105
 
 
106
#ifdef USE_FUTEX
 
107
#ifndef _GNU_SOURCE
 
108
#define _GNU_SOURCE
 
109
#endif
 
110
#include <unistd.h>
 
111
#include <sys/syscall.h>
 
112
#include <sys/types.h>
 
113
 
 
114
#define FUTEX_WAIT              0
 
115
#define FUTEX_WAKE              1
 
116
#define FUTEX_FD                2
 
117
#define FUTEX_REQUEUE           3
 
118
#define FUTEX_CMP_REQUEUE       4
 
119
#define FUTEX_WAKE_OP           5
 
120
 
 
121
static inline
 
122
int
 
123
futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
 
124
{
 
125
  return syscall(SYS_futex,
 
126
                 addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
 
127
}
 
128
 
 
129
static inline
 
130
int
 
131
futex_wake(volatile unsigned * addr)
 
132
{
 
133
  return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
 
134
}
 
135
 
 
136
struct thr_wait
 
137
{
 
138
  volatile unsigned m_futex_state;
 
139
  enum {
 
140
    FS_RUNNING = 0,
 
141
    FS_SLEEPING = 1
 
142
  };
 
143
  thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
 
144
  void init () {}
 
145
};
 
146
 
 
147
/**
 
148
 * Sleep until woken up or timeout occurs.
 
149
 *
 
150
 * Will call check_callback(check_arg) after proper synchronisation, and only
 
151
 * if that returns true will it actually sleep, else it will return
 
152
 * immediately. This is needed to avoid races with wakeup.
 
153
 *
 
154
 * Returns 'true' if it actually did sleep.
 
155
 */
 
156
static inline
 
157
bool
 
158
yield(struct thr_wait* wait, const Uint32 nsec,
 
159
      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 
160
{
 
161
  volatile unsigned * val = &wait->m_futex_state;
 
162
#ifndef NDEBUG
 
163
  int old = 
 
164
#endif
 
165
    xcng(val, thr_wait::FS_SLEEPING);
 
166
  assert(old == thr_wait::FS_RUNNING);
 
167
 
 
168
  /**
 
169
   * At this point, we need to re-check the condition that made us decide to
 
170
   * sleep, and skip sleeping if it changed..
 
171
   *
 
172
   * Otherwise, the condition may have not changed, and the thread making the
 
173
   * change have already decided not to wake us, as our state was FS_RUNNING
 
174
   * at the time.
 
175
   *
 
176
   * Also need a memory barrier to ensure this extra check is race-free.
 
177
   *   but that is already provided by xcng
 
178
   */
 
179
  bool waited = (*check_callback)(check_arg);
 
180
  if (waited)
 
181
  {
 
182
    struct timespec timeout;
 
183
    timeout.tv_sec = 0;
 
184
    timeout.tv_nsec = nsec;
 
185
    futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
 
186
  }
 
187
  xcng(val, thr_wait::FS_RUNNING);
 
188
  return waited;
 
189
}
 
190
 
 
191
static inline
 
192
int
 
193
wakeup(struct thr_wait* wait)
 
194
{
 
195
  volatile unsigned * val = &wait->m_futex_state;
 
196
  /**
 
197
   * We must ensure that any state update (new data in buffers...) are visible
 
198
   * to the other thread before we can look at the sleep state of that other
 
199
   * thread.
 
200
   */
 
201
  if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
 
202
  {
 
203
    return futex_wake(val);
 
204
  }
 
205
  return 0;
 
206
}
 
207
#else
 
208
#include <NdbMutex.h>
 
209
#include <NdbCondition.h>
 
210
 
 
211
struct thr_wait
 
212
{
 
213
  bool m_need_wakeup;
 
214
  NdbMutex *m_mutex;
 
215
  NdbCondition *m_cond;
 
216
  thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
 
217
 
 
218
  void init() {
 
219
    m_mutex = NdbMutex_Create();
 
220
    m_cond = NdbCondition_Create();
 
221
  }
 
222
};
 
223
 
 
224
static inline
 
225
bool
 
226
yield(struct thr_wait* wait, const Uint32 nsec,
 
227
      bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
 
228
{
 
229
  struct timespec end;
 
230
  NdbCondition_ComputeAbsTime(&end, nsec/1000000);
 
231
  NdbMutex_Lock(wait->m_mutex);
 
232
 
 
233
  Uint32 waits = 0;
 
234
  /* May have spurious wakeups: Always recheck condition predicate */
 
235
  while ((*check_callback)(check_arg))
 
236
  {
 
237
    wait->m_need_wakeup = true;
 
238
    waits++;
 
239
    if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
 
240
                                    wait->m_mutex, &end) == ETIMEDOUT)
 
241
    {
 
242
      wait->m_need_wakeup = false;
 
243
      break;
 
244
    }
 
245
  }
 
246
  NdbMutex_Unlock(wait->m_mutex);
 
247
  return (waits > 0);
 
248
}
 
249
 
 
250
 
 
251
static inline
 
252
int
 
253
wakeup(struct thr_wait* wait)
 
254
{
 
255
  NdbMutex_Lock(wait->m_mutex);
 
256
  // We should avoid signaling when not waiting for wakeup
 
257
  if (wait->m_need_wakeup)
 
258
  {
 
259
    wait->m_need_wakeup = false;
 
260
    NdbCondition_Signal(wait->m_cond);
 
261
  }
 
262
  NdbMutex_Unlock(wait->m_mutex);
 
263
  return 0;
 
264
}
 
265
 
 
266
#endif
 
267
 
 
268
#ifdef NDB_HAVE_XCNG
 
269
template <unsigned SZ>
 
270
struct thr_spin_lock
 
271
{
 
272
  thr_spin_lock(const char * name = 0)
 
273
  {
 
274
    m_lock = 0;
 
275
    register_lock(this, name);
 
276
  }
 
277
 
 
278
  union {
 
279
    volatile Uint32 m_lock;
 
280
    char pad[SZ];
 
281
  };
 
282
};
 
283
 
 
284
static
 
285
ATTRIBUTE_NOINLINE
 
286
void
 
287
lock_slow(void * sl, volatile unsigned * val)
 
288
{
 
289
  mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
 
290
 
 
291
loop:
 
292
  Uint32 spins = 0;
 
293
  do {
 
294
    spins++;
 
295
    cpu_pause();
 
296
  } while (* val == 1);
 
297
 
 
298
  if (unlikely(xcng(val, 1) != 0))
 
299
    goto loop;
 
300
 
 
301
  if (s)
 
302
  {
 
303
    s->m_spin_count += spins;
 
304
    Uint32 count = ++s->m_contended_count;
 
305
    Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
 
306
 
 
307
    if ((count % freq) == 0)
 
308
      printf("%s waiting for lock, contentions: %u spins: %u\n",
 
309
             s->m_name, count, s->m_spin_count);
 
310
  }
 
311
}
 
312
 
 
313
template <unsigned SZ>
 
314
static
 
315
inline
 
316
void
 
317
lock(struct thr_spin_lock<SZ>* sl)
 
318
{
 
319
  volatile unsigned* val = &sl->m_lock;
 
320
  if (likely(xcng(val, 1) == 0))
 
321
    return;
 
322
 
 
323
  lock_slow(sl, val);
 
324
}
 
325
 
 
326
template <unsigned SZ>
 
327
static
 
328
inline
 
329
void
 
330
unlock(struct thr_spin_lock<SZ>* sl)
 
331
{
 
332
  /**
 
333
   * Memory barrier here, to make sure all of our stores are visible before
 
334
   * the lock release is.
 
335
   */
 
336
  mb();
 
337
  sl->m_lock = 0;
 
338
}
 
339
 
 
340
template <unsigned SZ>
 
341
static
 
342
inline
 
343
int
 
344
trylock(struct thr_spin_lock<SZ>* sl)
 
345
{
 
346
  volatile unsigned* val = &sl->m_lock;
 
347
  return xcng(val, 1);
 
348
}
 
349
#else
 
350
#define thr_spin_lock thr_mutex
 
351
#endif
 
352
 
 
353
template <unsigned SZ>
 
354
struct thr_mutex
 
355
{
 
356
  thr_mutex(const char * name = 0) {
 
357
    NdbMutex_Init(&m_mutex);
 
358
    register_lock(this, name);
 
359
  }
 
360
 
 
361
  union {
 
362
    NdbMutex m_mutex;
 
363
    char pad[SZ];
 
364
  };
 
365
};
 
366
 
 
367
template <unsigned SZ>
 
368
static
 
369
inline
 
370
void
 
371
lock(struct thr_mutex<SZ>* sl)
 
372
{
 
373
  NdbMutex_Lock(&sl->m_mutex);
 
374
}
 
375
 
 
376
template <unsigned SZ>
 
377
static
 
378
inline
 
379
void
 
380
unlock(struct thr_mutex<SZ>* sl)
 
381
{
 
382
  NdbMutex_Unlock(&sl->m_mutex);
 
383
}
 
384
 
 
385
template <unsigned SZ>
 
386
static
 
387
inline
 
388
int
 
389
trylock(struct thr_mutex<SZ> * sl)
 
390
{
 
391
  return NdbMutex_Trylock(&sl->m_mutex);
 
392
}
 
393
 
 
394
/**
 
395
 * thr_safe_pool
 
396
 */
 
397
template<typename T>
 
398
struct thr_safe_pool
 
399
{
 
400
  thr_safe_pool(const char * name) : m_free_list(0), m_cnt(0), m_lock(name) {}
 
401
 
 
402
  T* m_free_list;
 
403
  Uint32 m_cnt;
 
404
  thr_spin_lock<NDB_CL - (sizeof(void*) + sizeof(Uint32))> m_lock;
 
405
 
 
406
  T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
 
407
    T* ret = 0;
 
408
    lock(&m_lock);
 
409
    if (m_free_list)
 
410
    {
 
411
      assert(m_cnt);
 
412
      m_cnt--;
 
413
      ret = m_free_list;
 
414
      m_free_list = ret->m_next;
 
415
      unlock(&m_lock);
 
416
    }
 
417
    else
 
418
    {
 
419
      Uint32 dummy;
 
420
      unlock(&m_lock);
 
421
      ret = reinterpret_cast<T*>
 
422
        (mm->alloc_page(rg, &dummy,
 
423
                        Ndbd_mem_manager::NDB_ZONE_ANY));
 
424
      // ToDo: How to deal with failed allocation?!?
 
425
      // I think in this case we need to start grabbing buffers kept for signal
 
426
      // trace.
 
427
    }
 
428
    return ret;
 
429
  }
 
430
 
 
431
  void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
 
432
    lock(&m_lock);
 
433
    t->m_next = m_free_list;
 
434
    m_free_list = t;
 
435
    m_cnt++;
 
436
    unlock(&m_lock);
 
437
  }
 
438
 
 
439
  void release_list(Ndbd_mem_manager *mm, Uint32 rg, 
 
440
                    T* head, T* tail, Uint32 cnt) {
 
441
    lock(&m_lock);
 
442
    tail->m_next = m_free_list;
 
443
    m_free_list = head;
 
444
    m_cnt += cnt;
 
445
    unlock(&m_lock);
 
446
  }
 
447
};
 
448
 
 
449
/**
 
450
 * thread_local_pool
 
451
 */
 
452
template<typename T>
 
453
class thread_local_pool
 
454
{
 
455
public:
 
456
  thread_local_pool(thr_safe_pool<T> *global_pool, unsigned max_free) :
 
457
    m_max_free(max_free),
 
458
    m_free(0),
 
459
    m_freelist(0),
 
460
    m_global_pool(global_pool)
 
461
  {
 
462
  }
 
463
 
 
464
  T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
 
465
    T *tmp = m_freelist;
 
466
    if (tmp)
 
467
    {
 
468
      m_freelist = tmp->m_next;
 
469
      assert(m_free > 0);
 
470
      m_free--;
 
471
    }
 
472
    else
 
473
      tmp = m_global_pool->seize(mm, rg);
 
474
 
 
475
    validate();
 
476
    return tmp;
 
477
  }
 
478
 
 
479
  void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
 
480
    unsigned free = m_free;
 
481
    if (free < m_max_free)
 
482
    {
 
483
      m_free = free + 1;
 
484
      t->m_next = m_freelist;
 
485
      m_freelist = t;
 
486
    }
 
487
    else
 
488
      m_global_pool->release(mm, rg, t);
 
489
 
 
490
    validate();
 
491
  }
 
492
 
 
493
  /**
 
494
   * Release to local pool even if it get's "too" full
 
495
   *   (wrt to m_max_free)
 
496
   */
 
497
  void release_local(T *t) {
 
498
    m_free++;
 
499
    t->m_next = m_freelist;
 
500
    m_freelist = t;
 
501
 
 
502
    validate();
 
503
  }
 
504
 
 
505
  void validate() const {
 
506
#ifdef VM_TRACE
 
507
    Uint32 cnt = 0;
 
508
    T* t = m_freelist;
 
509
    while (t)
 
510
    {
 
511
      cnt++;
 
512
      t = t->m_next;
 
513
    }
 
514
    assert(cnt == m_free);
 
515
#endif
 
516
  }
 
517
 
 
518
  /**
 
519
   * Release entries so that m_max_free is honored
 
520
   *   (likely used together with release_local)
 
521
   */
 
522
  void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
 
523
    validate();
 
524
    unsigned cnt = 0;
 
525
    unsigned free = m_free;
 
526
    Uint32 maxfree = m_max_free;
 
527
    assert(maxfree > 0);
 
528
 
 
529
    T* head = m_freelist;
 
530
    T* tail = m_freelist;
 
531
    if (free > maxfree)
 
532
    {
 
533
      cnt++;
 
534
      free--;
 
535
 
 
536
      while (free > maxfree)
 
537
      {
 
538
        cnt++;
 
539
        free--;
 
540
        tail = tail->m_next;
 
541
      } 
 
542
 
 
543
      assert(free == maxfree);
 
544
 
 
545
      m_free = free;
 
546
      m_freelist = tail->m_next;
 
547
      m_global_pool->release_list(mm, rg, head, tail, cnt);
 
548
    }
 
549
    validate();
 
550
  }
 
551
 
 
552
  void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
 
553
    validate();
 
554
    T* head = m_freelist;
 
555
    T* tail = m_freelist;
 
556
    if (tail)
 
557
    {
 
558
      unsigned cnt = 1;
 
559
      while (tail->m_next != 0)
 
560
      {
 
561
        cnt++;
 
562
        tail = tail->m_next;
 
563
      }
 
564
      m_global_pool->release_list(mm, rg, head, tail, cnt);
 
565
      m_free = 0;
 
566
      m_freelist = 0;
 
567
    }
 
568
    validate();
 
569
  }
 
570
 
 
571
  void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
 
572
 
 
573
private:
 
574
  unsigned m_max_free;
 
575
  unsigned m_free;
 
576
  T *m_freelist;
 
577
  thr_safe_pool<T> *m_global_pool;
 
578
};
 
579
 
 
580
/**
 
581
 * Signal buffers.
 
582
 *
 
583
 * Each thread job queue contains a list of these buffers with signals.
 
584
 *
 
585
 * There is an underlying assumption that the size of this structure is the
 
586
 * same as the global memory manager page size.
 
587
 */
 
588
struct thr_job_buffer // 32k
 
589
{
 
590
  static const unsigned SIZE = 8190;
 
591
 
 
592
  /*
 
593
   * Amount of signal data currently in m_data buffer.
 
594
   * Read/written by producer, read by consumer.
 
595
   */
 
596
  Uint32 m_len;
 
597
  /*
 
598
   * Whether this buffer contained prio A or prio B signals, used when dumping
 
599
   * signals from released buffers.
 
600
   */
 
601
  Uint32 m_prioa;
 
602
  union {
 
603
    Uint32 m_data[SIZE];
 
604
 
 
605
    thr_job_buffer * m_next; // For free-list
 
606
  };
 
607
};  
 
608
 
 
609
static
 
610
inline
 
611
Uint32
 
612
calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
 
613
{
 
614
  return (wi >= ri) ? wi - ri : (sz - ri) + wi;
 
615
}
 
616
 
 
617
/**
 
618
 * thr_job_queue is shared between consumer / producer. 
 
619
 *
 
620
 * The hot-spot of the thr_job_queue are the read/write indexes.
 
621
 * As they are updated and read frequently they have been placed
 
622
 * in its own thr_job_queue_head[] in order to make them fit inside a
 
623
 * single/few cache lines and thereby avoid complete L1-cache replacement
 
624
 * every time the job_queue is scanned.
 
625
 */
 
626
struct thr_job_queue_head
 
627
{
 
628
  unsigned m_read_index;  // Read/written by consumer, read by producer
 
629
  unsigned m_write_index; // Read/written by producer, read by consumer
 
630
 
 
631
  Uint32 used() const;
 
632
};
 
633
 
 
634
struct thr_job_queue
 
635
{
 
636
  static const unsigned SIZE = 31;
 
637
 
 
638
  struct thr_job_queue_head* m_head;
 
639
  struct thr_job_buffer* m_buffers[SIZE];
 
640
};
 
641
 
 
642
inline
 
643
Uint32
 
644
thr_job_queue_head::used() const
 
645
{
 
646
  return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
 
647
}
 
648
 
 
649
/*
 
650
 * Two structures tightly associated with thr_job_queue.
 
651
 *
 
652
 * There will generally be exactly one thr_jb_read_state and one
 
653
 * thr_jb_write_state associated with each thr_job_queue.
 
654
 *
 
655
 * The reason they are kept separate is to avoid unnecessary inter-CPU
 
656
 * cache line pollution. All fields shared among producer and consumer
 
657
 * threads are in thr_job_queue, thr_jb_write_state fields are only
 
658
 * accessed by the producer thread(s), and thr_jb_read_state fields are
 
659
 * only accessed by the consumer thread.
 
660
 *
 
661
 * For example, on Intel core 2 quad processors, there is a ~33%
 
662
 * penalty for two cores accessing the same 64-byte cacheline.
 
663
 */
 
664
struct thr_jb_write_state
 
665
{
 
666
  /*
 
667
   * The position to insert the next signal into the queue.
 
668
   *
 
669
   * m_write_index is the index into thr_job_queue::m_buffers[] of the buffer
 
670
   * to insert into, and m_write_pos is the index into thr_job_buffer::m_data[]
 
671
   * at which to store the next signal.
 
672
   */
 
673
  Uint32 m_write_index;
 
674
  Uint32 m_write_pos;
 
675
 
 
676
  /* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
 
677
  thr_job_buffer *m_write_buffer;
 
678
 
 
679
  /* Number of signals inserted since last flush to thr_job_queue. */
 
680
  Uint32 m_pending_signals;
 
681
 
 
682
  /* Number of signals inserted since last wakeup */
 
683
  Uint32 m_pending_signals_wakeup;
 
684
};
 
685
 
 
686
/*
 
687
 * This structure is also used when dumping signal traces, to dump executed
 
688
 * signals from the buffer(s) currently being processed.
 
689
 */
 
690
struct thr_jb_read_state
 
691
{
 
692
  /*
 
693
   * Index into thr_job_queue::m_buffers[] of the buffer that we are currently
 
694
   * executing signals from.
 
695
   */
 
696
  Uint32 m_read_index;
 
697
  /*
 
698
   * Index into m_read_buffer->m_data[] of the next signal to execute from the
 
699
   * current buffer.
 
700
   */
 
701
  Uint32 m_read_pos;
 
702
  /*
 
703
   * Thread local copy of thr_job_queue::m_buffers[m_read_index].
 
704
   */
 
705
  thr_job_buffer *m_read_buffer;
 
706
  /*
 
707
   * These are thread-local copies of thr_job_queue::m_write_index and
 
708
   * thr_job_buffer::m_len. They are read once at the start of the signal
 
709
   * execution loop and used to determine when the end of available signals is
 
710
   * reached.
 
711
   */
 
712
  Uint32 m_read_end;    // End within current thr_job_buffer. (*m_read_buffer)
 
713
 
 
714
  Uint32 m_write_index; // Last available thr_job_buffer.
 
715
 
 
716
  bool is_empty() const
 
717
  {
 
718
    assert(m_read_index != m_write_index  ||  m_read_pos <= m_read_end);
 
719
    return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
 
720
  }
 
721
};
 
722
 
 
723
/**
 
724
 * time-queue
 
725
 */
 
726
struct thr_tq
 
727
{
 
728
  static const unsigned SQ_SIZE = 512;
 
729
  static const unsigned LQ_SIZE = 512;
 
730
  static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
 
731
  
 
732
  Uint32 * m_delayed_signals[PAGES];
 
733
  Uint32 m_next_free;
 
734
  Uint32 m_next_timer;
 
735
  Uint32 m_current_time;
 
736
  Uint32 m_cnt[2];
 
737
  Uint32 m_short_queue[SQ_SIZE];
 
738
  Uint32 m_long_queue[LQ_SIZE];
 
739
};
 
740
 
 
741
/*
 
742
 * Max number of thread-local job buffers to keep before releasing to
 
743
 * global pool.
 
744
 */
 
745
#define THR_FREE_BUF_MAX 32
 
746
/* Minimum number of buffers (to ensure useful trace dumps). */
 
747
#define THR_FREE_BUF_MIN 12
 
748
/*
 
749
 * 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
 
750
 * at a time from/to global pool.
 
751
 */
 
752
#define THR_FREE_BUF_BATCH 6
 
753
 
 
754
/**
 
755
 * a page with send data
 
756
 */
 
757
struct thr_send_page
 
758
{
 
759
  static const Uint32 PGSIZE = 32768;
 
760
#if SIZEOF_CHARP == 4
 
761
  static const Uint32 HEADER_SIZE = 8;
 
762
#else
 
763
  static const Uint32 HEADER_SIZE = 12;
 
764
#endif
 
765
 
 
766
  static Uint32 max_bytes() {
 
767
    return PGSIZE - offsetof(thr_send_page, m_data);
 
768
  }
 
769
 
 
770
  /* Next page */
 
771
  thr_send_page* m_next;
 
772
 
 
773
  /* Bytes of send data available in this page. */
 
774
  Uint16 m_bytes;
 
775
 
 
776
  /* Start of unsent data */
 
777
  Uint16 m_start;
 
778
 
 
779
  /* Data; real size is to the end of one page. */
 
780
  char m_data[2];
 
781
};
 
782
 
 
783
/**
 
784
 * a linked list with thr_send_page
 
785
 */
 
786
struct thr_send_buffer
 
787
{
 
788
  thr_send_page* m_first_page;
 
789
  thr_send_page* m_last_page;
 
790
};
 
791
 
 
792
/**
 
793
 * a ring buffer with linked list of thr_send_page
 
794
 */
 
795
struct thr_send_queue
 
796
{
 
797
  unsigned m_write_index;
 
798
#if SIZEOF_CHARP == 8
 
799
  unsigned m_unused;
 
800
  thr_send_page* m_buffers[7];
 
801
  static const unsigned SIZE = 7;
 
802
#else
 
803
  thr_send_page* m_buffers[15];
 
804
  static const unsigned SIZE = 15;
 
805
#endif
 
806
};
 
807
 
 
808
struct thr_data
 
809
{
 
810
  thr_data() : m_jba_write_lock("jbalock"),
 
811
               m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
 
812
 
 
813
  thr_wait m_waiter;
 
814
  unsigned m_thr_no;
 
815
 
 
816
  /**
 
817
   * max signals to execute per JBB buffer
 
818
   */
 
819
  unsigned m_max_signals_per_jb;
 
820
 
 
821
  /**
 
822
   * max signals to execute before recomputing m_max_signals_per_jb
 
823
   */
 
824
  unsigned m_max_exec_signals;
 
825
 
 
826
  Uint64 m_time;
 
827
  struct thr_tq m_tq;
 
828
 
 
829
  /* Prio A signal incoming queue. */
 
830
  struct thr_spin_lock<64> m_jba_write_lock;
 
831
  struct thr_job_queue m_jba;
 
832
 
 
833
  struct thr_job_queue_head m_jba_head;
 
834
 
 
835
  /* Thread-local read state of prio A buffer. */
 
836
  struct thr_jb_read_state m_jba_read_state;
 
837
  /*
 
838
   * There is no m_jba_write_state, as we have multiple writers to the prio A
 
839
   * queue, so local state becomes invalid as soon as we release the lock.
 
840
   */
 
841
 
 
842
  /*
 
843
   * In m_next_buffer we keep a free buffer at all times, so that when
 
844
   * we hold the lock and find we need a new buffer, we can use this and this
 
845
   * way defer allocation to after releasing the lock.
 
846
   */
 
847
  struct thr_job_buffer* m_next_buffer;
 
848
 
 
849
  /*
 
850
   * We keep a small number of buffers in a thread-local cyclic FIFO, so that
 
851
   * we can avoid going to the global pool in most cases, and so that we have
 
852
   * recent buffers available for dumping in trace files.
 
853
   */
 
854
  struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
 
855
  /* m_first_free is the index of the entry to return next from seize(). */
 
856
  Uint32 m_first_free;
 
857
  /* m_first_unused is the first unused entry in m_free_fifo. */
 
858
  Uint32 m_first_unused;
 
859
 
 
860
  /*
 
861
   * These are the thread input queues, where other threads deliver signals
 
862
   * into.
 
863
   */
 
864
  struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
 
865
  struct thr_job_queue m_in_queue[MAX_THREADS];
 
866
  /* These are the write states of m_in_queue[self] in each thread. */
 
867
  struct thr_jb_write_state m_write_states[MAX_THREADS];
 
868
  /* These are the read states of all of our own m_in_queue[]. */
 
869
  struct thr_jb_read_state m_read_states[MAX_THREADS];
 
870
 
 
871
  /* Jam buffers for making trace files at crashes. */
 
872
  EmulatedJamBuffer m_jam;
 
873
  /* Watchdog counter for this thread. */
 
874
  Uint32 m_watchdog_counter;
 
875
  /* Signal delivery statistics. */
 
876
  Uint32 m_prioa_count;
 
877
  Uint32 m_prioa_size;
 
878
  Uint32 m_priob_count;
 
879
  Uint32 m_priob_size;
 
880
 
 
881
  /* Array of node ids with pending remote send data. */
 
882
  Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
 
883
  /* Number of node ids in m_pending_send_nodes. */
 
884
  Uint32 m_pending_send_count;
 
885
 
 
886
  /**
 
887
   * Bitmap of pending node ids with send data.
 
888
   * Used to quickly check if a node id is already in m_pending_send_nodes.
 
889
   */
 
890
  Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
 
891
 
 
892
  /* pool for send buffers */
 
893
  class thread_local_pool<thr_send_page> m_send_buffer_pool;
 
894
 
 
895
  /* Send buffer for this thread, these are not touched by any other thread */
 
896
  struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
 
897
 
 
898
  /* Block instances (main and worker) handled by this thread. */
 
899
  /* Used for sendpacked (send-at-job-buffer-end). */
 
900
  Uint32 m_instance_count;
 
901
  BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
 
902
 
 
903
  SectionSegmentPool::Cache m_sectionPoolCache;
 
904
 
 
905
  Uint32 m_cpu;
 
906
  pthread_t m_thr_id;
 
907
  NdbThread* m_thread;
 
908
};
 
909
 
 
910
struct mt_send_handle  : public TransporterSendBufferHandle
 
911
{
 
912
  struct thr_data * m_selfptr;
 
913
  mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
 
914
  virtual ~mt_send_handle() {}
 
915
 
 
916
  virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
 
917
  virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
 
918
  virtual bool forceSend(NodeId node);
 
919
};
 
920
 
 
921
struct trp_callback : public TransporterCallbackKernel
 
922
{
 
923
  trp_callback() {}
 
924
 
 
925
  /* Callback interface. */
 
926
  int checkJobBuffer();
 
927
  void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
 
928
  void lock_transporter(NodeId node);
 
929
  void unlock_transporter(NodeId node);
 
930
  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
 
931
  Uint32 bytes_sent(NodeId node, Uint32 bytes);
 
932
  bool has_data_to_send(NodeId node);
 
933
  void reset_send_buffer(NodeId node, bool should_be_empty);
 
934
};
 
935
 
 
936
extern trp_callback g_trp_callback;             // Forward declaration
 
937
extern struct thr_repository g_thr_repository;
 
938
 
 
939
#include <NdbMutex.h>
 
940
#include <NdbCondition.h>
 
941
 
 
942
struct thr_repository
 
943
{
 
944
  thr_repository()
 
945
    : m_receive_lock("recvlock"),
 
946
      m_section_lock("sectionlock"),
 
947
      m_mem_manager_lock("memmanagerlock"),
 
948
      m_jb_pool("jobbufferpool"),
 
949
      m_sb_pool("sendbufferpool")
 
950
    {}
 
951
 
 
952
  struct thr_spin_lock<64> m_receive_lock;
 
953
  struct thr_spin_lock<64> m_section_lock;
 
954
  struct thr_spin_lock<64> m_mem_manager_lock;
 
955
  struct thr_safe_pool<thr_job_buffer> m_jb_pool;
 
956
  struct thr_safe_pool<thr_send_page> m_sb_pool;
 
957
  Ndbd_mem_manager * m_mm;
 
958
  unsigned m_thread_count;
 
959
  struct thr_data m_thread[MAX_THREADS];
 
960
 
 
961
  /**
 
962
   * send buffer handling
 
963
   */
 
964
 
 
965
  /* The buffers that are to be sent */
 
966
  struct send_buffer
 
967
  {
 
968
    /**
 
969
     * lock
 
970
     */
 
971
    struct thr_spin_lock<8> m_send_lock;
 
972
 
 
973
    /**
 
974
     * pending data
 
975
     */
 
976
    struct thr_send_buffer m_buffer;
 
977
 
 
978
    /**
 
979
     * Flag used to coordinate sending to same remote node from different
 
980
     * threads.
 
981
     *
 
982
     * If two threads need to send to the same node at the same time, the
 
983
     * second thread, rather than wait for the first to finish, will just
 
984
     * set this flag, and the first thread will do an extra send when done
 
985
     * with the first.
 
986
     */
 
987
    Uint32 m_force_send;
 
988
 
 
989
    /**
 
990
     * Which thread is currently holding the m_send_lock
 
991
     */
 
992
    Uint32 m_send_thread;
 
993
 
 
994
    /**
 
995
     * bytes pending for this node
 
996
     */
 
997
    Uint32 m_bytes;
 
998
 
 
999
    /* read index(es) in thr_send_queue */
 
1000
    Uint32 m_read_index[MAX_THREADS];
 
1001
  } m_send_buffers[MAX_NTRANSPORTERS];
 
1002
 
 
1003
  /* The buffers published by threads */
 
1004
  thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
 
1005
 
 
1006
  /*
 
1007
   * These are used to synchronize during crash / trace dumps.
 
1008
   *
 
1009
   */
 
1010
  NdbMutex stop_for_crash_mutex;
 
1011
  NdbCondition stop_for_crash_cond;
 
1012
  Uint32 stopped_threads;
 
1013
};
 
1014
 
 
1015
#if 0
 
1016
static
 
1017
Uint32
 
1018
fifo_used_pages(struct thr_data* selfptr)
 
1019
{
 
1020
  return calc_fifo_used(selfptr->m_first_unused,
 
1021
                        selfptr->m_first_free,
 
1022
                        THR_FREE_BUF_MAX);
 
1023
}
 
1024
#endif
 
1025
 
 
1026
static
 
1027
void
 
1028
job_buffer_full(struct thr_data* selfptr)
 
1029
{
 
1030
  ndbout_c("job buffer full");
 
1031
  abort();
 
1032
}
 
1033
 
 
1034
static
 
1035
void
 
1036
out_of_job_buffer(struct thr_data* selfptr)
 
1037
{
 
1038
  ndbout_c("out of job buffer");
 
1039
  abort();
 
1040
}
 
1041
 
 
1042
static
 
1043
thr_job_buffer*
 
1044
seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
 
1045
{
 
1046
  thr_job_buffer* jb;
 
1047
  thr_data* selfptr = rep->m_thread + thr_no;
 
1048
  Uint32 first_free = selfptr->m_first_free;
 
1049
  Uint32 first_unused = selfptr->m_first_unused;
 
1050
 
 
1051
  /*
 
1052
   * An empty FIFO is denoted by m_first_free == m_first_unused.
 
1053
   * So we will never have a completely full FIFO array, at least one entry will
 
1054
   * always be unused. But the code is simpler as a result.
 
1055
   */
 
1056
 
 
1057
  /*
 
1058
   * We never allow the fifo to become completely empty, as we want to have
 
1059
   * a good number of signals available for trace files in case of a forced
 
1060
   * shutdown.
 
1061
   */
 
1062
  Uint32 buffers = (first_free > first_unused ?
 
1063
                    first_unused + THR_FREE_BUF_MAX - first_free :
 
1064
                    first_unused - first_free);
 
1065
  if (unlikely(buffers <= THR_FREE_BUF_MIN))
 
1066
  {
 
1067
    /*
 
1068
     * All used, allocate another batch from global pool.
 
1069
     *
 
1070
     * Put the new buffers at the head of the fifo, so as not to needlessly
 
1071
     * push out any existing buffers from the fifo (that would loose useful
 
1072
     * data for signal dumps in trace files).
 
1073
     */
 
1074
    Uint32 cnt = 0;
 
1075
    Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
 
1076
    assert(batch > 0);
 
1077
    assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
 
1078
    do {
 
1079
      jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
 
1080
      if (unlikely(jb == 0))
 
1081
      {
 
1082
        if (unlikely(cnt == 0))
 
1083
        {
 
1084
          out_of_job_buffer(selfptr);
 
1085
        }
 
1086
        break;
 
1087
      }
 
1088
      jb->m_len = 0;
 
1089
      jb->m_prioa = false;
 
1090
      first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
 
1091
      selfptr->m_free_fifo[first_free] = jb;
 
1092
      batch--;
 
1093
    } while (cnt < batch);
 
1094
    selfptr->m_first_free = first_free;
 
1095
  }
 
1096
 
 
1097
  jb= selfptr->m_free_fifo[first_free];
 
1098
  selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
 
1099
  /* Init here rather than in release_buffer() so signal dump will work. */
 
1100
  jb->m_len = 0;
 
1101
  jb->m_prioa = prioa;
 
1102
  return jb;
 
1103
}
 
1104
 
 
1105
static
 
1106
void
 
1107
release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
 
1108
{
 
1109
  struct thr_data* selfptr = rep->m_thread + thr_no;
 
1110
  Uint32 first_free = selfptr->m_first_free;
 
1111
  Uint32 first_unused = selfptr->m_first_unused;
 
1112
 
 
1113
  /*
 
1114
   * Pack near-empty signals, to get more info in the signal traces.
 
1115
   *
 
1116
   * This is not currently used, as we only release full job buffers, hence
 
1117
   * the #if 0.
 
1118
   */
 
1119
#if 0
 
1120
  Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
 
1121
  thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
 
1122
  Uint32 len1, len2;
 
1123
 
 
1124
  if (!jb->m_prioa &&
 
1125
      first_free != first_unused &&
 
1126
      !last_jb->m_prioa &&
 
1127
      (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
 
1128
      (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
 
1129
  {
 
1130
    /*
 
1131
     * The buffer being release is fairly empty, and what data it contains fit
 
1132
     * in the previously released buffer.
 
1133
     *
 
1134
     * We want to avoid too many almost-empty buffers in the free fifo, as that
 
1135
     * makes signal traces less useful due to too little data available. So in
 
1136
     * this case we move the data from the buffer to be released into the
 
1137
     * previous buffer, and place the to-be-released buffer at the head of the
 
1138
     * fifo (to be immediately reused).
 
1139
     *
 
1140
     * This is only done for prio B buffers, as we must not merge prio A and B
 
1141
     * data (or dumps would be incorrect), and prio A buffers are in any case
 
1142
     * full when released.
 
1143
     */
 
1144
    memcpy(last_jb->m_data + len1, jb->m_data, len2*sizeof(jb->m_data[0]));
 
1145
    last_jb->m_len = len1 + len2;
 
1146
    jb->m_len = 0;
 
1147
    first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
 
1148
    selfptr->m_free_fifo[first_free] = jb;
 
1149
    selfptr->m_first_free = first_free;
 
1150
  }
 
1151
  else
 
1152
#endif
 
1153
  {
 
1154
    /* Just insert at the end of the fifo. */
 
1155
    selfptr->m_free_fifo[first_unused] = jb;
 
1156
    first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
 
1157
    selfptr->m_first_unused = first_unused;
 
1158
  }
 
1159
 
 
1160
  if (unlikely(first_unused == first_free))
 
1161
  {
 
1162
    /* FIFO full, need to release to global pool. */
 
1163
    Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
 
1164
    assert(batch > 0);
 
1165
    assert(batch < THR_FREE_BUF_MAX);
 
1166
    do {
 
1167
      rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
 
1168
                             selfptr->m_free_fifo[first_free]);
 
1169
      first_free = (first_free + 1) % THR_FREE_BUF_MAX;
 
1170
      batch--;
 
1171
    } while (batch > 0);
 
1172
    selfptr->m_first_free = first_free;
 
1173
  }
 
1174
}
 
1175
 
 
1176
static
 
1177
inline
 
1178
Uint32
 
1179
scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
 
1180
{
 
1181
  Uint32 thr_no = selfptr->m_thr_no;
 
1182
  Uint32 **pages = selfptr->m_tq.m_delayed_signals;
 
1183
  Uint32 free = selfptr->m_tq.m_next_free;
 
1184
  Uint32* save = ptr;
 
1185
  for (Uint32 i = 0; i < cnt; i++, ptr++)
 
1186
  {
 
1187
    Uint32 val = * ptr;
 
1188
    if ((val & 0xFFFF) <= end)
 
1189
    {
 
1190
      Uint32 idx = val >> 16;
 
1191
      Uint32 buf = idx >> 8;
 
1192
      Uint32 pos = 32 * (idx & 0xFF);
 
1193
 
 
1194
      Uint32* page = * (pages + buf);
 
1195
 
 
1196
      const SignalHeader *s = reinterpret_cast<SignalHeader*>(page + pos);
 
1197
      const Uint32 *data = page + pos + (sizeof(*s)>>2);
 
1198
      if (0)
 
1199
        ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
 
1200
      /*
 
1201
       * ToDo: Do measurements of the frequency of these prio A timed signals.
 
1202
       *
 
1203
       * If they are frequent, we may want to optimize, as sending one prio A
 
1204
       * signal is somewhat expensive compared to sending one prio B.
 
1205
       */
 
1206
      sendprioa(thr_no, s, data,
 
1207
                data + s->theLength);
 
1208
      * (page + pos) = free;
 
1209
      free = idx;
 
1210
    }
 
1211
    else if (i > 0)
 
1212
    {
 
1213
      selfptr->m_tq.m_next_free = free;
 
1214
      memmove(save, ptr, 4 * (cnt - i));
 
1215
      return i;
 
1216
    }
 
1217
    else
 
1218
    {
 
1219
      return 0;
 
1220
    }
 
1221
  }
 
1222
  selfptr->m_tq.m_next_free = free;
 
1223
  return cnt;
 
1224
}
 
1225
 
 
1226
static
 
1227
void
 
1228
handle_time_wrap(struct thr_data* selfptr)
 
1229
{
 
1230
  Uint32 i;
 
1231
  struct thr_tq * tq = &selfptr->m_tq;
 
1232
  Uint32 cnt0 = tq->m_cnt[0];
 
1233
  Uint32 cnt1 = tq->m_cnt[1];
 
1234
  Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
 
1235
  Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
 
1236
  cnt0 -= tmp0;
 
1237
  cnt1 -= tmp1;
 
1238
  tq->m_cnt[0] = cnt0;
 
1239
  tq->m_cnt[1] = cnt1;
 
1240
  for (i = 0; i<cnt0; i++)
 
1241
  {
 
1242
    assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
 
1243
    tq->m_short_queue[i] -= 32767;
 
1244
  }
 
1245
  for (i = 0; i<cnt1; i++)
 
1246
  {
 
1247
    assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
 
1248
    tq->m_long_queue[i] -= 32767;
 
1249
  }
 
1250
}
 
1251
 
 
1252
static
 
1253
void
 
1254
scan_time_queues_impl(struct thr_data* selfptr, NDB_TICKS now)
 
1255
{
 
1256
  struct thr_tq * tq = &selfptr->m_tq;
 
1257
  NDB_TICKS last = selfptr->m_time;
 
1258
 
 
1259
  Uint32 curr = tq->m_current_time;
 
1260
  Uint32 cnt0 = tq->m_cnt[0];
 
1261
  Uint32 cnt1 = tq->m_cnt[1];
 
1262
 
 
1263
  assert(now > last);
 
1264
  Uint64 diff = now - last;
 
1265
  Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
 
1266
  Uint32 end = (curr + step);
 
1267
  if (end >= 32767)
 
1268
  {
 
1269
    handle_time_wrap(selfptr);
 
1270
    cnt0 = tq->m_cnt[0];
 
1271
    cnt1 = tq->m_cnt[1];
 
1272
    end -= 32767;
 
1273
  }
 
1274
 
 
1275
  Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
 
1276
  Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
 
1277
 
 
1278
  tq->m_current_time = end;
 
1279
  tq->m_cnt[0] = cnt0 - tmp0;
 
1280
  tq->m_cnt[1] = cnt1 - tmp1;
 
1281
  selfptr->m_time = last + step;
 
1282
}
 
1283
 
 
1284
static inline
 
1285
void
 
1286
scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
 
1287
{
 
1288
  if (selfptr->m_time != now)
 
1289
    scan_time_queues_impl(selfptr, now);
 
1290
}
 
1291
 
 
1292
static
 
1293
inline
 
1294
Uint32*
 
1295
get_free_slot(struct thr_repository* rep,
 
1296
              struct thr_data* selfptr,
 
1297
              Uint32* idxptr)
 
1298
{
 
1299
  struct thr_tq * tq = &selfptr->m_tq;
 
1300
  Uint32 idx = tq->m_next_free;
 
1301
retry:
 
1302
  Uint32 buf = idx >> 8;
 
1303
  Uint32 pos = idx & 0xFF;
 
1304
 
 
1305
  if (idx != RNIL)
 
1306
  {
 
1307
    Uint32* page = * (tq->m_delayed_signals + buf);
 
1308
    Uint32* ptr = page + (32 * pos);
 
1309
    tq->m_next_free = * ptr;
 
1310
    * idxptr = idx;
 
1311
    return ptr;
 
1312
  }
 
1313
 
 
1314
  Uint32 thr_no = selfptr->m_thr_no;
 
1315
  for (Uint32 i = 0; i<thr_tq::PAGES; i++)
 
1316
  {
 
1317
    if (tq->m_delayed_signals[i] == 0)
 
1318
    {
 
1319
      struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
 
1320
      Uint32 * page = reinterpret_cast<Uint32*>(jb);
 
1321
      tq->m_delayed_signals[i] = page;
 
1322
 
 
1323
      ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
 
1324
 
 
1325
      /**
 
1326
       * Init page
 
1327
       */
 
1328
      for (Uint32 j = 0; j<255; j ++)
 
1329
      {
 
1330
        page[j * 32] = (i << 8) + (j + 1);
 
1331
      }
 
1332
      page[255*32] = RNIL;
 
1333
      idx = (i << 8);
 
1334
      goto retry;
 
1335
    }
 
1336
  }
 
1337
  abort();
 
1338
  return NULL;
 
1339
}
 
1340
 
 
1341
void
 
1342
senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
 
1343
{
 
1344
  struct thr_repository* rep = &g_thr_repository;
 
1345
  struct thr_data * selfptr = rep->m_thread + thr_no;
 
1346
  assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
 
1347
  unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
 
1348
 
 
1349
  Uint32 max;
 
1350
  Uint32 * cntptr;
 
1351
  Uint32 * queueptr;
 
1352
 
 
1353
  Uint32 alarm = selfptr->m_tq.m_current_time + delay;
 
1354
  Uint32 nexttimer = selfptr->m_tq.m_next_timer;
 
1355
  if (delay < 100)
 
1356
  {
 
1357
    cntptr = selfptr->m_tq.m_cnt + 0;
 
1358
    queueptr = selfptr->m_tq.m_short_queue;
 
1359
    max = thr_tq::SQ_SIZE;
 
1360
  }
 
1361
  else
 
1362
  {
 
1363
    cntptr = selfptr->m_tq.m_cnt + 1;
 
1364
    queueptr = selfptr->m_tq.m_long_queue;
 
1365
    max = thr_tq::LQ_SIZE;
 
1366
  }
 
1367
 
 
1368
  Uint32 idx;
 
1369
  Uint32* ptr = get_free_slot(rep, selfptr, &idx);
 
1370
  memcpy(ptr, s, 4*siglen);
 
1371
 
 
1372
  if (0)
 
1373
    ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
 
1374
             selfptr->m_tq.m_current_time,
 
1375
             alarm,
 
1376
             getSignalName(s->theVerId_signalNumber),
 
1377
             getBlockName(refToBlock(s->theSendersBlockRef)),
 
1378
             getBlockName(s->theReceiversBlockNumber),
 
1379
             delay,
 
1380
             idx, ptr);
 
1381
 
 
1382
  Uint32 i;
 
1383
  Uint32 cnt = *cntptr;
 
1384
  Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
 
1385
 
 
1386
  * cntptr = cnt + 1;
 
1387
  selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
 
1388
 
 
1389
  if (cnt == 0)
 
1390
  {
 
1391
    queueptr[0] = newentry;
 
1392
    return;
 
1393
  }
 
1394
  else if (cnt < max)
 
1395
  {
 
1396
    for (i = 0; i<cnt; i++)
 
1397
    {
 
1398
      Uint32 save = queueptr[i];
 
1399
      if ((save & 0xFFFF) > alarm)
 
1400
      {
 
1401
        memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
 
1402
        queueptr[i] = newentry;
 
1403
        return;
 
1404
      }
 
1405
    }
 
1406
    assert(i == cnt);
 
1407
    queueptr[i] = newentry;
 
1408
    return;
 
1409
  }
 
1410
  else
 
1411
  {
 
1412
    abort();
 
1413
  }
 
1414
}
 
1415
 
 
1416
/*
 
1417
 * Flush the write state to the job queue, making any new signals available to
 
1418
 * receiving threads.
 
1419
 *
 
1420
 * Two versions:
 
1421
 *    - The general version flush_write_state_other() which may flush to
 
1422
 *      any thread, and possibly signal any waiters.
 
1423
 *    - The special version flush_write_state_self() which should only be used
 
1424
 *      to flush messages to itself.
 
1425
 *
 
1426
 * Call to these functions are encapsulated through flush_write_state
 
1427
 * which decides which of these functions to call.
 
1428
 */
 
1429
static inline
 
1430
void
 
1431
flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
 
1432
{
 
1433
  /* 
 
1434
   * Can simplify the flush_write_state when writing to myself:
 
1435
   * Simply update write references wo/ mutex, memory barrier and signaling
 
1436
   */
 
1437
  w->m_write_buffer->m_len = w->m_write_pos;
 
1438
  q_head->m_write_index = w->m_write_index;
 
1439
  w->m_pending_signals_wakeup = 0;
 
1440
  w->m_pending_signals = 0;
 
1441
}
 
1442
 
 
1443
static inline
 
1444
void
 
1445
flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
 
1446
                        thr_jb_write_state *w)
 
1447
{
 
1448
  /*
 
1449
   * Two write memory barriers here, as assigning m_len may make signal data
 
1450
   * available to other threads, and assigning m_write_index may make new
 
1451
   * buffers available.
 
1452
   *
 
1453
   * We could optimize this by only doing it as needed, and only doing it
 
1454
   * once before setting all m_len, and once before setting all m_write_index.
 
1455
   *
 
1456
   * But wmb() is a no-op anyway in x86 ...
 
1457
   */
 
1458
  wmb();
 
1459
  w->m_write_buffer->m_len = w->m_write_pos;
 
1460
  wmb();
 
1461
  q_head->m_write_index = w->m_write_index;
 
1462
 
 
1463
  w->m_pending_signals_wakeup += w->m_pending_signals;
 
1464
  w->m_pending_signals = 0;
 
1465
 
 
1466
  if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
 
1467
  {
 
1468
    w->m_pending_signals_wakeup = 0;
 
1469
    wakeup(&(dstptr->m_waiter));
 
1470
  }
 
1471
}
 
1472
 
 
1473
static inline
 
1474
void
 
1475
flush_write_state(const thr_data *selfptr, thr_data *dstptr,
 
1476
                  thr_job_queue_head *q_head, thr_jb_write_state *w)
 
1477
{
 
1478
  if (dstptr == selfptr)
 
1479
  {
 
1480
    flush_write_state_self(q_head, w);
 
1481
  }
 
1482
  else
 
1483
  {
 
1484
    flush_write_state_other(dstptr, q_head, w);
 
1485
  }
 
1486
}
 
1487
 
 
1488
 
 
1489
static
 
1490
void
 
1491
flush_jbb_write_state(thr_data *selfptr)
 
1492
{
 
1493
  Uint32 thr_count = g_thr_repository.m_thread_count;
 
1494
  Uint32 self = selfptr->m_thr_no;
 
1495
 
 
1496
  thr_jb_write_state *w = selfptr->m_write_states;
 
1497
  thr_data *thrptr = g_thr_repository.m_thread;
 
1498
  for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
 
1499
  {
 
1500
    if (w->m_pending_signals || w->m_pending_signals_wakeup)
 
1501
    {
 
1502
      w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
 
1503
      thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
 
1504
      flush_write_state(selfptr, thrptr, q_head, w);
 
1505
    }
 
1506
  }
 
1507
}
 
1508
 
 
1509
/**
 
1510
 * Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
 
1511
 * before running check_job_buffers
 
1512
 *
 
1513
 * This function returns 0 if there is space to receive this amount of
 
1514
 *   signals
 
1515
 * else 1
 
1516
 */
 
1517
static int
 
1518
check_job_buffers(struct thr_repository* rep)
 
1519
{
 
1520
  const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
 
1521
  unsigned thr_no = receiver_thread_no;
 
1522
  const thr_data *thrptr = rep->m_thread;
 
1523
  for (unsigned i = 0; i<num_threads; i++, thrptr++)
 
1524
  {
 
1525
    /**
 
1526
     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
 
1527
     *       but since the different thread can only consume
 
1528
     *       signals this means that the value returned from this
 
1529
     *       function is always conservative (i.e it can be better than
 
1530
     *       returned value, if read-index has moved but we didnt see it)
 
1531
     */
 
1532
    const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
 
1533
    unsigned ri = q_head->m_read_index;
 
1534
    unsigned wi = q_head->m_write_index;
 
1535
    unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
 
1536
    if (1 + minfree + busy >= thr_job_queue::SIZE)
 
1537
    {
 
1538
      return 1;
 
1539
    }
 
1540
  }
 
1541
 
 
1542
  return 0;
 
1543
}
 
1544
 
 
1545
/**
 
1546
 * Compute max signals that thr_no can execute wo/ risking
 
1547
 *   job-buffer-full
 
1548
 *
 
1549
 *  see-also update_sched_config
 
1550
 *
 
1551
 *
 
1552
 * 1) compute free-slots in ring-buffer from self to each thread in system
 
1553
 * 2) pick smallest value
 
1554
 * 3) compute how many signals this corresponds to
 
1555
 * 4) compute how many signals self can execute if all were to be to
 
1556
 *    the thread with the fullest ring-buffer (i.e the worst case)
 
1557
 *
 
1558
 *   Assumption: each signal may send *at most* 4 signals
 
1559
 *     - this assumption is made the same in ndbd and ndbmtd and is
 
1560
 *       mostly followed by block-code, although not it all places :-(
 
1561
 */
 
1562
static
 
1563
Uint32
 
1564
compute_max_signals_to_execute(Uint32 thr_no)
 
1565
{
 
1566
  Uint32 minfree = thr_job_queue::SIZE;
 
1567
  const struct thr_repository* rep = &g_thr_repository;
 
1568
  const thr_data *thrptr = rep->m_thread;
 
1569
 
 
1570
  for (unsigned i = 0; i<num_threads; i++, thrptr++)
 
1571
  {
 
1572
    /**
 
1573
     * NOTE: m_read_index is read wo/ lock (and updated by different thread)
 
1574
     *       but since the different thread can only consume
 
1575
     *       signals this means that the value returned from this
 
1576
     *       function is always conservative (i.e it can be better than
 
1577
     *       returned value, if read-index has moved but we didnt see it)
 
1578
     */
 
1579
    const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
 
1580
    unsigned ri = q_head->m_read_index;
 
1581
    unsigned wi = q_head->m_write_index;
 
1582
    unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
 
1583
 
 
1584
    assert(free <= thr_job_queue::SIZE);
 
1585
 
 
1586
    if (free < minfree)
 
1587
      minfree = free;
 
1588
  }
 
1589
 
 
1590
#define SAFETY 2
 
1591
 
 
1592
  if (minfree >= (1 + SAFETY))
 
1593
  {
 
1594
    return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
 
1595
  }
 
1596
  else
 
1597
  {
 
1598
    return 0;
 
1599
  }
 
1600
}
 
1601
 
 
1602
//#define NDBMT_RAND_YIELD
 
1603
#ifdef NDBMT_RAND_YIELD
 
1604
static Uint32 g_rand_yield = 0;
 
1605
static
 
1606
void
 
1607
rand_yield(Uint32 limit, void* ptr0, void * ptr1)
 
1608
{
 
1609
  return;
 
1610
  UintPtr tmp = UintPtr(ptr0) + UintPtr(ptr1);
 
1611
  Uint8* tmpptr = (Uint8*)&tmp;
 
1612
  Uint32 sum = g_rand_yield;
 
1613
  for (Uint32 i = 0; i<sizeof(tmp); i++)
 
1614
    sum = 33 * sum + tmpptr[i];
 
1615
 
 
1616
  if ((sum % 100) < limit)
 
1617
  {
 
1618
    g_rand_yield++;
 
1619
    sched_yield();
 
1620
  }
 
1621
}
 
1622
#else
 
1623
static inline void rand_yield(Uint32 limit, void* ptr0, void * ptr1) {}
 
1624
#endif
 
1625
 
 
1626
 
 
1627
 
 
1628
void
 
1629
trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
 
1630
{
 
1631
  SignalT<3> signalT;
 
1632
  Signal &signal = * new (&signalT) Signal(0);
 
1633
  memset(&signal.header, 0, sizeof(signal.header));
 
1634
 
 
1635
  signal.header.theLength = 3;
 
1636
  signal.header.theSendersSignalId = 0;
 
1637
  signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
 
1638
  signal.theData[0] = NDB_LE_SendBytesStatistic;
 
1639
  signal.theData[1] = nodeId;
 
1640
  signal.theData[2] = (Uint32)(bytes/count);
 
1641
  signal.header.theVerId_signalNumber = GSN_EVENT_REP;
 
1642
  signal.header.theReceiversBlockNumber = CMVMI;
 
1643
  sendlocal(g_thr_repository.m_send_buffers[nodeId].m_send_thread,
 
1644
            &signalT.header, signalT.theData, NULL);
 
1645
}
 
1646
 
 
1647
/**
 
1648
 * To lock during connect/disconnect, we take both the send lock for the node
 
1649
 * (to protect performSend(), and the global receive lock (to protect
 
1650
 * performReceive()). By having two locks, we avoid contention between the
 
1651
 * common send and receive operations.
 
1652
 *
 
1653
 * We can have contention between connect/disconnect of one transporter and
 
1654
 * receive for the others. But the transporter code should try to keep this
 
1655
 * lock only briefly, ie. only to set state to DISCONNECTING / socket fd to
 
1656
 * NDB_INVALID_SOCKET, not for the actual close() syscall.
 
1657
 */
 
1658
void
 
1659
trp_callback::lock_transporter(NodeId node)
 
1660
{
 
1661
  struct thr_repository* rep = &g_thr_repository;
 
1662
  /**
 
1663
   * Note: take the send lock _first_, so that we will not hold the receive
 
1664
   * lock while blocking on the send lock.
 
1665
   *
 
1666
   * The reverse case, blocking send lock for one transporter while waiting
 
1667
   * for receive lock, is not a problem, as the transporter being blocked is
 
1668
   * in any case disconnecting/connecting at this point in time, and sends are
 
1669
   * non-waiting (so we will not block sending on other transporters).
 
1670
   */
 
1671
  lock(&rep->m_send_buffers[node].m_send_lock);
 
1672
  lock(&rep->m_receive_lock);
 
1673
}
 
1674
 
 
1675
void
 
1676
trp_callback::unlock_transporter(NodeId node)
 
1677
{
 
1678
  struct thr_repository* rep = &g_thr_repository;
 
1679
  unlock(&rep->m_receive_lock);
 
1680
  unlock(&rep->m_send_buffers[node].m_send_lock);
 
1681
}
 
1682
 
 
1683
int
 
1684
trp_callback::checkJobBuffer()
 
1685
{
 
1686
  struct thr_repository* rep = &g_thr_repository;
 
1687
  if (unlikely(check_job_buffers(rep)))
 
1688
  {
 
1689
    do 
 
1690
    {
 
1691
      /**
 
1692
       * theoretically (or when we do single threaded by using ndbmtd with
 
1693
       * all in same thread) we should execute signals here...to 
 
1694
       * prevent dead-lock, but...with current ndbmtd only CMVMI runs in
 
1695
       * this thread, and other thread is waiting for CMVMI
 
1696
       * except for QMGR open/close connection, but that is not
 
1697
       * (i think) sufficient to create a deadlock
 
1698
       */
 
1699
 
 
1700
      /** FIXME:
 
1701
       *  On a CMT chip where #CPU >= #NDB-threads sched_yield() is
 
1702
       *  effectively a NOOP as there will normally be an idle CPU available
 
1703
       *  to immediately resume thread execution.
 
1704
       *  On a Niagara chip this may severely impact performance as the CPUs
 
1705
       *  are virtualized by timemultiplexing the physical core.
 
1706
       *  The thread should really be 'parked' on
 
1707
       *  a condition to free its execution resources.
 
1708
       */
 
1709
//    usleep(a-few-usec);  /* A micro-sleep would likely have been better... */
 
1710
#if defined HAVE_SCHED_YIELD
 
1711
      sched_yield();
 
1712
#elif defined _WIN32
 
1713
      SwitchToThread();
 
1714
#else
 
1715
      NdbSleep_MilliSleep(0);
 
1716
#endif
 
1717
 
 
1718
    } while (check_job_buffers(rep));
 
1719
  }
 
1720
 
 
1721
  return 0;
 
1722
}
 
1723
 
 
1724
/**
 
1725
 * Link all send-buffer-pages into *one*
 
1726
 *   single linked list of buffers
 
1727
 *
 
1728
 * TODO: This is not completly fair,
 
1729
 *       it would be better to get one entry from each thr_send_queue
 
1730
 *       per thread instead (until empty)
 
1731
 */
 
1732
static
 
1733
Uint32
 
1734
link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
 
1735
{
 
1736
  Uint32 ri[MAX_THREADS];
 
1737
  Uint32 wi[MAX_THREADS];
 
1738
  thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
 
1739
  for (unsigned thr = 0; thr < num_threads; thr++)
 
1740
  {
 
1741
    ri[thr] = sb->m_read_index[thr];
 
1742
    wi[thr] = src[thr].m_write_index;
 
1743
  }
 
1744
 
 
1745
  Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
 
1746
  thr_send_page* sentinel_page = new (&sentinel[0]) thr_send_page;
 
1747
  sentinel_page->m_next = 0;
 
1748
 
 
1749
  struct thr_send_buffer tmp;
 
1750
  tmp.m_first_page = sentinel_page;
 
1751
  tmp.m_last_page = sentinel_page;
 
1752
 
 
1753
  Uint32 bytes = 0;
 
1754
  for (unsigned thr = 0; thr < num_threads; thr++, src++)
 
1755
  {
 
1756
    Uint32 r = ri[thr];
 
1757
    Uint32 w = wi[thr];
 
1758
    if (r != w)
 
1759
    {
 
1760
      rmb();
 
1761
      while (r != w)
 
1762
      {
 
1763
        thr_send_page * p = src->m_buffers[r];
 
1764
        assert(p->m_start == 0);
 
1765
        bytes += p->m_bytes;
 
1766
        tmp.m_last_page->m_next = p;
 
1767
        while (p->m_next != 0)
 
1768
        {
 
1769
          p = p->m_next;
 
1770
          assert(p->m_start == 0);
 
1771
          bytes += p->m_bytes;
 
1772
        }
 
1773
        tmp.m_last_page = p;
 
1774
        assert(tmp.m_last_page != 0);
 
1775
        r = (r + 1) % thr_send_queue::SIZE;
 
1776
      }
 
1777
      sb->m_read_index[thr] = r;
 
1778
    }
 
1779
  }
 
1780
 
 
1781
  if (bytes)
 
1782
  {
 
1783
    if (sb->m_bytes)
 
1784
    {
 
1785
      assert(sb->m_buffer.m_first_page != 0);
 
1786
      assert(sb->m_buffer.m_last_page != 0);
 
1787
      sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
 
1788
      sb->m_buffer.m_last_page = tmp.m_last_page;
 
1789
    }
 
1790
    else
 
1791
    {
 
1792
      assert(sb->m_buffer.m_first_page == 0);
 
1793
      assert(sb->m_buffer.m_last_page == 0);
 
1794
      sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
 
1795
      sb->m_buffer.m_last_page = tmp.m_last_page;
 
1796
    }
 
1797
    sb->m_bytes += bytes;
 
1798
  }
 
1799
 
 
1800
  return sb->m_bytes;
 
1801
}
 
1802
 
 
1803
Uint32
 
1804
trp_callback::get_bytes_to_send_iovec(NodeId node,
 
1805
                                      struct iovec *dst, Uint32 max)
 
1806
{
 
1807
  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
 
1808
 
 
1809
  Uint32 bytes = link_thread_send_buffers(sb, node);
 
1810
  if (max == 0 || bytes == 0)
 
1811
    return 0;
 
1812
 
 
1813
  /**
 
1814
   * Process linked-list and put into iovecs
 
1815
   * TODO: Here we would also pack stuff to get better utilization
 
1816
   */
 
1817
  Uint32 tot = 0;
 
1818
  Uint32 pos = 0;
 
1819
  thr_send_page * p = sb->m_buffer.m_first_page;
 
1820
  do {
 
1821
    dst[pos].iov_len = p->m_bytes;
 
1822
    dst[pos].iov_base = p->m_data + p->m_start;
 
1823
    assert(p->m_start + p->m_bytes <= p->max_bytes());
 
1824
    tot += p->m_bytes;
 
1825
    pos++;
 
1826
    max--;
 
1827
    p = p->m_next;
 
1828
  } while (max && p != 0);
 
1829
 
 
1830
  return pos;
 
1831
}
 
1832
 
 
1833
static
 
1834
void
 
1835
release_list(thread_local_pool<thr_send_page>* pool,
 
1836
             thr_send_page* head, thr_send_page * tail)
 
1837
{
 
1838
  while (head != tail)
 
1839
  {
 
1840
    thr_send_page * tmp = head;
 
1841
    head = head->m_next;
 
1842
    pool->release_local(tmp);
 
1843
  }
 
1844
  pool->release_local(tail);
 
1845
}
 
1846
 
 
1847
 
 
1848
static
 
1849
Uint32
 
1850
bytes_sent(thread_local_pool<thr_send_page>* pool,
 
1851
           thr_repository::send_buffer* sb, Uint32 bytes)
 
1852
{
 
1853
  assert(bytes);
 
1854
 
 
1855
  Uint32 remain = bytes;
 
1856
  thr_send_page * prev = 0;
 
1857
  thr_send_page * curr = sb->m_buffer.m_first_page;
 
1858
 
 
1859
  assert(sb->m_bytes >= bytes);
 
1860
  while (remain && remain >= curr->m_bytes)
 
1861
  {
 
1862
    remain -= curr->m_bytes;
 
1863
    prev = curr;
 
1864
    curr = curr->m_next;
 
1865
  }
 
1866
 
 
1867
  Uint32 total_bytes = sb->m_bytes;
 
1868
  if (total_bytes == bytes)
 
1869
  {
 
1870
    /**
 
1871
     * Every thing was released
 
1872
     */
 
1873
    release_list(pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
 
1874
    sb->m_buffer.m_first_page = 0;
 
1875
    sb->m_buffer.m_last_page = 0;
 
1876
    sb->m_bytes = 0;
 
1877
    return 0;
 
1878
  }
 
1879
  else if (remain)
 
1880
  {
 
1881
    /**
 
1882
     * Half a page was released
 
1883
     */
 
1884
    curr->m_start += remain;
 
1885
    assert(curr->m_bytes > remain);
 
1886
    curr->m_bytes -= remain;
 
1887
    if (prev)
 
1888
    {
 
1889
      release_list(pool, sb->m_buffer.m_first_page, prev);
 
1890
    }
 
1891
  }
 
1892
  else
 
1893
  {
 
1894
    /**
 
1895
     * X full page(s) was released
 
1896
     */
 
1897
    if (prev)
 
1898
    {
 
1899
      release_list(pool, sb->m_buffer.m_first_page, prev);
 
1900
    }
 
1901
    else
 
1902
    {
 
1903
      pool->release_local(sb->m_buffer.m_first_page);
 
1904
    }
 
1905
  }
 
1906
 
 
1907
  sb->m_buffer.m_first_page = curr;
 
1908
  assert(sb->m_bytes > bytes);
 
1909
  sb->m_bytes -= bytes;
 
1910
  return sb->m_bytes;
 
1911
}
 
1912
 
 
1913
Uint32
 
1914
trp_callback::bytes_sent(NodeId node, Uint32 bytes)
 
1915
{
 
1916
  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
 
1917
  Uint32 thr_no = sb->m_send_thread;
 
1918
  assert(thr_no != NO_SEND_THREAD);
 
1919
  return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
 
1920
                      sb, bytes);
 
1921
}
 
1922
 
 
1923
bool
 
1924
trp_callback::has_data_to_send(NodeId node)
 
1925
{
 
1926
  return true;
 
1927
 
 
1928
  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
 
1929
  Uint32 thr_no = sb->m_send_thread;
 
1930
  assert(thr_no != NO_SEND_THREAD);
 
1931
  assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0));
 
1932
  if (sb->m_bytes > 0 || sb->m_force_send)
 
1933
    return true;
 
1934
 
 
1935
  thr_send_queue * dst = g_thr_repository.m_thread_send_buffers[node]+thr_no;
 
1936
 
 
1937
  return sb->m_read_index[thr_no] != dst->m_write_index;
 
1938
}
 
1939
 
 
1940
void
 
1941
trp_callback::reset_send_buffer(NodeId node, bool should_be_empty)
 
1942
{
 
1943
  struct thr_repository *rep = &g_thr_repository;
 
1944
  thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
 
1945
  struct iovec v[32];
 
1946
 
 
1947
  thread_local_pool<thr_send_page> pool(&rep->m_sb_pool, 0);
 
1948
 
 
1949
  lock(&sb->m_send_lock);
 
1950
 
 
1951
  for (;;)
 
1952
  {
 
1953
    Uint32 count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
 
1954
    if (count == 0)
 
1955
      break;
 
1956
    assert(!should_be_empty); // Got data when it should be empty
 
1957
    int bytes = 0;
 
1958
    for (Uint32 i = 0; i < count; i++)
 
1959
      bytes += v[i].iov_len;
 
1960
 
 
1961
    ::bytes_sent(&pool, sb, bytes);
 
1962
  }
 
1963
 
 
1964
  unlock(&sb->m_send_lock);
 
1965
 
 
1966
  pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
 
1967
}
 
1968
 
 
1969
static inline
 
1970
void
 
1971
register_pending_send(thr_data *selfptr, Uint32 nodeId)
 
1972
{
 
1973
  /* Mark that this node has pending send data. */
 
1974
  if (!selfptr->m_pending_send_mask.get(nodeId))
 
1975
  {
 
1976
    selfptr->m_pending_send_mask.set(nodeId, 1);
 
1977
    Uint32 i = selfptr->m_pending_send_count;
 
1978
    selfptr->m_pending_send_nodes[i] = nodeId;
 
1979
    selfptr->m_pending_send_count = i + 1;
 
1980
  }
 
1981
}
 
1982
 
 
1983
/**
 
1984
 * publish thread-locally prepared send-buffer
 
1985
 */
 
1986
static
 
1987
void
 
1988
flush_send_buffer(thr_data* selfptr, Uint32 node)
 
1989
{
 
1990
  Uint32 thr_no = selfptr->m_thr_no;
 
1991
  thr_send_buffer * src = selfptr->m_send_buffers + node;
 
1992
  thr_repository* rep = &g_thr_repository;
 
1993
 
 
1994
  if (src->m_first_page == 0)
 
1995
  {
 
1996
    return;
 
1997
  }
 
1998
  assert(src->m_last_page != 0);
 
1999
 
 
2000
  thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
 
2001
  thr_repository::send_buffer* sb = rep->m_send_buffers+node;
 
2002
 
 
2003
  Uint32 wi = dst->m_write_index;
 
2004
  Uint32 next = (wi + 1) % thr_send_queue::SIZE;
 
2005
  Uint32 ri = sb->m_read_index[thr_no];
 
2006
 
 
2007
  if (unlikely(next == ri))
 
2008
  {
 
2009
    lock(&sb->m_send_lock);
 
2010
    link_thread_send_buffers(sb, node);
 
2011
    unlock(&sb->m_send_lock);
 
2012
  }
 
2013
 
 
2014
  dst->m_buffers[wi] = src->m_first_page;
 
2015
  wmb();
 
2016
  dst->m_write_index = next;
 
2017
 
 
2018
  src->m_first_page = 0;
 
2019
  src->m_last_page = 0;
 
2020
}
 
2021
 
 
2022
/**
 
2023
 * This is used in case send buffer gets full, to force an emergency send,
 
2024
 * hopefully freeing up some buffer space for the next signal.
 
2025
 */
 
2026
bool
 
2027
mt_send_handle::forceSend(NodeId nodeId)
 
2028
{
 
2029
  struct thr_repository *rep = &g_thr_repository;
 
2030
  struct thr_data *selfptr = m_selfptr;
 
2031
  struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
 
2032
 
 
2033
  do
 
2034
  {
 
2035
    sb->m_force_send = 0;
 
2036
    lock(&sb->m_send_lock);
 
2037
    sb->m_send_thread = selfptr->m_thr_no;
 
2038
    globalTransporterRegistry.performSend(nodeId);
 
2039
    sb->m_send_thread = NO_SEND_THREAD;
 
2040
    unlock(&sb->m_send_lock);
 
2041
  } while (sb->m_force_send);
 
2042
 
 
2043
  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
 
2044
 
 
2045
  return true;
 
2046
}
 
2047
 
 
2048
/**
 
2049
 * try sending data
 
2050
 */
 
2051
static
 
2052
void
 
2053
try_send(thr_data * selfptr, Uint32 node)
 
2054
{
 
2055
  struct thr_repository *rep = &g_thr_repository;
 
2056
  struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
 
2057
 
 
2058
  do
 
2059
  {
 
2060
    if (trylock(&sb->m_send_lock) != 0)
 
2061
    {
 
2062
      return;
 
2063
    }
 
2064
 
 
2065
    sb->m_force_send = 0;
 
2066
    mb();
 
2067
 
 
2068
    sb->m_send_thread = selfptr->m_thr_no;
 
2069
    globalTransporterRegistry.performSend(node);
 
2070
    sb->m_send_thread = NO_SEND_THREAD;
 
2071
    unlock(&sb->m_send_lock);
 
2072
  } while (sb->m_force_send);
 
2073
 
 
2074
  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
 
2075
}
 
2076
 
 
2077
/**
 
2078
 * Flush send buffers and append them to dst. nodes send queue
 
2079
 *
 
2080
 * Flushed buffer contents are piggybacked when another thread
 
2081
 * do_send() to the same dst. node. This makes it possible to have
 
2082
 * more data included in each message, and thereby reduces total
 
2083
 * #messages handled by the OS which really impacts performance!
 
2084
 */
 
2085
static
 
2086
void
 
2087
do_flush(struct thr_data* selfptr)
 
2088
{
 
2089
  Uint32 i;
 
2090
  Uint32 count = selfptr->m_pending_send_count;
 
2091
  Uint8 *nodes = selfptr->m_pending_send_nodes;
 
2092
 
 
2093
  for (i = 0; i < count; i++)
 
2094
  {
 
2095
    flush_send_buffer(selfptr, nodes[i]);
 
2096
  }
 
2097
}
 
2098
 
 
2099
/**
 
2100
 * Send any pending data to remote nodes.
 
2101
 *
 
2102
 * If MUST_SEND is false, will only try to lock the send lock, but if it would
 
2103
 * block, that node is skipped, to be tried again next time round.
 
2104
 *
 
2105
 * If MUST_SEND is true, will always take the lock, waiting on it if needed.
 
2106
 *
 
2107
 * The list of pending nodes to send to is thread-local, but the per-node send
 
2108
 * buffer is shared by all threads. Thus we might skip a node for which
 
2109
 * another thread has pending send data, and we might send pending data also
 
2110
 * for another thread without clearing the node from the pending list of that
 
2111
 * other thread (but we will never loose signals due to this).
 
2112
 */
 
2113
static
 
2114
Uint32
 
2115
do_send(struct thr_data* selfptr, bool must_send)
 
2116
{
 
2117
  Uint32 i;
 
2118
  Uint32 count = selfptr->m_pending_send_count;
 
2119
  Uint8 *nodes = selfptr->m_pending_send_nodes;
 
2120
  struct thr_repository* rep = &g_thr_repository;
 
2121
 
 
2122
  if (count == 0)
 
2123
  {
 
2124
    return 0; // send-buffers empty
 
2125
  }
 
2126
 
 
2127
  /* Clear the pending list. */
 
2128
  selfptr->m_pending_send_mask.clear();
 
2129
  selfptr->m_pending_send_count = 0;
 
2130
 
 
2131
  for (i = 0; i < count; i++)
 
2132
  {
 
2133
    Uint32 node = nodes[i];
 
2134
    selfptr->m_watchdog_counter = 6;
 
2135
 
 
2136
    flush_send_buffer(selfptr, node);
 
2137
 
 
2138
    thr_repository::send_buffer * sb = rep->m_send_buffers + node;
 
2139
 
 
2140
    /**
 
2141
     * If we must send now, set the force_send flag.
 
2142
     *
 
2143
     * This will ensure that if we do not get the send lock, the thread
 
2144
     * holding the lock will try sending again for us when it has released
 
2145
     * the lock.
 
2146
     *
 
2147
     * The lock/unlock pair works as a memory barrier to ensure that the
 
2148
     * flag update is flushed to the other thread.
 
2149
     */
 
2150
    if (must_send)
 
2151
    {
 
2152
      sb->m_force_send = 1;
 
2153
    }
 
2154
 
 
2155
    do
 
2156
    {
 
2157
      if (trylock(&sb->m_send_lock) != 0)
 
2158
      {
 
2159
        if (!must_send)
 
2160
        {
 
2161
          /**
 
2162
           * Not doing this node now, re-add to pending list.
 
2163
           *
 
2164
           * As we only add from the start of an empty list, we are safe from
 
2165
           * overwriting the list while we are iterating over it.
 
2166
           */
 
2167
          register_pending_send(selfptr, node);
 
2168
        }
 
2169
        else
 
2170
        {
 
2171
          /* Other thread will send for us as we set m_force_send. */
 
2172
        }
 
2173
        break;
 
2174
      }
 
2175
 
 
2176
      /**
 
2177
       * Now clear the flag, and start sending all data available to this node.
 
2178
       *
 
2179
       * Put a memory barrier here, so that if another thread tries to grab
 
2180
       * the send lock but fails due to us holding it here, we either
 
2181
       * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
 
2182
       * 2) We clear here the flag just set by the other thread, but then we
 
2183
       * will (thanks to mb()) be able to see and send all of the data already
 
2184
       * in the first send iteration.
 
2185
       */
 
2186
      sb->m_force_send = 0;
 
2187
      mb();
 
2188
 
 
2189
      /**
 
2190
       * Set m_send_thr so that our transporter callback can know which thread
 
2191
       * holds the send lock for this remote node.
 
2192
       */
 
2193
      sb->m_send_thread = selfptr->m_thr_no;
 
2194
      int res = globalTransporterRegistry.performSend(node);
 
2195
      sb->m_send_thread = NO_SEND_THREAD;
 
2196
      unlock(&sb->m_send_lock);
 
2197
      if (res)
 
2198
      {
 
2199
        register_pending_send(selfptr, node);
 
2200
      }
 
2201
    } while (sb->m_force_send);
 
2202
  }
 
2203
 
 
2204
  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
 
2205
 
 
2206
  return selfptr->m_pending_send_count;
 
2207
}
 
2208
 
 
2209
Uint32 *
 
2210
mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
 
2211
{
 
2212
  struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
 
2213
  thr_send_page * p = b->m_last_page;
 
2214
  if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
 
2215
  {
 
2216
    return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
 
2217
  }
 
2218
  else if (p != 0)
 
2219
  {
 
2220
    // TODO: maybe dont always flush on page-boundary ???
 
2221
    flush_send_buffer(m_selfptr, node);
 
2222
    try_send(m_selfptr, node);
 
2223
  }
 
2224
 
 
2225
  if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm,
 
2226
                                               RG_TRANSPORTER_BUFFERS)) != 0)
 
2227
  {
 
2228
    p->m_bytes = 0;
 
2229
    p->m_start = 0;
 
2230
    p->m_next = 0;
 
2231
    b->m_first_page = b->m_last_page = p;
 
2232
    return (Uint32*)p->m_data;
 
2233
  }
 
2234
  return 0;
 
2235
}
 
2236
 
 
2237
Uint32
 
2238
mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
 
2239
{
 
2240
  struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
 
2241
  thr_send_page * p = b->m_last_page;
 
2242
  p->m_bytes += lenBytes;
 
2243
  return p->m_bytes;
 
2244
}
 
2245
 
 
2246
/*
 
2247
 * Insert a signal in a job queue.
 
2248
 *
 
2249
 * The signal is not visible to consumers yet after return from this function,
 
2250
 * only recorded in the thr_jb_write_state. It is necessary to first call
 
2251
 * flush_write_state() for this.
 
2252
 *
 
2253
 * The new_buffer is a job buffer to use if the current one gets full. If used,
 
2254
 * we return true, indicating that the caller should allocate a new one for
 
2255
 * the next call. (This is done to allow to insert under lock, but do the
 
2256
 * allocation outside the lock).
 
2257
 */
 
2258
static inline
 
2259
bool
 
2260
insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
 
2261
              const SignalHeader* sh, const Uint32 *data,
 
2262
              const Uint32 secPtr[3], thr_job_buffer *new_buffer)
 
2263
{
 
2264
  Uint32 write_pos = w->m_write_pos;
 
2265
  Uint32 datalen = sh->theLength;
 
2266
  assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
 
2267
  memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
 
2268
  write_pos += (sizeof(*sh) >> 2);
 
2269
  memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
 
2270
  write_pos += datalen;
 
2271
  const Uint32 *p= secPtr;
 
2272
  for (Uint32 i = 0; i < sh->m_noOfSections; i++)
 
2273
    w->m_write_buffer->m_data[write_pos++] = *p++;
 
2274
  w->m_pending_signals++;
 
2275
 
 
2276
#if SIZEOF_CHARP == 8
 
2277
  /* Align to 8-byte boundary, to ensure aligned copies. */
 
2278
  write_pos= (write_pos+1) & ~((Uint32)1);
 
2279
#endif
 
2280
 
 
2281
  /*
 
2282
   * We make sure that there is always room for at least one signal in the
 
2283
   * current buffer in the queue, so one insert is always possible without
 
2284
   * adding a new buffer.
 
2285
   */
 
2286
  if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
 
2287
  {
 
2288
    w->m_write_pos = write_pos;
 
2289
    return false;
 
2290
  }
 
2291
  else
 
2292
  {
 
2293
    /*
 
2294
     * Need a write memory barrier here, as this might make signal data visible
 
2295
     * to other threads.
 
2296
     *
 
2297
     * ToDo: We actually only need the wmb() here if we already make this
 
2298
     * buffer visible to the other thread. So we might optimize it a bit. But
 
2299
     * wmb() is a no-op on x86 anyway...
 
2300
     */
 
2301
    wmb();
 
2302
    w->m_write_buffer->m_len = write_pos;
 
2303
    Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
 
2304
 
 
2305
    /**
 
2306
     * Full job buffer is fatal.
 
2307
     *
 
2308
     * ToDo: should we wait for it to become non-full? There is no guarantee
 
2309
     * that this will actually happen...
 
2310
     *
 
2311
     * Or alternatively, ndbrequire() ?
 
2312
     */
 
2313
    if (unlikely(write_index == q->m_head->m_read_index))
 
2314
    {
 
2315
      job_buffer_full(0);
 
2316
    }
 
2317
    new_buffer->m_len = 0;
 
2318
    new_buffer->m_prioa = prioa;
 
2319
    q->m_buffers[write_index] = new_buffer;
 
2320
    w->m_write_index = write_index;
 
2321
    w->m_write_pos = 0;
 
2322
    w->m_write_buffer = new_buffer;
 
2323
    return true;                // Buffer new_buffer used
 
2324
  }
 
2325
 
 
2326
  return false;                 // Buffer new_buffer not used
 
2327
}
 
2328
 
 
2329
static
 
2330
void
 
2331
read_jbb_state(thr_data *selfptr, Uint32 count)
 
2332
{
 
2333
 
 
2334
  thr_jb_read_state *r = selfptr->m_read_states;
 
2335
  const thr_job_queue *q = selfptr->m_in_queue;
 
2336
  for (Uint32 i = 0; i < count; i++,r++,q++)
 
2337
  {
 
2338
    Uint32 read_index = r->m_read_index;
 
2339
 
 
2340
    /**
 
2341
     * Optimization: Only reload when possibly empty.
 
2342
     * Avoid cache reload of shared thr_job_queue_head
 
2343
     */
 
2344
    if (r->m_write_index == read_index)
 
2345
    {
 
2346
      r->m_write_index = q->m_head->m_write_index;
 
2347
      read_barrier_depends();
 
2348
      r->m_read_end = q->m_buffers[read_index]->m_len;
 
2349
    }
 
2350
  }
 
2351
}
 
2352
 
 
2353
static
 
2354
bool
 
2355
read_jba_state(thr_data *selfptr)
 
2356
{
 
2357
  thr_jb_read_state *r = &(selfptr->m_jba_read_state);
 
2358
  r->m_write_index = selfptr->m_jba_head.m_write_index;
 
2359
  read_barrier_depends();
 
2360
  r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
 
2361
  return r->is_empty();
 
2362
}
 
2363
 
 
2364
/* Check all job queues, return true only if all are empty. */
 
2365
static bool
 
2366
check_queues_empty(thr_data *selfptr)
 
2367
{
 
2368
  Uint32 thr_count = g_thr_repository.m_thread_count;
 
2369
  bool empty = read_jba_state(selfptr);
 
2370
  if (!empty)
 
2371
    return false;
 
2372
 
 
2373
  read_jbb_state(selfptr, thr_count);
 
2374
  const thr_jb_read_state *r = selfptr->m_read_states;
 
2375
  for (Uint32 i = 0; i < thr_count; i++,r++)
 
2376
  {
 
2377
    if (!r->is_empty())
 
2378
      return false;
 
2379
  }
 
2380
  return true;
 
2381
}
 
2382
 
 
2383
/*
 
2384
 * Execute at most MAX_SIGNALS signals from one job queue, updating local read
 
2385
 * state as appropriate.
 
2386
 *
 
2387
 * Returns number of signals actually executed.
 
2388
 */
 
2389
static
 
2390
Uint32
 
2391
execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
 
2392
                Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
 
2393
{
 
2394
  Uint32 num_signals;
 
2395
  Uint32 read_index = r->m_read_index;
 
2396
  Uint32 write_index = r->m_write_index;
 
2397
  Uint32 read_pos = r->m_read_pos;
 
2398
  Uint32 read_end = r->m_read_end;
 
2399
  Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
 
2400
 
 
2401
  if (read_index == write_index && read_pos >= read_end)
 
2402
    return 0;          // empty read_state
 
2403
 
 
2404
  thr_job_buffer *read_buffer = r->m_read_buffer;
 
2405
 
 
2406
  for (num_signals = 0; num_signals < max_signals; num_signals++)
 
2407
  {
 
2408
    while (read_pos >= read_end)
 
2409
    {
 
2410
      if (read_index == write_index)
 
2411
      {
 
2412
        /* No more available now. */
 
2413
        return num_signals;
 
2414
      }
 
2415
      else
 
2416
      {
 
2417
        /* Move to next buffer. */
 
2418
        read_index = (read_index + 1) % thr_job_queue::SIZE;
 
2419
        release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
 
2420
        read_buffer = q->m_buffers[read_index];
 
2421
        read_pos = 0;
 
2422
        read_end = read_buffer->m_len;
 
2423
        /* Update thread-local read state. */
 
2424
        r->m_read_index = q->m_head->m_read_index = read_index;
 
2425
        r->m_read_buffer = read_buffer;
 
2426
        r->m_read_pos = read_pos;
 
2427
        r->m_read_end = read_end;
 
2428
      }
 
2429
    }
 
2430
 
 
2431
    /*
 
2432
     * These pre-fetching were found using OProfile to reduce cache misses.
 
2433
     * (Though on Intel Core 2, they do not give much speedup, as apparently
 
2434
     * the hardware prefetcher is already doing a fairly good job).
 
2435
     */
 
2436
    NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
 
2437
    NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
 
2438
 
 
2439
    /* Now execute the signal. */
 
2440
    SignalHeader* s =
 
2441
      reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
 
2442
    Uint32 seccnt = s->m_noOfSections;
 
2443
    Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
 
2444
    if(siglen>16)
 
2445
    {
 
2446
      NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
 
2447
    }
 
2448
    Uint32 bno = blockToMain(s->theReceiversBlockNumber);
 
2449
    Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
 
2450
    SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
 
2451
    assert(block != 0);
 
2452
 
 
2453
    Uint32 gsn = s->theVerId_signalNumber;
 
2454
    *watchDogCounter = 1;
 
2455
    /* Must update original buffer so signal dump will see it. */
 
2456
    s->theSignalId = (*signalIdCounter)++;
 
2457
    memcpy(&sig->header, s, 4*siglen);
 
2458
    sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
 
2459
    sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
 
2460
    sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
 
2461
 
 
2462
    read_pos += siglen + seccnt;
 
2463
#if SIZEOF_CHARP == 8
 
2464
    /* Handle 8-byte alignment. */
 
2465
    read_pos = (read_pos + 1) & ~((Uint32)1);
 
2466
#endif
 
2467
 
 
2468
    /* Update just before execute so signal dump can know how far we are. */
 
2469
    r->m_read_pos = read_pos;
 
2470
 
 
2471
#ifdef VM_TRACE
 
2472
    if (globalData.testOn)
 
2473
    { //wl4391_todo segments
 
2474
      SegmentedSectionPtr ptr[3];
 
2475
      ptr[0].i = sig->m_sectionPtrI[0];
 
2476
      ptr[1].i = sig->m_sectionPtrI[1];
 
2477
      ptr[2].i = sig->m_sectionPtrI[2];
 
2478
      ::getSections(seccnt, ptr);
 
2479
      globalSignalLoggers.executeSignal(*s,
 
2480
                                        0,
 
2481
                                        &sig->theData[0], 
 
2482
                                        globalData.ownId,
 
2483
                                        ptr, seccnt);
 
2484
    }
 
2485
#endif
 
2486
 
 
2487
    block->executeFunction(gsn, sig);
 
2488
  }
 
2489
 
 
2490
  return num_signals;
 
2491
}
 
2492
 
 
2493
static
 
2494
Uint32
 
2495
run_job_buffers(thr_data *selfptr, Signal *sig, Uint32 *signalIdCounter)
 
2496
{
 
2497
  Uint32 thr_count = g_thr_repository.m_thread_count;
 
2498
  Uint32 signal_count = 0;
 
2499
  Uint32 perjb = selfptr->m_max_signals_per_jb;
 
2500
 
 
2501
  read_jbb_state(selfptr, thr_count);
 
2502
  /*
 
2503
   * A load memory barrier to ensure that we see any prio A signal sent later
 
2504
   * than loaded prio B signals.
 
2505
   */
 
2506
  rmb();
 
2507
 
 
2508
  thr_job_queue *queue = selfptr->m_in_queue;
 
2509
  thr_jb_read_state *read_state = selfptr->m_read_states;
 
2510
  for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
 
2511
       send_thr_no++,queue++,read_state++)
 
2512
  {
 
2513
    /* Read the prio A state often, to avoid starvation of prio A. */
 
2514
    bool jba_empty = read_jba_state(selfptr);
 
2515
    if (!jba_empty)
 
2516
    {
 
2517
      static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
 
2518
      signal_count += execute_signals(selfptr, &(selfptr->m_jba),
 
2519
                                      &(selfptr->m_jba_read_state), sig,
 
2520
                                      max_prioA, signalIdCounter);
 
2521
    }
 
2522
 
 
2523
    /* Now execute prio B signals from one thread. */
 
2524
    signal_count += execute_signals(selfptr, queue, read_state,
 
2525
                                    sig, perjb, signalIdCounter);
 
2526
  }
 
2527
 
 
2528
  return signal_count;
 
2529
}
 
2530
 
 
2531
struct thr_map_entry {
 
2532
  enum { NULL_THR_NO = 0xFF };
 
2533
  Uint8 thr_no;
 
2534
  thr_map_entry() : thr_no(NULL_THR_NO) {}
 
2535
};
 
2536
 
 
2537
static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
 
2538
 
 
2539
static inline Uint32
 
2540
block2ThreadId(Uint32 block, Uint32 instance)
 
2541
{
 
2542
  assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
 
2543
  Uint32 index = block - MIN_BLOCK_NO;
 
2544
  assert(instance < MAX_BLOCK_INSTANCES);
 
2545
  const thr_map_entry& entry = thr_map[index][instance];
 
2546
  assert(entry.thr_no < num_threads);
 
2547
  return entry.thr_no;
 
2548
}
 
2549
 
 
2550
void
 
2551
add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
 
2552
{
 
2553
  assert(main == blockToMain(main));
 
2554
  Uint32 index = main - MIN_BLOCK_NO;
 
2555
  assert(index < NO_OF_BLOCKS);
 
2556
  assert(instance < MAX_BLOCK_INSTANCES);
 
2557
 
 
2558
  SimulatedBlock* b = globalData.getBlock(main, instance);
 
2559
  require(b != 0);
 
2560
 
 
2561
  /* Block number including instance. */
 
2562
  Uint32 block = numberToBlock(main, instance);
 
2563
 
 
2564
  require(thr_no < num_threads);
 
2565
  struct thr_repository* rep = &g_thr_repository;
 
2566
  thr_data* thr_ptr = rep->m_thread + thr_no;
 
2567
 
 
2568
  /* Add to list. */
 
2569
  {
 
2570
    Uint32 i;
 
2571
    for (i = 0; i < thr_ptr->m_instance_count; i++)
 
2572
      require(thr_ptr->m_instance_list[i] != block);
 
2573
  }
 
2574
  require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
 
2575
  thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
 
2576
 
 
2577
  SimulatedBlock::ThreadContext ctx;
 
2578
  ctx.threadId = thr_no;
 
2579
  ctx.jamBuffer = &thr_ptr->m_jam;
 
2580
  ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
 
2581
  ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
 
2582
  b->assignToThread(ctx);
 
2583
 
 
2584
  /* Create entry mapping block to thread. */
 
2585
  thr_map_entry& entry = thr_map[index][instance];
 
2586
  require(entry.thr_no == thr_map_entry::NULL_THR_NO);
 
2587
  entry.thr_no = thr_no;
 
2588
}
 
2589
 
 
2590
/* Static assignment of main instances (before first signal). */
 
2591
void
 
2592
add_main_thr_map()
 
2593
{
 
2594
  /* Keep mt-classic assignments in MT LQH. */
 
2595
  const Uint32 thr_GLOBAL = 0;
 
2596
  const Uint32 thr_LOCAL = 1;
 
2597
  const Uint32 thr_RECEIVER = receiver_thread_no;
 
2598
 
 
2599
  add_thr_map(BACKUP, 0, thr_LOCAL);
 
2600
  add_thr_map(DBTC, 0, thr_GLOBAL);
 
2601
  add_thr_map(DBDIH, 0, thr_GLOBAL);
 
2602
  add_thr_map(DBLQH, 0, thr_LOCAL);
 
2603
  add_thr_map(DBACC, 0, thr_LOCAL);
 
2604
  add_thr_map(DBTUP, 0, thr_LOCAL);
 
2605
  add_thr_map(DBDICT, 0, thr_GLOBAL);
 
2606
  add_thr_map(NDBCNTR, 0, thr_GLOBAL);
 
2607
  add_thr_map(QMGR, 0, thr_GLOBAL);
 
2608
  add_thr_map(NDBFS, 0, thr_GLOBAL);
 
2609
  add_thr_map(CMVMI, 0, thr_RECEIVER);
 
2610
  add_thr_map(TRIX, 0, thr_GLOBAL);
 
2611
  add_thr_map(DBUTIL, 0, thr_GLOBAL);
 
2612
  add_thr_map(SUMA, 0, thr_LOCAL);
 
2613
  add_thr_map(DBTUX, 0, thr_LOCAL);
 
2614
  add_thr_map(TSMAN, 0, thr_LOCAL);
 
2615
  add_thr_map(LGMAN, 0, thr_LOCAL);
 
2616
  add_thr_map(PGMAN, 0, thr_LOCAL);
 
2617
  add_thr_map(RESTORE, 0, thr_LOCAL);
 
2618
  add_thr_map(DBINFO, 0, thr_LOCAL);
 
2619
  add_thr_map(DBSPJ, 0, thr_GLOBAL);
 
2620
}
 
2621
 
 
2622
/* Workers added by LocalProxy (before first signal). */
 
2623
void
 
2624
add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
 
2625
{
 
2626
  require(instance != 0);
 
2627
  Uint32 i = instance - 1;
 
2628
  Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
 
2629
  add_thr_map(block, instance, thr_no);
 
2630
}
 
2631
 
 
2632
/* Extra workers run`in proxy thread. */
 
2633
void
 
2634
add_extra_worker_thr_map(Uint32 block, Uint32 instance)
 
2635
{
 
2636
  require(instance != 0);
 
2637
  Uint32 thr_no = block2ThreadId(block, 0);
 
2638
  add_thr_map(block, instance, thr_no);
 
2639
}
 
2640
 
 
2641
/**
 
2642
 * create the duplicate entries needed so that
 
2643
 *   sender doesnt need to know how many instances there
 
2644
 *   actually are in this node...
 
2645
 *
 
2646
 * if only 1 instance...then duplicate that for all slots
 
2647
 * else assume instance 0 is proxy...and duplicate workers (modulo)
 
2648
 *
 
2649
 * NOTE: extra pgman worker is instance 5
 
2650
 */
 
2651
void
 
2652
finalize_thr_map()
 
2653
{
 
2654
  for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
 
2655
  {
 
2656
    Uint32 bno = b + MIN_BLOCK_NO;
 
2657
    Uint32 cnt = 0;
 
2658
    while (cnt < MAX_BLOCK_INSTANCES &&
 
2659
           thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
 
2660
      cnt++;
 
2661
 
 
2662
    if (cnt != MAX_BLOCK_INSTANCES)
 
2663
    {
 
2664
      SimulatedBlock * main = globalData.getBlock(bno, 0);
 
2665
      for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
 
2666
      {
 
2667
        Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
 
2668
        if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
 
2669
        {
 
2670
          thr_map[b][i] = thr_map[b][dup];
 
2671
          main->addInstance(globalData.getBlock(bno, dup), i);
 
2672
        }
 
2673
        else
 
2674
        {
 
2675
          /**
 
2676
           * extra pgman instance
 
2677
           */
 
2678
          require(bno == PGMAN);
 
2679
        }
 
2680
      }
 
2681
    }
 
2682
  }
 
2683
}
 
2684
 
 
2685
static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
 
2686
                              Uint32 b_count, Uint32 b_size)
 
2687
{
 
2688
  SignalT<6> sT;
 
2689
  Signal *s= new (&sT) Signal(0);
 
2690
 
 
2691
  memset(&s->header, 0, sizeof(s->header));
 
2692
  s->header.theLength = 6;
 
2693
  s->header.theSendersSignalId = 0;
 
2694
  s->header.theSendersBlockRef = numberToRef(0, 0);
 
2695
  s->header.theVerId_signalNumber = GSN_EVENT_REP;
 
2696
  s->header.theReceiversBlockNumber = CMVMI;
 
2697
  s->theData[0] = NDB_LE_MTSignalStatistics;
 
2698
  s->theData[1] = self;
 
2699
  s->theData[2] = a_count;
 
2700
  s->theData[3] = a_size;
 
2701
  s->theData[4] = b_count;
 
2702
  s->theData[5] = b_size;
 
2703
  /* ToDo: need this really be prio A like in old code? */
 
2704
  sendlocal(self, &s->header, s->theData,
 
2705
            NULL);
 
2706
}
 
2707
 
 
2708
static inline void
 
2709
update_sched_stats(thr_data *selfptr)
 
2710
{
 
2711
  if(selfptr->m_prioa_count + selfptr->m_priob_count >= 2000000)
 
2712
  {
 
2713
    reportSignalStats(selfptr->m_thr_no,
 
2714
                      selfptr->m_prioa_count,
 
2715
                      selfptr->m_prioa_size,
 
2716
                      selfptr->m_priob_count,
 
2717
                      selfptr->m_priob_size);
 
2718
    selfptr->m_prioa_count = 0;
 
2719
    selfptr->m_prioa_size = 0;
 
2720
    selfptr->m_priob_count = 0;
 
2721
    selfptr->m_priob_size = 0;
 
2722
 
 
2723
#if 0
 
2724
    Uint32 thr_no = selfptr->m_thr_no;
 
2725
    ndbout_c("--- %u fifo: %u jba: %u global: %u",
 
2726
             thr_no,
 
2727
             fifo_used_pages(selfptr),
 
2728
             selfptr->m_jba_head.used(),
 
2729
             g_thr_repository.m_free_list.m_cnt);
 
2730
    for (Uint32 i = 0; i<num_threads; i++)
 
2731
    {
 
2732
      ndbout_c("  %u-%u : %u",
 
2733
               thr_no, i, selfptr->m_in_queue_head[i].used());
 
2734
    }
 
2735
#endif
 
2736
  }
 
2737
}
 
2738
 
 
2739
static void
 
2740
init_thread(thr_data *selfptr)
 
2741
{
 
2742
  selfptr->m_waiter.init();
 
2743
  selfptr->m_jam.theEmulatedJamIndex = 0;
 
2744
  selfptr->m_jam.theEmulatedJamBlockNumber = 0;
 
2745
  bzero(selfptr->m_jam.theEmulatedJam, sizeof(selfptr->m_jam.theEmulatedJam));
 
2746
  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
 
2747
  NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
 
2748
 
 
2749
  unsigned thr_no = selfptr->m_thr_no;
 
2750
  globalEmulatorData.theWatchDog->
 
2751
    registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
 
2752
  {
 
2753
    while(selfptr->m_thread == 0)
 
2754
      NdbSleep_MilliSleep(30);
 
2755
  }
 
2756
 
 
2757
  THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
 
2758
  BaseString tmp;
 
2759
  tmp.appfmt("thr: %u ", thr_no);
 
2760
 
 
2761
  int tid = NdbThread_GetTid(selfptr->m_thread);
 
2762
  if (tid != -1)
 
2763
  {
 
2764
    tmp.appfmt("tid: %u ", tid);
 
2765
  }
 
2766
 
 
2767
  conf.appendInfo(tmp,
 
2768
                  selfptr->m_instance_list, selfptr->m_instance_count);
 
2769
  int res = conf.do_bind(selfptr->m_thread,
 
2770
                         selfptr->m_instance_list, selfptr->m_instance_count);
 
2771
  if (res < 0)
 
2772
  {
 
2773
    tmp.appfmt("err: %d ", -res);
 
2774
  }
 
2775
  else if (res > 0)
 
2776
  {
 
2777
    tmp.appfmt("OK ");
 
2778
  }
 
2779
 
 
2780
  selfptr->m_thr_id = pthread_self();
 
2781
 
 
2782
  for (Uint32 i = 0; i < selfptr->m_instance_count; i++) 
 
2783
  {
 
2784
    BlockReference block = selfptr->m_instance_list[i];
 
2785
    Uint32 main = blockToMain(block);
 
2786
    Uint32 instance = blockToInstance(block);
 
2787
    tmp.appfmt("%s(%u) ", getBlockName(main), instance);
 
2788
  }
 
2789
  printf("%s\n", tmp.c_str());
 
2790
  fflush(stdout);
 
2791
}
 
2792
 
 
2793
/**
 
2794
 * Align signal buffer for better cache performance.
 
2795
 * Also skew it a litte for each thread to avoid cache pollution.
 
2796
 */
 
2797
#define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
 
2798
static Signal *
 
2799
aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
 
2800
{
 
2801
  UintPtr sigtmp= (UintPtr)signal_buf;
 
2802
  sigtmp= (sigtmp+63) & (~(UintPtr)63);
 
2803
  sigtmp+= thr_no*256;
 
2804
  return (Signal *)sigtmp;
 
2805
}
 
2806
 
 
2807
Uint32 receiverThreadId;
 
2808
 
 
2809
/*
 
2810
 * We only do receive in thread 2, no other threads do receive.
 
2811
 *
 
2812
 * As part of the receive loop, we also periodically call update_connections()
 
2813
 * (this way we are similar to single-threaded ndbd).
 
2814
 *
 
2815
 * The CMVMI block (and no other blocks) run in the same thread as this
 
2816
 * receive loop; this way we avoid races between update_connections() and
 
2817
 * CMVMI calls into the transporters.
 
2818
 *
 
2819
 * Note that with this setup, local signals to CMVMI cannot wake up the thread
 
2820
 * if it is sleeping on the receive sockets. Thus CMVMI local signal processing
 
2821
 * can be (slightly) delayed, however CMVMI is not really performance critical
 
2822
 * (hopefully).
 
2823
 */
 
2824
extern "C"
 
2825
void *
 
2826
mt_receiver_thread_main(void *thr_arg)
 
2827
{
 
2828
  unsigned char signal_buf[SIGBUF_SIZE];
 
2829
  Signal *signal;
 
2830
  struct thr_repository* rep = &g_thr_repository;
 
2831
  struct thr_data* selfptr = (struct thr_data *)thr_arg;
 
2832
  unsigned thr_no = selfptr->m_thr_no;
 
2833
  Uint32& watchDogCounter = selfptr->m_watchdog_counter;
 
2834
  Uint32 thrSignalId = 0;
 
2835
  bool has_received = false;
 
2836
 
 
2837
  init_thread(selfptr);
 
2838
  receiverThreadId = thr_no;
 
2839
  signal = aligned_signal(signal_buf, thr_no);
 
2840
 
 
2841
  while (globalData.theRestartFlag != perform_stop)
 
2842
  { 
 
2843
    static int cnt = 0;
 
2844
 
 
2845
    update_sched_stats(selfptr);
 
2846
 
 
2847
    if (cnt == 0)
 
2848
    {
 
2849
      watchDogCounter = 5;
 
2850
      globalTransporterRegistry.update_connections();
 
2851
    }
 
2852
    cnt = (cnt + 1) & 15;
 
2853
 
 
2854
    watchDogCounter = 2;
 
2855
 
 
2856
    NDB_TICKS now = NdbTick_CurrentMillisecond();
 
2857
    scan_time_queues(selfptr, now);
 
2858
 
 
2859
    Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
 
2860
 
 
2861
    if (sum || has_received)
 
2862
    {
 
2863
      watchDogCounter = 6;
 
2864
      flush_jbb_write_state(selfptr);
 
2865
    }
 
2866
 
 
2867
    do_send(selfptr, TRUE);
 
2868
 
 
2869
    watchDogCounter = 7;
 
2870
 
 
2871
    has_received = false;
 
2872
    if (globalTransporterRegistry.pollReceive(1))
 
2873
    {
 
2874
      if (check_job_buffers(rep) == 0)
 
2875
      {
 
2876
        watchDogCounter = 8;
 
2877
        lock(&rep->m_receive_lock);
 
2878
        globalTransporterRegistry.performReceive();
 
2879
        unlock(&rep->m_receive_lock);
 
2880
        has_received = true;
 
2881
      }
 
2882
    }
 
2883
  }
 
2884
 
 
2885
  globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
 
2886
  return NULL;                  // Return value not currently used
 
2887
}
 
2888
 
 
2889
static
 
2890
inline
 
2891
void
 
2892
sendpacked(struct thr_data* thr_ptr, Signal* signal)
 
2893
{
 
2894
  Uint32 i;
 
2895
  for (i = 0; i < thr_ptr->m_instance_count; i++)
 
2896
  {
 
2897
    BlockReference block = thr_ptr->m_instance_list[i];
 
2898
    Uint32 main = blockToMain(block);
 
2899
    Uint32 instance = blockToInstance(block);
 
2900
    SimulatedBlock* b = globalData.getBlock(main, instance);
 
2901
    // wl4391_todo remove useless assert
 
2902
    assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
 
2903
    /* b->send_at_job_buffer_end(); */
 
2904
    b->executeFunction(GSN_SEND_PACKED, signal);
 
2905
  }
 
2906
}
 
2907
 
 
2908
/**
 
2909
 * check if out-queues of selfptr is full
 
2910
 * return true is so
 
2911
 */
 
2912
static bool
 
2913
check_job_buffer_full(thr_data *selfptr)
 
2914
{
 
2915
  Uint32 thr_no = selfptr->m_thr_no;
 
2916
  Uint32 tmp = compute_max_signals_to_execute(thr_no);
 
2917
#if 0
 
2918
  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
 
2919
 
 
2920
  if (perjb == 0)
 
2921
  {
 
2922
    return true;
 
2923
  }
 
2924
 
 
2925
  return false;
 
2926
#else
 
2927
  if (tmp < g_thr_repository.m_thread_count)
 
2928
    return true;
 
2929
  return false;
 
2930
#endif
 
2931
}
 
2932
 
 
2933
/**
 
2934
 * update_sched_config
 
2935
 *
 
2936
 *   In order to prevent "job-buffer-full", i.e
 
2937
 *     that one thread(T1) produces so much signals to another thread(T2)
 
2938
 *     so that the ring-buffer from T1 to T2 gets full
 
2939
 *     the mainlop have 2 "config" variables
 
2940
 *   - m_max_exec_signals
 
2941
 *     This is the *total* no of signals T1 can execute before calling
 
2942
 *     this method again
 
2943
 *   - m_max_signals_per_jb
 
2944
 *     This is the max no of signals T1 can execute from each other thread
 
2945
 *     in system
 
2946
 *
 
2947
 *   Assumption: each signal may send *at most* 4 signals
 
2948
 *     - this assumption is made the same in ndbd and ndbmtd and is
 
2949
 *       mostly followed by block-code, although not it all places :-(
 
2950
 *
 
2951
 *   This function return true, if it it slept
 
2952
 *     (i.e that it concluded that it could not execute *any* signals, wo/
 
2953
 *      risking job-buffer-full)
 
2954
 */
 
2955
static
 
2956
bool
 
2957
update_sched_config(struct thr_data* selfptr, Uint32 pending_send)
 
2958
{
 
2959
  Uint32 sleeploop = 0;
 
2960
  Uint32 thr_no = selfptr->m_thr_no;
 
2961
loop:
 
2962
  Uint32 tmp = compute_max_signals_to_execute(thr_no);
 
2963
  Uint32 perjb = tmp / g_thr_repository.m_thread_count;
 
2964
 
 
2965
  if (perjb > MAX_SIGNALS_PER_JB)
 
2966
    perjb = MAX_SIGNALS_PER_JB;
 
2967
 
 
2968
  selfptr->m_max_exec_signals = tmp;
 
2969
  selfptr->m_max_signals_per_jb = perjb;
 
2970
 
 
2971
  if (unlikely(perjb == 0))
 
2972
  {
 
2973
    sleeploop++;
 
2974
    if (sleeploop == 10)
 
2975
    {
 
2976
      /**
 
2977
       * we've slept for 10ms...try running anyway
 
2978
       */
 
2979
      selfptr->m_max_signals_per_jb = 1;
 
2980
      ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
 
2981
      return true;
 
2982
    }
 
2983
 
 
2984
    if (pending_send)
 
2985
    {
 
2986
      /* About to sleep, _must_ send now. */
 
2987
      pending_send = do_send(selfptr, TRUE);
 
2988
    }
 
2989
 
 
2990
    const Uint32 wait = 1000000;    /* 1 ms */
 
2991
    yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
 
2992
    goto loop;
 
2993
  }
 
2994
 
 
2995
  return sleeploop > 0;
 
2996
}
 
2997
 
 
2998
extern "C"
 
2999
void *
 
3000
mt_job_thread_main(void *thr_arg)
 
3001
{
 
3002
  unsigned char signal_buf[SIGBUF_SIZE];
 
3003
  Signal *signal;
 
3004
  const Uint32 nowait = 10 * 1000000;    /* 10 ms */
 
3005
  Uint32 thrSignalId = 0;
 
3006
 
 
3007
  struct thr_data* selfptr = (struct thr_data *)thr_arg;
 
3008
  init_thread(selfptr);
 
3009
  Uint32& watchDogCounter = selfptr->m_watchdog_counter;
 
3010
 
 
3011
  unsigned thr_no = selfptr->m_thr_no;
 
3012
  signal = aligned_signal(signal_buf, thr_no);
 
3013
 
 
3014
  /* Avoid false watchdog alarms caused by race condition. */
 
3015
  watchDogCounter = 1;
 
3016
 
 
3017
  Uint32 pending_send = 0;
 
3018
  Uint32 send_sum = 0;
 
3019
  int loops = 0;
 
3020
  int maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
 
3021
  NDB_TICKS now = selfptr->m_time;
 
3022
 
 
3023
  while (globalData.theRestartFlag != perform_stop)
 
3024
  { 
 
3025
    loops++;
 
3026
    update_sched_stats(selfptr);
 
3027
 
 
3028
    watchDogCounter = 2;
 
3029
    scan_time_queues(selfptr, now);
 
3030
 
 
3031
    Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
 
3032
    
 
3033
    watchDogCounter = 1;
 
3034
    signal->header.m_noOfSections = 0; /* valgrind */
 
3035
    sendpacked(selfptr, signal);
 
3036
 
 
3037
    if (sum)
 
3038
    {
 
3039
      watchDogCounter = 6;
 
3040
      flush_jbb_write_state(selfptr);
 
3041
      send_sum += sum;
 
3042
 
 
3043
      if (send_sum > MAX_SIGNALS_BEFORE_SEND)
 
3044
      {
 
3045
        /* Try to send, but skip for now in case of lock contention. */
 
3046
        pending_send = do_send(selfptr, FALSE);
 
3047
        send_sum = 0;
 
3048
      }
 
3049
      else
 
3050
      {
 
3051
        /* Send buffers append to send queues to dst. nodes. */
 
3052
        do_flush(selfptr);
 
3053
      }
 
3054
    }
 
3055
    else
 
3056
    {
 
3057
      /* No signals processed, prepare to sleep to wait for more */
 
3058
      if (pending_send || send_sum > 0)
 
3059
      {
 
3060
        /* About to sleep, _must_ send now. */
 
3061
        pending_send = do_send(selfptr, TRUE);
 
3062
        send_sum = 0;
 
3063
      }
 
3064
 
 
3065
      if (pending_send == 0)
 
3066
      {
 
3067
        bool waited = yield(&selfptr->m_waiter, nowait, check_queues_empty,
 
3068
                            selfptr);
 
3069
        if (waited)
 
3070
        {
 
3071
          /* Update current time after sleeping */
 
3072
          now = NdbTick_CurrentMillisecond();
 
3073
          loops = 0;
 
3074
        }
 
3075
      }
 
3076
    }
 
3077
 
 
3078
    /**
 
3079
     * Check if we executed enough signals,
 
3080
     *   and if so recompute how many signals to execute
 
3081
     */
 
3082
    if (sum >= selfptr->m_max_exec_signals)
 
3083
    {
 
3084
      if (update_sched_config(selfptr, pending_send))
 
3085
      {
 
3086
        /* Update current time after sleeping */
 
3087
        now = NdbTick_CurrentMillisecond();
 
3088
        loops = 0;
 
3089
      }
 
3090
    }
 
3091
    else
 
3092
    {
 
3093
      selfptr->m_max_exec_signals -= sum;
 
3094
    }
 
3095
 
 
3096
    /**
 
3097
     * Adaptive reading freq. of systeme time every time 1ms
 
3098
     * is likely to have passed
 
3099
     */
 
3100
    if (loops > maxloops)
 
3101
    {
 
3102
      now = NdbTick_CurrentMillisecond();
 
3103
      Uint64 diff = now - selfptr->m_time;
 
3104
 
 
3105
      /* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
 
3106
      if (diff < 1)
 
3107
        maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
 
3108
      else if (diff > 1 && maxloops > 1)
 
3109
        maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
 
3110
 
 
3111
      loops = 0;
 
3112
    }
 
3113
  }
 
3114
 
 
3115
  globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
 
3116
  return NULL;                  // Return value not currently used
 
3117
}
 
3118
 
 
3119
void
 
3120
sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
 
3121
          const Uint32 secPtr[3])
 
3122
{
 
3123
  Uint32 block = blockToMain(s->theReceiversBlockNumber);
 
3124
  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
 
3125
 
 
3126
  /*
 
3127
   * Max number of signals to put into job buffer before flushing the buffer
 
3128
   * to the other thread.
 
3129
   * This parameter found to be reasonable by benchmarking.
 
3130
   */
 
3131
  Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no) ? 
 
3132
    MAX_SIGNALS_BEFORE_FLUSH_RECEIVER : 
 
3133
    MAX_SIGNALS_BEFORE_FLUSH_OTHER;
 
3134
 
 
3135
  Uint32 dst = block2ThreadId(block, instance);
 
3136
  struct thr_repository* rep = &g_thr_repository;
 
3137
  struct thr_data * selfptr = rep->m_thread + self;
 
3138
  assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
 
3139
  struct thr_data * dstptr = rep->m_thread + dst;
 
3140
 
 
3141
  selfptr->m_priob_count++;
 
3142
  Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
 
3143
  selfptr->m_priob_size += siglen;
 
3144
 
 
3145
  thr_job_queue *q = dstptr->m_in_queue + self;
 
3146
  thr_jb_write_state *w = selfptr->m_write_states + dst;
 
3147
  if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
 
3148
  {
 
3149
    selfptr->m_next_buffer = seize_buffer(rep, self, false);
 
3150
  }
 
3151
  if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
 
3152
    flush_write_state(selfptr, dstptr, q->m_head, w);
 
3153
}
 
3154
 
 
3155
void
 
3156
sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
 
3157
          const Uint32 secPtr[3])
 
3158
{
 
3159
  Uint32 block = blockToMain(s->theReceiversBlockNumber);
 
3160
  Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
 
3161
 
 
3162
  Uint32 dst = block2ThreadId(block, instance);
 
3163
  struct thr_repository* rep = &g_thr_repository;
 
3164
  struct thr_data *selfptr = rep->m_thread + self;
 
3165
  assert(s->theVerId_signalNumber == GSN_START_ORD ||
 
3166
         pthread_equal(selfptr->m_thr_id, pthread_self()));
 
3167
  struct thr_data *dstptr = rep->m_thread + dst;
 
3168
 
 
3169
  selfptr->m_prioa_count++;
 
3170
  Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
 
3171
  selfptr->m_prioa_size += siglen;  
 
3172
 
 
3173
  thr_job_queue *q = &(dstptr->m_jba);
 
3174
  thr_jb_write_state w;
 
3175
 
 
3176
  lock(&dstptr->m_jba_write_lock);
 
3177
 
 
3178
  Uint32 index = q->m_head->m_write_index;
 
3179
  w.m_write_index = index;
 
3180
  thr_job_buffer *buffer = q->m_buffers[index];
 
3181
  w.m_write_buffer = buffer;
 
3182
  w.m_write_pos = buffer->m_len;
 
3183
  w.m_pending_signals = 0;
 
3184
  w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
 
3185
  bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
 
3186
                                selfptr->m_next_buffer);
 
3187
  flush_write_state(selfptr, dstptr, q->m_head, &w);
 
3188
 
 
3189
  unlock(&dstptr->m_jba_write_lock);
 
3190
 
 
3191
  if (buf_used)
 
3192
    selfptr->m_next_buffer = seize_buffer(rep, self, true);
 
3193
}
 
3194
 
 
3195
/**
 
3196
 * Send a signal to a remote node.
 
3197
 *
 
3198
 * (The signal is only queued here, and actually sent later in do_send()).
 
3199
 */
 
3200
SendStatus
 
3201
mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
 
3202
               const Uint32 * data, NodeId nodeId,
 
3203
               const LinearSectionPtr ptr[3])
 
3204
{
 
3205
  thr_repository *rep = &g_thr_repository;
 
3206
  thr_data *selfptr = rep->m_thread + self;
 
3207
  SendStatus ss;
 
3208
 
 
3209
  mt_send_handle handle(selfptr);
 
3210
  register_pending_send(selfptr, nodeId);
 
3211
  /* prepareSend() is lock-free, as we have per-thread send buffers. */
 
3212
  ss = globalTransporterRegistry.prepareSend(&handle,
 
3213
                                             sh, prio, data, nodeId, ptr);
 
3214
  return ss;
 
3215
}
 
3216
 
 
3217
SendStatus
 
3218
mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
 
3219
               const Uint32 *data, NodeId nodeId,
 
3220
               class SectionSegmentPool *thePool,
 
3221
               const SegmentedSectionPtr ptr[3])
 
3222
{
 
3223
  thr_repository *rep = &g_thr_repository;
 
3224
  thr_data *selfptr = rep->m_thread + self;
 
3225
  SendStatus ss;
 
3226
 
 
3227
  mt_send_handle handle(selfptr);
 
3228
  register_pending_send(selfptr, nodeId);
 
3229
  ss = globalTransporterRegistry.prepareSend(&handle,
 
3230
                                             sh, prio, data, nodeId,
 
3231
                                             *thePool, ptr);
 
3232
  return ss;
 
3233
}
 
3234
 
 
3235
/*
 
3236
 * This functions sends a prio A STOP_FOR_CRASH signal to a thread.
 
3237
 *
 
3238
 * It works when called from any other thread, not just from job processing
 
3239
 * threads. But note that this signal will be the last signal to be executed by
 
3240
 * the other thread, as it will exit immediately.
 
3241
 */
 
3242
static
 
3243
void
 
3244
sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
 
3245
{
 
3246
  SignalT<StopForCrash::SignalLength> signalT;
 
3247
  struct thr_repository* rep = &g_thr_repository;
 
3248
  /* As this signal will be the last one executed by the other thread, it does
 
3249
     not matter which buffer we use in case the current buffer is filled up by
 
3250
     the STOP_FOR_CRASH signal; the data in it will never be read.
 
3251
  */
 
3252
  static thr_job_buffer dummy_buffer;
 
3253
 
 
3254
  /**
 
3255
   * Pick any instance running in this thread
 
3256
   */
 
3257
  struct thr_data * dstptr = rep->m_thread + dst;
 
3258
  Uint32 bno = dstptr->m_instance_list[0];
 
3259
 
 
3260
  memset(&signalT.header, 0, sizeof(SignalHeader));
 
3261
  signalT.header.theVerId_signalNumber   = GSN_STOP_FOR_CRASH;
 
3262
  signalT.header.theReceiversBlockNumber = bno;
 
3263
  signalT.header.theSendersBlockRef      = 0;
 
3264
  signalT.header.theTrace                = 0;
 
3265
  signalT.header.theSendersSignalId      = 0;
 
3266
  signalT.header.theSignalId             = 0;
 
3267
  signalT.header.theLength               = StopForCrash::SignalLength;
 
3268
  StopForCrash * stopForCrash = CAST_PTR(StopForCrash, &signalT.theData[0]);
 
3269
  stopForCrash->flags = 0;
 
3270
 
 
3271
  thr_job_queue *q = &(dstptr->m_jba);
 
3272
  thr_jb_write_state w;
 
3273
 
 
3274
  lock(&dstptr->m_jba_write_lock);
 
3275
 
 
3276
  Uint32 index = q->m_head->m_write_index;
 
3277
  w.m_write_index = index;
 
3278
  thr_job_buffer *buffer = q->m_buffers[index];
 
3279
  w.m_write_buffer = buffer;
 
3280
  w.m_write_pos = buffer->m_len;
 
3281
  w.m_pending_signals = 0;
 
3282
  w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
 
3283
  insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
 
3284
                &dummy_buffer);
 
3285
  flush_write_state(selfptr, dstptr, q->m_head, &w);
 
3286
 
 
3287
  unlock(&dstptr->m_jba_write_lock);
 
3288
}
 
3289
 
 
3290
/**
 
3291
 * init functions
 
3292
 */
 
3293
static
 
3294
void
 
3295
queue_init(struct thr_tq* tq)
 
3296
{
 
3297
  tq->m_next_timer = 0;
 
3298
  tq->m_current_time = 0;
 
3299
  tq->m_next_free = RNIL;
 
3300
  tq->m_cnt[0] = tq->m_cnt[1] = 0;
 
3301
  bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
 
3302
}
 
3303
 
 
3304
static
 
3305
void
 
3306
thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
 
3307
         unsigned thr_no)
 
3308
{
 
3309
  Uint32 i;
 
3310
 
 
3311
  selfptr->m_thr_no = thr_no;
 
3312
  selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
 
3313
  selfptr->m_max_exec_signals = 0;
 
3314
  selfptr->m_first_free = 0;
 
3315
  selfptr->m_first_unused = 0;
 
3316
  
 
3317
  {
 
3318
    char buf[100];
 
3319
    BaseString::snprintf(buf, sizeof(buf), "jbalock thr: %u", thr_no);
 
3320
    register_lock(&selfptr->m_jba_write_lock, buf);
 
3321
  }
 
3322
  selfptr->m_jba_head.m_read_index = 0;
 
3323
  selfptr->m_jba_head.m_write_index = 0;
 
3324
  selfptr->m_jba.m_head = &selfptr->m_jba_head;
 
3325
  thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
 
3326
  selfptr->m_jba.m_buffers[0] = buffer;
 
3327
  selfptr->m_jba_read_state.m_read_index = 0;
 
3328
  selfptr->m_jba_read_state.m_read_buffer = buffer;
 
3329
  selfptr->m_jba_read_state.m_read_pos = 0;
 
3330
  selfptr->m_jba_read_state.m_read_end = 0;
 
3331
  selfptr->m_jba_read_state.m_write_index = 0;
 
3332
  selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
 
3333
  selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
 
3334
  
 
3335
  for (i = 0; i<cnt; i++)
 
3336
  {
 
3337
    selfptr->m_in_queue_head[i].m_read_index = 0;
 
3338
    selfptr->m_in_queue_head[i].m_write_index = 0;
 
3339
    selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
 
3340
    buffer = seize_buffer(rep, thr_no, false);
 
3341
    selfptr->m_in_queue[i].m_buffers[0] = buffer;
 
3342
    selfptr->m_read_states[i].m_read_index = 0;
 
3343
    selfptr->m_read_states[i].m_read_buffer = buffer;
 
3344
    selfptr->m_read_states[i].m_read_pos = 0;
 
3345
    selfptr->m_read_states[i].m_read_end = 0;
 
3346
    selfptr->m_read_states[i].m_write_index = 0;
 
3347
  }
 
3348
  queue_init(&selfptr->m_tq);
 
3349
 
 
3350
  selfptr->m_prioa_count = 0;
 
3351
  selfptr->m_prioa_size = 0;
 
3352
  selfptr->m_priob_count = 0;
 
3353
  selfptr->m_priob_size = 0;
 
3354
 
 
3355
  selfptr->m_pending_send_count = 0;
 
3356
  selfptr->m_pending_send_mask.clear();
 
3357
 
 
3358
  selfptr->m_instance_count = 0;
 
3359
  for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
 
3360
    selfptr->m_instance_list[i] = 0;
 
3361
 
 
3362
  bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
 
3363
 
 
3364
  selfptr->m_thread = 0;
 
3365
  selfptr->m_cpu = NO_LOCK_CPU;
 
3366
}
 
3367
 
 
3368
/* Have to do this after init of all m_in_queues is done. */
 
3369
static
 
3370
void
 
3371
thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
 
3372
          unsigned int cnt, unsigned thr_no)
 
3373
{
 
3374
  for (Uint32 i = 0; i<cnt; i++)
 
3375
  {
 
3376
    selfptr->m_write_states[i].m_write_index = 0;
 
3377
    selfptr->m_write_states[i].m_write_pos = 0;
 
3378
    selfptr->m_write_states[i].m_write_buffer =
 
3379
      rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
 
3380
    selfptr->m_write_states[i].m_pending_signals = 0;
 
3381
    selfptr->m_write_states[i].m_pending_signals_wakeup = 0;
 
3382
  }    
 
3383
}
 
3384
 
 
3385
static
 
3386
void
 
3387
send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
 
3388
{
 
3389
  char buf[100];
 
3390
  BaseString::snprintf(buf, sizeof(buf), "send lock node %d", node);
 
3391
  register_lock(&sb->m_send_lock, buf);
 
3392
  sb->m_force_send = 0;
 
3393
  sb->m_send_thread = NO_SEND_THREAD;
 
3394
  bzero(&sb->m_buffer, sizeof(sb->m_buffer));
 
3395
  sb->m_bytes = 0;
 
3396
  bzero(sb->m_read_index, sizeof(sb->m_read_index));
 
3397
}
 
3398
 
 
3399
static
 
3400
void
 
3401
rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
 
3402
{
 
3403
  rep->m_mm = mm;
 
3404
 
 
3405
  rep->m_thread_count = cnt;
 
3406
  for (unsigned int i = 0; i<cnt; i++)
 
3407
  {
 
3408
    thr_init(rep, rep->m_thread + i, cnt, i);
 
3409
  }
 
3410
  for (unsigned int i = 0; i<cnt; i++)
 
3411
  {
 
3412
    thr_init2(rep, rep->m_thread + i, cnt, i);
 
3413
  }
 
3414
 
 
3415
  rep->stopped_threads = 0;
 
3416
  NdbMutex_Init(&rep->stop_for_crash_mutex);
 
3417
  NdbCondition_Init(&rep->stop_for_crash_cond);
 
3418
 
 
3419
  for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
 
3420
  {
 
3421
    send_buffer_init(i, rep->m_send_buffers+i);
 
3422
  }
 
3423
 
 
3424
  bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
 
3425
}
 
3426
 
 
3427
 
 
3428
/**
 
3429
 * Thread Config
 
3430
 */
 
3431
 
 
3432
#include "ThreadConfig.hpp"
 
3433
#include <signaldata/StartOrd.hpp>
 
3434
 
 
3435
Uint32
 
3436
compute_jb_pages(struct EmulatorData * ed)
 
3437
{
 
3438
  Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1;
 
3439
 
 
3440
  Uint32 perthread = 0;
 
3441
 
 
3442
  /**
 
3443
   * Each thread can have thr_job_queue::SIZE pages in out-queues
 
3444
   *   to each other thread
 
3445
   */
 
3446
  perthread += cnt * (1 + thr_job_queue::SIZE);
 
3447
 
 
3448
  /**
 
3449
   * And thr_job_queue::SIZE prio A signals
 
3450
   */
 
3451
  perthread += (1 + thr_job_queue::SIZE);
 
3452
 
 
3453
  /**
 
3454
   * And XXX time-queue signals
 
3455
   */
 
3456
  perthread += 32; // Say 1M for now
 
3457
 
 
3458
  /**
 
3459
   * Each thread also keeps an own cache with max THR_FREE_BUF_MAX
 
3460
   */
 
3461
  perthread += THR_FREE_BUF_MAX;
 
3462
 
 
3463
  /**
 
3464
   * Multiply by no of threads
 
3465
   */
 
3466
  Uint32 tot = cnt * perthread;
 
3467
 
 
3468
  return tot;
 
3469
}
 
3470
 
 
3471
ThreadConfig::ThreadConfig()
 
3472
{
 
3473
}
 
3474
 
 
3475
ThreadConfig::~ThreadConfig()
 
3476
{
 
3477
}
 
3478
 
 
3479
/*
 
3480
 * We must do the init here rather than in the constructor, since at
 
3481
 * constructor time the global memory manager is not available.
 
3482
 */
 
3483
void
 
3484
ThreadConfig::init()
 
3485
{
 
3486
  num_lqh_workers = globalData.ndbMtLqhWorkers;
 
3487
  num_lqh_threads = globalData.ndbMtLqhThreads;
 
3488
  num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
 
3489
  require(num_threads <= MAX_THREADS);
 
3490
  receiver_thread_no = num_threads - 1;
 
3491
 
 
3492
  ndbout << "NDBMT: num_threads=" << num_threads << endl;
 
3493
 
 
3494
  ::rep_init(&g_thr_repository, num_threads,
 
3495
             globalEmulatorData.m_mem_manager);
 
3496
}
 
3497
 
 
3498
static
 
3499
void
 
3500
setcpuaffinity(struct thr_repository* rep)
 
3501
{
 
3502
  THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
 
3503
  conf.create_cpusets();
 
3504
  if (conf.getInfoMessage())
 
3505
  {
 
3506
    printf("%s", conf.getInfoMessage());
 
3507
    fflush(stdout);
 
3508
  }
 
3509
}
 
3510
 
 
3511
void
 
3512
ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
 
3513
{
 
3514
  unsigned int thr_no;
 
3515
  struct thr_repository* rep = &g_thr_repository;
 
3516
 
 
3517
  /**
 
3518
   * assign threads to CPU's
 
3519
   */
 
3520
  setcpuaffinity(rep);
 
3521
 
 
3522
  /*
 
3523
   * Start threads for all execution threads, except for the receiver
 
3524
   * thread, which runs in the main thread.
 
3525
   */
 
3526
  for (thr_no = 0; thr_no < num_threads; thr_no++)
 
3527
  {
 
3528
    rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
 
3529
 
 
3530
    if (thr_no == receiver_thread_no)
 
3531
      continue;                 // Will run in the main thread.
 
3532
 
 
3533
    /*
 
3534
     * The NdbThread_Create() takes void **, but that is cast to void * when
 
3535
     * passed to the thread function. Which is kind of strange ...
 
3536
     */
 
3537
    rep->m_thread[thr_no].m_thread =
 
3538
      NdbThread_Create(mt_job_thread_main,
 
3539
                       (void **)(rep->m_thread + thr_no),
 
3540
                       1024*1024,
 
3541
                       "execute thread", //ToDo add number
 
3542
                       NDB_THREAD_PRIO_MEAN);
 
3543
    require(rep->m_thread[thr_no].m_thread != NULL);
 
3544
  }
 
3545
 
 
3546
  /* Now run the main loop for thread 0 directly. */
 
3547
  rep->m_thread[receiver_thread_no].m_thread = pThis;
 
3548
  mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
 
3549
 
 
3550
  /* Wait for all threads to shutdown. */
 
3551
  for (thr_no = 0; thr_no < num_threads; thr_no++)
 
3552
  {
 
3553
    if (thr_no == receiver_thread_no)
 
3554
      continue;
 
3555
    void *dummy_return_status;
 
3556
    NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
 
3557
    NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
 
3558
  }
 
3559
}
 
3560
 
 
3561
int
 
3562
ThreadConfig::doStart(NodeState::StartLevel startLevel)
 
3563
{
 
3564
  SignalT<3> signalT;
 
3565
  memset(&signalT.header, 0, sizeof(SignalHeader));
 
3566
  
 
3567
  signalT.header.theVerId_signalNumber   = GSN_START_ORD;
 
3568
  signalT.header.theReceiversBlockNumber = CMVMI;
 
3569
  signalT.header.theSendersBlockRef      = 0;
 
3570
  signalT.header.theTrace                = 0;
 
3571
  signalT.header.theSignalId             = 0;
 
3572
  signalT.header.theLength               = StartOrd::SignalLength;
 
3573
  
 
3574
  StartOrd * startOrd = CAST_PTR(StartOrd, &signalT.theData[0]);
 
3575
  startOrd->restartInfo = 0;
 
3576
  
 
3577
  sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
 
3578
  return 0;
 
3579
}
 
3580
 
 
3581
/*
 
3582
 * Compare signal ids, taking into account overflow/wrapover.
 
3583
 * Return same as strcmp().
 
3584
 * Eg.
 
3585
 *   wrap_compare(0x10,0x20) -> -1
 
3586
 *   wrap_compare(0x10,0xffffff20) -> 1
 
3587
 *   wrap_compare(0xffffff80,0xffffff20) -> 1
 
3588
 *   wrap_compare(0x7fffffff, 0x80000001) -> -1
 
3589
 */
 
3590
static
 
3591
inline
 
3592
int
 
3593
wrap_compare(Uint32 a, Uint32 b)
 
3594
{
 
3595
  /* Avoid dependencies on undefined C/C++ interger overflow semantics. */
 
3596
  if (a >= 0x80000000)
 
3597
    if (b >= 0x80000000)
 
3598
      return (int)(a & 0x7fffffff) - (int)(b & 0x7fffffff);
 
3599
    else
 
3600
      return (a - b) >= 0x80000000 ? -1 : 1;
 
3601
  else
 
3602
    if (b >= 0x80000000)
 
3603
      return (b - a) >= 0x80000000 ? 1 : -1;
 
3604
    else
 
3605
      return (int)a - (int)b;
 
3606
}
 
3607
 
 
3608
Uint32
 
3609
FastScheduler::traceDumpGetNumThreads()
 
3610
{
 
3611
  /* The last thread is only for receiver -> no trace file. */
 
3612
  return num_threads;
 
3613
}
 
3614
 
 
3615
bool
 
3616
FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
 
3617
                               const Uint32 * & thrdTheEmulatedJam,
 
3618
                               Uint32 & thrdTheEmulatedJamIndex)
 
3619
{
 
3620
  if (thr_no >= num_threads)
 
3621
    return false;
 
3622
 
 
3623
#ifdef NO_EMULATED_JAM
 
3624
  jamBlockNumber = 0;
 
3625
  thrdTheEmulatedJam = NULL;
 
3626
  thrdTheEmulatedJamIndex = 0;
 
3627
#else
 
3628
  const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
 
3629
  thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
 
3630
  thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
 
3631
  jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
 
3632
#endif
 
3633
  return true;
 
3634
}
 
3635
 
 
3636
void
 
3637
FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
 
3638
{
 
3639
  /*
 
3640
   * We are about to generate trace files for all threads.
 
3641
   *
 
3642
   * We want to stop all threads processing before we dump, as otherwise the
 
3643
   * signal buffers could change while dumping, leading to inconsistent
 
3644
   * results.
 
3645
   *
 
3646
   * To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
 
3647
   * thread. We then wait for threads to signal they are done (but not forever,
 
3648
   * so as to not have one hanging thread prevent the generation of trace
 
3649
   * dumps). We also must be careful not to send to ourself if the crash is
 
3650
   * being processed by one of the threads processing signals.
 
3651
   *
 
3652
   * We do not stop the transporter thread, as it cannot receive signals (but
 
3653
   * because it does not receive signals it does not really influence dumps in
 
3654
   * any case).
 
3655
   */
 
3656
  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
 
3657
  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
 
3658
  /* The selfptr might be NULL, or pointer to thread that crashed. */
 
3659
 
 
3660
  Uint32 waitFor_count = 0;
 
3661
  NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
 
3662
  g_thr_repository.stopped_threads = 0;
 
3663
 
 
3664
  for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
 
3665
  {
 
3666
    if (selfptr != NULL && selfptr->m_thr_no == thr_no)
 
3667
    {
 
3668
      /* This is own thread; we have already stopped processing. */
 
3669
      continue;
 
3670
    }
 
3671
 
 
3672
    sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
 
3673
 
 
3674
    waitFor_count++;
 
3675
  }
 
3676
 
 
3677
  static const Uint32 max_wait_seconds = 2;
 
3678
  NDB_TICKS start = NdbTick_CurrentMillisecond();
 
3679
  while (g_thr_repository.stopped_threads < waitFor_count)
 
3680
  {
 
3681
    NdbCondition_WaitTimeout(&g_thr_repository.stop_for_crash_cond,
 
3682
                             &g_thr_repository.stop_for_crash_mutex,
 
3683
                             10);
 
3684
    NDB_TICKS now = NdbTick_CurrentMillisecond();
 
3685
    if (now > start + max_wait_seconds * 1000)
 
3686
      break;                    // Give up
 
3687
  }
 
3688
  if (g_thr_repository.stopped_threads < waitFor_count)
 
3689
  {
 
3690
    if (nst != NST_ErrorInsert)
 
3691
    {
 
3692
      nst = NST_Watchdog; // Make this abort fast
 
3693
    }
 
3694
    ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
 
3695
             waitFor_count - g_thr_repository.stopped_threads);
 
3696
  }
 
3697
  NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
 
3698
 
 
3699
  /* Now we are ready (or as ready as can be) for doing crash dump. */
 
3700
}
 
3701
 
 
3702
void mt_execSTOP_FOR_CRASH()
 
3703
{
 
3704
  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
 
3705
  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
 
3706
  require(selfptr != NULL);
 
3707
 
 
3708
  NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
 
3709
  g_thr_repository.stopped_threads++;
 
3710
  NdbCondition_Signal(&g_thr_repository.stop_for_crash_cond);
 
3711
  NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
 
3712
 
 
3713
  /* ToDo: is this correct? */
 
3714
  globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
 
3715
 
 
3716
  pthread_exit(NULL);
 
3717
}
 
3718
 
 
3719
void
 
3720
FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
 
3721
{
 
3722
  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
 
3723
  thr_data *selfptr = reinterpret_cast<thr_data *>(value);
 
3724
  const thr_repository *rep = &g_thr_repository;
 
3725
  /*
 
3726
   * The selfptr might be NULL, or pointer to thread that is doing the crash
 
3727
   * jump.
 
3728
   * If non-null, we should update the watchdog counter while dumping.
 
3729
   */
 
3730
  Uint32 *watchDogCounter;
 
3731
  if (selfptr)
 
3732
    watchDogCounter = &selfptr->m_watchdog_counter;
 
3733
  else
 
3734
    watchDogCounter = NULL;
 
3735
 
 
3736
  /*
 
3737
   * We want to dump the signal buffers from last executed to first executed.
 
3738
   * So we first need to find the correct sequence to output signals in, stored
 
3739
   * in this arrray.
 
3740
   *
 
3741
   * We will check any buffers in the cyclic m_free_fifo. In addition,
 
3742
   * we also need to scan the already executed part of the current
 
3743
   * buffer in m_jba.
 
3744
   *
 
3745
   * Due to partial execution of prio A buffers, we will use signal ids to know
 
3746
   * where to interleave prio A signals into the stream of prio B signals
 
3747
   * read. So we will keep a pointer to a prio A buffer around; and while
 
3748
   * scanning prio B buffers we will interleave prio A buffers from that buffer
 
3749
   * when the signal id fits the sequence.
 
3750
   *
 
3751
   * This also means that we may have to discard the earliest part of available
 
3752
   * prio A signal data due to too little prio B data present, or vice versa.
 
3753
   */
 
3754
  static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
 
3755
  struct {
 
3756
    const SignalHeader *ptr;
 
3757
    bool prioa;
 
3758
  } signalSequence[MAX_SIGNALS_TO_DUMP];
 
3759
  Uint32 seq_start = 0;
 
3760
  Uint32 seq_end = 0;
 
3761
 
 
3762
  const thr_data *thr_ptr = &rep->m_thread[thr_no];
 
3763
  if (watchDogCounter)
 
3764
    *watchDogCounter = 4;
 
3765
 
 
3766
  /*
 
3767
   * ToDo: Might do some sanity check to avoid crashing on not yet initialised
 
3768
   * thread.
 
3769
   */
 
3770
 
 
3771
  /* Scan all available buffers with already executed signals. */
 
3772
 
 
3773
  /*
 
3774
   * Keep track of all available buffers, so that we can pick out signals in
 
3775
   * the same order they were executed (order obtained from signal id).
 
3776
   *
 
3777
   * We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
 
3778
   * (and freed) buffers, plus MAX_THREADS buffers for currently active
 
3779
   * prio B buffers, plus one active prio A buffer.
 
3780
   */
 
3781
  struct {
 
3782
    const thr_job_buffer *m_jb;
 
3783
    Uint32 m_pos;
 
3784
    Uint32 m_max;
 
3785
  } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
 
3786
 
 
3787
  Uint32 num_jbs = 0;
 
3788
 
 
3789
  /* Load released buffers. */
 
3790
  Uint32 idx = thr_ptr->m_first_free;
 
3791
  while (idx != thr_ptr->m_first_unused)
 
3792
  {
 
3793
    const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
 
3794
    if (q->m_len > 0)
 
3795
    {
 
3796
      jbs[num_jbs].m_jb = q;
 
3797
      jbs[num_jbs].m_pos = 0;
 
3798
      jbs[num_jbs].m_max = q->m_len;
 
3799
      num_jbs++;
 
3800
    }
 
3801
    idx = (idx + 1) % THR_FREE_BUF_MAX;
 
3802
  }
 
3803
  /* Load any active prio B buffers. */
 
3804
  for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
 
3805
  {
 
3806
    const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
 
3807
    const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
 
3808
    Uint32 read_pos = r->m_read_pos;
 
3809
    if (read_pos > 0)
 
3810
    {
 
3811
      jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
 
3812
      jbs[num_jbs].m_pos = 0;
 
3813
      jbs[num_jbs].m_max = read_pos;
 
3814
      num_jbs++;
 
3815
    }
 
3816
  }
 
3817
  /* Load any active prio A buffer. */
 
3818
  const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
 
3819
  Uint32 read_pos = r->m_read_pos;
 
3820
  if (read_pos > 0)
 
3821
  {
 
3822
    jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
 
3823
    jbs[num_jbs].m_pos = 0;
 
3824
    jbs[num_jbs].m_max = read_pos;
 
3825
    num_jbs++;
 
3826
  }
 
3827
 
 
3828
  /* Now pick out one signal at a time, in signal id order. */
 
3829
  while (num_jbs > 0)
 
3830
  {
 
3831
    if (watchDogCounter)
 
3832
      *watchDogCounter = 4;
 
3833
 
 
3834
    /* Search out the smallest signal id remaining. */
 
3835
    Uint32 idx_min = 0;
 
3836
    const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
 
3837
    const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
 
3838
    Uint32 sid_min = s_min->theSignalId;
 
3839
 
 
3840
    for (Uint32 i = 1; i < num_jbs; i++)
 
3841
    {
 
3842
      p = jbs[i].m_jb->m_data + jbs[i].m_pos;
 
3843
      const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
 
3844
      Uint32 sid = s->theSignalId;
 
3845
      if (wrap_compare(sid, sid_min) < 0)
 
3846
      {
 
3847
        idx_min = i;
 
3848
        s_min = s;
 
3849
        sid_min = sid;
 
3850
      }
 
3851
    }
 
3852
 
 
3853
    /* We found the next signal, now put it in the ordered cyclic buffer. */
 
3854
    signalSequence[seq_end].ptr = s_min;
 
3855
    signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
 
3856
    Uint32 siglen =
 
3857
      (sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
 
3858
#if SIZEOF_CHARP == 8
 
3859
    /* Align to 8-byte boundary, to ensure aligned copies. */
 
3860
    siglen= (siglen+1) & ~((Uint32)1);
 
3861
#endif
 
3862
    jbs[idx_min].m_pos += siglen;
 
3863
    if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
 
3864
    {
 
3865
      /* We are done with this job buffer. */
 
3866
      num_jbs--;
 
3867
      jbs[idx_min] = jbs[num_jbs];
 
3868
    }
 
3869
    seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
 
3870
    /* Drop old signals if too many available in history. */
 
3871
    if (seq_end == seq_start)
 
3872
      seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
 
3873
  }
 
3874
 
 
3875
  /* Now, having build the correct signal sequence, we can dump them all. */
 
3876
  fprintf(out, "\n");
 
3877
  bool first_one = true;
 
3878
  bool out_of_signals = false;
 
3879
  Uint32 lastSignalId = 0;
 
3880
  while (seq_end != seq_start)
 
3881
  {
 
3882
    if (watchDogCounter)
 
3883
      *watchDogCounter = 4;
 
3884
 
 
3885
    if (seq_end == 0)
 
3886
      seq_end = MAX_SIGNALS_TO_DUMP;
 
3887
    seq_end--;
 
3888
    SignalT<25> signal;
 
3889
    const SignalHeader *s = signalSequence[seq_end].ptr;
 
3890
    unsigned siglen = (sizeof(*s)>>2) + s->theLength;
 
3891
    if (siglen > 25)
 
3892
      siglen = 25;              // Sanity check
 
3893
    memcpy(&signal.header, s, 4*siglen);
 
3894
    // instance number in trace file is confusing if not MT LQH
 
3895
    if (num_lqh_workers == 0)
 
3896
      signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
 
3897
 
 
3898
    const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
 
3899
    signal.m_sectionPtrI[0] = posptr[siglen + 0];
 
3900
    signal.m_sectionPtrI[1] = posptr[siglen + 1];
 
3901
    signal.m_sectionPtrI[2] = posptr[siglen + 2];
 
3902
    bool prioa = signalSequence[seq_end].prioa;
 
3903
 
 
3904
    /* Make sure to display clearly when there is a gap in the dump. */
 
3905
    if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
 
3906
    {
 
3907
      out_of_signals = true;
 
3908
      fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
 
3909
              "incomplete.\n\n\n\n", prioa ? "B" : "A");
 
3910
    }
 
3911
    first_one = false;
 
3912
    lastSignalId = s->theSignalId;
 
3913
 
 
3914
    fprintf(out, "--------------- Signal ----------------\n");
 
3915
    Uint32 prio = (prioa ? JBA : JBB);
 
3916
    SignalLoggerManager::printSignalHeader(out, 
 
3917
                                           signal.header,
 
3918
                                           prio,
 
3919
                                           globalData.ownId, 
 
3920
                                           true);
 
3921
    SignalLoggerManager::printSignalData  (out, 
 
3922
                                           signal.header,
 
3923
                                           &signal.theData[0]);
 
3924
  }
 
3925
  fflush(out);
 
3926
}
 
3927
 
 
3928
int
 
3929
FastScheduler::traceDumpGetCurrentThread()
 
3930
{
 
3931
  void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
 
3932
  const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
 
3933
 
 
3934
  /* The selfptr might be NULL, or pointer to thread that crashed. */
 
3935
  if (selfptr == 0)
 
3936
  {
 
3937
    return -1;
 
3938
  }
 
3939
  else
 
3940
  {
 
3941
    return (int)selfptr->m_thr_no;
 
3942
  }
 
3943
}
 
3944
 
 
3945
void
 
3946
mt_section_lock()
 
3947
{
 
3948
  lock(&(g_thr_repository.m_section_lock));
 
3949
}
 
3950
 
 
3951
void
 
3952
mt_section_unlock()
 
3953
{
 
3954
  unlock(&(g_thr_repository.m_section_lock));
 
3955
}
 
3956
 
 
3957
void
 
3958
mt_mem_manager_init()
 
3959
{
 
3960
}
 
3961
 
 
3962
void
 
3963
mt_mem_manager_lock()
 
3964
{
 
3965
  lock(&(g_thr_repository.m_mem_manager_lock));
 
3966
}
 
3967
 
 
3968
void
 
3969
mt_mem_manager_unlock()
 
3970
{
 
3971
  unlock(&(g_thr_repository.m_mem_manager_lock));
 
3972
}
 
3973
 
 
3974
Vector<mt_lock_stat> g_locks;
 
3975
template class Vector<mt_lock_stat>;
 
3976
 
 
3977
static
 
3978
void
 
3979
register_lock(const void * ptr, const char * name)
 
3980
{
 
3981
  if (name == 0)
 
3982
    return;
 
3983
 
 
3984
  mt_lock_stat* arr = g_locks.getBase();
 
3985
  for (size_t i = 0; i<g_locks.size(); i++)
 
3986
  {
 
3987
    if (arr[i].m_ptr == ptr)
 
3988
    {
 
3989
      if (arr[i].m_name)
 
3990
      {
 
3991
        free(arr[i].m_name);
 
3992
      }
 
3993
      arr[i].m_name = strdup(name);
 
3994
      return;
 
3995
    }
 
3996
  }
 
3997
 
 
3998
  mt_lock_stat ln;
 
3999
  ln.m_ptr = ptr;
 
4000
  ln.m_name = strdup(name);
 
4001
  ln.m_contended_count = 0;
 
4002
  ln.m_spin_count = 0;
 
4003
  g_locks.push_back(ln);
 
4004
}
 
4005
 
 
4006
static
 
4007
mt_lock_stat *
 
4008
lookup_lock(const void * ptr)
 
4009
{
 
4010
  mt_lock_stat* arr = g_locks.getBase();
 
4011
  for (size_t i = 0; i<g_locks.size(); i++)
 
4012
  {
 
4013
    if (arr[i].m_ptr == ptr)
 
4014
      return arr + i;
 
4015
  }
 
4016
 
 
4017
  return 0;
 
4018
}
 
4019
 
 
4020
Uint32
 
4021
mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
 
4022
                                    Uint32 dst[], Uint32 len)
 
4023
{
 
4024
  Uint32 cnt = 0;
 
4025
  Bitmask<(MAX_THREADS+31)/32> mask;
 
4026
  mask.set(threadId);
 
4027
  for (Uint32 i = 0; blocks[i] != 0; i++)
 
4028
  {
 
4029
    Uint32 block = blocks[i];
 
4030
    /**
 
4031
     * Find each thread that has instance of block
 
4032
     */
 
4033
    assert(block == blockToMain(block));
 
4034
    Uint32 index = block - MIN_BLOCK_NO;
 
4035
    for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
 
4036
    {
 
4037
      Uint32 thr_no = thr_map[index][instance].thr_no;
 
4038
      if (thr_no == thr_map_entry::NULL_THR_NO)
 
4039
        break;
 
4040
 
 
4041
      if (mask.get(thr_no))
 
4042
        continue;
 
4043
 
 
4044
      mask.set(thr_no);
 
4045
      require(cnt < len);
 
4046
      dst[cnt++] = numberToRef(block, instance, 0);
 
4047
    }
 
4048
  }
 
4049
  return cnt;
 
4050
}
 
4051
 
 
4052
void
 
4053
mt_wakeup(class SimulatedBlock* block)
 
4054
{
 
4055
  Uint32 thr_no = block->getThreadId();
 
4056
  thr_data *thrptr = g_thr_repository.m_thread + thr_no;
 
4057
  wakeup(&thrptr->m_waiter);
 
4058
}
 
4059
 
 
4060
#ifdef VM_TRACE
 
4061
void
 
4062
mt_assert_own_thread(SimulatedBlock* block)
 
4063
{
 
4064
  Uint32 thr_no = block->getThreadId();
 
4065
  thr_data *thrptr = g_thr_repository.m_thread + thr_no;
 
4066
 
 
4067
  if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
 
4068
  {
 
4069
    fprintf(stderr, "mt_assert_own_thread() - assertion-failure\n");
 
4070
    fflush(stderr);
 
4071
    abort();
 
4072
  }
 
4073
}
 
4074
#endif
 
4075
 
 
4076
/**
 
4077
 * Global data
 
4078
 */
 
4079
struct thr_repository g_thr_repository;
 
4080
 
 
4081
struct trp_callback g_trp_callback;
 
4082
 
 
4083
TransporterRegistry globalTransporterRegistry(&g_trp_callback, false);