58
* Current number of live rd_kafka_t handles.
59
* This is used by rd_kafka_wait_destroyed() to know when the library
60
* has fully cleaned up after itself.
62
static int rd_kafka_handle_cnt_curr = 0; /* atomic */
57
65
* Wait for all rd_kafka_t objects to be destroyed.
58
66
* Returns 0 if all kafka objects are now destroyed, or -1 if the
59
67
* timeout was reached.
61
69
int rd_kafka_wait_destroyed (int timeout_ms) {
62
70
rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
64
while (rd_kafka_thread_cnt() > 0) {
72
while (rd_kafka_thread_cnt() > 0 ||
73
rd_kafka_handle_cnt_curr > 0) {
65
74
if (rd_clock() >= timeout) {
111
void rd_kafka_log_buf (const rd_kafka_t *rk, int level,
112
const char *fac, const char *buf) {
114
if (!rk->rk_log_cb || level > rk->rk_log_level)
117
rk->rk_log_cb(rk, level, fac, buf);
102
120
void rd_kafka_log0 (const rd_kafka_t *rk, const char *extra, int level,
103
121
const char *fac, const char *fmt, ...) {
241
259
mcnt = srcq->rkq_qlen;
242
260
TAILQ_CONCAT(&dstq->rkq_q, &srcq->rkq_q, rko_link);
243
261
TAILQ_INIT(&srcq->rkq_q);
244
rd_atomic_set(&srcq->rkq_qlen, 0);
245
rd_atomic_add(&dstq->rkq_qlen, mcnt);
262
(void)rd_atomic_set(&srcq->rkq_qlen, 0);
263
(void)rd_atomic_add(&dstq->rkq_qlen, mcnt);
247
265
while (mcnt < cnt && (rko = TAILQ_FIRST(&srcq->rkq_q))) {
248
266
TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
249
267
TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, rko_link);
250
rd_atomic_sub(&dstq->rkq_qlen, 1);
251
rd_atomic_add(&dstq->rkq_qlen, 1);
268
(void)rd_atomic_sub(&dstq->rkq_qlen, 1);
269
(void)rd_atomic_add(&dstq->rkq_qlen, 1);
348
366
/* Reset real queue */
349
367
TAILQ_INIT(&rkq->rkq_q);
350
rd_atomic_set(&rkq->rkq_qlen, 0);
368
(void)rd_atomic_set(&rkq->rkq_qlen, 0);
351
369
pthread_mutex_unlock(&rkq->rkq_lock);
353
371
rd_kafka_dbg(rk, QUEUE, "QSERVE", "Serving %i ops", localq.rkq_qlen);
441
* Propogate an error event to the application.
442
* If no error_cb has been set by the application the error will
445
void rd_kafka_op_err (rd_kafka_t *rk, rd_kafka_resp_err_t err,
446
const char *fmt, ...) {
451
vsnprintf(buf, sizeof(buf), fmt, ap);
454
if (rk->rk_conf.error_cb)
455
rd_kafka_op_reply(rk, RD_KAFKA_OP_ERR, err,
456
strdup(buf), strlen(buf));
458
rd_kafka_log_buf(rk, LOG_ERR, "ERROR", buf);
423
462
static const char *rd_kafka_type2str (rd_kafka_type_t type) {
424
463
static const char *types[] = {
450
489
return "Local: Message timed out";
451
490
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
452
491
return "Broker: No more messages";
492
case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION:
493
return "Local: Unknown partition";
494
case RD_KAFKA_RESP_ERR__FS:
495
return "Local: File or filesystem error";
496
case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
497
return "Local: Unknown topic";
498
case RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN:
499
return "Local: All broker connections are down";
500
case RD_KAFKA_RESP_ERR__INVALID_ARG:
501
return "Local: Invalid argument or configuration";
502
case RD_KAFKA_RESP_ERR__TIMED_OUT:
503
return "Local: Timed out";
504
case RD_KAFKA_RESP_ERR__QUEUE_FULL:
505
return "Local: Queue full";
453
507
case RD_KAFKA_RESP_ERR_UNKNOWN:
454
508
return "Unknown error";
455
509
case RD_KAFKA_RESP_ERR_NO_ERROR:
542
rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
546
return RD_KAFKA_RESP_ERR__INVALID_ARG;
549
return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
552
return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
555
return RD_KAFKA_RESP_ERR__TIMED_OUT;
558
return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
561
return RD_KAFKA_RESP_ERR__QUEUE_FULL;
564
return RD_KAFKA_RESP_ERR__FAIL;
492
569
void rd_kafka_destroy0 (rd_kafka_t *rk) {
497
574
rd_kafka_q_purge(&rk->rk_rep);
499
576
rd_kafkap_str_destroy(rk->rk_clientid);
500
rd_kafka_conf_destroy(&rk->rk_conf);
577
rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
502
579
pthread_mutex_destroy(&rk->rk_lock);
583
rd_atomic_sub(&rd_kafka_handle_cnt_curr, 1);
511
590
rd_kafka_topic_t *rkt, *rkt_tmp;
513
592
rd_kafka_dbg(rk, GENERIC, "DESTROY", "Terminating instance");
514
rd_atomic_add(&rk->rk_terminate, 1);
593
(void)rd_atomic_add(&rk->rk_terminate, 1);
516
595
/* Decommission all topics */
517
596
rd_kafka_lock(rk);
663
743
rkb->rkb_c.rx_bytes,
664
744
rkb->rkb_c.rx_err,
745
rkb->rkb_c.rx_corrid_err,
665
746
rkb->rkb_rtt_last.ra_min,
666
747
rkb->rkb_rtt_last.ra_max,
667
748
rkb->rkb_rtt_last.ra_avg,
815
static void rd_kafka_topic_scan_tmr_cb (rd_kafka_t *rk, void *arg) {
816
rd_kafka_topic_scan_all(rk, rd_clock());
819
static void rd_kafka_stats_emit_tmr_cb (rd_kafka_t *rk, void *arg) {
820
rd_kafka_stats_emit_all(rk);
735
824
* Main loop for Kafka handler thread.
737
826
static void *rd_kafka_thread_main (void *arg) {
738
827
rd_kafka_t *rk = arg;
739
rd_ts_t last_topic_scan = rd_clock();
740
rd_ts_t last_stats_emit = last_topic_scan;
742
rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);
828
rd_kafka_timer_t tmr_topic_scan = {};
829
rd_kafka_timer_t tmr_stats_emit = {};
831
(void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);
833
rd_kafka_timer_start(rk, &tmr_topic_scan, 1000000,
834
rd_kafka_topic_scan_tmr_cb, NULL);
835
rd_kafka_timer_start(rk, &tmr_stats_emit,
836
rk->rk_conf.stats_interval_ms * 1000,
837
rd_kafka_stats_emit_tmr_cb, NULL);
744
839
while (likely(rk->rk_terminate == 0)) {
745
rd_ts_t now = rd_clock();
747
if (last_topic_scan + 1000000 <= now) {
748
rd_kafka_topic_scan_all(rk, now);
749
last_topic_scan = now;
753
last_stats_emit + (rk->rk_conf.stats_interval_ms*1000)) {
754
rd_kafka_stats_emit_all(rk);
755
last_stats_emit = now;
840
rd_kafka_timers_run(rk, 1000000);
761
843
rd_kafka_destroy0(rk); /* destroy handler thread's refcnt */
763
rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);
845
(void)rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);
800
881
TAILQ_INIT(&rk->rk_brokers);
801
882
TAILQ_INIT(&rk->rk_topics);
883
TAILQ_INIT(&rk->rk_timers);
884
pthread_mutex_init(&rk->rk_timers_lock, NULL);
885
pthread_cond_init(&rk->rk_timers_cond, NULL);
803
887
rk->rk_log_cb = rd_kafka_log_print;
810
894
/* Construct a client id if none is given. */
811
895
if (!rk->rk_conf.clientid)
812
896
rk->rk_conf.clientid = strdup("rdkafka");
814
898
snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
815
899
rk->rk_conf.clientid, rd_kafka_type2str(rk->rk_type), rkid++);
862
948
int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
864
char *payload, size_t len,
950
void *payload, size_t len,
865
951
const void *key, size_t keylen,
866
952
void *msg_opaque) {
886
972
rd_kafka_topic_unlock(rkt);
888
974
rd_kafka_toppar_lock(rktp);
977
case RD_KAFKA_OFFSET_BEGINNING:
978
case RD_KAFKA_OFFSET_END:
890
979
rktp->rktp_query_offset = offset;
891
980
rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY;
982
case RD_KAFKA_OFFSET_STORED:
983
if (!rkt->rkt_conf.auto_commit) {
984
rd_kafka_toppar_unlock(rktp);
985
rd_kafka_toppar_destroy(rktp);
989
rd_kafka_offset_store_init(rktp);
893
992
rktp->rktp_next_offset = offset;
894
993
rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_ACTIVE;
896
996
rd_kafka_toppar_unlock(rktp);
898
998
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "CONSUMER",
917
1017
if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
918
1018
!(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
919
1019
rd_kafka_topic_unlock(rkt);
927
1027
rd_kafka_toppar_lock(rktp);
928
1028
rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
1030
if (rktp->rktp_offset_path)
1031
rd_kafka_offset_store_term(rktp);
930
1033
/* Purge receive queue. */
931
1034
rd_kafka_q_purge(&rktp->rktp_fetchq);
1023
1126
TAILQ_REMOVE(&rktp->rktp_fetchq.rkq_q, rko, rko_link);
1024
rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1);
1127
(void)rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1);
1026
1129
pthread_mutex_unlock(&rktp->rktp_fetchq.rkq_lock);
1029
1132
rkmessages[cnt++] = rd_kafka_message_get(rko);
1135
/* Auto store offset of last message in batch, if enabled */
1136
if (cnt > 0 && rkt->rkt_conf.auto_commit)
1137
rd_kafka_offset_store0(rktp, rkmessages[cnt-1]->offset,
1032
1140
rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
1049
1158
rd_kafka_message_t *rkmessage;
1051
1160
rkmessage = rd_kafka_message_get(rko);
1161
if (ctx->rktp->rktp_rkt->rkt_conf.auto_commit)
1162
rd_kafka_offset_store0(ctx->rktp, rkmessage->offset, 1/*lock*/);
1052
1163
ctx->consume_cb(rkmessage, ctx->opaque);
1074
1185
if (unlikely(!rktp)) {
1075
1186
/* No such toppar known */
1080
1193
r = rd_kafka_q_serve(rkt->rkt_rk, &rktp->rktp_fetchq, timeout_ms,
1081
1194
rd_kafka_consume_cb, &ctx);
1117
1230
/* Get rkmessage from rko */
1118
1231
rkmessage = rd_kafka_message_get(rko);
1234
if (rktp->rktp_rkt->rkt_conf.auto_commit)
1235
rd_kafka_offset_store0(rktp, rkmessage->offset, 1/*lock*/);
1237
rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
1120
1239
return rkmessage;
1269
1387
rkt->rkt_partition_cnt, rkt->rkt_refcnt);
1270
1388
if (rkt->rkt_ua)
1271
1389
rd_kafka_toppar_dump(fp, " ", rkt->rkt_ua);
1390
if (!TAILQ_EMPTY(&rkt->rkt_desp)) {
1391
fprintf(fp, " desired partitions:");
1392
TAILQ_FOREACH(rktp, &rkt->rkt_desp, rktp_rktlink)
1393
fprintf(fp, " %"PRId32, rktp->rktp_partition);
1273
1397
rd_kafka_unlock(rk);