149
149
certify_v1to2(galera::TrxHandle* trx,
150
150
galera::Certification::CertIndex& cert_index,
151
galera::WriteSet::KeySequence::const_iterator key_seq_iter,
151
const galera::Key& key,
152
152
galera::TrxHandle::CertKeySet& key_list,
153
153
bool const store_keys, bool const log_conflicts)
155
155
typedef std::list<galera::KeyPart> KPS;
157
KPS key_parts(key_seq_iter->key_parts<KPS>());
157
KPS key_parts(key.key_parts<KPS>());
158
158
KPS::const_iterator begin(key_parts.begin()), end;
159
159
bool full_key(false);
160
160
for (end = begin; full_key == false; end != key_parts.end() ? ++end : end)
162
162
full_key = (end == key_parts.end());
163
163
galera::Certification::CertIndex::iterator ci;
164
galera::KeyEntry ke(key_seq_iter->version(), begin, end,
165
key_seq_iter->flags());
164
galera::KeyEntry ke(key.version(), begin, end, key.flags());
167
166
cert_debug << "key: " << ke.get_key()
168
167
<< " (" << (full_key == true ? "full" : "partial") << ")";
185
184
kep = new galera::KeyEntry(ke);
186
ci = cert_index.insert(std::make_pair(kep, kep)).first;
185
ci = cert_index.insert(kep).first;
187
186
cert_debug << "created new entry";
194
193
// Note: For we skip certification for isolated trxs, only
195
194
// cert index and key_list is populated.
196
195
if ((trx->flags() & galera::TrxHandle::F_ISOLATION) == 0 &&
197
certify_and_depend_v1to2(ci->second, trx, full_key,
196
certify_and_depend_v1to2(*ci, trx, full_key,
198
197
!shared_key, log_conflicts))
231
230
galera::Certification::do_test_v1to2(TrxHandle* trx, bool store_keys)
233
232
cert_debug << "BEGIN CERTIFICATION: " << *trx;
234
size_t offset(serial_size(*trx));
235
const MappedBuffer& wscoll(trx->write_set_collection());
236
233
galera::TrxHandle::CertKeySet& key_list(trx->cert_keys_);
237
234
long key_count(0);
238
235
gu::Lock lock(mutex_);
254
251
size_t prev_cert_index_size(cert_index_.size());
256
253
/* Scan over write sets */
257
while (offset < wscoll.size())
255
const gu::byte_t* buf(trx->write_set_buffer().first);
256
const size_t buf_len(trx->write_set_buffer().second);
257
while (offset < buf_len)
259
WriteSet ws(trx->version());
260
if ((offset = unserialize(&wscoll[0], wscoll.size(), offset, ws)) == 0)
262
gu_throw_fatal << "failed to unserialize write set";
265
WriteSet::KeySequence rk;
259
std::pair<size_t, size_t> k(WriteSet::segment(buf, buf_len, offset));
268
261
// Scan over all keys
269
for (WriteSet::KeySequence::const_iterator i(rk.begin());
263
while (offset < k.first + k.second)
272
if (certify_v1to2(trx, cert_index_, i, key_list, store_keys,
265
Key key(trx->version());
266
offset = unserialize(buf, buf_len, offset, key);
267
if (certify_v1to2(trx, cert_index_, key, key_list, store_keys,
273
268
log_conflicts_) == false)
279
key_count += rk.size();
276
std::pair<size_t, size_t> d(WriteSet::segment(buf, buf_len, offset));
277
offset = d.first + d.second;
282
281
trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_));
417
416
if (trx->last_seen_seqno() < initial_position_)
419
log_warn << "last seen seqno below limit for trx " << *trx;
418
if (cert_index_.empty() == false)
420
log_warn << "last seen seqno below limit for trx " << *trx;
424
log_debug << "last seen seqno below limit for trx " << *trx;
422
428
if (trx->global_seqno() - trx->last_seen_seqno() > max_length_)
518
524
if (seqno >= position_)
520
for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
526
std::for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
521
527
assert(cert_index_.size() == 0);
522
528
trx_map_.clear();
526
532
log_warn << "moving position backwards: " << position_ << " -> "
528
for_each(cert_index_.begin(), cert_index_.end(), DiscardRK());
529
for_each(trx_map_.begin(), trx_map_.end(),
534
std::for_each(cert_index_.begin(), cert_index_.end(),
536
std::for_each(trx_map_.begin(), trx_map_.end(),
530
537
Unref2nd<TrxMap::value_type>());
531
538
cert_index_.clear();
532
539
trx_map_.clear();