~stub/ubuntu/wily/librdkafka/hack

« back to all changes in this revision

Viewing changes to rdkafka.c

  • Committer: Package Import Robot
  • Author(s): Faidon Liambotis
  • Date: 2014-02-18 02:21:43 UTC
  • mfrom: (1.1.2) (2.1.1 sid)
  • Revision ID: package-import@ubuntu.com-20140218022143-3g5lbea2mtwjkk37
Tags: 0.8.3-1
* New upstream release.
  - Multiple internal symbols hidden; breaks ABI without a SONAME bump, but
    these were internal and should not break any applications, packaged or
    not.
* Update Standards-Version to 3.9.5, no changes needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
38
38
#include "rdkafka_msg.h"
39
39
#include "rdkafka_broker.h"
40
40
#include "rdkafka_topic.h"
 
41
#include "rdkafka_offset.h"
41
42
 
42
43
#include "rdtime.h"
43
44
 
54
55
}
55
56
 
56
57
/**
 
58
 * Current number of live rd_kafka_t handles.
 
59
 * This is used by rd_kafka_wait_destroyed() to know when the library
 
60
 * has fully cleaned up after itself.
 
61
 */
 
62
static int rd_kafka_handle_cnt_curr = 0; /* atomic */
 
63
 
 
64
/**
57
65
 * Wait for all rd_kafka_t objects to be destroyed.
58
66
 * Returns 0 if all kafka objects are now destroyed, or -1 if the
59
67
 * timeout was reached.
61
69
int rd_kafka_wait_destroyed (int timeout_ms) {
62
70
        rd_ts_t timeout = rd_clock() + (timeout_ms * 1000);
63
71
 
64
 
        while (rd_kafka_thread_cnt() > 0) {
 
72
        while (rd_kafka_thread_cnt() > 0 ||
 
73
               rd_kafka_handle_cnt_curr > 0) {
65
74
                if (rd_clock() >= timeout) {
66
75
                        errno = ETIMEDOUT;
67
76
                        return -1;
78
87
 * for delta timeouts.
79
88
 * `timeout_ms' is the delta timeout in milliseconds.
80
89
 */
81
 
static int pthread_cond_timedwait_ms (pthread_cond_t *cond,
82
 
                                      pthread_mutex_t *mutex,
83
 
                                      int timeout_ms) {
 
90
int pthread_cond_timedwait_ms (pthread_cond_t *cond,
 
91
                               pthread_mutex_t *mutex,
 
92
                               int timeout_ms) {
84
93
        struct timeval tv;
85
94
        struct timespec ts;
86
95
 
99
108
}
100
109
 
101
110
 
 
111
void rd_kafka_log_buf (const rd_kafka_t *rk, int level,
 
112
                       const char *fac, const char *buf) {
 
113
 
 
114
        if (!rk->rk_log_cb || level > rk->rk_log_level)
 
115
                return;
 
116
 
 
117
        rk->rk_log_cb(rk, level, fac, buf);
 
118
}
 
119
 
102
120
void rd_kafka_log0 (const rd_kafka_t *rk, const char *extra, int level,
103
121
                   const char *fac, const char *fmt, ...) {
104
122
        char buf[2048];
217
235
        }
218
236
 
219
237
        TAILQ_INIT(&rkq->rkq_q);
220
 
        rd_atomic_set(&rkq->rkq_qlen, 0);
 
238
        (void)rd_atomic_set(&rkq->rkq_qlen, 0);
221
239
 
222
240
        pthread_mutex_unlock(&rkq->rkq_lock);
223
241
}
241
259
                mcnt = srcq->rkq_qlen;
242
260
                TAILQ_CONCAT(&dstq->rkq_q, &srcq->rkq_q, rko_link);
243
261
                TAILQ_INIT(&srcq->rkq_q);
244
 
                rd_atomic_set(&srcq->rkq_qlen, 0);
245
 
                rd_atomic_add(&dstq->rkq_qlen, mcnt);
 
262
                (void)rd_atomic_set(&srcq->rkq_qlen, 0);
 
263
                (void)rd_atomic_add(&dstq->rkq_qlen, mcnt);
246
264
        } else {
247
265
                while (mcnt < cnt && (rko = TAILQ_FIRST(&srcq->rkq_q))) {
248
266
                        TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
249
267
                        TAILQ_INSERT_TAIL(&dstq->rkq_q, rko, rko_link);
250
 
                        rd_atomic_sub(&dstq->rkq_qlen, 1);
251
 
                        rd_atomic_add(&dstq->rkq_qlen, 1);
 
268
                        (void)rd_atomic_sub(&dstq->rkq_qlen, 1);
 
269
                        (void)rd_atomic_add(&dstq->rkq_qlen, 1);
252
270
                        mcnt++;
253
271
                }
254
272
        }
347
365
 
348
366
        /* Reset real queue */
349
367
        TAILQ_INIT(&rkq->rkq_q);
350
 
        rd_atomic_set(&rkq->rkq_qlen, 0);
 
368
        (void)rd_atomic_set(&rkq->rkq_qlen, 0);
351
369
        pthread_mutex_unlock(&rkq->rkq_lock);
352
370
 
353
371
        rd_kafka_dbg(rk, QUEUE, "QSERVE", "Serving %i ops", localq.rkq_qlen);
419
437
}
420
438
 
421
439
 
 
440
/**
 
441
 * Propogate an error event to the application.
 
442
 * If no error_cb has been set by the application the error will
 
443
 * be logged instead.
 
444
 */
 
445
void rd_kafka_op_err (rd_kafka_t *rk, rd_kafka_resp_err_t err,
 
446
                      const char *fmt, ...) {
 
447
        va_list ap;
 
448
        char buf[2048];
 
449
 
 
450
        va_start(ap, fmt);
 
451
        vsnprintf(buf, sizeof(buf), fmt, ap);
 
452
        va_end(ap);
 
453
 
 
454
        if (rk->rk_conf.error_cb)
 
455
                rd_kafka_op_reply(rk, RD_KAFKA_OP_ERR, err,
 
456
                                  strdup(buf), strlen(buf));
 
457
        else
 
458
                rd_kafka_log_buf(rk, LOG_ERR, "ERROR", buf);
 
459
}
 
460
 
422
461
 
423
462
static const char *rd_kafka_type2str (rd_kafka_type_t type) {
424
463
        static const char *types[] = {
450
489
                return "Local: Message timed out";
451
490
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
452
491
                return "Broker: No more messages";
 
492
        case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION:
 
493
                return "Local: Unknown partition";
 
494
        case RD_KAFKA_RESP_ERR__FS:
 
495
                return "Local: File or filesystem error";
 
496
        case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
 
497
                return "Local: Unknown topic";
 
498
        case RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN:
 
499
                return "Local: All broker connections are down";
 
500
        case RD_KAFKA_RESP_ERR__INVALID_ARG:
 
501
                return "Local: Invalid argument or configuration";
 
502
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
 
503
                return "Local: Timed out";
 
504
        case RD_KAFKA_RESP_ERR__QUEUE_FULL:
 
505
                return "Local: Queue full";
 
506
 
453
507
        case RD_KAFKA_RESP_ERR_UNKNOWN:
454
508
                return "Unknown error";
455
509
        case RD_KAFKA_RESP_ERR_NO_ERROR:
485
539
}
486
540
 
487
541
 
488
 
 
489
 
 
 
542
rd_kafka_resp_err_t rd_kafka_errno2err (int errnox) {
 
543
        switch (errnox)
 
544
        {
 
545
        case EINVAL:
 
546
                return RD_KAFKA_RESP_ERR__INVALID_ARG;
 
547
 
 
548
        case ENOENT:
 
549
                return RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
 
550
 
 
551
        case ESRCH:
 
552
                return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
 
553
 
 
554
        case ETIMEDOUT:
 
555
                return RD_KAFKA_RESP_ERR__TIMED_OUT;
 
556
 
 
557
        case EMSGSIZE:
 
558
                return RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE;
 
559
 
 
560
        case ENOBUFS:
 
561
                return RD_KAFKA_RESP_ERR__QUEUE_FULL;
 
562
 
 
563
        default:
 
564
                return RD_KAFKA_RESP_ERR__FAIL;
 
565
        }
 
566
}
490
567
 
491
568
 
492
569
void rd_kafka_destroy0 (rd_kafka_t *rk) {
497
574
        rd_kafka_q_purge(&rk->rk_rep);
498
575
 
499
576
        rd_kafkap_str_destroy(rk->rk_clientid);
500
 
        rd_kafka_conf_destroy(&rk->rk_conf);
 
577
        rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
501
578
 
502
579
        pthread_mutex_destroy(&rk->rk_lock);
503
580
 
504
581
        free(rk);
 
582
 
 
583
        rd_atomic_sub(&rd_kafka_handle_cnt_curr, 1);
505
584
}
506
585
 
507
586
 
511
590
        rd_kafka_topic_t *rkt, *rkt_tmp;
512
591
 
513
592
        rd_kafka_dbg(rk, GENERIC, "DESTROY", "Terminating instance");
514
 
        rd_atomic_add(&rk->rk_terminate, 1);
 
593
        (void)rd_atomic_add(&rk->rk_terminate, 1);
515
594
 
516
595
        /* Decommission all topics */
517
596
        rd_kafka_lock(rk);
641
720
                           "\"rx\":%"PRIu64", "
642
721
                           "\"rxbytes\":%"PRIu64", "
643
722
                           "\"rxerrs\":%"PRIu64", "
 
723
                           "\"rxcorriderrs\":%"PRIu64", "
644
724
                           "\"rtt\": {"
645
725
                           " \"min\":%"PRIu64","
646
726
                           " \"max\":%"PRIu64","
662
742
                           rkb->rkb_c.rx,
663
743
                           rkb->rkb_c.rx_bytes,
664
744
                           rkb->rkb_c.rx_err,
 
745
                           rkb->rkb_c.rx_corrid_err,
665
746
                           rkb->rkb_rtt_last.ra_min,
666
747
                           rkb->rkb_rtt_last.ra_max,
667
748
                           rkb->rkb_rtt_last.ra_avg,
731
812
 
732
813
 
733
814
 
 
815
static void rd_kafka_topic_scan_tmr_cb (rd_kafka_t *rk, void *arg) {
 
816
        rd_kafka_topic_scan_all(rk, rd_clock());
 
817
}
 
818
 
 
819
static void rd_kafka_stats_emit_tmr_cb (rd_kafka_t *rk, void *arg) {
 
820
        rd_kafka_stats_emit_all(rk);
 
821
}
 
822
 
734
823
/**
735
824
 * Main loop for Kafka handler thread.
736
825
 */
737
826
static void *rd_kafka_thread_main (void *arg) {
738
827
        rd_kafka_t *rk = arg;
739
 
        rd_ts_t last_topic_scan = rd_clock();
740
 
        rd_ts_t last_stats_emit = last_topic_scan;
741
 
 
742
 
        rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);
 
828
        rd_kafka_timer_t tmr_topic_scan = {};
 
829
        rd_kafka_timer_t tmr_stats_emit = {};
 
830
 
 
831
        (void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);
 
832
 
 
833
        rd_kafka_timer_start(rk, &tmr_topic_scan, 1000000,
 
834
                             rd_kafka_topic_scan_tmr_cb, NULL);
 
835
        rd_kafka_timer_start(rk, &tmr_stats_emit,
 
836
                             rk->rk_conf.stats_interval_ms * 1000,
 
837
                             rd_kafka_stats_emit_tmr_cb, NULL);
743
838
 
744
839
        while (likely(rk->rk_terminate == 0)) {
745
 
                rd_ts_t now = rd_clock();
746
 
 
747
 
                if (last_topic_scan + 1000000 <= now) {
748
 
                        rd_kafka_topic_scan_all(rk, now);
749
 
                        last_topic_scan = now;
750
 
                }
751
 
 
752
 
                if (now >
753
 
                    last_stats_emit + (rk->rk_conf.stats_interval_ms*1000)) {
754
 
                        rd_kafka_stats_emit_all(rk);
755
 
                        last_stats_emit = now;
756
 
                }
757
 
 
758
 
                sleep(1);
 
840
                rd_kafka_timers_run(rk, 1000000);
759
841
        }
760
842
 
761
843
        rd_kafka_destroy0(rk); /* destroy handler thread's refcnt */
762
844
 
763
 
        rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);
 
845
        (void)rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);
764
846
 
765
847
        return NULL;
766
848
}
787
869
 
788
870
        if (!conf)
789
871
                conf = rd_kafka_conf_new();
790
 
 
791
872
        rk->rk_conf = *conf;
792
873
        free(conf);
793
874
 
799
880
 
800
881
        TAILQ_INIT(&rk->rk_brokers);
801
882
        TAILQ_INIT(&rk->rk_topics);
 
883
        TAILQ_INIT(&rk->rk_timers);
 
884
        pthread_mutex_init(&rk->rk_timers_lock, NULL);
 
885
        pthread_cond_init(&rk->rk_timers_cond, NULL);
802
886
 
803
887
        rk->rk_log_cb = rd_kafka_log_print;
804
888
 
810
894
        /* Construct a client id if none is given. */
811
895
        if (!rk->rk_conf.clientid)
812
896
                rk->rk_conf.clientid = strdup("rdkafka");
813
 
        
 
897
 
814
898
        snprintf(rk->rk_name, sizeof(rk->rk_name), "%s#%s-%i",
815
899
                 rk->rk_conf.clientid, rd_kafka_type2str(rk->rk_type), rkid++);
816
900
 
848
932
        if (rk->rk_conf.brokerlist)
849
933
                rd_kafka_brokers_add(rk, rk->rk_conf.brokerlist);
850
934
 
 
935
        rd_atomic_add(&rd_kafka_handle_cnt_curr, 1);
 
936
 
851
937
        return rk;
852
938
}
853
939
 
861
947
 */
862
948
int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
863
949
                      int msgflags,
864
 
                      char *payload, size_t len,
 
950
                      void *payload, size_t len,
865
951
                      const void *key, size_t keylen,
866
952
                      void *msg_opaque) {
867
953
 
876
962
                            int64_t offset) {
877
963
        rd_kafka_toppar_t *rktp;
878
964
 
879
 
        if (partition == RD_KAFKA_PARTITION_UA) {
880
 
                errno = EINVAL;
 
965
        if (partition < 0) {
 
966
                errno = ESRCH;
881
967
                return -1;
882
968
        }
883
969
 
886
972
        rd_kafka_topic_unlock(rkt);
887
973
 
888
974
        rd_kafka_toppar_lock(rktp);
889
 
        if (offset < 0) {
 
975
        switch (offset)
 
976
        {
 
977
        case RD_KAFKA_OFFSET_BEGINNING:
 
978
        case RD_KAFKA_OFFSET_END:
890
979
                rktp->rktp_query_offset = offset;
891
980
                rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY;
892
 
        } else {
 
981
                break;
 
982
        case RD_KAFKA_OFFSET_STORED:
 
983
                if (!rkt->rkt_conf.auto_commit) {
 
984
                        rd_kafka_toppar_unlock(rktp);
 
985
                        rd_kafka_toppar_destroy(rktp);
 
986
                        errno = EINVAL;
 
987
                        return -1;
 
988
                }
 
989
                rd_kafka_offset_store_init(rktp);
 
990
                break;
 
991
        default:
893
992
                rktp->rktp_next_offset = offset;
894
993
                rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_ACTIVE;
895
994
        }
 
995
 
896
996
        rd_kafka_toppar_unlock(rktp);
897
997
 
898
998
        rd_kafka_dbg(rkt->rkt_rk, TOPIC, "CONSUMER",
917
1017
        if (!(rktp = rd_kafka_toppar_get(rkt, partition, 0)) &&
918
1018
            !(rktp = rd_kafka_toppar_desired_get(rkt, partition))) {
919
1019
                rd_kafka_topic_unlock(rkt);
920
 
                errno = ENOENT;
 
1020
                errno = ESRCH;
921
1021
                return -1;
922
1022
        }
923
1023
 
927
1027
        rd_kafka_toppar_lock(rktp);
928
1028
        rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE;
929
1029
 
 
1030
        if (rktp->rktp_offset_path)
 
1031
                rd_kafka_offset_store_term(rktp);
 
1032
 
930
1033
        /* Purge receive queue. */
931
1034
        rd_kafka_q_purge(&rktp->rktp_fetchq);
932
1035
 
997
1100
 
998
1101
        if (unlikely(!rktp)) {
999
1102
                /* No such toppar known */
1000
 
                errno = ENOENT;
 
1103
                errno = ESRCH;
1001
1104
                return -1;
1002
1105
        }
1003
1106
 
1021
1124
                }
1022
1125
 
1023
1126
                TAILQ_REMOVE(&rktp->rktp_fetchq.rkq_q, rko, rko_link);
1024
 
                rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1);
 
1127
                (void)rd_atomic_sub(&rktp->rktp_fetchq.rkq_qlen, 1);
1025
1128
 
1026
1129
                pthread_mutex_unlock(&rktp->rktp_fetchq.rkq_lock);
1027
1130
 
1029
1132
                rkmessages[cnt++] = rd_kafka_message_get(rko);
1030
1133
        }
1031
1134
 
 
1135
        /* Auto store offset of last message in batch, if enabled */
 
1136
        if (cnt > 0 && rkt->rkt_conf.auto_commit)
 
1137
                rd_kafka_offset_store0(rktp, rkmessages[cnt-1]->offset,
 
1138
                                       1/*lock*/);
 
1139
 
1032
1140
        rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
1033
1141
 
1034
1142
        return cnt;
1038
1146
struct consume_ctx {
1039
1147
        void (*consume_cb) (rd_kafka_message_t *rkmessage, void *opaque);
1040
1148
        void *opaque;
 
1149
        rd_kafka_toppar_t *rktp;
1041
1150
};
1042
1151
 
1043
1152
 
1049
1158
        rd_kafka_message_t *rkmessage;
1050
1159
 
1051
1160
        rkmessage = rd_kafka_message_get(rko);
 
1161
        if (ctx->rktp->rktp_rkt->rkt_conf.auto_commit)
 
1162
                rd_kafka_offset_store0(ctx->rktp, rkmessage->offset, 1/*lock*/);
1052
1163
        ctx->consume_cb(rkmessage, ctx->opaque);
1053
1164
}
1054
1165
 
1061
1172
                                                   void *opaque),
1062
1173
                               void *opaque) {
1063
1174
        rd_kafka_toppar_t *rktp;
1064
 
        struct consume_ctx ctx = { consume_cb: consume_cb, opaque: opaque };
 
1175
        struct consume_ctx ctx = { .consume_cb = consume_cb, .opaque = opaque };
1065
1176
        int r;
1066
1177
 
1067
1178
        /* Get toppar */
1073
1184
 
1074
1185
        if (unlikely(!rktp)) {
1075
1186
                /* No such toppar known */
1076
 
                errno = ENOENT;
 
1187
                errno = ESRCH;
1077
1188
                return -1;
1078
1189
        }
1079
1190
 
 
1191
        ctx.rktp = rktp;
 
1192
 
1080
1193
        r = rd_kafka_q_serve(rkt->rkt_rk, &rktp->rktp_fetchq, timeout_ms,
1081
1194
                             rd_kafka_consume_cb, &ctx);
1082
1195
 
1101
1214
 
1102
1215
        if (unlikely(!rktp)) {
1103
1216
                /* No such toppar known */
1104
 
                errno = ENOENT;
 
1217
                errno = ESRCH;
1105
1218
                return NULL;
1106
1219
        }
1107
1220
 
1117
1230
        /* Get rkmessage from rko */
1118
1231
        rkmessage = rd_kafka_message_get(rko);
1119
1232
 
 
1233
        /* Store offset */
 
1234
        if (rktp->rktp_rkt->rkt_conf.auto_commit)
 
1235
                rd_kafka_offset_store0(rktp, rkmessage->offset, 1/*lock*/);
 
1236
 
 
1237
        rd_kafka_toppar_destroy(rktp); /* refcnt from .._get() */
 
1238
 
1120
1239
        return rkmessage;
1121
1240
}
1122
1241
 
1175
1294
                if (!(dcnt % 1000))
1176
1295
                        rd_kafka_dbg(rk, MSG, "POLL",
1177
1296
                                     "Now %i messages delivered to app", dcnt);
1178
 
                                                  
1179
1297
                break;
1180
1298
 
1181
1299
        case RD_KAFKA_OP_STATS:
1269
1387
                        rkt->rkt_partition_cnt, rkt->rkt_refcnt);
1270
1388
                if (rkt->rkt_ua)
1271
1389
                        rd_kafka_toppar_dump(fp, "   ", rkt->rkt_ua);
 
1390
                if (!TAILQ_EMPTY(&rkt->rkt_desp)) {
 
1391
                        fprintf(fp, "   desired partitions:");
 
1392
                        TAILQ_FOREACH(rktp, &rkt->rkt_desp, rktp_rktlink)
 
1393
                                fprintf(fp, " %"PRId32, rktp->rktp_partition);
 
1394
                        fprintf(fp, "\n");
 
1395
                }
1272
1396
        }
1273
1397
        rd_kafka_unlock(rk);
1274
 
        
1275
1398
}
1276
1399
 
1277
1400