417
static void usage_record_prefix_by_time(uint64_t epoch, string& key)
420
snprintf(buf, sizeof(buf), "%011llu", (long long unsigned)epoch);
424
static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key)
426
char buf[32 + user.size() + bucket.size()];
427
snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str());
431
static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key)
433
char buf[32 + user.size() + bucket.size()];
434
snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
438
static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
440
bufferlist::iterator kiter = record_bl.begin();
443
} catch (buffer::error& err) {
444
CLS_LOG(1, "ERROR: usage_record_decode(): failed to decode record_bl\n");
451
int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
453
CLS_LOG(10, "rgw_user_usage_log_add()");
455
bufferlist::iterator in_iter = in->begin();
456
rgw_cls_usage_log_add_op op;
459
::decode(op, in_iter);
460
} catch (buffer::error& err) {
461
CLS_LOG(1, "ERROR: rgw_user_usage_log_add(): failed to decode request\n");
465
rgw_usage_log_info& info = op.info;
466
vector<rgw_usage_log_entry>::iterator iter;
468
for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) {
469
rgw_usage_log_entry& entry = *iter;
471
usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
473
CLS_LOG(10, "rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str());
475
bufferlist record_bl;
476
int ret = cls_cxx_map_get_val(hctx, key_by_time, &record_bl);
477
if (ret < 0 && ret != -ENOENT) {
478
CLS_LOG(1, "ERROR: rgw_user_usage_log_add(): cls_cxx_map_read_key returned %d\n", ret);
482
rgw_usage_log_entry e;
483
ret = usage_record_decode(record_bl, e);
486
CLS_LOG(10, "rgw_user_usage_log_add aggregating existing bucket\n");
490
bufferlist new_record_bl;
491
::encode(entry, new_record_bl);
492
ret = cls_cxx_map_set_val(hctx, key_by_time, &new_record_bl);
497
usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
498
ret = cls_cxx_map_set_val(hctx, key_by_user, &new_record_bl);
506
static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64_t end,
507
string& user, string& key_iter, uint32_t max_entries, bool *truncated,
508
int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
511
CLS_LOG(10, "usage_iterate_range");
513
map<string, bufferlist> keys;
515
string filter_prefix;
516
string start_key, end_key;
517
bool by_user = !user.empty();
525
usage_record_prefix_by_time(end, end_key);
528
user_key.append("_");
531
if (key_iter.empty()) {
535
usage_record_prefix_by_time(start, start_key);
538
start_key = key_iter;
542
int ret = cls_cxx_map_get_vals(hctx, start_key, filter_prefix, NUM_KEYS, &keys);
547
map<string, bufferlist>::iterator iter = keys.begin();
548
if (iter == keys.end())
551
for (; iter != keys.end(); ++iter) {
552
const string& key = iter->first;
553
rgw_usage_log_entry e;
555
if (!by_user && key.compare(end_key) >= 0)
558
if (by_user && key.compare(0, user_key.size(), user_key) != 0)
561
ret = usage_record_decode(iter->second, e);
568
/* keys are sorted by epoch, so once we're past end we're done */
572
ret = cb(hctx, key, e, param);
578
if (max_entries && (i > max_entries)) {
585
start_key = iter->first;
590
static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
592
map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param;
593
rgw_user_bucket ub(entry.owner, entry.bucket);
594
rgw_usage_log_entry& le = (*usage)[ub];
600
int rgw_user_usage_log_read(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
602
CLS_LOG(10, "rgw_user_usage_log_read()");
604
bufferlist::iterator in_iter = in->begin();
605
rgw_cls_usage_log_read_op op;
608
::decode(op, in_iter);
609
} catch (buffer::error& err) {
610
CLS_LOG(1, "ERROR: rgw_user_usage_log_read(): failed to decode request\n");
614
rgw_cls_usage_log_read_ret ret_info;
615
map<rgw_user_bucket, rgw_usage_log_entry> *usage = &ret_info.usage;
616
string iter = op.iter;
617
#define MAX_ENTRIES 1000
618
uint32_t max_entries = (op.max_entries ? op.max_entries : MAX_ENTRIES);
619
int ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.owner, iter, max_entries, &ret_info.truncated, usage_log_read_cb, (void *)usage);
623
if (ret_info.truncated)
624
ret_info.next_iter = iter;
626
::encode(ret_info, *out);
630
static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
635
usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
636
usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
638
int ret = cls_cxx_map_remove_key(hctx, key_by_time);
642
return cls_cxx_map_remove_key(hctx, key_by_user);
645
int rgw_user_usage_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
647
CLS_LOG(10, "rgw_user_usage_log_trim()");
649
/* only continue if object exists! */
650
int ret = cls_cxx_stat(hctx, NULL, NULL);
654
bufferlist::iterator in_iter = in->begin();
655
rgw_cls_usage_log_trim_op op;
658
::decode(op, in_iter);
659
} catch (buffer::error& err) {
660
CLS_LOG(1, "ERROR: rgw_user_log_usage_log_trim(): failed to decode request\n");
665
ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, iter, 0, NULL, usage_log_trim_cb, NULL);
411
672
void __cls_init()
413
CLS_LOG("Loaded rgw class!");
674
CLS_LOG(1, "Loaded rgw class!");
415
676
cls_register("rgw", &h_class);
416
677
cls_register_cxx_method(h_class, "bucket_init_index", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_init_index, &h_rgw_bucket_init_index);