9
#include <boost/asio.hpp>
10
#include <boost/bind.hpp>
12
#include <async/TAsioAsync.h>
10
15
#include "transport/TSocket.h"
11
16
#include "transport/TTransport.h"
12
17
#include "transport/TBufferTransports.h"
13
18
#include "protocol/TProtocol.h"
14
19
#include "protocol/TBinaryProtocol.h"
15
#include "gen-cpp/Cassandra.h"
20
#include "gen-cpp-async/Cassandra.h"
16
21
// cassandra includes end
18
23
#include "cassandra_se.h"
38
43
class Cassandra_se_impl: public Cassandra_se_interface
40
CassandraClient *cass; /* Connection to cassandra */
46
std::vector<std::string> hosts_list;
41
47
ConsistencyLevel::type cur_consistency_level;
43
48
std::string column_family;
44
49
std::string keyspace;
51
/* Connection handlers */
52
boost::asio::io_service io_service;
53
boost::shared_ptr<protocol::TProtocolFactory> protocolFactory;
55
//boost::shared_ptr<async::TAsioClient> client;
56
std::vector<boost::shared_ptr<async::TAsioClient> > clients;
58
std::vector<boost::shared_ptr<CassandraAsyncClient> > async_clients;
61
CassandraClient *cass; /* Connection to cassandra */
47
66
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
48
67
CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
56
75
typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
57
76
typedef std::map<std::string, ColumnFamilyToMutation> KeyToCfMutationMap;
59
KeyToCfMutationMap batch_mutation; /* Prepare operation here */
60
int64_t insert_timestamp;
78
std::vector<KeyToCfMutationMap> batch_mutations; /* Prepare operation here */
79
KeyToCfMutationMap *batch_mutation; /* Prepare operation here */
80
uint batch_mutation_idx;
82
int64_t insert_timestamp; /* rows will be inserted with this timestamp */
84
/* Current row mutation being filled */
61
85
std::vector<Mutation>* insert_list;
63
87
/* Resultset we're reading */
69
93
SlicePredicate slice_pred;
70
94
bool get_slices_returned_less;
72
Cassandra_se_impl() : cass(NULL) {}
73
virtual ~Cassandra_se_impl(){ delete cass; }
96
Cassandra_se_impl() /* : cass(NULL) */ {}
97
virtual ~Cassandra_se_impl(){ /*delete cass; */ }
75
99
/* Connection and DDL checks */
100
bool make_hosts_list(const char *hosts);
76
101
bool connect(const char *host, int port, const char *keyspace);
77
102
void set_column_family(const char *cfname) { column_family.assign(cfname); }
85
110
void clear_insert_buffer();
86
111
void start_row_insert(const char *key, int key_len);
87
112
void add_insert_column(const char *name, const char *value, int value_len);
113
bool do_insert(bool flush);
90
115
/* Reads, point lookups */
91
116
bool get_slice(char *key, size_t key_len, bool *found);
124
149
/* Non-inherited utility functions: */
125
150
int64_t get_i64_timestamp();
152
void on_connected2(boost::shared_ptr< CassandraAsyncClient> client);
153
void on_set_keyspace();
154
void on_set_keyspace_fail();
155
void on_describe_keyspace(org::apache::cassandra::KsDef ks_def_arg);
156
void on_batch_mutate_done();
160
bool describe_keyspace_done;
161
uint batch_mutations_to_do;
164
void on_remove_done() { remove_op_done= true; }
179
void Cassandra_se_impl::on_connected2(boost::shared_ptr<CassandraAsyncClient> client)
181
async_clients.push_back(client);
183
fprintf(stderr, "=Connected!\n");
187
void Cassandra_se_impl::on_set_keyspace()
190
fprintf(stderr, "=Keyspace set!\n");
193
void Cassandra_se_impl::on_set_keyspace_fail()
196
error_happened= true;
199
bool Cassandra_se_impl::make_hosts_list(const char *hosts)
201
const char* p= hosts;
202
const char* name_start=p;
206
if (*p==0 || *p==',')
208
// [name_start; p) is the name
209
size_t len= p - name_start;
212
hosts_list.push_back(std::string(name_start, len));
219
if (hosts_list.size() > 0)
220
return false; /* OK */
222
return true; /* Empty list: error */
138
226
bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg)
230
if (make_hosts_list(host))
142
233
keyspace.assign(keyspace_arg);
236
protocolFactory= boost::shared_ptr<protocol::TProtocolFactory>(new protocol::TBinaryProtocolFactory());
237
clients.resize(hosts_list.size());
239
for (uint i= 0; i < hosts_list.size(); i++)
241
clients[i]= boost::shared_ptr<async::TAsioClient>(
242
new async::TAsioClient(io_service,
243
protocolFactory, protocolFactory));
245
boost::function<void (boost::shared_ptr<CassandraAsyncClient> c )> call_me;
246
call_me= boost::bind(&Cassandra_se_impl::on_connected2, this, _1);
248
clients[i]->connect(hosts_list[i], port, call_me);
253
while (connected !=hosts_list.size())
255
size_t cnt= io_service.run_one();
256
fprintf(stderr, "processed %d events\n", (int)cnt);
260
error_happened=false;
261
for (uint i= 0; i < hosts_list.size(); i++)
263
boost::function< void () > call_on_connect;
264
call_on_connect= boost::bind(&Cassandra_se_impl::on_set_keyspace, this);
265
async_clients[i]->set_keyspace(keyspace_arg).
266
setCallback(boost::bind(&Cassandra_se_impl::on_set_keyspace, this)).
267
setErrback(boost::bind(&Cassandra_se_impl::on_set_keyspace_fail, this));
270
while (keyspace_set != hosts_list.size())
272
size_t cnt= io_service.run_one();
273
fprintf(stderr, "processed %d events\n", (int)cnt);
281
print_error("Unknown exception");
145
286
boost::shared_ptr<TTransport> socket =
146
boost::shared_ptr<TSocket>(new TSocket(host, port));
287
boost::shared_ptr<TSocket>(new TSocket(hosts_list[0], port));
147
288
boost::shared_ptr<TTransport> tr =
148
289
boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
149
290
boost::shared_ptr<TProtocol> p =
319
void Cassandra_se_impl::on_describe_keyspace(org::apache::cassandra::KsDef ks_def_arg)
321
fprintf(stderr, "=describe_keyspace_done");
323
describe_keyspace_done= true;
177
327
bool Cassandra_se_impl::setup_ddl_checks()
329
describe_keyspace_done= false;
330
async_clients[0]->describe_keyspace(keyspace).
331
setCallback(boost::bind(&Cassandra_se_impl::on_describe_keyspace,
334
while (!describe_keyspace_done)
336
size_t cnt= io_service.run_one();
337
fprintf(stderr, "processed %d events\n", (int)cnt);
180
342
cass->describe_keyspace(ks_def, keyspace);
182
std::vector<CfDef>::iterator it;
183
for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
186
if (!cf_def.name.compare(column_family))
190
print_error("describe_keyspace() didn't return our column family");
192
343
} catch (InvalidRequestException ire) {
193
344
print_error("%s [%s]", ire.what(), ire.why.c_str());
194
345
} catch (NotFoundException nfe) {
268
435
std::string key_to_insert;
269
436
key_to_insert.assign(key, key_len);
270
batch_mutation[key_to_insert]= ColumnFamilyToMutation();
271
ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
437
(*batch_mutation)[key_to_insert]= ColumnFamilyToMutation();
438
ColumnFamilyToMutation& cf_mut= (*batch_mutation)[key_to_insert];
273
440
cf_mut[column_family]= std::vector<Mutation>();
274
441
insert_list= &cf_mut[column_family];
297
bool Cassandra_se_impl::do_insert()
464
void Cassandra_se_impl::on_batch_mutate_done()
466
batch_mutations_to_do--;
470
bool Cassandra_se_impl::do_insert(bool flush)
473
size_t size= hosts_list.size();
477
/* Switch to constructing another mutation object */
478
if (batch_mutation_idx != size - 1)
480
batch_mutation= &batch_mutations[++batch_mutation_idx];
485
for (uint i= 0; i < size; i++)
488
zero-size mutations are allowed by Cassandra's batch_mutate but lets not
489
do them (we may attempt to do it if there is a bulk insert that stores
490
exactly @@cassandra_insert_batch_size*n elements.
492
if (batch_mutations[i].empty())
495
batch_mutations_to_do++;
496
async_clients[i]->batch_mutate(batch_mutations[i], cur_consistency_level).
497
setCallback(boost::bind(&Cassandra_se_impl::on_batch_mutate_done, this));
499
cassandra_counters.row_inserts+= batch_mutations[i].size();
500
cassandra_counters.row_insert_batches++;
302
zero-size mutations are allowed by Cassandra's batch_mutate but lets not
303
do them (we may attempt to do it if there is a bulk insert that stores
304
exactly @@cassandra_insert_batch_size*n elements.
306
if (batch_mutation.empty())
503
while (batch_mutations_to_do != 0)
505
io_service.run_one();
506
//fprintf(stderr, "bm processed %d events\n", (int)cnt);
510
clear_insert_buffer();
311
515
cass->batch_mutate(batch_mutation, cur_consistency_level);
587
791
ColumnPath column_path;
588
792
column_path.column_family= column_family;
794
remove_op_done= false;
795
async_clients[0]->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level).
796
setCallback(boost::bind(&Cassandra_se_impl::on_remove_done, this));
799
while (!remove_op_done)
801
io_service.run_one();
592
808
cass->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level);