~posulliv/drizzle/optimizer-style-cleanup

« back to all changes in this revision

Viewing changes to plugin/transaction_log/transaction_log_applier.cc

  • Committer: Padraig O'Sullivan
  • Date: 2010-04-17 01:38:47 UTC
  • mfrom: (1237.9.238 bad-staging)
  • Revision ID: osullivan.padraig@gmail.com-20100417013847-ibjioqsfbmf5yg4g
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
 *
7
7
 *  Authors:
8
8
 *
9
 
 *  Jay Pipes <jaypipes@gmail.com.com>
 
9
 *    Jay Pipes <jaypipes@gmail.com>
10
10
 *
11
11
 *  This program is free software; you can redistribute it and/or modify
12
12
 *  it under the terms of the GNU General Public License as published by
43
43
 */
44
44
 
45
45
#include "config.h"
 
46
#include "write_buffer.h"
46
47
#include "transaction_log.h"
47
48
#include "transaction_log_applier.h"
48
49
#include "transaction_log_index.h"
49
50
 
50
 
#include <sys/stat.h>
51
 
#include <fcntl.h>
52
 
#include <unistd.h>
53
 
#include <errno.h>
 
51
#include <vector>
54
52
 
55
 
#include <drizzled/errmsg_print.h>
56
 
#include <drizzled/gettext.h>
57
 
#include <drizzled/algorithm/crc32.h>
58
53
#include <drizzled/message/transaction.pb.h>
59
 
#include <google/protobuf/io/coded_stream.h>
 
54
#include <drizzled/util/functors.h>
 
55
#include <drizzled/session.h>
60
56
 
61
57
using namespace std;
62
58
using namespace drizzled;
63
 
using namespace google;
64
59
 
65
60
TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */
66
61
 
67
 
extern TransactionLogIndex *transaction_log_index;
68
 
 
69
62
TransactionLogApplier::TransactionLogApplier(const string name_arg,
70
 
                                             TransactionLog &in_transaction_log,
71
 
                                             bool in_do_checksum) :
 
63
                                             TransactionLog *in_transaction_log,
 
64
                                             TransactionLogIndex *in_transaction_log_index,
 
65
                                             uint32_t in_num_write_buffers) :
72
66
  plugin::TransactionApplier(name_arg),
73
 
  transaction_log(in_transaction_log),  
74
 
  do_checksum(in_do_checksum)
 
67
  transaction_log(in_transaction_log),
 
68
  transaction_log_index(in_transaction_log_index),
 
69
  num_write_buffers(in_num_write_buffers),
 
70
  write_buffers()
75
71
{
 
72
  /* 
 
73
   * Create each of the buffers we need for undo log entries 
 
74
   */
 
75
  write_buffers.reserve(num_write_buffers);
 
76
  for (size_t x= 0; x < num_write_buffers; ++x)
 
77
  {
 
78
    write_buffers.push_back(new WriteBuffer());
 
79
  }
76
80
}
77
81
 
78
82
TransactionLogApplier::~TransactionLogApplier()
79
83
{
80
 
}
81
 
 
82
 
void TransactionLogApplier::apply(const message::Transaction &to_apply)
83
 
{
84
 
  uint8_t *buffer; /* Buffer we will write serialized header, 
85
 
                      message and trailing checksum to */
86
 
  uint8_t *orig_buffer;
87
 
 
88
 
  size_t message_byte_length= to_apply.ByteSize();
89
 
  size_t total_envelope_length= TransactionLog::HEADER_TRAILER_BYTES + message_byte_length;
90
 
 
91
 
  /* 
92
 
   * Attempt allocation of raw memory buffer for the header, 
93
 
   * message and trailing checksum bytes.
94
 
   */
95
 
  buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
96
 
  if (buffer == NULL)
97
 
  {
98
 
    errmsg_printf(ERRMSG_LVL_ERROR, 
99
 
                  _("Failed to allocate enough memory to buffer header, "
100
 
                    "transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
101
 
                    " bytes.  Error: %s\n"), 
102
 
                  static_cast<int64_t>(total_envelope_length),
103
 
                  strerror(errno));
104
 
    return;
105
 
  }
106
 
  else
107
 
    orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
108
 
 
109
 
  /*
110
 
   * Write the header information, which is the message type and
111
 
   * the length of the transaction message into the buffer
112
 
   */
113
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
114
 
      static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
115
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
116
 
      static_cast<uint32_t>(message_byte_length), buffer);
117
 
  
118
 
  /*
119
 
   * Now write the serialized transaction message, followed
120
 
   * by the optional checksum into the buffer.
121
 
   */
122
 
  buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
123
 
 
124
 
  uint32_t checksum= 0;
125
 
  if (do_checksum)
126
 
  {
127
 
    checksum= drizzled::algorithm::crc32(
128
 
        reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
129
 
  }
130
 
 
131
 
  /* We always write in network byte order */
132
 
  buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
133
 
 
134
 
  /* Ask the transaction log to write the entry and return where it wrote it */
135
 
  off_t written_to= transaction_log.writeEntry(orig_buffer, total_envelope_length);
136
 
 
137
 
  free(orig_buffer);
 
84
  for_each(write_buffers.begin(),
 
85
           write_buffers.end(),
 
86
           DeletePtr());
 
87
  write_buffers.clear();
 
88
  delete transaction_log;
 
89
  delete transaction_log_index;
 
90
}
 
91
 
 
92
WriteBuffer *TransactionLogApplier::getWriteBuffer(const Session &session)
 
93
{
 
94
  return write_buffers[session.getSessionId() % num_write_buffers];
 
95
}
 
96
 
 
97
plugin::ReplicationReturnCode
 
98
TransactionLogApplier::apply(Session &in_session,
 
99
                             const message::Transaction &to_apply)
 
100
{
 
101
  size_t entry_size= TransactionLog::getLogEntrySize(to_apply);
 
102
  WriteBuffer *write_buffer= getWriteBuffer(in_session);
 
103
 
 
104
  uint32_t checksum;
 
105
 
 
106
  write_buffer->lock();
 
107
  write_buffer->resize(entry_size);
 
108
  uint8_t *bytes= write_buffer->getRawBytes();
 
109
  bytes= transaction_log->packTransactionIntoLogEntry(to_apply,
 
110
                                                     bytes,
 
111
                                                     &checksum);
 
112
 
 
113
  off_t written_to= transaction_log->writeEntry(bytes, entry_size);
 
114
  write_buffer->unlock();
138
115
 
139
116
  /* Add an entry to the index describing what was just applied */
140
117
  transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
141
118
                                                      written_to,
142
 
                                                      total_envelope_length),
 
119
                                                      entry_size),
143
120
                                  to_apply,
144
121
                                  checksum);
145
 
 
 
122
  return plugin::SUCCESS;
146
123
}