~raghavendra-prabhu/percona-xtradb-cluster/release-5.5.30-galera-2.x

« back to all changes in this revision

Viewing changes to galera/src/certification.cpp

  • Committer: Raghavendra D Prabhu
  • Date: 2013-04-05 13:55:24 UTC
  • mfrom: (95.2.22 2.x)
  • Revision ID: raghavendra.prabhu@percona.com-20130405135524-aa4f4y9rko7t3cvt
Merge galera/2.x branch.

We merge upto r148 of galera/2.x branch for 5.5.30-23.7.4 PXC release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
47
47
 
48
48
        CertIndex::iterator ci(cert_index_.find(kel));
49
49
        assert(ci != cert_index_.end());
50
 
        KeyEntry* const ke(ci->second);
 
50
        KeyEntry* const ke(*ci);
51
51
 
52
52
        if (shared == false &&
53
53
            (ke->ref_trx() == trx || ke->ref_full_trx() == trx))
148
148
static bool
149
149
certify_v1to2(galera::TrxHandle*                            trx,
150
150
              galera::Certification::CertIndex&             cert_index,
151
 
              galera::WriteSet::KeySequence::const_iterator key_seq_iter,
 
151
              const galera::Key&                            key,
152
152
              galera::TrxHandle::CertKeySet&                key_list,
153
153
              bool const store_keys, bool const log_conflicts)
154
154
{
155
155
    typedef std::list<galera::KeyPart> KPS;
156
156
 
157
 
    KPS key_parts(key_seq_iter->key_parts<KPS>());
 
157
    KPS key_parts(key.key_parts<KPS>());
158
158
    KPS::const_iterator begin(key_parts.begin()), end;
159
159
    bool full_key(false);
160
160
    for (end = begin; full_key == false; end != key_parts.end() ? ++end : end)
161
161
    {
162
162
        full_key = (end == key_parts.end());
163
163
        galera::Certification::CertIndex::iterator ci;
164
 
        galera::KeyEntry ke(key_seq_iter->version(), begin, end,
165
 
                            key_seq_iter->flags());
 
164
        galera::KeyEntry ke(key.version(), begin, end, key.flags());
166
165
 
167
166
        cert_debug << "key: " << ke.get_key()
168
167
                   << " (" << (full_key == true ? "full" : "partial") << ")";
183
182
            if (store_keys)
184
183
            {
185
184
                kep = new galera::KeyEntry(ke);
186
 
                ci = cert_index.insert(std::make_pair(kep, kep)).first;
 
185
                ci = cert_index.insert(kep).first;
187
186
                cert_debug << "created new entry";
188
187
            }
189
188
        }
194
193
            // Note: For we skip certification for isolated trxs, only
195
194
            // cert index and key_list is populated.
196
195
            if ((trx->flags() & galera::TrxHandle::F_ISOLATION) == 0 &&
197
 
                certify_and_depend_v1to2(ci->second, trx, full_key,
 
196
                certify_and_depend_v1to2(*ci, trx, full_key,
198
197
                                         !shared_key, log_conflicts))
199
198
            {
200
199
                return false;
203
202
            if (store_keys)
204
203
            {
205
204
                if (gu_likely(
206
 
                        true == ke.get_key().equal_all(ci->second->get_key())))
 
205
                        true == ke.get_key().equal_all((*ci)->get_key())))
207
206
                {
208
 
                    kep = ci->second;
 
207
                    kep = *ci;
209
208
                }
210
209
                else
211
210
                {
231
230
galera::Certification::do_test_v1to2(TrxHandle* trx, bool store_keys)
232
231
{
233
232
    cert_debug << "BEGIN CERTIFICATION: " << *trx;
234
 
    size_t offset(serial_size(*trx));
235
 
    const MappedBuffer& wscoll(trx->write_set_collection());
236
233
    galera::TrxHandle::CertKeySet& key_list(trx->cert_keys_);
237
234
    long key_count(0);
238
235
    gu::Lock lock(mutex_);
254
251
    size_t prev_cert_index_size(cert_index_.size());
255
252
#endif // NDEBUG
256
253
    /* Scan over write sets */
257
 
    while (offset < wscoll.size())
 
254
    size_t offset(0);
 
255
    const gu::byte_t* buf(trx->write_set_buffer().first);
 
256
    const size_t buf_len(trx->write_set_buffer().second);
 
257
    while (offset < buf_len)
258
258
    {
259
 
        WriteSet ws(trx->version());
260
 
        if ((offset = unserialize(&wscoll[0], wscoll.size(), offset, ws)) == 0)
261
 
        {
262
 
            gu_throw_fatal << "failed to unserialize write set";
263
 
        }
264
 
 
265
 
        WriteSet::KeySequence rk;
266
 
        ws.get_keys(rk);
 
259
        std::pair<size_t, size_t> k(WriteSet::segment(buf, buf_len, offset));
267
260
 
268
261
        // Scan over all keys
269
 
        for (WriteSet::KeySequence::const_iterator i(rk.begin());
270
 
             i != rk.end(); ++i)
 
262
        offset = k.first;
 
263
        while (offset < k.first + k.second)
271
264
        {
272
 
            if (certify_v1to2(trx, cert_index_, i, key_list, store_keys,
 
265
            Key key(trx->version());
 
266
            offset = unserialize(buf, buf_len, offset, key);
 
267
            if (certify_v1to2(trx, cert_index_, key, key_list, store_keys,
273
268
                              log_conflicts_) == false)
274
269
            {
275
270
                goto cert_fail;
276
271
            }
 
272
            ++key_count;
277
273
        }
278
274
 
279
 
        key_count += rk.size();
 
275
        // Skip data part
 
276
        std::pair<size_t, size_t> d(WriteSet::segment(buf, buf_len, offset));
 
277
        offset = d.first + d.second;
 
278
 
280
279
    }
281
280
 
282
281
    trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_));
295
294
                               << kel->get_key() << "' from cert index";
296
295
            }
297
296
 
298
 
            KeyEntry* const ke(ci->second);
 
297
            KeyEntry* const ke(*ci);
299
298
            const bool full_key(i->second.first);
300
299
            const bool shared_key(i->second.second);
301
300
            bool keep(false);
353
352
 
354
353
            if (ci != cert_index_.end())
355
354
            {
356
 
                KeyEntry* ke(ci->second);
 
355
                KeyEntry* ke(*ci);
357
356
 
358
357
                if (ke->ref_trx() == 0 && ke->ref_shared_trx() == 0)
359
358
                {
416
415
    {
417
416
        if (trx->last_seen_seqno() < initial_position_)
418
417
        {
419
 
            log_warn << "last seen seqno below limit for trx " << *trx;
 
418
            if (cert_index_.empty() == false)
 
419
            {
 
420
                log_warn << "last seen seqno below limit for trx " << *trx;
 
421
            }
 
422
            else
 
423
            {
 
424
                log_debug << "last seen seqno below limit for trx " << *trx;
 
425
            }
420
426
        }
421
427
 
422
428
        if (trx->global_seqno() - trx->last_seen_seqno() > max_length_)
517
523
 
518
524
    if (seqno >= position_)
519
525
    {
520
 
        for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
 
526
        std::for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
521
527
        assert(cert_index_.size() == 0);
522
528
        trx_map_.clear();
523
529
    }
525
531
    {
526
532
        log_warn << "moving position backwards: " << position_ << " -> "
527
533
                 << seqno;
528
 
        for_each(cert_index_.begin(), cert_index_.end(), DiscardRK());
529
 
        for_each(trx_map_.begin(), trx_map_.end(),
 
534
        std::for_each(cert_index_.begin(), cert_index_.end(),
 
535
                      gu::DeleteObject());
 
536
        std::for_each(trx_map_.begin(), trx_map_.end(),
530
537
                 Unref2nd<TrxMap::value_type>());
531
538
        cert_index_.clear();
532
539
        trx_map_.clear();