~maria-captains/maria/5.5-cassandra-parallel

« back to all changes in this revision

Viewing changes to storage/cassandra/cassandra_se.cc

  • Committer: Sergey Petrunya
  • Date: 2012-09-20 14:32:37 UTC
  • Revision ID: psergey@askmonty.org-20120920143237-og4d9qh8uiojasmi
Casandra SE:
- Introduce asynchronous operations
- parallel INSERT operation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
6
6
#include <stdio.h>
7
7
#include <stdarg.h>
8
8
 
 
9
#include <boost/asio.hpp>
 
10
#include <boost/bind.hpp>
 
11
 
 
12
#include <async/TAsioAsync.h>
 
13
 
9
14
#include "Thrift.h"
10
15
#include "transport/TSocket.h"
11
16
#include "transport/TTransport.h"
12
17
#include "transport/TBufferTransports.h"
13
18
#include "protocol/TProtocol.h"
14
19
#include "protocol/TBinaryProtocol.h"
15
 
#include "gen-cpp/Cassandra.h"
 
20
#include "gen-cpp-async/Cassandra.h"
16
21
// cassandra includes end
17
22
 
18
23
#include "cassandra_se.h"
37
42
*/
38
43
class Cassandra_se_impl: public Cassandra_se_interface
39
44
{
40
 
  CassandraClient *cass; /* Connection to cassandra */
 
45
  /* Parameters */ 
 
46
  std::vector<std::string> hosts_list;
41
47
  ConsistencyLevel::type cur_consistency_level;
42
 
 
43
48
  std::string column_family;
44
49
  std::string keyspace;
45
50
  
 
51
  /* Connection handlers */
 
52
  boost::asio::io_service io_service;
 
53
  boost::shared_ptr<protocol::TProtocolFactory> protocolFactory;
 
54
 
 
55
  //boost::shared_ptr<async::TAsioClient> client;
 
56
  std::vector<boost::shared_ptr<async::TAsioClient> > clients;
 
57
 
 
58
  std::vector<boost::shared_ptr<CassandraAsyncClient> > async_clients;
 
59
  
 
60
  // DELETE THIS: v
 
61
  CassandraClient *cass; /* Connection to cassandra */
 
62
 
 
63
 
 
64
  
46
65
  /* DDL data */
47
66
  KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
48
67
  CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
56
75
  typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
57
76
  typedef std::map<std::string,  ColumnFamilyToMutation> KeyToCfMutationMap;
58
77
   
59
 
  KeyToCfMutationMap batch_mutation; /* Prepare operation here */
60
 
  int64_t insert_timestamp;
 
78
  std::vector<KeyToCfMutationMap> batch_mutations; /* Prepare operation here */
 
79
  KeyToCfMutationMap *batch_mutation; /* Prepare operation here */
 
80
  uint batch_mutation_idx;
 
81
  
 
82
  int64_t insert_timestamp; /* rows will be inserted with this timestamp */
 
83
  
 
84
  /* Current row mutation being filled */
61
85
  std::vector<Mutation>* insert_list;
62
86
   
63
87
  /* Resultset we're reading */
69
93
  SlicePredicate slice_pred;
70
94
  bool get_slices_returned_less;
71
95
public:
72
 
  Cassandra_se_impl() : cass(NULL) {}
73
 
  virtual ~Cassandra_se_impl(){ delete cass; }
 
96
  Cassandra_se_impl() /* : cass(NULL) */ {}
 
97
  virtual ~Cassandra_se_impl(){ /*delete cass; */ }
74
98
  
75
99
  /* Connection and DDL checks */
 
100
  bool make_hosts_list(const char *hosts);
76
101
  bool connect(const char *host, int port, const char *keyspace);
77
102
  void set_column_family(const char *cfname) { column_family.assign(cfname); }
78
103
 
85
110
  void clear_insert_buffer();
86
111
  void start_row_insert(const char *key, int key_len);
87
112
  void add_insert_column(const char *name, const char *value, int value_len);
88
 
  bool do_insert();
 
113
  bool do_insert(bool flush);
89
114
 
90
115
  /* Reads, point lookups */
91
116
  bool get_slice(char *key, size_t key_len, bool *found);
123
148
private:
124
149
  /* Non-inherited utility functions: */
125
150
  int64_t get_i64_timestamp();
 
151
 
 
152
  void on_connected2(boost::shared_ptr< CassandraAsyncClient> client);
 
153
  void on_set_keyspace();
 
154
  void on_set_keyspace_fail();
 
155
  void on_describe_keyspace(org::apache::cassandra::KsDef ks_def_arg);
 
156
  void on_batch_mutate_done();
 
157
 
 
158
  uint connected;
 
159
  uint keyspace_set;
 
160
  bool describe_keyspace_done;
 
161
  uint batch_mutations_to_do;
 
162
 
 
163
  bool remove_op_done;
 
164
  void on_remove_done() { remove_op_done= true; }
 
165
 
 
166
  bool error_happened;
126
167
};
127
168
 
128
169
 
135
176
}
136
177
 
137
178
 
 
179
void Cassandra_se_impl::on_connected2(boost::shared_ptr<CassandraAsyncClient> client)
 
180
{
 
181
  async_clients.push_back(client);
 
182
  connected++;
 
183
  fprintf(stderr, "=Connected!\n");
 
184
}
 
185
 
 
186
 
 
187
void Cassandra_se_impl::on_set_keyspace()
 
188
{
 
189
  keyspace_set++;
 
190
  fprintf(stderr, "=Keyspace set!\n");
 
191
}
 
192
 
 
193
void Cassandra_se_impl::on_set_keyspace_fail()
 
194
{
 
195
  keyspace_set++;
 
196
  error_happened= true;
 
197
}
 
198
 
 
199
bool Cassandra_se_impl::make_hosts_list(const char *hosts)
 
200
{
 
201
  const char* p= hosts;
 
202
  const char* name_start=p;
 
203
  hosts_list.clear();
 
204
  while (1)
 
205
  {
 
206
    if (*p==0 || *p==',')
 
207
    {
 
208
      // [name_start; p) is the name
 
209
      size_t len= p - name_start;
 
210
      if (len==0)
 
211
        return true;
 
212
      hosts_list.push_back(std::string(name_start, len));
 
213
      name_start= p + 1;
 
214
      if (*p == 0)
 
215
        break;
 
216
    }
 
217
    p++;
 
218
  }
 
219
  if (hosts_list.size() > 0)
 
220
    return false; /* OK */
 
221
  else
 
222
    return true; /* Empty list: error */
 
223
}
 
224
 
 
225
 
138
226
bool Cassandra_se_impl::connect(const char *host, int port, const char *keyspace_arg)
139
227
{
140
228
  bool res= true;
141
229
  
 
230
  if (make_hosts_list(host))
 
231
    return true;
 
232
 
142
233
  keyspace.assign(keyspace_arg);
143
 
  
 
234
 
 
235
  try {
 
236
    protocolFactory= boost::shared_ptr<protocol::TProtocolFactory>(new protocol::TBinaryProtocolFactory());
 
237
    clients.resize(hosts_list.size());
 
238
 
 
239
    for (uint i= 0; i < hosts_list.size(); i++)
 
240
    {
 
241
      clients[i]= boost::shared_ptr<async::TAsioClient>(
 
242
                             new async::TAsioClient(io_service, 
 
243
                              protocolFactory, protocolFactory));
 
244
 
 
245
      boost::function<void (boost::shared_ptr<CassandraAsyncClient> c )> call_me;
 
246
      call_me= boost::bind(&Cassandra_se_impl::on_connected2, this, _1);
 
247
 
 
248
      clients[i]->connect(hosts_list[i], port, call_me);
 
249
    }
 
250
    connected= 0;
 
251
 
 
252
    //io_service.run();
 
253
    while (connected !=hosts_list.size())
 
254
    {
 
255
      size_t cnt= io_service.run_one();
 
256
      fprintf(stderr, "processed %d events\n", (int)cnt);
 
257
    }
 
258
 
 
259
    keyspace_set= 0;
 
260
    error_happened=false;
 
261
    for (uint i= 0; i < hosts_list.size(); i++)
 
262
    {
 
263
      boost::function< void () > call_on_connect;
 
264
      call_on_connect= boost::bind(&Cassandra_se_impl::on_set_keyspace, this);
 
265
      async_clients[i]->set_keyspace(keyspace_arg).
 
266
        setCallback(boost::bind(&Cassandra_se_impl::on_set_keyspace, this)).
 
267
        setErrback(boost::bind(&Cassandra_se_impl::on_set_keyspace_fail, this));
 
268
    }
 
269
 
 
270
    while (keyspace_set != hosts_list.size())
 
271
    {
 
272
      size_t cnt= io_service.run_one();
 
273
      fprintf(stderr, "processed %d events\n", (int)cnt);
 
274
    }
 
275
    if (error_happened)
 
276
      res= true;
 
277
    else
 
278
      res= false;
 
279
 
 
280
  }catch (...) {
 
281
    print_error("Unknown exception");
 
282
  }
 
283
 
 
284
#if 1  
144
285
  try {
145
286
    boost::shared_ptr<TTransport> socket = 
146
 
      boost::shared_ptr<TSocket>(new TSocket(host, port));
 
287
      boost::shared_ptr<TSocket>(new TSocket(hosts_list[0], port));
147
288
    boost::shared_ptr<TTransport> tr = 
148
289
      boost::shared_ptr<TFramedTransport>(new TFramedTransport (socket));
149
290
    boost::shared_ptr<TProtocol> p = 
165
306
  }catch (...) {
166
307
    print_error("Unknown exception");
167
308
  }
 
309
#endif  
168
310
 
169
311
  cur_consistency_level= ConsistencyLevel::ONE;
170
312
 
174
316
}
175
317
 
176
318
 
 
319
void Cassandra_se_impl::on_describe_keyspace(org::apache::cassandra::KsDef ks_def_arg)
 
320
{
 
321
  fprintf(stderr, "=describe_keyspace_done");
 
322
  ks_def= ks_def_arg;
 
323
  describe_keyspace_done= true;
 
324
}
 
325
 
 
326
 
177
327
bool Cassandra_se_impl::setup_ddl_checks()
178
328
{
 
329
  describe_keyspace_done= false;
 
330
  async_clients[0]->describe_keyspace(keyspace).
 
331
                    setCallback(boost::bind(&Cassandra_se_impl::on_describe_keyspace,
 
332
                    this, _1));
 
333
 
 
334
  while (!describe_keyspace_done)
 
335
  {
 
336
    size_t cnt= io_service.run_one();
 
337
    fprintf(stderr, "processed %d events\n", (int)cnt);
 
338
  }
 
339
 
 
340
#if 0  
179
341
  try {
180
342
    cass->describe_keyspace(ks_def, keyspace);
181
 
 
182
 
    std::vector<CfDef>::iterator it;
183
 
    for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
184
 
    {
185
 
      cf_def= *it;
186
 
      if (!cf_def.name.compare(column_family))
187
 
        return false;
188
 
    }
189
 
 
190
 
    print_error("describe_keyspace() didn't return our column family");
191
 
 
192
343
  } catch (InvalidRequestException ire) {
193
344
    print_error("%s [%s]", ire.what(), ire.why.c_str());
194
345
  } catch (NotFoundException nfe) {
198
349
  } catch (...) {
199
350
    print_error("Unknown exception");
200
351
  }
 
352
#endif
 
353
 
 
354
  std::vector<CfDef>::iterator it;
 
355
  for (it= ks_def.cf_defs.begin(); it < ks_def.cf_defs.end(); it++)
 
356
  {
 
357
    cf_def= *it;
 
358
    if (!cf_def.name.compare(column_family))
 
359
      return false;
 
360
  }
 
361
 
 
362
  print_error("describe_keyspace() didn't return our column family");
201
363
 
202
364
  return true;
203
365
}
259
421
 
260
422
void Cassandra_se_impl::clear_insert_buffer()
261
423
{
262
 
  batch_mutation.clear();
 
424
  batch_mutations.resize(hosts_list.size());
 
425
  for (uint i=0; i < hosts_list.size(); i++)
 
426
    batch_mutations[i]= KeyToCfMutationMap();
 
427
 
 
428
  batch_mutation_idx= 0;
 
429
  batch_mutation= &batch_mutations[batch_mutation_idx];
263
430
}
264
431
 
265
432
 
267
434
{
268
435
  std::string key_to_insert;
269
436
  key_to_insert.assign(key, key_len);
270
 
  batch_mutation[key_to_insert]= ColumnFamilyToMutation();
271
 
  ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
 
437
  (*batch_mutation)[key_to_insert]= ColumnFamilyToMutation();
 
438
  ColumnFamilyToMutation& cf_mut= (*batch_mutation)[key_to_insert];
272
439
 
273
440
  cf_mut[column_family]= std::vector<Mutation>();
274
441
  insert_list= &cf_mut[column_family];
294
461
}
295
462
 
296
463
 
297
 
bool Cassandra_se_impl::do_insert()
 
464
void Cassandra_se_impl::on_batch_mutate_done()
 
465
{
 
466
  batch_mutations_to_do--;
 
467
}
 
468
 
 
469
 
 
470
bool Cassandra_se_impl::do_insert(bool flush)
298
471
{
299
472
  bool res= true;
 
473
  size_t size= hosts_list.size();
 
474
 
 
475
  if (!flush)
 
476
  {
 
477
    /* Switch to constructing another mutation object */
 
478
    if (batch_mutation_idx != size - 1)
 
479
    {
 
480
      batch_mutation= &batch_mutations[++batch_mutation_idx];
 
481
      return false;
 
482
    }
 
483
  }
 
484
 
 
485
  for (uint i= 0; i < size; i++)
 
486
  {
 
487
    /*
 
488
      zero-size mutations are allowed by Cassandra's batch_mutate but lets not 
 
489
      do them (we may attempt to do it if there is a bulk insert that stores
 
490
      exactly @@cassandra_insert_batch_size*n elements.
 
491
    */
 
492
    if (batch_mutations[i].empty())
 
493
      continue;
 
494
 
 
495
    batch_mutations_to_do++;
 
496
    async_clients[i]->batch_mutate(batch_mutations[i], cur_consistency_level).
 
497
      setCallback(boost::bind(&Cassandra_se_impl::on_batch_mutate_done, this));
 
498
 
 
499
    cassandra_counters.row_inserts+= batch_mutations[i].size();
 
500
    cassandra_counters.row_insert_batches++;
 
501
  }
300
502
  
301
 
  /*
302
 
    zero-size mutations are allowed by Cassandra's batch_mutate but lets not 
303
 
    do them (we may attempt to do it if there is a bulk insert that stores
304
 
    exactly @@cassandra_insert_batch_size*n elements.
305
 
  */
306
 
  if (batch_mutation.empty())
307
 
    return false;
308
 
 
 
503
  while (batch_mutations_to_do != 0)
 
504
  {
 
505
    io_service.run_one();
 
506
    //fprintf(stderr, "bm processed %d events\n", (int)cnt);
 
507
  }
 
508
  res= false;
 
509
 
 
510
  clear_insert_buffer();
 
511
 
 
512
#if 0
309
513
  try {
310
514
    
311
515
    cass->batch_mutate(batch_mutation, cur_consistency_level);
327
531
  } catch (...) {
328
532
    print_error("Unknown exception");
329
533
  }
330
 
 
 
534
#endif
331
535
  return res;
332
536
}
333
537
 
587
791
  ColumnPath column_path;
588
792
  column_path.column_family= column_family;
589
793
 
 
794
  remove_op_done= false;
 
795
  async_clients[0]->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level).
 
796
      setCallback(boost::bind(&Cassandra_se_impl::on_remove_done, this));
 
797
  
 
798
  
 
799
  while (!remove_op_done)
 
800
  {
 
801
    io_service.run_one();
 
802
  }
 
803
  res= false;
 
804
 
 
805
#if 0
590
806
  try {
591
807
    
592
808
    cass->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level);
603
819
  } catch (...) {
604
820
    print_error("Unknown exception");
605
821
  }
606
 
 
 
822
#endif
607
823
  return res;
608
824
}
609
825