45
45
#include "config.h"
46
#include "write_buffer.h"
46
47
#include "transaction_log.h"
47
48
#include "transaction_log_applier.h"
48
49
#include "transaction_log_index.h"
55
#include <drizzled/errmsg_print.h>
56
#include <drizzled/gettext.h>
57
#include <drizzled/algorithm/crc32.h>
58
53
#include <drizzled/message/transaction.pb.h>
59
#include <google/protobuf/io/coded_stream.h>
54
#include <drizzled/util/functors.h>
55
#include <drizzled/session.h>
61
57
using namespace std;
62
58
using namespace drizzled;
63
using namespace google;
65
60
TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */
67
extern TransactionLogIndex *transaction_log_index;
69
62
TransactionLogApplier::TransactionLogApplier(const string name_arg,
70
TransactionLog &in_transaction_log,
71
bool in_do_checksum) :
63
TransactionLog *in_transaction_log,
64
TransactionLogIndex *in_transaction_log_index,
65
uint32_t in_num_write_buffers) :
72
66
plugin::TransactionApplier(name_arg),
73
transaction_log(in_transaction_log),
74
do_checksum(in_do_checksum)
67
transaction_log(in_transaction_log),
68
transaction_log_index(in_transaction_log_index),
69
num_write_buffers(in_num_write_buffers),
73
* Create each of the buffers we need for undo log entries
75
write_buffers.reserve(num_write_buffers);
76
for (size_t x= 0; x < num_write_buffers; ++x)
78
write_buffers.push_back(new WriteBuffer());
78
82
TransactionLogApplier::~TransactionLogApplier()
82
void TransactionLogApplier::apply(const message::Transaction &to_apply)
84
uint8_t *buffer; /* Buffer we will write serialized header,
85
message and trailing checksum to */
88
size_t message_byte_length= to_apply.ByteSize();
89
size_t total_envelope_length= TransactionLog::HEADER_TRAILER_BYTES + message_byte_length;
92
* Attempt allocation of raw memory buffer for the header,
93
* message and trailing checksum bytes.
95
buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
98
errmsg_printf(ERRMSG_LVL_ERROR,
99
_("Failed to allocate enough memory to buffer header, "
100
"transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
101
" bytes. Error: %s\n"),
102
static_cast<int64_t>(total_envelope_length),
107
orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
110
* Write the header information, which is the message type and
111
* the length of the transaction message into the buffer
113
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
114
static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
115
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
116
static_cast<uint32_t>(message_byte_length), buffer);
119
* Now write the serialized transaction message, followed
120
* by the optional checksum into the buffer.
122
buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
124
uint32_t checksum= 0;
127
checksum= drizzled::algorithm::crc32(
128
reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
131
/* We always write in network byte order */
132
buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
134
/* Ask the transaction log to write the entry and return where it wrote it */
135
off_t written_to= transaction_log.writeEntry(orig_buffer, total_envelope_length);
84
for_each(write_buffers.begin(),
87
write_buffers.clear();
88
delete transaction_log;
89
delete transaction_log_index;
92
WriteBuffer *TransactionLogApplier::getWriteBuffer(const Session &session)
94
return write_buffers[session.getSessionId() % num_write_buffers];
97
plugin::ReplicationReturnCode
98
TransactionLogApplier::apply(Session &in_session,
99
const message::Transaction &to_apply)
101
size_t entry_size= TransactionLog::getLogEntrySize(to_apply);
102
WriteBuffer *write_buffer= getWriteBuffer(in_session);
106
write_buffer->lock();
107
write_buffer->resize(entry_size);
108
uint8_t *bytes= write_buffer->getRawBytes();
109
bytes= transaction_log->packTransactionIntoLogEntry(to_apply,
113
off_t written_to= transaction_log->writeEntry(bytes, entry_size);
114
write_buffer->unlock();
139
116
/* Add an entry to the index describing what was just applied */
140
117
transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
142
total_envelope_length),
122
return plugin::SUCCESS;