46
46
should put a pointer to the following functions in the lock structure:
47
47
(If the pointer is zero (default), the function is not called)
50
Before giving a lock of type TL_WRITE_CONCURRENT_INSERT,
51
we check if this function exists and returns 0.
52
If not, then the lock is upgraded to TL_WRITE_LOCK
53
In MyISAM this is a simple check if the insert can be done
54
at the end of the datafile.
56
Before a write lock is released, this function is called.
57
In MyISAM this functions updates the count and length of the datafile
59
When one gets a lock this functions is called.
60
In MyISAM this stores the number of rows and size of the datafile
63
50
The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
64
51
TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
68
55
#include "config.h"
69
56
#include "drizzled/internal/my_sys.h"
57
#include "drizzled/internal/thread_var.h"
58
#include "drizzled/statistics_variables.h"
60
#include "drizzled/session.h"
61
#include "drizzled/current_session.h"
71
63
#include "thr_lock.h"
72
64
#include "drizzled/internal/m_string.h"
87
79
#include <drizzled/util/test.h>
81
#include <boost/interprocess/sync/lock_options.hpp>
89
83
using namespace std;
94
bool thr_lock_inited= false;
95
uint32_t locks_immediate = 0L, locks_waited = 0L;
96
88
uint64_t table_lock_wait_timeout;
97
89
static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
100
static list<THR_LOCK *> thr_lock_thread_list; /* List of threads in use */
102
92
uint64_t max_write_lock_count= ~(uint64_t) 0L;
104
static inline pthread_cond_t *get_cond(void)
106
return &my_thread_var->suspend;
110
95
** For the future (now the thread specific cond is alloced by my_pthread.c)
115
thr_lock_inited= true;
120
98
static inline bool
121
99
thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
129
107
void thr_lock_init(THR_LOCK *lock)
131
memset(lock, 0, sizeof(*lock));
132
pthread_mutex_init(&lock->mutex,MY_MUTEX_INIT_FAST);
133
110
lock->read.last= &lock->read.data;
134
111
lock->read_wait.last= &lock->read_wait.data;
135
112
lock->write_wait.last= &lock->write_wait.data;
136
113
lock->write.last= &lock->write.data;
138
pthread_mutex_lock(&internal::THR_LOCK_lock); /* Add to locks in use */
139
thr_lock_thread_list.push_front(lock);
140
pthread_mutex_unlock(&internal::THR_LOCK_lock);
144
void thr_lock_delete(THR_LOCK *lock)
146
pthread_mutex_destroy(&lock->mutex);
147
pthread_mutex_lock(&internal::THR_LOCK_lock);
148
thr_lock_thread_list.remove(lock);
149
pthread_mutex_unlock(&internal::THR_LOCK_lock);
153
void thr_lock_info_init(THR_LOCK_INFO *info)
117
void THR_LOCK_INFO::init()
155
119
internal::st_my_thread_var *tmp= my_thread_var;
156
info->thread= tmp->pthread_self;
157
info->thread_id= tmp->id;
120
thread= tmp->pthread_self;
161
125
/* Initialize a lock instance */
163
void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param)
127
void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
166
data->type= TL_UNLOCK;
167
data->owner= NULL; /* no owner yet */
168
data->status_param= param;
131
owner= NULL; /* no owner yet */
132
status_param= param_arg;
184
148
static void wake_up_waiters(THR_LOCK *lock);
187
static enum enum_thr_lock_result
188
wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
151
static enum enum_thr_lock_result wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, bool in_wait_list)
191
internal::st_my_thread_var *thread_var= my_thread_var;
192
pthread_cond_t *cond= &thread_var->suspend;
193
struct timespec wait_timeout;
153
Session *session= current_session;
154
internal::st_my_thread_var *thread_var= session->getThreadVar();
156
boost::condition_variable *cond= &thread_var->suspend;
194
157
enum enum_thr_lock_result result= THR_LOCK_ABORTED;
195
158
bool can_deadlock= test(data->owner->info->n_cursors);
201
164
wait->last= &data->next;
204
statistic_increment(locks_waited, &internal::THR_LOCK_lock);
167
current_global_counters.locks_waited++;
206
169
/* Set up control struct to allow others to abort locks */
207
thread_var->current_mutex= &data->lock->mutex;
208
thread_var->current_cond= cond;
170
thread_var->current_mutex= data->lock->native_handle();
171
thread_var->current_cond= &thread_var->suspend;
172
data->cond= &thread_var->suspend;;
212
set_timespec(wait_timeout, table_lock_wait_timeout);
213
174
while (!thread_var->abort || in_wait_list)
215
int rc= (can_deadlock ?
216
pthread_cond_timedwait(cond, &data->lock->mutex,
218
pthread_cond_wait(cond, &data->lock->mutex));
176
boost::mutex::scoped_lock scoped(*data->lock->native_handle(), boost::adopt_lock_t());
181
xtime_get(&xt, boost::TIME_UTC);
182
xt.sec += table_lock_wait_timeout;
183
if (not cond->timed_wait(scoped, xt))
185
result= THR_LOCK_WAIT_TIMEOUT;
220
195
We must break the wait if one of the following occurs:
221
196
- the connection has been aborted (!thread_var->abort), but
229
204
Order of checks below is important to not report about timeout
230
205
if the predicate is true.
236
if (rc == ETIMEDOUT || rc == ETIME)
238
result= THR_LOCK_WAIT_TIMEOUT;
207
if (data->cond == NULL)
242
214
if (data->cond || data->type == TL_UNLOCK)
256
228
result= THR_LOCK_SUCCESS;
257
if (data->lock->get_status)
258
(*data->lock->get_status)(data->status_param, 0);
260
pthread_mutex_unlock(&data->lock->mutex);
230
data->lock->unlock();
262
232
/* The following must be done after unlock of lock->mutex */
263
pthread_mutex_lock(&thread_var->mutex);
233
boost::mutex::scoped_lock scopedLock(thread_var->mutex);
264
234
thread_var->current_mutex= NULL;
265
235
thread_var->current_cond= NULL;
266
pthread_mutex_unlock(&thread_var->mutex);
271
static enum enum_thr_lock_result
272
thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
273
enum thr_lock_type lock_type)
240
static enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
275
THR_LOCK *lock=data->lock;
242
THR_LOCK *lock= data->lock;
276
243
enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
277
244
struct st_lock_list *wait_queue;
278
245
THR_LOCK_DATA *lock_owner;
281
248
data->cond=0; /* safety */
282
249
data->type=lock_type;
283
250
data->owner= owner; /* Must be reset ! */
284
pthread_mutex_lock(&lock->mutex);
285
252
if ((int) lock_type <= (int) TL_READ_NO_INSERT)
287
254
/* Request for READ lock */
307
274
lock->read.last= &data->next;
308
275
if (lock_type == TL_READ_NO_INSERT)
309
276
lock->read_no_write_count++;
310
if (lock->get_status)
311
(*lock->get_status)(data->status_param, 0);
312
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
277
current_global_counters.locks_immediate++;
315
280
if (lock->write.data->type == TL_WRITE_ONLY)
327
292
(*lock->read.last)=data; /* Add to running FIFO */
328
293
data->prev=lock->read.last;
329
294
lock->read.last= &data->next;
330
if (lock->get_status)
331
(*lock->get_status)(data->status_param, 0);
332
295
if (lock_type == TL_READ_NO_INSERT)
333
296
lock->read_no_write_count++;
334
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
297
current_global_counters.locks_immediate++;
344
307
else /* Request for WRITE lock */
346
if (lock_type == TL_WRITE_CONCURRENT_INSERT && ! lock->check_status)
309
if (lock_type == TL_WRITE_CONCURRENT_INSERT)
347
310
data->type=lock_type= thr_upgraded_concurrent_insert_lock;
349
312
if (lock->write.data) /* If there is a write lock */
378
341
(*lock->write.last)=data; /* Add to running fifo */
379
342
data->prev=lock->write.last;
380
343
lock->write.last= &data->next;
381
if (data->lock->get_status)
382
(*data->lock->get_status)(data->status_param, 0);
383
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
344
current_global_counters.locks_immediate++;
408
364
(*lock->write.last)=data; /* Add as current write lock */
409
365
data->prev=lock->write.last;
410
366
lock->write.last= &data->next;
411
if (data->lock->get_status)
412
(*data->lock->get_status)(data->status_param, concurrent_insert);
413
statistic_increment(locks_immediate,&internal::THR_LOCK_lock);
367
current_global_counters.locks_immediate++;
429
383
result= THR_LOCK_DEADLOCK;
432
387
/* Can't get lock yet; Wait for it */
433
388
return(wait_for_lock(wait_queue, data, 0));
435
pthread_mutex_unlock(&lock->mutex);
440
396
static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
442
THR_LOCK_DATA *data=lock->read_wait.data;
398
THR_LOCK_DATA *data= lock->read_wait.data;
444
400
/* move all locks from read_wait list to read list */
445
401
(*lock->read.last)=data;
472
428
lock->read_no_write_count++;
474
data->cond=0; /* Mark thread free */
475
pthread_cond_signal(cond);
430
data->cond= NULL; /* Mark thread free */
476
432
} while ((data=data->next));
477
433
*lock->read_wait.last=0;
478
434
if (!lock->read_wait.data)
479
435
lock->write_lock_count=0;
482
/* Unlock lock and free next thread on same lock */
438
/* Unlock lock and free next thread on same lock */
484
440
static void thr_unlock(THR_LOCK_DATA *data)
486
442
THR_LOCK *lock=data->lock;
487
443
enum thr_lock_type lock_type=data->type;
488
pthread_mutex_lock(&lock->mutex);
490
446
if (((*data->prev)=data->next)) /* remove from lock-list */
491
447
data->next->prev= data->prev;
495
451
lock->write.last=data->prev;
496
452
if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
498
if (lock->update_status)
499
(*lock->update_status)(data->status_param);
503
if (lock->restore_status)
504
(*lock->restore_status)(data->status_param);
506
456
if (lock_type == TL_READ_NO_INSERT)
507
457
lock->read_no_write_count--;
508
458
data->type=TL_UNLOCK; /* Mark unlocked */
509
459
wake_up_waiters(lock);
510
pthread_mutex_unlock(&lock->mutex);
554
503
data->prev=lock->write.last;
556
505
lock->write.last= &data->next;
557
if (data->type == TL_WRITE_CONCURRENT_INSERT &&
558
(*lock->check_status)(data->status_param))
559
data->type=TL_WRITE; /* Upgrade lock */
561
pthread_cond_t *cond=data->cond;
562
data->cond=0; /* Mark thread free */
563
pthread_cond_signal(cond); /* Start waiting thread */
508
boost::condition_variable *cond= data->cond;
509
data->cond= NULL; /* Mark thread free */
510
cond->notify_one(); /* Start waiting thred */
565
512
if (data->type != TL_WRITE_ALLOW_WRITE ||
566
513
!lock->write_wait.data ||
584
531
lock_type != TL_WRITE_ALLOW_WRITE) ||
585
532
!lock->read_no_write_count))
588
For DELAYED, ALLOW_READ, WRITE_ALLOW_WRITE or CONCURRENT_INSERT locks
589
start WRITE locks together with the READ locks
591
if (lock_type == TL_WRITE_CONCURRENT_INSERT &&
592
(*lock->check_status)(data->status_param))
594
data->type=TL_WRITE; /* Upgrade lock */
595
if (lock->read_wait.data)
596
free_all_read_locks(lock,0);
600
pthread_cond_t *cond=data->cond;
535
boost::condition_variable *cond= data->cond;
601
536
if (((*data->prev)=data->next)) /* remove from wait-list */
602
537
data->next->prev= data->prev;
606
541
data->prev=lock->write.last;
607
542
lock->write.last= &data->next;
608
543
data->next=0; /* Only one write lock */
609
data->cond=0; /* Mark thread free */
610
pthread_cond_signal(cond); /* Start waiting thread */
544
data->cond= NULL; /* Mark thread free */
545
cond->notify_one(); /* Start waiting thread */
611
546
} while (lock_type == TL_WRITE_ALLOW_WRITE &&
612
547
(data=lock->write_wait.data) &&
613
548
data->type == TL_WRITE_ALLOW_WRITE);
686
if (last_lock->lock == (*pos)->lock &&
687
last_lock->lock->copy_status)
689
if (last_lock->type <= TL_READ_NO_INSERT)
691
THR_LOCK_DATA **read_lock;
693
If we are locking the same table with read locks we must ensure
694
that all tables share the status of the last write lock or
698
(*pos)->type <= TL_READ_NO_INSERT &&
700
pos[-1]->lock == (*pos)->lock ;
706
(last_lock->lock->copy_status)((*read_lock)->status_param,
707
(*pos)->status_param);
708
} while (*(read_lock++) != last_lock);
709
last_lock= (*pos); /* Point at last write lock */
712
(*last_lock->lock->copy_status)((*pos)->status_param,
713
last_lock->status_param);
717
624
} while (pos != data);
739
646
TL_WRITE_ONLY to abort any new accesses to the lock
742
void thr_abort_locks(THR_LOCK *lock)
649
void THR_LOCK::abort_locks()
745
pthread_mutex_lock(&lock->mutex);
651
boost::mutex::scoped_lock scopedLock(mutex);
747
for (data=lock->read_wait.data; data ; data=data->next)
653
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
749
data->type= TL_UNLOCK; /* Mark killed */
655
local_data->type= TL_UNLOCK; /* Mark killed */
750
656
/* It's safe to signal the cond first: we're still holding the mutex. */
751
pthread_cond_signal(data->cond);
752
data->cond= NULL; /* Removed from list */
657
local_data->cond->notify_one();
658
local_data->cond= NULL; /* Removed from list */
754
for (data=lock->write_wait.data; data ; data=data->next)
660
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
756
data->type=TL_UNLOCK;
757
pthread_cond_signal(data->cond);
662
local_data->type= TL_UNLOCK;
663
local_data->cond->notify_one();
664
local_data->cond= NULL;
760
lock->read_wait.last= &lock->read_wait.data;
761
lock->write_wait.last= &lock->write_wait.data;
762
lock->read_wait.data=lock->write_wait.data=0;
763
if (lock->write.data)
764
lock->write.data->type=TL_WRITE_ONLY;
765
pthread_mutex_unlock(&lock->mutex);
666
read_wait.last= &read_wait.data;
667
write_wait.last= &write_wait.data;
668
read_wait.data= write_wait.data=0;
670
write.data->type=TL_WRITE_ONLY;
773
677
This is used to abort all locks for a specific thread
776
bool thr_abort_locks_for_thread(THR_LOCK *lock, uint64_t thread_id)
680
bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
779
682
bool found= false;
781
pthread_mutex_lock(&lock->mutex);
782
for (data= lock->read_wait.data; data ; data= data->next)
684
boost::mutex::scoped_lock scopedLock(mutex);
685
for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
784
if (data->owner->info->thread_id == thread_id)
687
if (local_data->owner->info->thread_id == thread_id_arg)
786
data->type= TL_UNLOCK; /* Mark killed */
689
local_data->type= TL_UNLOCK; /* Mark killed */
787
690
/* It's safe to signal the cond first: we're still holding the mutex. */
789
pthread_cond_signal(data->cond);
790
data->cond= 0; /* Removed from list */
692
local_data->cond->notify_one();
693
local_data->cond= 0; /* Removed from list */
792
if (((*data->prev)= data->next))
793
data->next->prev= data->prev;
695
if (((*local_data->prev)= local_data->next))
696
local_data->next->prev= local_data->prev;
795
lock->read_wait.last= data->prev;
698
read_wait.last= local_data->prev;
798
for (data= lock->write_wait.data; data ; data= data->next)
701
for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
800
if (data->owner->info->thread_id == thread_id)
703
if (local_data->owner->info->thread_id == thread_id_arg)
802
data->type= TL_UNLOCK;
705
local_data->type= TL_UNLOCK;
804
pthread_cond_signal(data->cond);
707
local_data->cond->notify_one();
708
local_data->cond= NULL;
807
if (((*data->prev)= data->next))
808
data->next->prev= data->prev;
710
if (((*local_data->prev)= local_data->next))
711
local_data->next->prev= local_data->prev;
810
lock->write_wait.last= data->prev;
713
write_wait.last= local_data->prev;
813
wake_up_waiters(lock);
814
pthread_mutex_unlock(&lock->mutex);
716
wake_up_waiters(this);
818
721
} /* namespace drizzled */