~stub/ubuntu/wily/librdkafka/hack

« back to all changes in this revision

Viewing changes to src/rdthread.c

  • Committer: Package Import Robot
  • Author(s): Adam Conrad
  • Date: 2015-03-11 05:55:33 UTC
  • mfrom: (1.1.3) (2.1.5 vivid-proposed)
  • Revision ID: package-import@ubuntu.com-20150311055533-z5577gnoy49y0679
Tags: 0.8.5-2ubuntu1
as-needed-fix.patch: Break up libs from cflags in the configure compile
tests so we can put the libs at the end of the linker line, fixing test
detection issues when building with -Wl,--as-needed passed to binutils.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * librd - Rapid Development C library
 
3
 *
 
4
 * Copyright (c) 2012-2013, Magnus Edenhill
 
5
 * All rights reserved.
 
6
 * 
 
7
 * Redistribution and use in source and binary forms, with or without
 
8
 * modification, are permitted provided that the following conditions are met: 
 
9
 * 
 
10
 * 1. Redistributions of source code must retain the above copyright notice,
 
11
 *    this list of conditions and the following disclaimer. 
 
12
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 
13
 *    this list of conditions and the following disclaimer in the documentation
 
14
 *    and/or other materials provided with the distribution. 
 
15
 * 
 
16
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 
17
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 
18
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 
19
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
 
20
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 
21
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 
22
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 
23
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 
24
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 
25
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
26
 * POSSIBILITY OF SUCH DAMAGE.
 
27
 */
 
28
 
 
29
#include "rd.h"
 
30
#include "rdthread.h"
 
31
#include "rdqueue.h"
 
32
#include "rdevent.h"
 
33
#include "rdlog.h"
 
34
 
 
35
#ifdef __linux__
 
36
#include <sys/prctl.h>
 
37
#endif
 
38
#include <stdarg.h>
 
39
 
 
40
rd_thread_t *rd_mainthread;
 
41
__thread rd_thread_t *rd_currthread;
 
42
 
 
43
 
 
44
void rd_thread_init (void) {
 
45
        pthread_t thr = pthread_self();
 
46
        rd_mainthread = rd_thread_create0("main", &thr);
 
47
        rd_currthread = rd_mainthread;
 
48
}
 
49
 
 
50
int rd_thread_poll (int timeout_ms) {
 
51
        rd_fifoq_elm_t *rfqe;
 
52
        int cnt = 0;
 
53
        int nowait = timeout_ms == 0;
 
54
 
 
55
        while ((rfqe = rd_fifoq_pop0(&rd_currthread->rdt_eventq,
 
56
                                     nowait, timeout_ms))) {
 
57
                rd_thread_event_t *rte = rfqe->rfqe_ptr;
 
58
                
 
59
                rd_thread_event_call(rte);
 
60
                
 
61
                rd_fifoq_elm_release(&rd_currthread->rdt_eventq, rfqe);
 
62
 
 
63
                cnt++;
 
64
        }
 
65
 
 
66
        return cnt;
 
67
}
 
68
 
 
69
 
 
70
static void rd_thread_destroy (rd_thread_t *rdt) {
 
71
        assert(rdt->rdt_state != RD_THREAD_S_RUNNING);
 
72
        if (rdt->rdt_name)
 
73
                free(rdt->rdt_name);
 
74
        rd_fifoq_destroy(&rdt->rdt_eventq);
 
75
        free(rdt);
 
76
}
 
77
 
 
78
void rd_thread_cleanup (void) {
 
79
}
 
80
 
 
81
 
 
82
void rd_thread_dispatch (void) {
 
83
 
 
84
        while (rd_currthread->rdt_state == RD_THREAD_S_RUNNING) {
 
85
                /* FIXME: Proper conding for all thread inputs. */
 
86
                rd_thread_poll(100);
 
87
        }
 
88
 
 
89
        rd_thread_cleanup();
 
90
}
 
91
 
 
92
 
 
93
static void *rd_thread_start_routine (void *arg) {
 
94
        rd_thread_t *rdt = arg;
 
95
        void *ret;
 
96
        
 
97
        /* By default with block the user-defined signals. */
 
98
        rd_thread_sigmask(SIG_BLOCK, SIGUSR1, SIGUSR2, RD_SIG_END);
 
99
 
 
100
        rd_currthread = rdt;
 
101
 
 
102
        ret = rdt->rdt_start(rdt->rdt_start_arg);
 
103
 
 
104
        rd_thread_cleanup();
 
105
        rd_thread_destroy(rdt);
 
106
 
 
107
        return ret;
 
108
}
 
109
 
 
110
 
 
111
rd_thread_t *rd_thread_create0 (const char *name, pthread_t *pthread) {
 
112
        rd_thread_t *rdt;
 
113
 
 
114
        rdt = calloc(1, sizeof(*rdt));
 
115
  
 
116
        if (name)
 
117
                rdt->rdt_name = strdup(name);
 
118
 
 
119
        rdt->rdt_state = RD_THREAD_S_RUNNING;
 
120
 
 
121
        rd_fifoq_init(&rdt->rdt_eventq);
 
122
 
 
123
        if (pthread)
 
124
                rdt->rdt_thread = *pthread;
 
125
 
 
126
        return rdt;
 
127
}
 
128
 
 
129
 
 
130
int rd_thread_create (rd_thread_t **rdt,
 
131
                      const char *name,
 
132
                      const pthread_attr_t *attr,
 
133
                      void *(*start_routine)(void*),
 
134
                      void *arg) {
 
135
        rd_thread_t *rdt0;
 
136
 
 
137
        rdt0 = rd_thread_create0(name, NULL);
 
138
 
 
139
        rdt0->rdt_start = start_routine;
 
140
        rdt0->rdt_start_arg = arg;
 
141
 
 
142
        if (rdt)
 
143
                *rdt = rdt0;
 
144
        
 
145
        /* FIXME: We should block all signals until pthread_create returns. */
 
146
        if (pthread_create(&rdt0->rdt_thread, attr,
 
147
                           rd_thread_start_routine, rdt0)) {
 
148
                int errno_save = errno;
 
149
                rd_thread_destroy(rdt0);
 
150
                if (rdt)
 
151
                        *rdt = NULL;
 
152
                errno = errno_save;
 
153
                return -1;
 
154
        }
 
155
 
 
156
#ifdef PR_SET_NAME
 
157
        prctl(PR_SET_NAME, (char *)rdt0->rdt_name, 0, 0, 0);
 
158
#endif
 
159
        
 
160
        return 0;
 
161
}
 
162
 
 
163
 
 
164
int rd_threads_create (const char *nameprefix, int threadcount,
 
165
                       const pthread_attr_t *attr,
 
166
                       void *(*start_routine)(void*),
 
167
                       void *arg) {
 
168
        int i;
 
169
        char *name = alloca(strlen(nameprefix) + 4);
 
170
        int failed = 0;
 
171
 
 
172
        if (threadcount >= 1000) {
 
173
                errno = E2BIG;
 
174
                return -1;
 
175
        }
 
176
                
 
177
        for (i = 0 ; i < threadcount ; i++) {
 
178
                sprintf(name, "%s%i", nameprefix, i);
 
179
                if (!rd_thread_create(NULL, name, attr, start_routine, arg))
 
180
                        failed++;
 
181
        }
 
182
 
 
183
        if (failed == threadcount)
 
184
                return -1;
 
185
 
 
186
        return threadcount - failed;
 
187
}
 
188
 
 
189
 
 
190
int rd_thread_sigmask (int how, ...) {
 
191
        va_list ap;
 
192
        sigset_t set;
 
193
        int sig;
 
194
 
 
195
        sigemptyset(&set);
 
196
 
 
197
        va_start(ap, how);
 
198
        while ((sig = va_arg(ap, int)) != RD_SIG_END) {
 
199
                if (sig == RD_SIG_ALL)
 
200
                        sigfillset(&set);
 
201
                else
 
202
                        sigaddset(&set, sig);
 
203
        }
 
204
        va_end(ap);
 
205
 
 
206
        return pthread_sigmask(how, &set, NULL);
 
207
}
 
208