~stub/ubuntu/wily/librdkafka/hack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
 * librd - Rapid Development C library
 *
 * Copyright (c) 2012-2013, Magnus Edenhill
 * All rights reserved.
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met: 
 * 
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer. 
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution. 
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#pragma once

#include "rd.h"
#include "rdtypes.h"
#include "rdsysqueue.h"




/**
 * Thread-safe FIFO queues.
 * Typical usage is for work-queues (see tests/0001-fifoq.c for an example)
 *
 *
 * It is up the object code to properly lock the object itself.
 *
 *
 * Usage:
 *
 *   Caller thread:
 *       -- Add to fifoq:
 *       rd_fifoq_add(&my_fifoq, myobj);
 *
 *   In worker thread:
 *       -- Dequeue and process:
 *       while (1) {
 *         rfqe = rd_fifoq_pop_wait(&my_fifoq);
 *         myobj = rfqe->ptr;
 *         perform_work(myobj);
 *         rd_fifoq_elm_release(rfqe);
 *       }
 */

typedef struct rd_fifoq_elm_s {
	TAILQ_ENTRY(rd_fifoq_elm_s) rfqe_link;
	int         rfqe_refcnt;
	void       *rfqe_ptr;
} rd_fifoq_elm_t;


TAILQ_HEAD(rd_fifoq_elm_head_s, rd_fifoq_elm_s);

typedef struct rd_fifoq_s {
	TAILQ_HEAD(, rd_fifoq_elm_s) rfq_q;
	rd_mutex_t  rfq_lock;
	rd_cond_t   rfq_cond;
	int         rfq_cnt;
	int         rfq_max_size;
	int         rfq_taildrop;
	int         rfq_inited;
} rd_fifoq_t;

void        rd_fifoq_destroy (rd_fifoq_t *rfg);
rd_fifoq_t *rd_fifoq_init (rd_fifoq_t *rfq);

#define RD_FIFOQ_INITIALIZER(rfq)			\
	{						\
	.rfq_q = TAILQ_HEAD_INITIALIZER((rfq).rfq_q),	\
		.rfq_lock = RD_MUTEX_INITIALIZER,	\
		.rfq_cond = RD_COND_INITIALIZER,	\
		.rfq_inited = 1				\
			}
		

void rd_fifoq_set_max_size (rd_fifoq_t *rfq, int max_size, int taildrop);

void rd_fifoq_add0 (rd_fifoq_t *rfq, void *ptr, void **ptr_purged);
#define rd_fifoq_add(rfq,ptr) rd_fifoq_add0(rfq,ptr,NULL)
#define rd_fifoq_add_purge(rfq,ptr,ptr_purged) \
	rd_fifoq_add0(rfq,ptr,(void **)ptr_purged)

rd_fifoq_elm_t *rd_fifoq_pop0 (rd_fifoq_t *rfq, int no_wait, int timeout_ms);
#define rd_fifoq_pop_wait(rfq) rd_fifoq_pop0(rfq, 0, 0)
#define rd_fifoq_pop_timedwait(rfq,tmo) rd_fifoq_pop0(rfq, 0, tmo)
#define rd_fifoq_pop(rfq) rd_fifoq_pop0(rfq, 1, 0)

static inline void rd_fifoq_elm_release0 (rd_fifoq_t *rfq,
					  rd_fifoq_elm_t *rfqe) {
	if (rd_atomic_sub(&rfqe->rfqe_refcnt, 1) > 0)
		return;

	free(rfqe);
}

#define rd_fifoq_elm_release(RFQ,RFQE) do {   \
	rd_mutex_lock(&(RFQ)->rfq_lock);      \
	rd_fifoq_elm_release0(RFQ, RFQE);     \
	rd_mutex_unlock(&(RFQ)->rfq_lock);    \
	} while (0)