1
/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
3
This program is free software; you can redistribute it and/or modify
4
it under the terms of the GNU General Public License as published by
5
the Free Software Foundation; version 2 of the License.
7
This program is distributed in the hope that it will be useful,
8
but WITHOUT ANY WARRANTY; without even the implied warranty of
9
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
GNU General Public License for more details.
12
You should have received a copy of the GNU General Public License
13
along with this program; if not, write to the Free Software
14
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
#include <ndb_global.h>
18
#include <VMSignal.hpp>
19
#include <kernel_types.h>
21
#include <SignalLoggerManager.hpp>
22
#include <SimulatedBlock.hpp>
23
#include <ErrorHandlingMacros.hpp>
24
#include <GlobalData.hpp>
25
#include <WatchDog.hpp>
26
#include <TransporterDefinitions.hpp>
27
#include "FastScheduler.hpp"
29
#include <DebuggerNames.hpp>
30
#include <signaldata/StopForCrash.hpp>
31
#include "TransporterCallbackKernel.hpp"
33
#include <portlib/ndb_prefetch.h>
39
GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
41
SimulatedBlock* b = getBlock(blockNo);
42
if (b != 0 && instanceNo != 0)
43
b = b->getInstance(instanceNo);
48
/* Provides a small (but noticeable) speedup in benchmarks. */
49
#define memcpy __builtin_memcpy
52
/* size of a cacheline */
55
/* Constants found by benchmarks to be reasonable values. */
57
/* Maximum number of signals to execute before sending to remote nodes. */
58
static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
61
* Max. signals to execute from one job buffer before considering other
62
* possible stuff to do.
64
static const Uint32 MAX_SIGNALS_PER_JB = 100;
67
* Max signals written to other thread before calling flush_jbb_write_state
69
static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
70
static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
71
static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
73
//#define NDB_MT_LOCK_TO_CPU
75
#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
76
#define NUM_MAIN_THREADS 2 // except receiver
77
#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
79
/* If this is too small it crashes before first signal. */
80
#define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
82
static Uint32 num_lqh_workers = 0;
83
static Uint32 num_lqh_threads = 0;
84
static Uint32 num_threads = 0;
85
static Uint32 receiver_thread_no = 0;
87
#define NO_SEND_THREAD (MAX_THREADS + 1)
89
/* max signal is 32 words, 7 for signal header and 25 datawords */
90
#define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
96
Uint32 m_contended_count;
99
static void register_lock(const void * ptr, const char * name);
100
static mt_lock_stat * lookup_lock(const void * ptr);
102
#if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
111
#include <sys/syscall.h>
112
#include <sys/types.h>
117
#define FUTEX_REQUEUE 3
118
#define FUTEX_CMP_REQUEUE 4
119
#define FUTEX_WAKE_OP 5
123
futex_wait(volatile unsigned * addr, int val, const struct timespec * timeout)
125
return syscall(SYS_futex,
126
addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
131
futex_wake(volatile unsigned * addr)
133
return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
138
volatile unsigned m_futex_state;
143
thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
148
* Sleep until woken up or timeout occurs.
150
* Will call check_callback(check_arg) after proper synchronisation, and only
151
* if that returns true will it actually sleep, else it will return
152
* immediately. This is needed to avoid races with wakeup.
154
* Returns 'true' if it actually did sleep.
158
yield(struct thr_wait* wait, const Uint32 nsec,
159
bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
161
volatile unsigned * val = &wait->m_futex_state;
165
xcng(val, thr_wait::FS_SLEEPING);
166
assert(old == thr_wait::FS_RUNNING);
169
* At this point, we need to re-check the condition that made us decide to
170
* sleep, and skip sleeping if it changed..
172
* Otherwise, the condition may have not changed, and the thread making the
173
* change have already decided not to wake us, as our state was FS_RUNNING
176
* Also need a memory barrier to ensure this extra check is race-free.
177
* but that is already provided by xcng
179
bool waited = (*check_callback)(check_arg);
182
struct timespec timeout;
184
timeout.tv_nsec = nsec;
185
futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
187
xcng(val, thr_wait::FS_RUNNING);
193
wakeup(struct thr_wait* wait)
195
volatile unsigned * val = &wait->m_futex_state;
197
* We must ensure that any state update (new data in buffers...) are visible
198
* to the other thread before we can look at the sleep state of that other
201
if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
203
return futex_wake(val);
208
#include <NdbMutex.h>
209
#include <NdbCondition.h>
215
NdbCondition *m_cond;
216
thr_wait() : m_need_wakeup(false), m_mutex(0), m_cond(0) {}
219
m_mutex = NdbMutex_Create();
220
m_cond = NdbCondition_Create();
226
yield(struct thr_wait* wait, const Uint32 nsec,
227
bool (*check_callback)(struct thr_data *), struct thr_data *check_arg)
230
NdbCondition_ComputeAbsTime(&end, nsec/1000000);
231
NdbMutex_Lock(wait->m_mutex);
234
/* May have spurious wakeups: Always recheck condition predicate */
235
while ((*check_callback)(check_arg))
237
wait->m_need_wakeup = true;
239
if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
240
wait->m_mutex, &end) == ETIMEDOUT)
242
wait->m_need_wakeup = false;
246
NdbMutex_Unlock(wait->m_mutex);
253
wakeup(struct thr_wait* wait)
255
NdbMutex_Lock(wait->m_mutex);
256
// We should avoid signaling when not waiting for wakeup
257
if (wait->m_need_wakeup)
259
wait->m_need_wakeup = false;
260
NdbCondition_Signal(wait->m_cond);
262
NdbMutex_Unlock(wait->m_mutex);
269
template <unsigned SZ>
272
thr_spin_lock(const char * name = 0)
275
register_lock(this, name);
279
volatile Uint32 m_lock;
287
lock_slow(void * sl, volatile unsigned * val)
289
mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
296
} while (* val == 1);
298
if (unlikely(xcng(val, 1) != 0))
303
s->m_spin_count += spins;
304
Uint32 count = ++s->m_contended_count;
305
Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
307
if ((count % freq) == 0)
308
printf("%s waiting for lock, contentions: %u spins: %u\n",
309
s->m_name, count, s->m_spin_count);
313
template <unsigned SZ>
317
lock(struct thr_spin_lock<SZ>* sl)
319
volatile unsigned* val = &sl->m_lock;
320
if (likely(xcng(val, 1) == 0))
326
template <unsigned SZ>
330
unlock(struct thr_spin_lock<SZ>* sl)
333
* Memory barrier here, to make sure all of our stores are visible before
334
* the lock release is.
340
template <unsigned SZ>
344
trylock(struct thr_spin_lock<SZ>* sl)
346
volatile unsigned* val = &sl->m_lock;
350
#define thr_spin_lock thr_mutex
353
template <unsigned SZ>
356
thr_mutex(const char * name = 0) {
357
NdbMutex_Init(&m_mutex);
358
register_lock(this, name);
367
template <unsigned SZ>
371
lock(struct thr_mutex<SZ>* sl)
373
NdbMutex_Lock(&sl->m_mutex);
376
template <unsigned SZ>
380
unlock(struct thr_mutex<SZ>* sl)
382
NdbMutex_Unlock(&sl->m_mutex);
385
template <unsigned SZ>
389
trylock(struct thr_mutex<SZ> * sl)
391
return NdbMutex_Trylock(&sl->m_mutex);
400
thr_safe_pool(const char * name) : m_free_list(0), m_cnt(0), m_lock(name) {}
404
thr_spin_lock<NDB_CL - (sizeof(void*) + sizeof(Uint32))> m_lock;
406
T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
414
m_free_list = ret->m_next;
421
ret = reinterpret_cast<T*>
422
(mm->alloc_page(rg, &dummy,
423
Ndbd_mem_manager::NDB_ZONE_ANY));
424
// ToDo: How to deal with failed allocation?!?
425
// I think in this case we need to start grabbing buffers kept for signal
431
void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
433
t->m_next = m_free_list;
439
void release_list(Ndbd_mem_manager *mm, Uint32 rg,
440
T* head, T* tail, Uint32 cnt) {
442
tail->m_next = m_free_list;
453
class thread_local_pool
456
thread_local_pool(thr_safe_pool<T> *global_pool, unsigned max_free) :
457
m_max_free(max_free),
460
m_global_pool(global_pool)
464
T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
468
m_freelist = tmp->m_next;
473
tmp = m_global_pool->seize(mm, rg);
479
void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
480
unsigned free = m_free;
481
if (free < m_max_free)
484
t->m_next = m_freelist;
488
m_global_pool->release(mm, rg, t);
494
* Release to local pool even if it get's "too" full
495
* (wrt to m_max_free)
497
void release_local(T *t) {
499
t->m_next = m_freelist;
505
void validate() const {
514
assert(cnt == m_free);
519
* Release entries so that m_max_free is honored
520
* (likely used together with release_local)
522
void release_global(Ndbd_mem_manager *mm, Uint32 rg) {
525
unsigned free = m_free;
526
Uint32 maxfree = m_max_free;
529
T* head = m_freelist;
530
T* tail = m_freelist;
536
while (free > maxfree)
543
assert(free == maxfree);
546
m_freelist = tail->m_next;
547
m_global_pool->release_list(mm, rg, head, tail, cnt);
552
void release_all(Ndbd_mem_manager *mm, Uint32 rg) {
554
T* head = m_freelist;
555
T* tail = m_freelist;
559
while (tail->m_next != 0)
564
m_global_pool->release_list(mm, rg, head, tail, cnt);
571
void set_pool(thr_safe_pool<T> * pool) { m_global_pool = pool; }
577
thr_safe_pool<T> *m_global_pool;
583
* Each thread job queue contains a list of these buffers with signals.
585
* There is an underlying assumption that the size of this structure is the
586
* same as the global memory manager page size.
588
struct thr_job_buffer // 32k
590
static const unsigned SIZE = 8190;
593
* Amount of signal data currently in m_data buffer.
594
* Read/written by producer, read by consumer.
598
* Whether this buffer contained prio A or prio B signals, used when dumping
599
* signals from released buffers.
605
thr_job_buffer * m_next; // For free-list
612
calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
614
return (wi >= ri) ? wi - ri : (sz - ri) + wi;
618
* thr_job_queue is shared between consumer / producer.
620
* The hot-spot of the thr_job_queue are the read/write indexes.
621
* As they are updated and read frequently they have been placed
622
* in its own thr_job_queue_head[] in order to make them fit inside a
623
* single/few cache lines and thereby avoid complete L1-cache replacement
624
* every time the job_queue is scanned.
626
struct thr_job_queue_head
628
unsigned m_read_index; // Read/written by consumer, read by producer
629
unsigned m_write_index; // Read/written by producer, read by consumer
636
static const unsigned SIZE = 31;
638
struct thr_job_queue_head* m_head;
639
struct thr_job_buffer* m_buffers[SIZE];
644
thr_job_queue_head::used() const
646
return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
650
* Two structures tightly associated with thr_job_queue.
652
* There will generally be exactly one thr_jb_read_state and one
653
* thr_jb_write_state associated with each thr_job_queue.
655
* The reason they are kept separate is to avoid unnecessary inter-CPU
656
* cache line pollution. All fields shared among producer and consumer
657
* threads are in thr_job_queue, thr_jb_write_state fields are only
658
* accessed by the producer thread(s), and thr_jb_read_state fields are
659
* only accessed by the consumer thread.
661
* For example, on Intel core 2 quad processors, there is a ~33%
662
* penalty for two cores accessing the same 64-byte cacheline.
664
struct thr_jb_write_state
667
* The position to insert the next signal into the queue.
669
* m_write_index is the index into thr_job_queue::m_buffers[] of the buffer
670
* to insert into, and m_write_pos is the index into thr_job_buffer::m_data[]
671
* at which to store the next signal.
673
Uint32 m_write_index;
676
/* Thread-local copy of thr_job_queue::m_buffers[m_write_index]. */
677
thr_job_buffer *m_write_buffer;
679
/* Number of signals inserted since last flush to thr_job_queue. */
680
Uint32 m_pending_signals;
682
/* Number of signals inserted since last wakeup */
683
Uint32 m_pending_signals_wakeup;
687
* This structure is also used when dumping signal traces, to dump executed
688
* signals from the buffer(s) currently being processed.
690
struct thr_jb_read_state
693
* Index into thr_job_queue::m_buffers[] of the buffer that we are currently
694
* executing signals from.
698
* Index into m_read_buffer->m_data[] of the next signal to execute from the
703
* Thread local copy of thr_job_queue::m_buffers[m_read_index].
705
thr_job_buffer *m_read_buffer;
707
* These are thread-local copies of thr_job_queue::m_write_index and
708
* thr_job_buffer::m_len. They are read once at the start of the signal
709
* execution loop and used to determine when the end of available signals is
712
Uint32 m_read_end; // End within current thr_job_buffer. (*m_read_buffer)
714
Uint32 m_write_index; // Last available thr_job_buffer.
716
bool is_empty() const
718
assert(m_read_index != m_write_index || m_read_pos <= m_read_end);
719
return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
728
static const unsigned SQ_SIZE = 512;
729
static const unsigned LQ_SIZE = 512;
730
static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
732
Uint32 * m_delayed_signals[PAGES];
735
Uint32 m_current_time;
737
Uint32 m_short_queue[SQ_SIZE];
738
Uint32 m_long_queue[LQ_SIZE];
742
* Max number of thread-local job buffers to keep before releasing to
745
#define THR_FREE_BUF_MAX 32
746
/* Minimum number of buffers (to ensure useful trace dumps). */
747
#define THR_FREE_BUF_MIN 12
749
* 1/THR_FREE_BUF_BATCH is the fraction of job buffers to allocate/free
750
* at a time from/to global pool.
752
#define THR_FREE_BUF_BATCH 6
755
* a page with send data
759
static const Uint32 PGSIZE = 32768;
760
#if SIZEOF_CHARP == 4
761
static const Uint32 HEADER_SIZE = 8;
763
static const Uint32 HEADER_SIZE = 12;
766
static Uint32 max_bytes() {
767
return PGSIZE - offsetof(thr_send_page, m_data);
771
thr_send_page* m_next;
773
/* Bytes of send data available in this page. */
776
/* Start of unsent data */
779
/* Data; real size is to the end of one page. */
784
* a linked list with thr_send_page
786
struct thr_send_buffer
788
thr_send_page* m_first_page;
789
thr_send_page* m_last_page;
793
* a ring buffer with linked list of thr_send_page
795
struct thr_send_queue
797
unsigned m_write_index;
798
#if SIZEOF_CHARP == 8
800
thr_send_page* m_buffers[7];
801
static const unsigned SIZE = 7;
803
thr_send_page* m_buffers[15];
804
static const unsigned SIZE = 15;
810
thr_data() : m_jba_write_lock("jbalock"),
811
m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
817
* max signals to execute per JBB buffer
819
unsigned m_max_signals_per_jb;
822
* max signals to execute before recomputing m_max_signals_per_jb
824
unsigned m_max_exec_signals;
829
/* Prio A signal incoming queue. */
830
struct thr_spin_lock<64> m_jba_write_lock;
831
struct thr_job_queue m_jba;
833
struct thr_job_queue_head m_jba_head;
835
/* Thread-local read state of prio A buffer. */
836
struct thr_jb_read_state m_jba_read_state;
838
* There is no m_jba_write_state, as we have multiple writers to the prio A
839
* queue, so local state becomes invalid as soon as we release the lock.
843
* In m_next_buffer we keep a free buffer at all times, so that when
844
* we hold the lock and find we need a new buffer, we can use this and this
845
* way defer allocation to after releasing the lock.
847
struct thr_job_buffer* m_next_buffer;
850
* We keep a small number of buffers in a thread-local cyclic FIFO, so that
851
* we can avoid going to the global pool in most cases, and so that we have
852
* recent buffers available for dumping in trace files.
854
struct thr_job_buffer *m_free_fifo[THR_FREE_BUF_MAX];
855
/* m_first_free is the index of the entry to return next from seize(). */
857
/* m_first_unused is the first unused entry in m_free_fifo. */
858
Uint32 m_first_unused;
861
* These are the thread input queues, where other threads deliver signals
864
struct thr_job_queue_head m_in_queue_head[MAX_THREADS];
865
struct thr_job_queue m_in_queue[MAX_THREADS];
866
/* These are the write states of m_in_queue[self] in each thread. */
867
struct thr_jb_write_state m_write_states[MAX_THREADS];
868
/* These are the read states of all of our own m_in_queue[]. */
869
struct thr_jb_read_state m_read_states[MAX_THREADS];
871
/* Jam buffers for making trace files at crashes. */
872
EmulatedJamBuffer m_jam;
873
/* Watchdog counter for this thread. */
874
Uint32 m_watchdog_counter;
875
/* Signal delivery statistics. */
876
Uint32 m_prioa_count;
878
Uint32 m_priob_count;
881
/* Array of node ids with pending remote send data. */
882
Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
883
/* Number of node ids in m_pending_send_nodes. */
884
Uint32 m_pending_send_count;
887
* Bitmap of pending node ids with send data.
888
* Used to quickly check if a node id is already in m_pending_send_nodes.
890
Bitmask<(MAX_NTRANSPORTERS+31)/32> m_pending_send_mask;
892
/* pool for send buffers */
893
class thread_local_pool<thr_send_page> m_send_buffer_pool;
895
/* Send buffer for this thread, these are not touched by any other thread */
896
struct thr_send_buffer m_send_buffers[MAX_NTRANSPORTERS];
898
/* Block instances (main and worker) handled by this thread. */
899
/* Used for sendpacked (send-at-job-buffer-end). */
900
Uint32 m_instance_count;
901
BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
903
SectionSegmentPool::Cache m_sectionPoolCache;
910
struct mt_send_handle : public TransporterSendBufferHandle
912
struct thr_data * m_selfptr;
913
mt_send_handle(thr_data* ptr) : m_selfptr(ptr) {}
914
virtual ~mt_send_handle() {}
916
virtual Uint32 *getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
917
virtual Uint32 updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
918
virtual bool forceSend(NodeId node);
921
struct trp_callback : public TransporterCallbackKernel
925
/* Callback interface. */
926
int checkJobBuffer();
927
void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
928
void lock_transporter(NodeId node);
929
void unlock_transporter(NodeId node);
930
Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max);
931
Uint32 bytes_sent(NodeId node, Uint32 bytes);
932
bool has_data_to_send(NodeId node);
933
void reset_send_buffer(NodeId node, bool should_be_empty);
936
extern trp_callback g_trp_callback; // Forward declaration
937
extern struct thr_repository g_thr_repository;
939
#include <NdbMutex.h>
940
#include <NdbCondition.h>
942
struct thr_repository
945
: m_receive_lock("recvlock"),
946
m_section_lock("sectionlock"),
947
m_mem_manager_lock("memmanagerlock"),
948
m_jb_pool("jobbufferpool"),
949
m_sb_pool("sendbufferpool")
952
struct thr_spin_lock<64> m_receive_lock;
953
struct thr_spin_lock<64> m_section_lock;
954
struct thr_spin_lock<64> m_mem_manager_lock;
955
struct thr_safe_pool<thr_job_buffer> m_jb_pool;
956
struct thr_safe_pool<thr_send_page> m_sb_pool;
957
Ndbd_mem_manager * m_mm;
958
unsigned m_thread_count;
959
struct thr_data m_thread[MAX_THREADS];
962
* send buffer handling
965
/* The buffers that are to be sent */
971
struct thr_spin_lock<8> m_send_lock;
976
struct thr_send_buffer m_buffer;
979
* Flag used to coordinate sending to same remote node from different
982
* If two threads need to send to the same node at the same time, the
983
* second thread, rather than wait for the first to finish, will just
984
* set this flag, and the first thread will do an extra send when done
990
* Which thread is currently holding the m_send_lock
992
Uint32 m_send_thread;
995
* bytes pending for this node
999
/* read index(es) in thr_send_queue */
1000
Uint32 m_read_index[MAX_THREADS];
1001
} m_send_buffers[MAX_NTRANSPORTERS];
1003
/* The buffers published by threads */
1004
thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
1007
* These are used to synchronize during crash / trace dumps.
1010
NdbMutex stop_for_crash_mutex;
1011
NdbCondition stop_for_crash_cond;
1012
Uint32 stopped_threads;
1018
fifo_used_pages(struct thr_data* selfptr)
1020
return calc_fifo_used(selfptr->m_first_unused,
1021
selfptr->m_first_free,
1028
job_buffer_full(struct thr_data* selfptr)
1030
ndbout_c("job buffer full");
1036
out_of_job_buffer(struct thr_data* selfptr)
1038
ndbout_c("out of job buffer");
1044
seize_buffer(struct thr_repository* rep, int thr_no, bool prioa)
1047
thr_data* selfptr = rep->m_thread + thr_no;
1048
Uint32 first_free = selfptr->m_first_free;
1049
Uint32 first_unused = selfptr->m_first_unused;
1052
* An empty FIFO is denoted by m_first_free == m_first_unused.
1053
* So we will never have a completely full FIFO array, at least one entry will
1054
* always be unused. But the code is simpler as a result.
1058
* We never allow the fifo to become completely empty, as we want to have
1059
* a good number of signals available for trace files in case of a forced
1062
Uint32 buffers = (first_free > first_unused ?
1063
first_unused + THR_FREE_BUF_MAX - first_free :
1064
first_unused - first_free);
1065
if (unlikely(buffers <= THR_FREE_BUF_MIN))
1068
* All used, allocate another batch from global pool.
1070
* Put the new buffers at the head of the fifo, so as not to needlessly
1071
* push out any existing buffers from the fifo (that would loose useful
1072
* data for signal dumps in trace files).
1075
Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1077
assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
1079
jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
1080
if (unlikely(jb == 0))
1082
if (unlikely(cnt == 0))
1084
out_of_job_buffer(selfptr);
1089
jb->m_prioa = false;
1090
first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1091
selfptr->m_free_fifo[first_free] = jb;
1093
} while (cnt < batch);
1094
selfptr->m_first_free = first_free;
1097
jb= selfptr->m_free_fifo[first_free];
1098
selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1099
/* Init here rather than in release_buffer() so signal dump will work. */
1101
jb->m_prioa = prioa;
1107
release_buffer(struct thr_repository* rep, int thr_no, thr_job_buffer* jb)
1109
struct thr_data* selfptr = rep->m_thread + thr_no;
1110
Uint32 first_free = selfptr->m_first_free;
1111
Uint32 first_unused = selfptr->m_first_unused;
1114
* Pack near-empty signals, to get more info in the signal traces.
1116
* This is not currently used, as we only release full job buffers, hence
1120
Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
1121
thr_job_buffer *last_jb = selfptr->m_free_fifo[last_free];
1125
first_free != first_unused &&
1126
!last_jb->m_prioa &&
1127
(len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
1128
(len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
1131
* The buffer being release is fairly empty, and what data it contains fit
1132
* in the previously released buffer.
1134
* We want to avoid too many almost-empty buffers in the free fifo, as that
1135
* makes signal traces less useful due to too little data available. So in
1136
* this case we move the data from the buffer to be released into the
1137
* previous buffer, and place the to-be-released buffer at the head of the
1138
* fifo (to be immediately reused).
1140
* This is only done for prio B buffers, as we must not merge prio A and B
1141
* data (or dumps would be incorrect), and prio A buffers are in any case
1142
* full when released.
1144
memcpy(last_jb->m_data + len1, jb->m_data, len2*sizeof(jb->m_data[0]));
1145
last_jb->m_len = len1 + len2;
1147
first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1148
selfptr->m_free_fifo[first_free] = jb;
1149
selfptr->m_first_free = first_free;
1154
/* Just insert at the end of the fifo. */
1155
selfptr->m_free_fifo[first_unused] = jb;
1156
first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
1157
selfptr->m_first_unused = first_unused;
1160
if (unlikely(first_unused == first_free))
1162
/* FIFO full, need to release to global pool. */
1163
Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1165
assert(batch < THR_FREE_BUF_MAX);
1167
rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
1168
selfptr->m_free_fifo[first_free]);
1169
first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1171
} while (batch > 0);
1172
selfptr->m_first_free = first_free;
1179
scan_queue(struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
1181
Uint32 thr_no = selfptr->m_thr_no;
1182
Uint32 **pages = selfptr->m_tq.m_delayed_signals;
1183
Uint32 free = selfptr->m_tq.m_next_free;
1185
for (Uint32 i = 0; i < cnt; i++, ptr++)
1188
if ((val & 0xFFFF) <= end)
1190
Uint32 idx = val >> 16;
1191
Uint32 buf = idx >> 8;
1192
Uint32 pos = 32 * (idx & 0xFF);
1194
Uint32* page = * (pages + buf);
1196
const SignalHeader *s = reinterpret_cast<SignalHeader*>(page + pos);
1197
const Uint32 *data = page + pos + (sizeof(*s)>>2);
1199
ndbout_c("found %p val: %d end: %d", s, val & 0xFFFF, end);
1201
* ToDo: Do measurements of the frequency of these prio A timed signals.
1203
* If they are frequent, we may want to optimize, as sending one prio A
1204
* signal is somewhat expensive compared to sending one prio B.
1206
sendprioa(thr_no, s, data,
1207
data + s->theLength);
1208
* (page + pos) = free;
1213
selfptr->m_tq.m_next_free = free;
1214
memmove(save, ptr, 4 * (cnt - i));
1222
selfptr->m_tq.m_next_free = free;
1228
handle_time_wrap(struct thr_data* selfptr)
1231
struct thr_tq * tq = &selfptr->m_tq;
1232
Uint32 cnt0 = tq->m_cnt[0];
1233
Uint32 cnt1 = tq->m_cnt[1];
1234
Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
1235
Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
1238
tq->m_cnt[0] = cnt0;
1239
tq->m_cnt[1] = cnt1;
1240
for (i = 0; i<cnt0; i++)
1242
assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
1243
tq->m_short_queue[i] -= 32767;
1245
for (i = 0; i<cnt1; i++)
1247
assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
1248
tq->m_long_queue[i] -= 32767;
1254
scan_time_queues_impl(struct thr_data* selfptr, NDB_TICKS now)
1256
struct thr_tq * tq = &selfptr->m_tq;
1257
NDB_TICKS last = selfptr->m_time;
1259
Uint32 curr = tq->m_current_time;
1260
Uint32 cnt0 = tq->m_cnt[0];
1261
Uint32 cnt1 = tq->m_cnt[1];
1264
Uint64 diff = now - last;
1265
Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
1266
Uint32 end = (curr + step);
1269
handle_time_wrap(selfptr);
1270
cnt0 = tq->m_cnt[0];
1271
cnt1 = tq->m_cnt[1];
1275
Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
1276
Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
1278
tq->m_current_time = end;
1279
tq->m_cnt[0] = cnt0 - tmp0;
1280
tq->m_cnt[1] = cnt1 - tmp1;
1281
selfptr->m_time = last + step;
1286
scan_time_queues(struct thr_data* selfptr, NDB_TICKS now)
1288
if (selfptr->m_time != now)
1289
scan_time_queues_impl(selfptr, now);
1295
get_free_slot(struct thr_repository* rep,
1296
struct thr_data* selfptr,
1299
struct thr_tq * tq = &selfptr->m_tq;
1300
Uint32 idx = tq->m_next_free;
1302
Uint32 buf = idx >> 8;
1303
Uint32 pos = idx & 0xFF;
1307
Uint32* page = * (tq->m_delayed_signals + buf);
1308
Uint32* ptr = page + (32 * pos);
1309
tq->m_next_free = * ptr;
1314
Uint32 thr_no = selfptr->m_thr_no;
1315
for (Uint32 i = 0; i<thr_tq::PAGES; i++)
1317
if (tq->m_delayed_signals[i] == 0)
1319
struct thr_job_buffer *jb = seize_buffer(rep, thr_no, false);
1320
Uint32 * page = reinterpret_cast<Uint32*>(jb);
1321
tq->m_delayed_signals[i] = page;
1323
ndbout_c("saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
1328
for (Uint32 j = 0; j<255; j ++)
1330
page[j * 32] = (i << 8) + (j + 1);
1332
page[255*32] = RNIL;
1342
senddelay(Uint32 thr_no, const SignalHeader* s, Uint32 delay)
1344
struct thr_repository* rep = &g_thr_repository;
1345
struct thr_data * selfptr = rep->m_thread + thr_no;
1346
assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
1347
unsigned siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
1353
Uint32 alarm = selfptr->m_tq.m_current_time + delay;
1354
Uint32 nexttimer = selfptr->m_tq.m_next_timer;
1357
cntptr = selfptr->m_tq.m_cnt + 0;
1358
queueptr = selfptr->m_tq.m_short_queue;
1359
max = thr_tq::SQ_SIZE;
1363
cntptr = selfptr->m_tq.m_cnt + 1;
1364
queueptr = selfptr->m_tq.m_long_queue;
1365
max = thr_tq::LQ_SIZE;
1369
Uint32* ptr = get_free_slot(rep, selfptr, &idx);
1370
memcpy(ptr, s, 4*siglen);
1373
ndbout_c("now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
1374
selfptr->m_tq.m_current_time,
1376
getSignalName(s->theVerId_signalNumber),
1377
getBlockName(refToBlock(s->theSendersBlockRef)),
1378
getBlockName(s->theReceiversBlockNumber),
1383
Uint32 cnt = *cntptr;
1384
Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
1387
selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
1391
queueptr[0] = newentry;
1396
for (i = 0; i<cnt; i++)
1398
Uint32 save = queueptr[i];
1399
if ((save & 0xFFFF) > alarm)
1401
memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
1402
queueptr[i] = newentry;
1407
queueptr[i] = newentry;
1417
* Flush the write state to the job queue, making any new signals available to
1418
* receiving threads.
1421
* - The general version flush_write_state_other() which may flush to
1422
* any thread, and possibly signal any waiters.
1423
* - The special version flush_write_state_self() which should only be used
1424
* to flush messages to itself.
1426
* Call to these functions are encapsulated through flush_write_state
1427
* which decides which of these functions to call.
1431
flush_write_state_self(thr_job_queue_head *q_head, thr_jb_write_state *w)
1434
* Can simplify the flush_write_state when writing to myself:
1435
* Simply update write references wo/ mutex, memory barrier and signaling
1437
w->m_write_buffer->m_len = w->m_write_pos;
1438
q_head->m_write_index = w->m_write_index;
1439
w->m_pending_signals_wakeup = 0;
1440
w->m_pending_signals = 0;
1445
flush_write_state_other(thr_data *dstptr, thr_job_queue_head *q_head,
1446
thr_jb_write_state *w)
1449
* Two write memory barriers here, as assigning m_len may make signal data
1450
* available to other threads, and assigning m_write_index may make new
1451
* buffers available.
1453
* We could optimize this by only doing it as needed, and only doing it
1454
* once before setting all m_len, and once before setting all m_write_index.
1456
* But wmb() is a no-op anyway in x86 ...
1459
w->m_write_buffer->m_len = w->m_write_pos;
1461
q_head->m_write_index = w->m_write_index;
1463
w->m_pending_signals_wakeup += w->m_pending_signals;
1464
w->m_pending_signals = 0;
1466
if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
1468
w->m_pending_signals_wakeup = 0;
1469
wakeup(&(dstptr->m_waiter));
1475
flush_write_state(const thr_data *selfptr, thr_data *dstptr,
1476
thr_job_queue_head *q_head, thr_jb_write_state *w)
1478
if (dstptr == selfptr)
1480
flush_write_state_self(q_head, w);
1484
flush_write_state_other(dstptr, q_head, w);
1491
flush_jbb_write_state(thr_data *selfptr)
1493
Uint32 thr_count = g_thr_repository.m_thread_count;
1494
Uint32 self = selfptr->m_thr_no;
1496
thr_jb_write_state *w = selfptr->m_write_states;
1497
thr_data *thrptr = g_thr_repository.m_thread;
1498
for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
1500
if (w->m_pending_signals || w->m_pending_signals_wakeup)
1502
w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
1503
thr_job_queue_head *q_head = thrptr->m_in_queue_head + self;
1504
flush_write_state(selfptr, thrptr, q_head, w);
1510
* Transporter will receive 1024 signals (MAX_RECEIVED_SIGNALS)
1511
* before running check_job_buffers
1513
* This function returns 0 if there is space to receive this amount of
1518
check_job_buffers(struct thr_repository* rep)
1520
const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
1521
unsigned thr_no = receiver_thread_no;
1522
const thr_data *thrptr = rep->m_thread;
1523
for (unsigned i = 0; i<num_threads; i++, thrptr++)
1526
* NOTE: m_read_index is read wo/ lock (and updated by different thread)
1527
* but since the different thread can only consume
1528
* signals this means that the value returned from this
1529
* function is always conservative (i.e it can be better than
1530
* returned value, if read-index has moved but we didnt see it)
1532
const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1533
unsigned ri = q_head->m_read_index;
1534
unsigned wi = q_head->m_write_index;
1535
unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
1536
if (1 + minfree + busy >= thr_job_queue::SIZE)
1546
* Compute max signals that thr_no can execute wo/ risking
1549
* see-also update_sched_config
1552
* 1) compute free-slots in ring-buffer from self to each thread in system
1553
* 2) pick smallest value
1554
* 3) compute how many signals this corresponds to
1555
* 4) compute how many signals self can execute if all were to be to
1556
* the thread with the fullest ring-buffer (i.e the worst case)
1558
* Assumption: each signal may send *at most* 4 signals
1559
* - this assumption is made the same in ndbd and ndbmtd and is
1560
* mostly followed by block-code, although not it all places :-(
1564
compute_max_signals_to_execute(Uint32 thr_no)
1566
Uint32 minfree = thr_job_queue::SIZE;
1567
const struct thr_repository* rep = &g_thr_repository;
1568
const thr_data *thrptr = rep->m_thread;
1570
for (unsigned i = 0; i<num_threads; i++, thrptr++)
1573
* NOTE: m_read_index is read wo/ lock (and updated by different thread)
1574
* but since the different thread can only consume
1575
* signals this means that the value returned from this
1576
* function is always conservative (i.e it can be better than
1577
* returned value, if read-index has moved but we didnt see it)
1579
const thr_job_queue_head *q_head = thrptr->m_in_queue_head + thr_no;
1580
unsigned ri = q_head->m_read_index;
1581
unsigned wi = q_head->m_write_index;
1582
unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
1584
assert(free <= thr_job_queue::SIZE);
1592
if (minfree >= (1 + SAFETY))
1594
return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
1602
//#define NDBMT_RAND_YIELD
1603
#ifdef NDBMT_RAND_YIELD
1604
static Uint32 g_rand_yield = 0;
1607
rand_yield(Uint32 limit, void* ptr0, void * ptr1)
1610
UintPtr tmp = UintPtr(ptr0) + UintPtr(ptr1);
1611
Uint8* tmpptr = (Uint8*)&tmp;
1612
Uint32 sum = g_rand_yield;
1613
for (Uint32 i = 0; i<sizeof(tmp); i++)
1614
sum = 33 * sum + tmpptr[i];
1616
if ((sum % 100) < limit)
1623
static inline void rand_yield(Uint32 limit, void* ptr0, void * ptr1) {}
1629
trp_callback::reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes)
1632
Signal &signal = * new (&signalT) Signal(0);
1633
memset(&signal.header, 0, sizeof(signal.header));
1635
signal.header.theLength = 3;
1636
signal.header.theSendersSignalId = 0;
1637
signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
1638
signal.theData[0] = NDB_LE_SendBytesStatistic;
1639
signal.theData[1] = nodeId;
1640
signal.theData[2] = (Uint32)(bytes/count);
1641
signal.header.theVerId_signalNumber = GSN_EVENT_REP;
1642
signal.header.theReceiversBlockNumber = CMVMI;
1643
sendlocal(g_thr_repository.m_send_buffers[nodeId].m_send_thread,
1644
&signalT.header, signalT.theData, NULL);
1648
* To lock during connect/disconnect, we take both the send lock for the node
1649
* (to protect performSend(), and the global receive lock (to protect
1650
* performReceive()). By having two locks, we avoid contention between the
1651
* common send and receive operations.
1653
* We can have contention between connect/disconnect of one transporter and
1654
* receive for the others. But the transporter code should try to keep this
1655
* lock only briefly, ie. only to set state to DISCONNECTING / socket fd to
1656
* NDB_INVALID_SOCKET, not for the actual close() syscall.
1659
trp_callback::lock_transporter(NodeId node)
1661
struct thr_repository* rep = &g_thr_repository;
1663
* Note: take the send lock _first_, so that we will not hold the receive
1664
* lock while blocking on the send lock.
1666
* The reverse case, blocking send lock for one transporter while waiting
1667
* for receive lock, is not a problem, as the transporter being blocked is
1668
* in any case disconnecting/connecting at this point in time, and sends are
1669
* non-waiting (so we will not block sending on other transporters).
1671
lock(&rep->m_send_buffers[node].m_send_lock);
1672
lock(&rep->m_receive_lock);
1676
trp_callback::unlock_transporter(NodeId node)
1678
struct thr_repository* rep = &g_thr_repository;
1679
unlock(&rep->m_receive_lock);
1680
unlock(&rep->m_send_buffers[node].m_send_lock);
1684
trp_callback::checkJobBuffer()
1686
struct thr_repository* rep = &g_thr_repository;
1687
if (unlikely(check_job_buffers(rep)))
1692
* theoretically (or when we do single threaded by using ndbmtd with
1693
* all in same thread) we should execute signals here...to
1694
* prevent dead-lock, but...with current ndbmtd only CMVMI runs in
1695
* this thread, and other thread is waiting for CMVMI
1696
* except for QMGR open/close connection, but that is not
1697
* (i think) sufficient to create a deadlock
1701
* On a CMT chip where #CPU >= #NDB-threads sched_yield() is
1702
* effectively a NOOP as there will normally be an idle CPU available
1703
* to immediately resume thread execution.
1704
* On a Niagara chip this may severely impact performance as the CPUs
1705
* are virtualized by timemultiplexing the physical core.
1706
* The thread should really be 'parked' on
1707
* a condition to free its execution resources.
1709
// usleep(a-few-usec); /* A micro-sleep would likely have been better... */
1710
#if defined HAVE_SCHED_YIELD
1712
#elif defined _WIN32
1715
NdbSleep_MilliSleep(0);
1718
} while (check_job_buffers(rep));
1725
* Link all send-buffer-pages into *one*
1726
* single linked list of buffers
1728
* TODO: This is not completly fair,
1729
* it would be better to get one entry from each thr_send_queue
1730
* per thread instead (until empty)
1734
link_thread_send_buffers(thr_repository::send_buffer * sb, Uint32 node)
1736
Uint32 ri[MAX_THREADS];
1737
Uint32 wi[MAX_THREADS];
1738
thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
1739
for (unsigned thr = 0; thr < num_threads; thr++)
1741
ri[thr] = sb->m_read_index[thr];
1742
wi[thr] = src[thr].m_write_index;
1745
Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
1746
thr_send_page* sentinel_page = new (&sentinel[0]) thr_send_page;
1747
sentinel_page->m_next = 0;
1749
struct thr_send_buffer tmp;
1750
tmp.m_first_page = sentinel_page;
1751
tmp.m_last_page = sentinel_page;
1754
for (unsigned thr = 0; thr < num_threads; thr++, src++)
1763
thr_send_page * p = src->m_buffers[r];
1764
assert(p->m_start == 0);
1765
bytes += p->m_bytes;
1766
tmp.m_last_page->m_next = p;
1767
while (p->m_next != 0)
1770
assert(p->m_start == 0);
1771
bytes += p->m_bytes;
1773
tmp.m_last_page = p;
1774
assert(tmp.m_last_page != 0);
1775
r = (r + 1) % thr_send_queue::SIZE;
1777
sb->m_read_index[thr] = r;
1785
assert(sb->m_buffer.m_first_page != 0);
1786
assert(sb->m_buffer.m_last_page != 0);
1787
sb->m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
1788
sb->m_buffer.m_last_page = tmp.m_last_page;
1792
assert(sb->m_buffer.m_first_page == 0);
1793
assert(sb->m_buffer.m_last_page == 0);
1794
sb->m_buffer.m_first_page = tmp.m_first_page->m_next;
1795
sb->m_buffer.m_last_page = tmp.m_last_page;
1797
sb->m_bytes += bytes;
1804
trp_callback::get_bytes_to_send_iovec(NodeId node,
1805
struct iovec *dst, Uint32 max)
1807
thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1809
Uint32 bytes = link_thread_send_buffers(sb, node);
1810
if (max == 0 || bytes == 0)
1814
* Process linked-list and put into iovecs
1815
* TODO: Here we would also pack stuff to get better utilization
1819
thr_send_page * p = sb->m_buffer.m_first_page;
1821
dst[pos].iov_len = p->m_bytes;
1822
dst[pos].iov_base = p->m_data + p->m_start;
1823
assert(p->m_start + p->m_bytes <= p->max_bytes());
1828
} while (max && p != 0);
1835
release_list(thread_local_pool<thr_send_page>* pool,
1836
thr_send_page* head, thr_send_page * tail)
1838
while (head != tail)
1840
thr_send_page * tmp = head;
1841
head = head->m_next;
1842
pool->release_local(tmp);
1844
pool->release_local(tail);
1850
bytes_sent(thread_local_pool<thr_send_page>* pool,
1851
thr_repository::send_buffer* sb, Uint32 bytes)
1855
Uint32 remain = bytes;
1856
thr_send_page * prev = 0;
1857
thr_send_page * curr = sb->m_buffer.m_first_page;
1859
assert(sb->m_bytes >= bytes);
1860
while (remain && remain >= curr->m_bytes)
1862
remain -= curr->m_bytes;
1864
curr = curr->m_next;
1867
Uint32 total_bytes = sb->m_bytes;
1868
if (total_bytes == bytes)
1871
* Every thing was released
1873
release_list(pool, sb->m_buffer.m_first_page, sb->m_buffer.m_last_page);
1874
sb->m_buffer.m_first_page = 0;
1875
sb->m_buffer.m_last_page = 0;
1882
* Half a page was released
1884
curr->m_start += remain;
1885
assert(curr->m_bytes > remain);
1886
curr->m_bytes -= remain;
1889
release_list(pool, sb->m_buffer.m_first_page, prev);
1895
* X full page(s) was released
1899
release_list(pool, sb->m_buffer.m_first_page, prev);
1903
pool->release_local(sb->m_buffer.m_first_page);
1907
sb->m_buffer.m_first_page = curr;
1908
assert(sb->m_bytes > bytes);
1909
sb->m_bytes -= bytes;
1914
trp_callback::bytes_sent(NodeId node, Uint32 bytes)
1916
thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1917
Uint32 thr_no = sb->m_send_thread;
1918
assert(thr_no != NO_SEND_THREAD);
1919
return ::bytes_sent(&g_thr_repository.m_thread[thr_no].m_send_buffer_pool,
1924
trp_callback::has_data_to_send(NodeId node)
1928
thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers + node;
1929
Uint32 thr_no = sb->m_send_thread;
1930
assert(thr_no != NO_SEND_THREAD);
1931
assert((sb->m_bytes > 0) == (sb->m_buffer.m_first_page != 0));
1932
if (sb->m_bytes > 0 || sb->m_force_send)
1935
thr_send_queue * dst = g_thr_repository.m_thread_send_buffers[node]+thr_no;
1937
return sb->m_read_index[thr_no] != dst->m_write_index;
1941
trp_callback::reset_send_buffer(NodeId node, bool should_be_empty)
1943
struct thr_repository *rep = &g_thr_repository;
1944
thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
1947
thread_local_pool<thr_send_page> pool(&rep->m_sb_pool, 0);
1949
lock(&sb->m_send_lock);
1953
Uint32 count = get_bytes_to_send_iovec(node, v, sizeof(v)/sizeof(v[0]));
1956
assert(!should_be_empty); // Got data when it should be empty
1958
for (Uint32 i = 0; i < count; i++)
1959
bytes += v[i].iov_len;
1961
::bytes_sent(&pool, sb, bytes);
1964
unlock(&sb->m_send_lock);
1966
pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
1971
register_pending_send(thr_data *selfptr, Uint32 nodeId)
1973
/* Mark that this node has pending send data. */
1974
if (!selfptr->m_pending_send_mask.get(nodeId))
1976
selfptr->m_pending_send_mask.set(nodeId, 1);
1977
Uint32 i = selfptr->m_pending_send_count;
1978
selfptr->m_pending_send_nodes[i] = nodeId;
1979
selfptr->m_pending_send_count = i + 1;
1984
* publish thread-locally prepared send-buffer
1988
flush_send_buffer(thr_data* selfptr, Uint32 node)
1990
Uint32 thr_no = selfptr->m_thr_no;
1991
thr_send_buffer * src = selfptr->m_send_buffers + node;
1992
thr_repository* rep = &g_thr_repository;
1994
if (src->m_first_page == 0)
1998
assert(src->m_last_page != 0);
2000
thr_send_queue * dst = rep->m_thread_send_buffers[node]+thr_no;
2001
thr_repository::send_buffer* sb = rep->m_send_buffers+node;
2003
Uint32 wi = dst->m_write_index;
2004
Uint32 next = (wi + 1) % thr_send_queue::SIZE;
2005
Uint32 ri = sb->m_read_index[thr_no];
2007
if (unlikely(next == ri))
2009
lock(&sb->m_send_lock);
2010
link_thread_send_buffers(sb, node);
2011
unlock(&sb->m_send_lock);
2014
dst->m_buffers[wi] = src->m_first_page;
2016
dst->m_write_index = next;
2018
src->m_first_page = 0;
2019
src->m_last_page = 0;
2023
* This is used in case send buffer gets full, to force an emergency send,
2024
* hopefully freeing up some buffer space for the next signal.
2027
mt_send_handle::forceSend(NodeId nodeId)
2029
struct thr_repository *rep = &g_thr_repository;
2030
struct thr_data *selfptr = m_selfptr;
2031
struct thr_repository::send_buffer * sb = rep->m_send_buffers + nodeId;
2035
sb->m_force_send = 0;
2036
lock(&sb->m_send_lock);
2037
sb->m_send_thread = selfptr->m_thr_no;
2038
globalTransporterRegistry.performSend(nodeId);
2039
sb->m_send_thread = NO_SEND_THREAD;
2040
unlock(&sb->m_send_lock);
2041
} while (sb->m_force_send);
2043
selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2053
try_send(thr_data * selfptr, Uint32 node)
2055
struct thr_repository *rep = &g_thr_repository;
2056
struct thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2060
if (trylock(&sb->m_send_lock) != 0)
2065
sb->m_force_send = 0;
2068
sb->m_send_thread = selfptr->m_thr_no;
2069
globalTransporterRegistry.performSend(node);
2070
sb->m_send_thread = NO_SEND_THREAD;
2071
unlock(&sb->m_send_lock);
2072
} while (sb->m_force_send);
2074
selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2078
* Flush send buffers and append them to dst. nodes send queue
2080
* Flushed buffer contents are piggybacked when another thread
2081
* do_send() to the same dst. node. This makes it possible to have
2082
* more data included in each message, and thereby reduces total
2083
* #messages handled by the OS which really impacts performance!
2087
do_flush(struct thr_data* selfptr)
2090
Uint32 count = selfptr->m_pending_send_count;
2091
Uint8 *nodes = selfptr->m_pending_send_nodes;
2093
for (i = 0; i < count; i++)
2095
flush_send_buffer(selfptr, nodes[i]);
2100
* Send any pending data to remote nodes.
2102
* If MUST_SEND is false, will only try to lock the send lock, but if it would
2103
* block, that node is skipped, to be tried again next time round.
2105
* If MUST_SEND is true, will always take the lock, waiting on it if needed.
2107
* The list of pending nodes to send to is thread-local, but the per-node send
2108
* buffer is shared by all threads. Thus we might skip a node for which
2109
* another thread has pending send data, and we might send pending data also
2110
* for another thread without clearing the node from the pending list of that
2111
* other thread (but we will never loose signals due to this).
2115
do_send(struct thr_data* selfptr, bool must_send)
2118
Uint32 count = selfptr->m_pending_send_count;
2119
Uint8 *nodes = selfptr->m_pending_send_nodes;
2120
struct thr_repository* rep = &g_thr_repository;
2124
return 0; // send-buffers empty
2127
/* Clear the pending list. */
2128
selfptr->m_pending_send_mask.clear();
2129
selfptr->m_pending_send_count = 0;
2131
for (i = 0; i < count; i++)
2133
Uint32 node = nodes[i];
2134
selfptr->m_watchdog_counter = 6;
2136
flush_send_buffer(selfptr, node);
2138
thr_repository::send_buffer * sb = rep->m_send_buffers + node;
2141
* If we must send now, set the force_send flag.
2143
* This will ensure that if we do not get the send lock, the thread
2144
* holding the lock will try sending again for us when it has released
2147
* The lock/unlock pair works as a memory barrier to ensure that the
2148
* flag update is flushed to the other thread.
2152
sb->m_force_send = 1;
2157
if (trylock(&sb->m_send_lock) != 0)
2162
* Not doing this node now, re-add to pending list.
2164
* As we only add from the start of an empty list, we are safe from
2165
* overwriting the list while we are iterating over it.
2167
register_pending_send(selfptr, node);
2171
/* Other thread will send for us as we set m_force_send. */
2177
* Now clear the flag, and start sending all data available to this node.
2179
* Put a memory barrier here, so that if another thread tries to grab
2180
* the send lock but fails due to us holding it here, we either
2181
* 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
2182
* 2) We clear here the flag just set by the other thread, but then we
2183
* will (thanks to mb()) be able to see and send all of the data already
2184
* in the first send iteration.
2186
sb->m_force_send = 0;
2190
* Set m_send_thr so that our transporter callback can know which thread
2191
* holds the send lock for this remote node.
2193
sb->m_send_thread = selfptr->m_thr_no;
2194
int res = globalTransporterRegistry.performSend(node);
2195
sb->m_send_thread = NO_SEND_THREAD;
2196
unlock(&sb->m_send_lock);
2199
register_pending_send(selfptr, node);
2201
} while (sb->m_force_send);
2204
selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2206
return selfptr->m_pending_send_count;
2210
mt_send_handle::getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max)
2212
struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2213
thr_send_page * p = b->m_last_page;
2214
if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
2216
return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
2220
// TODO: maybe dont always flush on page-boundary ???
2221
flush_send_buffer(m_selfptr, node);
2222
try_send(m_selfptr, node);
2225
if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm,
2226
RG_TRANSPORTER_BUFFERS)) != 0)
2231
b->m_first_page = b->m_last_page = p;
2232
return (Uint32*)p->m_data;
2238
mt_send_handle::updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio)
2240
struct thr_send_buffer * b = m_selfptr->m_send_buffers+node;
2241
thr_send_page * p = b->m_last_page;
2242
p->m_bytes += lenBytes;
2247
* Insert a signal in a job queue.
2249
* The signal is not visible to consumers yet after return from this function,
2250
* only recorded in the thr_jb_write_state. It is necessary to first call
2251
* flush_write_state() for this.
2253
* The new_buffer is a job buffer to use if the current one gets full. If used,
2254
* we return true, indicating that the caller should allocate a new one for
2255
* the next call. (This is done to allow to insert under lock, but do the
2256
* allocation outside the lock).
2260
insert_signal(thr_job_queue *q, thr_jb_write_state *w, Uint32 prioa,
2261
const SignalHeader* sh, const Uint32 *data,
2262
const Uint32 secPtr[3], thr_job_buffer *new_buffer)
2264
Uint32 write_pos = w->m_write_pos;
2265
Uint32 datalen = sh->theLength;
2266
assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
2267
memcpy(w->m_write_buffer->m_data + write_pos, sh, sizeof(*sh));
2268
write_pos += (sizeof(*sh) >> 2);
2269
memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
2270
write_pos += datalen;
2271
const Uint32 *p= secPtr;
2272
for (Uint32 i = 0; i < sh->m_noOfSections; i++)
2273
w->m_write_buffer->m_data[write_pos++] = *p++;
2274
w->m_pending_signals++;
2276
#if SIZEOF_CHARP == 8
2277
/* Align to 8-byte boundary, to ensure aligned copies. */
2278
write_pos= (write_pos+1) & ~((Uint32)1);
2282
* We make sure that there is always room for at least one signal in the
2283
* current buffer in the queue, so one insert is always possible without
2284
* adding a new buffer.
2286
if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
2288
w->m_write_pos = write_pos;
2294
* Need a write memory barrier here, as this might make signal data visible
2297
* ToDo: We actually only need the wmb() here if we already make this
2298
* buffer visible to the other thread. So we might optimize it a bit. But
2299
* wmb() is a no-op on x86 anyway...
2302
w->m_write_buffer->m_len = write_pos;
2303
Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
2306
* Full job buffer is fatal.
2308
* ToDo: should we wait for it to become non-full? There is no guarantee
2309
* that this will actually happen...
2311
* Or alternatively, ndbrequire() ?
2313
if (unlikely(write_index == q->m_head->m_read_index))
2317
new_buffer->m_len = 0;
2318
new_buffer->m_prioa = prioa;
2319
q->m_buffers[write_index] = new_buffer;
2320
w->m_write_index = write_index;
2322
w->m_write_buffer = new_buffer;
2323
return true; // Buffer new_buffer used
2326
return false; // Buffer new_buffer not used
2331
read_jbb_state(thr_data *selfptr, Uint32 count)
2334
thr_jb_read_state *r = selfptr->m_read_states;
2335
const thr_job_queue *q = selfptr->m_in_queue;
2336
for (Uint32 i = 0; i < count; i++,r++,q++)
2338
Uint32 read_index = r->m_read_index;
2341
* Optimization: Only reload when possibly empty.
2342
* Avoid cache reload of shared thr_job_queue_head
2344
if (r->m_write_index == read_index)
2346
r->m_write_index = q->m_head->m_write_index;
2347
read_barrier_depends();
2348
r->m_read_end = q->m_buffers[read_index]->m_len;
2355
read_jba_state(thr_data *selfptr)
2357
thr_jb_read_state *r = &(selfptr->m_jba_read_state);
2358
r->m_write_index = selfptr->m_jba_head.m_write_index;
2359
read_barrier_depends();
2360
r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
2361
return r->is_empty();
2364
/* Check all job queues, return true only if all are empty. */
2366
check_queues_empty(thr_data *selfptr)
2368
Uint32 thr_count = g_thr_repository.m_thread_count;
2369
bool empty = read_jba_state(selfptr);
2373
read_jbb_state(selfptr, thr_count);
2374
const thr_jb_read_state *r = selfptr->m_read_states;
2375
for (Uint32 i = 0; i < thr_count; i++,r++)
2384
* Execute at most MAX_SIGNALS signals from one job queue, updating local read
2385
* state as appropriate.
2387
* Returns number of signals actually executed.
2391
execute_signals(thr_data *selfptr, thr_job_queue *q, thr_jb_read_state *r,
2392
Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
2395
Uint32 read_index = r->m_read_index;
2396
Uint32 write_index = r->m_write_index;
2397
Uint32 read_pos = r->m_read_pos;
2398
Uint32 read_end = r->m_read_end;
2399
Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
2401
if (read_index == write_index && read_pos >= read_end)
2402
return 0; // empty read_state
2404
thr_job_buffer *read_buffer = r->m_read_buffer;
2406
for (num_signals = 0; num_signals < max_signals; num_signals++)
2408
while (read_pos >= read_end)
2410
if (read_index == write_index)
2412
/* No more available now. */
2417
/* Move to next buffer. */
2418
read_index = (read_index + 1) % thr_job_queue::SIZE;
2419
release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
2420
read_buffer = q->m_buffers[read_index];
2422
read_end = read_buffer->m_len;
2423
/* Update thread-local read state. */
2424
r->m_read_index = q->m_head->m_read_index = read_index;
2425
r->m_read_buffer = read_buffer;
2426
r->m_read_pos = read_pos;
2427
r->m_read_end = read_end;
2432
* These pre-fetching were found using OProfile to reduce cache misses.
2433
* (Though on Intel Core 2, they do not give much speedup, as apparently
2434
* the hardware prefetcher is already doing a fairly good job).
2436
NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
2437
NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
2439
/* Now execute the signal. */
2441
reinterpret_cast<SignalHeader*>(read_buffer->m_data + read_pos);
2442
Uint32 seccnt = s->m_noOfSections;
2443
Uint32 siglen = (sizeof(*s)>>2) + s->theLength;
2446
NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
2448
Uint32 bno = blockToMain(s->theReceiversBlockNumber);
2449
Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
2450
SimulatedBlock* block = globalData.mt_getBlock(bno, ino);
2453
Uint32 gsn = s->theVerId_signalNumber;
2454
*watchDogCounter = 1;
2455
/* Must update original buffer so signal dump will see it. */
2456
s->theSignalId = (*signalIdCounter)++;
2457
memcpy(&sig->header, s, 4*siglen);
2458
sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
2459
sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
2460
sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
2462
read_pos += siglen + seccnt;
2463
#if SIZEOF_CHARP == 8
2464
/* Handle 8-byte alignment. */
2465
read_pos = (read_pos + 1) & ~((Uint32)1);
2468
/* Update just before execute so signal dump can know how far we are. */
2469
r->m_read_pos = read_pos;
2472
if (globalData.testOn)
2473
{ //wl4391_todo segments
2474
SegmentedSectionPtr ptr[3];
2475
ptr[0].i = sig->m_sectionPtrI[0];
2476
ptr[1].i = sig->m_sectionPtrI[1];
2477
ptr[2].i = sig->m_sectionPtrI[2];
2478
::getSections(seccnt, ptr);
2479
globalSignalLoggers.executeSignal(*s,
2487
block->executeFunction(gsn, sig);
2495
run_job_buffers(thr_data *selfptr, Signal *sig, Uint32 *signalIdCounter)
2497
Uint32 thr_count = g_thr_repository.m_thread_count;
2498
Uint32 signal_count = 0;
2499
Uint32 perjb = selfptr->m_max_signals_per_jb;
2501
read_jbb_state(selfptr, thr_count);
2503
* A load memory barrier to ensure that we see any prio A signal sent later
2504
* than loaded prio B signals.
2508
thr_job_queue *queue = selfptr->m_in_queue;
2509
thr_jb_read_state *read_state = selfptr->m_read_states;
2510
for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
2511
send_thr_no++,queue++,read_state++)
2513
/* Read the prio A state often, to avoid starvation of prio A. */
2514
bool jba_empty = read_jba_state(selfptr);
2517
static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
2518
signal_count += execute_signals(selfptr, &(selfptr->m_jba),
2519
&(selfptr->m_jba_read_state), sig,
2520
max_prioA, signalIdCounter);
2523
/* Now execute prio B signals from one thread. */
2524
signal_count += execute_signals(selfptr, queue, read_state,
2525
sig, perjb, signalIdCounter);
2528
return signal_count;
2531
struct thr_map_entry {
2532
enum { NULL_THR_NO = 0xFF };
2534
thr_map_entry() : thr_no(NULL_THR_NO) {}
2537
static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
2539
static inline Uint32
2540
block2ThreadId(Uint32 block, Uint32 instance)
2542
assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
2543
Uint32 index = block - MIN_BLOCK_NO;
2544
assert(instance < MAX_BLOCK_INSTANCES);
2545
const thr_map_entry& entry = thr_map[index][instance];
2546
assert(entry.thr_no < num_threads);
2547
return entry.thr_no;
2551
add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
2553
assert(main == blockToMain(main));
2554
Uint32 index = main - MIN_BLOCK_NO;
2555
assert(index < NO_OF_BLOCKS);
2556
assert(instance < MAX_BLOCK_INSTANCES);
2558
SimulatedBlock* b = globalData.getBlock(main, instance);
2561
/* Block number including instance. */
2562
Uint32 block = numberToBlock(main, instance);
2564
require(thr_no < num_threads);
2565
struct thr_repository* rep = &g_thr_repository;
2566
thr_data* thr_ptr = rep->m_thread + thr_no;
2571
for (i = 0; i < thr_ptr->m_instance_count; i++)
2572
require(thr_ptr->m_instance_list[i] != block);
2574
require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
2575
thr_ptr->m_instance_list[thr_ptr->m_instance_count++] = block;
2577
SimulatedBlock::ThreadContext ctx;
2578
ctx.threadId = thr_no;
2579
ctx.jamBuffer = &thr_ptr->m_jam;
2580
ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
2581
ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
2582
b->assignToThread(ctx);
2584
/* Create entry mapping block to thread. */
2585
thr_map_entry& entry = thr_map[index][instance];
2586
require(entry.thr_no == thr_map_entry::NULL_THR_NO);
2587
entry.thr_no = thr_no;
2590
/* Static assignment of main instances (before first signal). */
2594
/* Keep mt-classic assignments in MT LQH. */
2595
const Uint32 thr_GLOBAL = 0;
2596
const Uint32 thr_LOCAL = 1;
2597
const Uint32 thr_RECEIVER = receiver_thread_no;
2599
add_thr_map(BACKUP, 0, thr_LOCAL);
2600
add_thr_map(DBTC, 0, thr_GLOBAL);
2601
add_thr_map(DBDIH, 0, thr_GLOBAL);
2602
add_thr_map(DBLQH, 0, thr_LOCAL);
2603
add_thr_map(DBACC, 0, thr_LOCAL);
2604
add_thr_map(DBTUP, 0, thr_LOCAL);
2605
add_thr_map(DBDICT, 0, thr_GLOBAL);
2606
add_thr_map(NDBCNTR, 0, thr_GLOBAL);
2607
add_thr_map(QMGR, 0, thr_GLOBAL);
2608
add_thr_map(NDBFS, 0, thr_GLOBAL);
2609
add_thr_map(CMVMI, 0, thr_RECEIVER);
2610
add_thr_map(TRIX, 0, thr_GLOBAL);
2611
add_thr_map(DBUTIL, 0, thr_GLOBAL);
2612
add_thr_map(SUMA, 0, thr_LOCAL);
2613
add_thr_map(DBTUX, 0, thr_LOCAL);
2614
add_thr_map(TSMAN, 0, thr_LOCAL);
2615
add_thr_map(LGMAN, 0, thr_LOCAL);
2616
add_thr_map(PGMAN, 0, thr_LOCAL);
2617
add_thr_map(RESTORE, 0, thr_LOCAL);
2618
add_thr_map(DBINFO, 0, thr_LOCAL);
2619
add_thr_map(DBSPJ, 0, thr_GLOBAL);
2622
/* Workers added by LocalProxy (before first signal). */
2624
add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
2626
require(instance != 0);
2627
Uint32 i = instance - 1;
2628
Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
2629
add_thr_map(block, instance, thr_no);
2632
/* Extra workers run`in proxy thread. */
2634
add_extra_worker_thr_map(Uint32 block, Uint32 instance)
2636
require(instance != 0);
2637
Uint32 thr_no = block2ThreadId(block, 0);
2638
add_thr_map(block, instance, thr_no);
2642
* create the duplicate entries needed so that
2643
* sender doesnt need to know how many instances there
2644
* actually are in this node...
2646
* if only 1 instance...then duplicate that for all slots
2647
* else assume instance 0 is proxy...and duplicate workers (modulo)
2649
* NOTE: extra pgman worker is instance 5
2654
for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
2656
Uint32 bno = b + MIN_BLOCK_NO;
2658
while (cnt < MAX_BLOCK_INSTANCES &&
2659
thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
2662
if (cnt != MAX_BLOCK_INSTANCES)
2664
SimulatedBlock * main = globalData.getBlock(bno, 0);
2665
for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
2667
Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
2668
if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
2670
thr_map[b][i] = thr_map[b][dup];
2671
main->addInstance(globalData.getBlock(bno, dup), i);
2676
* extra pgman instance
2678
require(bno == PGMAN);
2685
static void reportSignalStats(Uint32 self, Uint32 a_count, Uint32 a_size,
2686
Uint32 b_count, Uint32 b_size)
2689
Signal *s= new (&sT) Signal(0);
2691
memset(&s->header, 0, sizeof(s->header));
2692
s->header.theLength = 6;
2693
s->header.theSendersSignalId = 0;
2694
s->header.theSendersBlockRef = numberToRef(0, 0);
2695
s->header.theVerId_signalNumber = GSN_EVENT_REP;
2696
s->header.theReceiversBlockNumber = CMVMI;
2697
s->theData[0] = NDB_LE_MTSignalStatistics;
2698
s->theData[1] = self;
2699
s->theData[2] = a_count;
2700
s->theData[3] = a_size;
2701
s->theData[4] = b_count;
2702
s->theData[5] = b_size;
2703
/* ToDo: need this really be prio A like in old code? */
2704
sendlocal(self, &s->header, s->theData,
2709
update_sched_stats(thr_data *selfptr)
2711
if(selfptr->m_prioa_count + selfptr->m_priob_count >= 2000000)
2713
reportSignalStats(selfptr->m_thr_no,
2714
selfptr->m_prioa_count,
2715
selfptr->m_prioa_size,
2716
selfptr->m_priob_count,
2717
selfptr->m_priob_size);
2718
selfptr->m_prioa_count = 0;
2719
selfptr->m_prioa_size = 0;
2720
selfptr->m_priob_count = 0;
2721
selfptr->m_priob_size = 0;
2724
Uint32 thr_no = selfptr->m_thr_no;
2725
ndbout_c("--- %u fifo: %u jba: %u global: %u",
2727
fifo_used_pages(selfptr),
2728
selfptr->m_jba_head.used(),
2729
g_thr_repository.m_free_list.m_cnt);
2730
for (Uint32 i = 0; i<num_threads; i++)
2732
ndbout_c(" %u-%u : %u",
2733
thr_no, i, selfptr->m_in_queue_head[i].used());
2740
init_thread(thr_data *selfptr)
2742
selfptr->m_waiter.init();
2743
selfptr->m_jam.theEmulatedJamIndex = 0;
2744
selfptr->m_jam.theEmulatedJamBlockNumber = 0;
2745
bzero(selfptr->m_jam.theEmulatedJam, sizeof(selfptr->m_jam.theEmulatedJam));
2746
NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
2747
NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
2749
unsigned thr_no = selfptr->m_thr_no;
2750
globalEmulatorData.theWatchDog->
2751
registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
2753
while(selfptr->m_thread == 0)
2754
NdbSleep_MilliSleep(30);
2757
THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2759
tmp.appfmt("thr: %u ", thr_no);
2761
int tid = NdbThread_GetTid(selfptr->m_thread);
2764
tmp.appfmt("tid: %u ", tid);
2767
conf.appendInfo(tmp,
2768
selfptr->m_instance_list, selfptr->m_instance_count);
2769
int res = conf.do_bind(selfptr->m_thread,
2770
selfptr->m_instance_list, selfptr->m_instance_count);
2773
tmp.appfmt("err: %d ", -res);
2780
selfptr->m_thr_id = pthread_self();
2782
for (Uint32 i = 0; i < selfptr->m_instance_count; i++)
2784
BlockReference block = selfptr->m_instance_list[i];
2785
Uint32 main = blockToMain(block);
2786
Uint32 instance = blockToInstance(block);
2787
tmp.appfmt("%s(%u) ", getBlockName(main), instance);
2789
printf("%s\n", tmp.c_str());
2794
* Align signal buffer for better cache performance.
2795
* Also skew it a litte for each thread to avoid cache pollution.
2797
#define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
2799
aligned_signal(unsigned char signal_buf[SIGBUF_SIZE], unsigned thr_no)
2801
UintPtr sigtmp= (UintPtr)signal_buf;
2802
sigtmp= (sigtmp+63) & (~(UintPtr)63);
2803
sigtmp+= thr_no*256;
2804
return (Signal *)sigtmp;
2807
Uint32 receiverThreadId;
2810
* We only do receive in thread 2, no other threads do receive.
2812
* As part of the receive loop, we also periodically call update_connections()
2813
* (this way we are similar to single-threaded ndbd).
2815
* The CMVMI block (and no other blocks) run in the same thread as this
2816
* receive loop; this way we avoid races between update_connections() and
2817
* CMVMI calls into the transporters.
2819
* Note that with this setup, local signals to CMVMI cannot wake up the thread
2820
* if it is sleeping on the receive sockets. Thus CMVMI local signal processing
2821
* can be (slightly) delayed, however CMVMI is not really performance critical
2826
mt_receiver_thread_main(void *thr_arg)
2828
unsigned char signal_buf[SIGBUF_SIZE];
2830
struct thr_repository* rep = &g_thr_repository;
2831
struct thr_data* selfptr = (struct thr_data *)thr_arg;
2832
unsigned thr_no = selfptr->m_thr_no;
2833
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
2834
Uint32 thrSignalId = 0;
2835
bool has_received = false;
2837
init_thread(selfptr);
2838
receiverThreadId = thr_no;
2839
signal = aligned_signal(signal_buf, thr_no);
2841
while (globalData.theRestartFlag != perform_stop)
2845
update_sched_stats(selfptr);
2849
watchDogCounter = 5;
2850
globalTransporterRegistry.update_connections();
2852
cnt = (cnt + 1) & 15;
2854
watchDogCounter = 2;
2856
NDB_TICKS now = NdbTick_CurrentMillisecond();
2857
scan_time_queues(selfptr, now);
2859
Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
2861
if (sum || has_received)
2863
watchDogCounter = 6;
2864
flush_jbb_write_state(selfptr);
2867
do_send(selfptr, TRUE);
2869
watchDogCounter = 7;
2871
has_received = false;
2872
if (globalTransporterRegistry.pollReceive(1))
2874
if (check_job_buffers(rep) == 0)
2876
watchDogCounter = 8;
2877
lock(&rep->m_receive_lock);
2878
globalTransporterRegistry.performReceive();
2879
unlock(&rep->m_receive_lock);
2880
has_received = true;
2885
globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
2886
return NULL; // Return value not currently used
2892
sendpacked(struct thr_data* thr_ptr, Signal* signal)
2895
for (i = 0; i < thr_ptr->m_instance_count; i++)
2897
BlockReference block = thr_ptr->m_instance_list[i];
2898
Uint32 main = blockToMain(block);
2899
Uint32 instance = blockToInstance(block);
2900
SimulatedBlock* b = globalData.getBlock(main, instance);
2901
// wl4391_todo remove useless assert
2902
assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
2903
/* b->send_at_job_buffer_end(); */
2904
b->executeFunction(GSN_SEND_PACKED, signal);
2909
* check if out-queues of selfptr is full
2913
check_job_buffer_full(thr_data *selfptr)
2915
Uint32 thr_no = selfptr->m_thr_no;
2916
Uint32 tmp = compute_max_signals_to_execute(thr_no);
2918
Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2927
if (tmp < g_thr_repository.m_thread_count)
2934
* update_sched_config
2936
* In order to prevent "job-buffer-full", i.e
2937
* that one thread(T1) produces so much signals to another thread(T2)
2938
* so that the ring-buffer from T1 to T2 gets full
2939
* the mainlop have 2 "config" variables
2940
* - m_max_exec_signals
2941
* This is the *total* no of signals T1 can execute before calling
2943
* - m_max_signals_per_jb
2944
* This is the max no of signals T1 can execute from each other thread
2947
* Assumption: each signal may send *at most* 4 signals
2948
* - this assumption is made the same in ndbd and ndbmtd and is
2949
* mostly followed by block-code, although not it all places :-(
2951
* This function return true, if it it slept
2952
* (i.e that it concluded that it could not execute *any* signals, wo/
2953
* risking job-buffer-full)
2957
update_sched_config(struct thr_data* selfptr, Uint32 pending_send)
2959
Uint32 sleeploop = 0;
2960
Uint32 thr_no = selfptr->m_thr_no;
2962
Uint32 tmp = compute_max_signals_to_execute(thr_no);
2963
Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2965
if (perjb > MAX_SIGNALS_PER_JB)
2966
perjb = MAX_SIGNALS_PER_JB;
2968
selfptr->m_max_exec_signals = tmp;
2969
selfptr->m_max_signals_per_jb = perjb;
2971
if (unlikely(perjb == 0))
2974
if (sleeploop == 10)
2977
* we've slept for 10ms...try running anyway
2979
selfptr->m_max_signals_per_jb = 1;
2980
ndbout_c("%u - sleeploop 10!!", selfptr->m_thr_no);
2986
/* About to sleep, _must_ send now. */
2987
pending_send = do_send(selfptr, TRUE);
2990
const Uint32 wait = 1000000; /* 1 ms */
2991
yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
2995
return sleeploop > 0;
3000
mt_job_thread_main(void *thr_arg)
3002
unsigned char signal_buf[SIGBUF_SIZE];
3004
const Uint32 nowait = 10 * 1000000; /* 10 ms */
3005
Uint32 thrSignalId = 0;
3007
struct thr_data* selfptr = (struct thr_data *)thr_arg;
3008
init_thread(selfptr);
3009
Uint32& watchDogCounter = selfptr->m_watchdog_counter;
3011
unsigned thr_no = selfptr->m_thr_no;
3012
signal = aligned_signal(signal_buf, thr_no);
3014
/* Avoid false watchdog alarms caused by race condition. */
3015
watchDogCounter = 1;
3017
Uint32 pending_send = 0;
3018
Uint32 send_sum = 0;
3020
int maxloops = 10;/* Loops before reading clock, fuzzy adapted to 1ms freq. */
3021
NDB_TICKS now = selfptr->m_time;
3023
while (globalData.theRestartFlag != perform_stop)
3026
update_sched_stats(selfptr);
3028
watchDogCounter = 2;
3029
scan_time_queues(selfptr, now);
3031
Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
3033
watchDogCounter = 1;
3034
signal->header.m_noOfSections = 0; /* valgrind */
3035
sendpacked(selfptr, signal);
3039
watchDogCounter = 6;
3040
flush_jbb_write_state(selfptr);
3043
if (send_sum > MAX_SIGNALS_BEFORE_SEND)
3045
/* Try to send, but skip for now in case of lock contention. */
3046
pending_send = do_send(selfptr, FALSE);
3051
/* Send buffers append to send queues to dst. nodes. */
3057
/* No signals processed, prepare to sleep to wait for more */
3058
if (pending_send || send_sum > 0)
3060
/* About to sleep, _must_ send now. */
3061
pending_send = do_send(selfptr, TRUE);
3065
if (pending_send == 0)
3067
bool waited = yield(&selfptr->m_waiter, nowait, check_queues_empty,
3071
/* Update current time after sleeping */
3072
now = NdbTick_CurrentMillisecond();
3079
* Check if we executed enough signals,
3080
* and if so recompute how many signals to execute
3082
if (sum >= selfptr->m_max_exec_signals)
3084
if (update_sched_config(selfptr, pending_send))
3086
/* Update current time after sleeping */
3087
now = NdbTick_CurrentMillisecond();
3093
selfptr->m_max_exec_signals -= sum;
3097
* Adaptive reading freq. of systeme time every time 1ms
3098
* is likely to have passed
3100
if (loops > maxloops)
3102
now = NdbTick_CurrentMillisecond();
3103
Uint64 diff = now - selfptr->m_time;
3105
/* Adjust 'maxloop' to achieve clock reading frequency of 1ms */
3107
maxloops += ((maxloops/10) + 1); /* No change: less frequent reading */
3108
else if (diff > 1 && maxloops > 1)
3109
maxloops -= ((maxloops/10) + 1); /* Overslept: Need more frequent read*/
3115
globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
3116
return NULL; // Return value not currently used
3120
sendlocal(Uint32 self, const SignalHeader *s, const Uint32 *data,
3121
const Uint32 secPtr[3])
3123
Uint32 block = blockToMain(s->theReceiversBlockNumber);
3124
Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3127
* Max number of signals to put into job buffer before flushing the buffer
3128
* to the other thread.
3129
* This parameter found to be reasonable by benchmarking.
3131
Uint32 MAX_SIGNALS_BEFORE_FLUSH = (self == receiver_thread_no) ?
3132
MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
3133
MAX_SIGNALS_BEFORE_FLUSH_OTHER;
3135
Uint32 dst = block2ThreadId(block, instance);
3136
struct thr_repository* rep = &g_thr_repository;
3137
struct thr_data * selfptr = rep->m_thread + self;
3138
assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
3139
struct thr_data * dstptr = rep->m_thread + dst;
3141
selfptr->m_priob_count++;
3142
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3143
selfptr->m_priob_size += siglen;
3145
thr_job_queue *q = dstptr->m_in_queue + self;
3146
thr_jb_write_state *w = selfptr->m_write_states + dst;
3147
if (insert_signal(q, w, false, s, data, secPtr, selfptr->m_next_buffer))
3149
selfptr->m_next_buffer = seize_buffer(rep, self, false);
3151
if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
3152
flush_write_state(selfptr, dstptr, q->m_head, w);
3156
sendprioa(Uint32 self, const SignalHeader *s, const uint32 *data,
3157
const Uint32 secPtr[3])
3159
Uint32 block = blockToMain(s->theReceiversBlockNumber);
3160
Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3162
Uint32 dst = block2ThreadId(block, instance);
3163
struct thr_repository* rep = &g_thr_repository;
3164
struct thr_data *selfptr = rep->m_thread + self;
3165
assert(s->theVerId_signalNumber == GSN_START_ORD ||
3166
pthread_equal(selfptr->m_thr_id, pthread_self()));
3167
struct thr_data *dstptr = rep->m_thread + dst;
3169
selfptr->m_prioa_count++;
3170
Uint32 siglen = (sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3171
selfptr->m_prioa_size += siglen;
3173
thr_job_queue *q = &(dstptr->m_jba);
3174
thr_jb_write_state w;
3176
lock(&dstptr->m_jba_write_lock);
3178
Uint32 index = q->m_head->m_write_index;
3179
w.m_write_index = index;
3180
thr_job_buffer *buffer = q->m_buffers[index];
3181
w.m_write_buffer = buffer;
3182
w.m_write_pos = buffer->m_len;
3183
w.m_pending_signals = 0;
3184
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3185
bool buf_used = insert_signal(q, &w, true, s, data, secPtr,
3186
selfptr->m_next_buffer);
3187
flush_write_state(selfptr, dstptr, q->m_head, &w);
3189
unlock(&dstptr->m_jba_write_lock);
3192
selfptr->m_next_buffer = seize_buffer(rep, self, true);
3196
* Send a signal to a remote node.
3198
* (The signal is only queued here, and actually sent later in do_send()).
3201
mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3202
const Uint32 * data, NodeId nodeId,
3203
const LinearSectionPtr ptr[3])
3205
thr_repository *rep = &g_thr_repository;
3206
thr_data *selfptr = rep->m_thread + self;
3209
mt_send_handle handle(selfptr);
3210
register_pending_send(selfptr, nodeId);
3211
/* prepareSend() is lock-free, as we have per-thread send buffers. */
3212
ss = globalTransporterRegistry.prepareSend(&handle,
3213
sh, prio, data, nodeId, ptr);
3218
mt_send_remote(Uint32 self, const SignalHeader *sh, Uint8 prio,
3219
const Uint32 *data, NodeId nodeId,
3220
class SectionSegmentPool *thePool,
3221
const SegmentedSectionPtr ptr[3])
3223
thr_repository *rep = &g_thr_repository;
3224
thr_data *selfptr = rep->m_thread + self;
3227
mt_send_handle handle(selfptr);
3228
register_pending_send(selfptr, nodeId);
3229
ss = globalTransporterRegistry.prepareSend(&handle,
3230
sh, prio, data, nodeId,
3236
* This functions sends a prio A STOP_FOR_CRASH signal to a thread.
3238
* It works when called from any other thread, not just from job processing
3239
* threads. But note that this signal will be the last signal to be executed by
3240
* the other thread, as it will exit immediately.
3244
sendprioa_STOP_FOR_CRASH(const struct thr_data *selfptr, Uint32 dst)
3246
SignalT<StopForCrash::SignalLength> signalT;
3247
struct thr_repository* rep = &g_thr_repository;
3248
/* As this signal will be the last one executed by the other thread, it does
3249
not matter which buffer we use in case the current buffer is filled up by
3250
the STOP_FOR_CRASH signal; the data in it will never be read.
3252
static thr_job_buffer dummy_buffer;
3255
* Pick any instance running in this thread
3257
struct thr_data * dstptr = rep->m_thread + dst;
3258
Uint32 bno = dstptr->m_instance_list[0];
3260
memset(&signalT.header, 0, sizeof(SignalHeader));
3261
signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
3262
signalT.header.theReceiversBlockNumber = bno;
3263
signalT.header.theSendersBlockRef = 0;
3264
signalT.header.theTrace = 0;
3265
signalT.header.theSendersSignalId = 0;
3266
signalT.header.theSignalId = 0;
3267
signalT.header.theLength = StopForCrash::SignalLength;
3268
StopForCrash * stopForCrash = CAST_PTR(StopForCrash, &signalT.theData[0]);
3269
stopForCrash->flags = 0;
3271
thr_job_queue *q = &(dstptr->m_jba);
3272
thr_jb_write_state w;
3274
lock(&dstptr->m_jba_write_lock);
3276
Uint32 index = q->m_head->m_write_index;
3277
w.m_write_index = index;
3278
thr_job_buffer *buffer = q->m_buffers[index];
3279
w.m_write_buffer = buffer;
3280
w.m_write_pos = buffer->m_len;
3281
w.m_pending_signals = 0;
3282
w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3283
insert_signal(q, &w, true, &signalT.header, signalT.theData, NULL,
3285
flush_write_state(selfptr, dstptr, q->m_head, &w);
3287
unlock(&dstptr->m_jba_write_lock);
3295
queue_init(struct thr_tq* tq)
3297
tq->m_next_timer = 0;
3298
tq->m_current_time = 0;
3299
tq->m_next_free = RNIL;
3300
tq->m_cnt[0] = tq->m_cnt[1] = 0;
3301
bzero(tq->m_delayed_signals, sizeof(tq->m_delayed_signals));
3306
thr_init(struct thr_repository* rep, struct thr_data *selfptr, unsigned int cnt,
3311
selfptr->m_thr_no = thr_no;
3312
selfptr->m_max_signals_per_jb = MAX_SIGNALS_PER_JB;
3313
selfptr->m_max_exec_signals = 0;
3314
selfptr->m_first_free = 0;
3315
selfptr->m_first_unused = 0;
3319
BaseString::snprintf(buf, sizeof(buf), "jbalock thr: %u", thr_no);
3320
register_lock(&selfptr->m_jba_write_lock, buf);
3322
selfptr->m_jba_head.m_read_index = 0;
3323
selfptr->m_jba_head.m_write_index = 0;
3324
selfptr->m_jba.m_head = &selfptr->m_jba_head;
3325
thr_job_buffer *buffer = seize_buffer(rep, thr_no, true);
3326
selfptr->m_jba.m_buffers[0] = buffer;
3327
selfptr->m_jba_read_state.m_read_index = 0;
3328
selfptr->m_jba_read_state.m_read_buffer = buffer;
3329
selfptr->m_jba_read_state.m_read_pos = 0;
3330
selfptr->m_jba_read_state.m_read_end = 0;
3331
selfptr->m_jba_read_state.m_write_index = 0;
3332
selfptr->m_next_buffer = seize_buffer(rep, thr_no, false);
3333
selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
3335
for (i = 0; i<cnt; i++)
3337
selfptr->m_in_queue_head[i].m_read_index = 0;
3338
selfptr->m_in_queue_head[i].m_write_index = 0;
3339
selfptr->m_in_queue[i].m_head = &selfptr->m_in_queue_head[i];
3340
buffer = seize_buffer(rep, thr_no, false);
3341
selfptr->m_in_queue[i].m_buffers[0] = buffer;
3342
selfptr->m_read_states[i].m_read_index = 0;
3343
selfptr->m_read_states[i].m_read_buffer = buffer;
3344
selfptr->m_read_states[i].m_read_pos = 0;
3345
selfptr->m_read_states[i].m_read_end = 0;
3346
selfptr->m_read_states[i].m_write_index = 0;
3348
queue_init(&selfptr->m_tq);
3350
selfptr->m_prioa_count = 0;
3351
selfptr->m_prioa_size = 0;
3352
selfptr->m_priob_count = 0;
3353
selfptr->m_priob_size = 0;
3355
selfptr->m_pending_send_count = 0;
3356
selfptr->m_pending_send_mask.clear();
3358
selfptr->m_instance_count = 0;
3359
for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
3360
selfptr->m_instance_list[i] = 0;
3362
bzero(&selfptr->m_send_buffers, sizeof(selfptr->m_send_buffers));
3364
selfptr->m_thread = 0;
3365
selfptr->m_cpu = NO_LOCK_CPU;
3368
/* Have to do this after init of all m_in_queues is done. */
3371
thr_init2(struct thr_repository* rep, struct thr_data *selfptr,
3372
unsigned int cnt, unsigned thr_no)
3374
for (Uint32 i = 0; i<cnt; i++)
3376
selfptr->m_write_states[i].m_write_index = 0;
3377
selfptr->m_write_states[i].m_write_pos = 0;
3378
selfptr->m_write_states[i].m_write_buffer =
3379
rep->m_thread[i].m_in_queue[thr_no].m_buffers[0];
3380
selfptr->m_write_states[i].m_pending_signals = 0;
3381
selfptr->m_write_states[i].m_pending_signals_wakeup = 0;
3387
send_buffer_init(Uint32 node, thr_repository::send_buffer * sb)
3390
BaseString::snprintf(buf, sizeof(buf), "send lock node %d", node);
3391
register_lock(&sb->m_send_lock, buf);
3392
sb->m_force_send = 0;
3393
sb->m_send_thread = NO_SEND_THREAD;
3394
bzero(&sb->m_buffer, sizeof(sb->m_buffer));
3396
bzero(sb->m_read_index, sizeof(sb->m_read_index));
3401
rep_init(struct thr_repository* rep, unsigned int cnt, Ndbd_mem_manager *mm)
3405
rep->m_thread_count = cnt;
3406
for (unsigned int i = 0; i<cnt; i++)
3408
thr_init(rep, rep->m_thread + i, cnt, i);
3410
for (unsigned int i = 0; i<cnt; i++)
3412
thr_init2(rep, rep->m_thread + i, cnt, i);
3415
rep->stopped_threads = 0;
3416
NdbMutex_Init(&rep->stop_for_crash_mutex);
3417
NdbCondition_Init(&rep->stop_for_crash_cond);
3419
for (int i = 0 ; i < MAX_NTRANSPORTERS; i++)
3421
send_buffer_init(i, rep->m_send_buffers+i);
3424
bzero(rep->m_thread_send_buffers, sizeof(rep->m_thread_send_buffers));
3432
#include "ThreadConfig.hpp"
3433
#include <signaldata/StartOrd.hpp>
3436
compute_jb_pages(struct EmulatorData * ed)
3438
Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1;
3440
Uint32 perthread = 0;
3443
* Each thread can have thr_job_queue::SIZE pages in out-queues
3444
* to each other thread
3446
perthread += cnt * (1 + thr_job_queue::SIZE);
3449
* And thr_job_queue::SIZE prio A signals
3451
perthread += (1 + thr_job_queue::SIZE);
3454
* And XXX time-queue signals
3456
perthread += 32; // Say 1M for now
3459
* Each thread also keeps an own cache with max THR_FREE_BUF_MAX
3461
perthread += THR_FREE_BUF_MAX;
3464
* Multiply by no of threads
3466
Uint32 tot = cnt * perthread;
3471
ThreadConfig::ThreadConfig()
3475
ThreadConfig::~ThreadConfig()
3480
* We must do the init here rather than in the constructor, since at
3481
* constructor time the global memory manager is not available.
3484
ThreadConfig::init()
3486
num_lqh_workers = globalData.ndbMtLqhWorkers;
3487
num_lqh_threads = globalData.ndbMtLqhThreads;
3488
num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
3489
require(num_threads <= MAX_THREADS);
3490
receiver_thread_no = num_threads - 1;
3492
ndbout << "NDBMT: num_threads=" << num_threads << endl;
3494
::rep_init(&g_thr_repository, num_threads,
3495
globalEmulatorData.m_mem_manager);
3500
setcpuaffinity(struct thr_repository* rep)
3502
THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
3503
conf.create_cpusets();
3504
if (conf.getInfoMessage())
3506
printf("%s", conf.getInfoMessage());
3512
ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
3514
unsigned int thr_no;
3515
struct thr_repository* rep = &g_thr_repository;
3518
* assign threads to CPU's
3520
setcpuaffinity(rep);
3523
* Start threads for all execution threads, except for the receiver
3524
* thread, which runs in the main thread.
3526
for (thr_no = 0; thr_no < num_threads; thr_no++)
3528
rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
3530
if (thr_no == receiver_thread_no)
3531
continue; // Will run in the main thread.
3534
* The NdbThread_Create() takes void **, but that is cast to void * when
3535
* passed to the thread function. Which is kind of strange ...
3537
rep->m_thread[thr_no].m_thread =
3538
NdbThread_Create(mt_job_thread_main,
3539
(void **)(rep->m_thread + thr_no),
3541
"execute thread", //ToDo add number
3542
NDB_THREAD_PRIO_MEAN);
3543
require(rep->m_thread[thr_no].m_thread != NULL);
3546
/* Now run the main loop for thread 0 directly. */
3547
rep->m_thread[receiver_thread_no].m_thread = pThis;
3548
mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
3550
/* Wait for all threads to shutdown. */
3551
for (thr_no = 0; thr_no < num_threads; thr_no++)
3553
if (thr_no == receiver_thread_no)
3555
void *dummy_return_status;
3556
NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
3557
NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
3562
ThreadConfig::doStart(NodeState::StartLevel startLevel)
3565
memset(&signalT.header, 0, sizeof(SignalHeader));
3567
signalT.header.theVerId_signalNumber = GSN_START_ORD;
3568
signalT.header.theReceiversBlockNumber = CMVMI;
3569
signalT.header.theSendersBlockRef = 0;
3570
signalT.header.theTrace = 0;
3571
signalT.header.theSignalId = 0;
3572
signalT.header.theLength = StartOrd::SignalLength;
3574
StartOrd * startOrd = CAST_PTR(StartOrd, &signalT.theData[0]);
3575
startOrd->restartInfo = 0;
3577
sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
3582
* Compare signal ids, taking into account overflow/wrapover.
3583
* Return same as strcmp().
3585
* wrap_compare(0x10,0x20) -> -1
3586
* wrap_compare(0x10,0xffffff20) -> 1
3587
* wrap_compare(0xffffff80,0xffffff20) -> 1
3588
* wrap_compare(0x7fffffff, 0x80000001) -> -1
3593
wrap_compare(Uint32 a, Uint32 b)
3595
/* Avoid dependencies on undefined C/C++ interger overflow semantics. */
3596
if (a >= 0x80000000)
3597
if (b >= 0x80000000)
3598
return (int)(a & 0x7fffffff) - (int)(b & 0x7fffffff);
3600
return (a - b) >= 0x80000000 ? -1 : 1;
3602
if (b >= 0x80000000)
3603
return (b - a) >= 0x80000000 ? 1 : -1;
3605
return (int)a - (int)b;
3609
FastScheduler::traceDumpGetNumThreads()
3611
/* The last thread is only for receiver -> no trace file. */
3616
FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
3617
const Uint32 * & thrdTheEmulatedJam,
3618
Uint32 & thrdTheEmulatedJamIndex)
3620
if (thr_no >= num_threads)
3623
#ifdef NO_EMULATED_JAM
3625
thrdTheEmulatedJam = NULL;
3626
thrdTheEmulatedJamIndex = 0;
3628
const EmulatedJamBuffer *jamBuffer = &g_thr_repository.m_thread[thr_no].m_jam;
3629
thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
3630
thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
3631
jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
3637
FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
3640
* We are about to generate trace files for all threads.
3642
* We want to stop all threads processing before we dump, as otherwise the
3643
* signal buffers could change while dumping, leading to inconsistent
3646
* To stop threads, we send the GSN_STOP_FOR_CRASH signal as prio A to each
3647
* thread. We then wait for threads to signal they are done (but not forever,
3648
* so as to not have one hanging thread prevent the generation of trace
3649
* dumps). We also must be careful not to send to ourself if the crash is
3650
* being processed by one of the threads processing signals.
3652
* We do not stop the transporter thread, as it cannot receive signals (but
3653
* because it does not receive signals it does not really influence dumps in
3656
void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3657
const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3658
/* The selfptr might be NULL, or pointer to thread that crashed. */
3660
Uint32 waitFor_count = 0;
3661
NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3662
g_thr_repository.stopped_threads = 0;
3664
for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
3666
if (selfptr != NULL && selfptr->m_thr_no == thr_no)
3668
/* This is own thread; we have already stopped processing. */
3672
sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
3677
static const Uint32 max_wait_seconds = 2;
3678
NDB_TICKS start = NdbTick_CurrentMillisecond();
3679
while (g_thr_repository.stopped_threads < waitFor_count)
3681
NdbCondition_WaitTimeout(&g_thr_repository.stop_for_crash_cond,
3682
&g_thr_repository.stop_for_crash_mutex,
3684
NDB_TICKS now = NdbTick_CurrentMillisecond();
3685
if (now > start + max_wait_seconds * 1000)
3688
if (g_thr_repository.stopped_threads < waitFor_count)
3690
if (nst != NST_ErrorInsert)
3692
nst = NST_Watchdog; // Make this abort fast
3694
ndbout_c("Warning: %d thread(s) did not stop before starting crash dump.",
3695
waitFor_count - g_thr_repository.stopped_threads);
3697
NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3699
/* Now we are ready (or as ready as can be) for doing crash dump. */
3702
void mt_execSTOP_FOR_CRASH()
3704
void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3705
const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3706
require(selfptr != NULL);
3708
NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3709
g_thr_repository.stopped_threads++;
3710
NdbCondition_Signal(&g_thr_repository.stop_for_crash_cond);
3711
NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3713
/* ToDo: is this correct? */
3714
globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
3720
FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
3722
void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3723
thr_data *selfptr = reinterpret_cast<thr_data *>(value);
3724
const thr_repository *rep = &g_thr_repository;
3726
* The selfptr might be NULL, or pointer to thread that is doing the crash
3728
* If non-null, we should update the watchdog counter while dumping.
3730
Uint32 *watchDogCounter;
3732
watchDogCounter = &selfptr->m_watchdog_counter;
3734
watchDogCounter = NULL;
3737
* We want to dump the signal buffers from last executed to first executed.
3738
* So we first need to find the correct sequence to output signals in, stored
3741
* We will check any buffers in the cyclic m_free_fifo. In addition,
3742
* we also need to scan the already executed part of the current
3745
* Due to partial execution of prio A buffers, we will use signal ids to know
3746
* where to interleave prio A signals into the stream of prio B signals
3747
* read. So we will keep a pointer to a prio A buffer around; and while
3748
* scanning prio B buffers we will interleave prio A buffers from that buffer
3749
* when the signal id fits the sequence.
3751
* This also means that we may have to discard the earliest part of available
3752
* prio A signal data due to too little prio B data present, or vice versa.
3754
static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
3756
const SignalHeader *ptr;
3758
} signalSequence[MAX_SIGNALS_TO_DUMP];
3759
Uint32 seq_start = 0;
3762
const thr_data *thr_ptr = &rep->m_thread[thr_no];
3763
if (watchDogCounter)
3764
*watchDogCounter = 4;
3767
* ToDo: Might do some sanity check to avoid crashing on not yet initialised
3771
/* Scan all available buffers with already executed signals. */
3774
* Keep track of all available buffers, so that we can pick out signals in
3775
* the same order they were executed (order obtained from signal id).
3777
* We may need to keep track of THR_FREE_BUF_MAX buffers for fully executed
3778
* (and freed) buffers, plus MAX_THREADS buffers for currently active
3779
* prio B buffers, plus one active prio A buffer.
3782
const thr_job_buffer *m_jb;
3785
} jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
3789
/* Load released buffers. */
3790
Uint32 idx = thr_ptr->m_first_free;
3791
while (idx != thr_ptr->m_first_unused)
3793
const thr_job_buffer *q = thr_ptr->m_free_fifo[idx];
3796
jbs[num_jbs].m_jb = q;
3797
jbs[num_jbs].m_pos = 0;
3798
jbs[num_jbs].m_max = q->m_len;
3801
idx = (idx + 1) % THR_FREE_BUF_MAX;
3803
/* Load any active prio B buffers. */
3804
for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
3806
const thr_job_queue *q = thr_ptr->m_in_queue + thr_no;
3807
const thr_jb_read_state *r = thr_ptr->m_read_states + thr_no;
3808
Uint32 read_pos = r->m_read_pos;
3811
jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
3812
jbs[num_jbs].m_pos = 0;
3813
jbs[num_jbs].m_max = read_pos;
3817
/* Load any active prio A buffer. */
3818
const thr_jb_read_state *r = &thr_ptr->m_jba_read_state;
3819
Uint32 read_pos = r->m_read_pos;
3822
jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
3823
jbs[num_jbs].m_pos = 0;
3824
jbs[num_jbs].m_max = read_pos;
3828
/* Now pick out one signal at a time, in signal id order. */
3831
if (watchDogCounter)
3832
*watchDogCounter = 4;
3834
/* Search out the smallest signal id remaining. */
3836
const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
3837
const SignalHeader *s_min = reinterpret_cast<const SignalHeader*>(p);
3838
Uint32 sid_min = s_min->theSignalId;
3840
for (Uint32 i = 1; i < num_jbs; i++)
3842
p = jbs[i].m_jb->m_data + jbs[i].m_pos;
3843
const SignalHeader *s = reinterpret_cast<const SignalHeader*>(p);
3844
Uint32 sid = s->theSignalId;
3845
if (wrap_compare(sid, sid_min) < 0)
3853
/* We found the next signal, now put it in the ordered cyclic buffer. */
3854
signalSequence[seq_end].ptr = s_min;
3855
signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
3857
(sizeof(SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
3858
#if SIZEOF_CHARP == 8
3859
/* Align to 8-byte boundary, to ensure aligned copies. */
3860
siglen= (siglen+1) & ~((Uint32)1);
3862
jbs[idx_min].m_pos += siglen;
3863
if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
3865
/* We are done with this job buffer. */
3867
jbs[idx_min] = jbs[num_jbs];
3869
seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
3870
/* Drop old signals if too many available in history. */
3871
if (seq_end == seq_start)
3872
seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
3875
/* Now, having build the correct signal sequence, we can dump them all. */
3877
bool first_one = true;
3878
bool out_of_signals = false;
3879
Uint32 lastSignalId = 0;
3880
while (seq_end != seq_start)
3882
if (watchDogCounter)
3883
*watchDogCounter = 4;
3886
seq_end = MAX_SIGNALS_TO_DUMP;
3889
const SignalHeader *s = signalSequence[seq_end].ptr;
3890
unsigned siglen = (sizeof(*s)>>2) + s->theLength;
3892
siglen = 25; // Sanity check
3893
memcpy(&signal.header, s, 4*siglen);
3894
// instance number in trace file is confusing if not MT LQH
3895
if (num_lqh_workers == 0)
3896
signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
3898
const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
3899
signal.m_sectionPtrI[0] = posptr[siglen + 0];
3900
signal.m_sectionPtrI[1] = posptr[siglen + 1];
3901
signal.m_sectionPtrI[2] = posptr[siglen + 2];
3902
bool prioa = signalSequence[seq_end].prioa;
3904
/* Make sure to display clearly when there is a gap in the dump. */
3905
if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
3907
out_of_signals = true;
3908
fprintf(out, "\n\n\nNo more prio %s signals, rest of dump will be "
3909
"incomplete.\n\n\n\n", prioa ? "B" : "A");
3912
lastSignalId = s->theSignalId;
3914
fprintf(out, "--------------- Signal ----------------\n");
3915
Uint32 prio = (prioa ? JBA : JBB);
3916
SignalLoggerManager::printSignalHeader(out,
3921
SignalLoggerManager::printSignalData (out,
3923
&signal.theData[0]);
3929
FastScheduler::traceDumpGetCurrentThread()
3931
void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3932
const thr_data *selfptr = reinterpret_cast<const thr_data *>(value);
3934
/* The selfptr might be NULL, or pointer to thread that crashed. */
3941
return (int)selfptr->m_thr_no;
3948
lock(&(g_thr_repository.m_section_lock));
3954
unlock(&(g_thr_repository.m_section_lock));
3958
mt_mem_manager_init()
3963
mt_mem_manager_lock()
3965
lock(&(g_thr_repository.m_mem_manager_lock));
3969
mt_mem_manager_unlock()
3971
unlock(&(g_thr_repository.m_mem_manager_lock));
3974
Vector<mt_lock_stat> g_locks;
3975
template class Vector<mt_lock_stat>;
3979
register_lock(const void * ptr, const char * name)
3984
mt_lock_stat* arr = g_locks.getBase();
3985
for (size_t i = 0; i<g_locks.size(); i++)
3987
if (arr[i].m_ptr == ptr)
3991
free(arr[i].m_name);
3993
arr[i].m_name = strdup(name);
4000
ln.m_name = strdup(name);
4001
ln.m_contended_count = 0;
4002
ln.m_spin_count = 0;
4003
g_locks.push_back(ln);
4008
lookup_lock(const void * ptr)
4010
mt_lock_stat* arr = g_locks.getBase();
4011
for (size_t i = 0; i<g_locks.size(); i++)
4013
if (arr[i].m_ptr == ptr)
4021
mt_get_thread_references_for_blocks(const Uint32 blocks[], Uint32 threadId,
4022
Uint32 dst[], Uint32 len)
4025
Bitmask<(MAX_THREADS+31)/32> mask;
4027
for (Uint32 i = 0; blocks[i] != 0; i++)
4029
Uint32 block = blocks[i];
4031
* Find each thread that has instance of block
4033
assert(block == blockToMain(block));
4034
Uint32 index = block - MIN_BLOCK_NO;
4035
for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
4037
Uint32 thr_no = thr_map[index][instance].thr_no;
4038
if (thr_no == thr_map_entry::NULL_THR_NO)
4041
if (mask.get(thr_no))
4046
dst[cnt++] = numberToRef(block, instance, 0);
4053
mt_wakeup(class SimulatedBlock* block)
4055
Uint32 thr_no = block->getThreadId();
4056
thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4057
wakeup(&thrptr->m_waiter);
4062
mt_assert_own_thread(SimulatedBlock* block)
4064
Uint32 thr_no = block->getThreadId();
4065
thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4067
if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
4069
fprintf(stderr, "mt_assert_own_thread() - assertion-failure\n");
4079
struct thr_repository g_thr_repository;
4081
struct trp_callback g_trp_callback;
4083
TransporterRegistry globalTransporterRegistry(&g_trp_callback, false);