~knielsen/maria/5.1-group-commit

« back to all changes in this revision

Viewing changes to sql/handler.cc

  • Committer: knielsen at knielsen-hq
  • Date: 2010-06-09 11:17:39 UTC
  • Revision ID: knielsen@knielsen-hq.org-20100609111739-qso1veqt9io020fk
MWL#116: Fix a couple of races in group commit.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1103
1103
static THD *
1104
1104
enqueue_atomic(THD *thd)
1105
1105
{
1106
 
  my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
 
1106
  THD *orig_queue;
 
1107
 
1107
1108
  thd->next_commit_ordered= group_commit_queue;
 
1109
 
 
1110
  my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
 
1111
  do
 
1112
  {
 
1113
    /*
 
1114
      Save the read value of group_commit_queue in each iteration of the loop.
 
1115
      When my_atomic_casptr() returns TRUE, we know that orig_queue is equal
 
1116
      to the value of group_commit_queue when we enqueued.
 
1117
 
 
1118
      However, as soon as we enqueue, thd->next_commit_ordered may be
 
1119
      invalidated by another thread (the group commit leader). So we need to
 
1120
      save the old queue value in a local variable orig_queue like this.
 
1121
    */
 
1122
    orig_queue= thd->next_commit_ordered;
 
1123
  }
1108
1124
  while (!my_atomic_casptr((void **)(&group_commit_queue),
1109
1125
                           (void **)(&thd->next_commit_ordered),
1110
 
                           thd))
1111
 
    ;
 
1126
                           thd));
1112
1127
  my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue);
1113
 
  return thd->next_commit_ordered;
 
1128
 
 
1129
  return orig_queue;
1114
1130
}
1115
1131
 
1116
1132
static THD *
1399
1415
  int cookie;
1400
1416
  if (tc_log->use_group_log_xid)
1401
1417
  {
 
1418
    // ToDo: if xid==NULL here, we may use is_group_commit_leader uninitialised.
 
1419
    // ToDo: Same for cookie below when xid==NULL.
 
1420
    // Seems we generally need to check the case xid==NULL.
1402
1421
    if (is_group_commit_leader)
1403
1422
    {
1404
1423
      pthread_mutex_lock(&LOCK_group_commit);
1434
1453
      }
1435
1454
      pthread_mutex_unlock(&LOCK_group_commit);
1436
1455
 
1437
 
        /* Wake up everyone except ourself. */
1438
 
      while ((queue= queue->next_commit_ordered)  != NULL)
1439
 
        group_commit_wakeup_other(queue);
 
1456
      /* Wake up everyone except ourself. */
 
1457
      THD *current= queue->next_commit_ordered;
 
1458
      while (current != NULL)
 
1459
      {
 
1460
        /*
 
1461
          Careful not to access current->next_commit_ordered after waking up
 
1462
          the other thread! As it may change immediately after wakeup.
 
1463
        */
 
1464
        THD *next= current->next_commit_ordered;
 
1465
        group_commit_wakeup_other(current);
 
1466
        current= next;
 
1467
      }
1440
1468
    }
1441
1469
    else
1442
1470
    {