6
#ifndef DISABLE_THREADS
11
#include "amqp_framing.h"
12
#include "amqp_private.h"
16
#define RPC_REPLY(replytype) \
17
(amqp_rpc_reply->reply_type == AMQP_RESPONSE_NORMAL \
18
? (replytype *) amqp_rpc_reply->reply.decoded \
21
amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
22
amqp_channel_t channel)
24
amqp_rpc_reply_t *amqp_rpc_reply;
25
amqp_rpc_reply = amqp_get_rpc_reply();
27
AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK,
30
return RPC_REPLY(amqp_channel_open_ok_t);
33
int amqp_basic_publish(amqp_connection_state_t state,
34
amqp_channel_t channel,
35
amqp_bytes_t exchange,
36
amqp_bytes_t routing_key,
37
amqp_boolean_t mandatory,
38
amqp_boolean_t immediate,
39
amqp_basic_properties_t const *properties,
44
size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
46
amqp_basic_publish_t m =
47
(amqp_basic_publish_t) {
49
.routing_key = routing_key,
50
.mandatory = mandatory,
51
.immediate = immediate
54
amqp_basic_properties_t default_properties;
56
AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m));
58
if (properties == NULL) {
59
memset(&default_properties, 0, sizeof(default_properties));
60
properties = &default_properties;
63
f.frame_type = AMQP_FRAME_HEADER;
65
f.payload.properties.class_id = AMQP_BASIC_CLASS;
66
f.payload.properties.body_size = body.len;
67
f.payload.properties.decoded = (void *) properties;
68
AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
72
int remaining = body.len - body_offset;
73
assert(remaining >= 0);
78
f.frame_type = AMQP_FRAME_BODY;
80
f.payload.body_fragment.bytes = BUF_AT(body, body_offset);
81
if (remaining >= usable_body_payload_size) {
82
f.payload.body_fragment.len = usable_body_payload_size;
84
f.payload.body_fragment.len = remaining;
87
body_offset += f.payload.body_fragment.len;
88
AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
94
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
95
amqp_channel_t channel,
99
snprintf(codestr, sizeof(codestr), "%d", code);
100
return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK,
101
amqp_channel_close_t,
102
code, amqp_cstring_bytes(codestr), 0, 0);
105
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
109
snprintf(codestr, sizeof(codestr), "%d", code);
110
return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
111
amqp_connection_close_t,
112
code, amqp_cstring_bytes(codestr), 0, 0);
115
amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
116
amqp_channel_t channel,
117
amqp_bytes_t exchange,
119
amqp_boolean_t passive,
120
amqp_boolean_t durable,
121
amqp_boolean_t auto_delete,
122
amqp_table_t arguments)
124
amqp_rpc_reply_t *amqp_rpc_reply;
125
amqp_rpc_reply = amqp_get_rpc_reply();
127
AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK,
128
amqp_exchange_declare_t,
129
0, exchange, type, passive, durable, auto_delete, 0, 0, arguments);
130
return RPC_REPLY(amqp_exchange_declare_ok_t);
133
amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
134
amqp_channel_t channel,
136
amqp_boolean_t passive,
137
amqp_boolean_t durable,
138
amqp_boolean_t exclusive,
139
amqp_boolean_t auto_delete,
140
amqp_table_t arguments)
142
amqp_rpc_reply_t *amqp_rpc_reply;
143
amqp_rpc_reply = amqp_get_rpc_reply();
145
AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK,
146
amqp_queue_declare_t,
147
0, queue, passive, durable, exclusive, auto_delete, 0, arguments);
148
return RPC_REPLY(amqp_queue_declare_ok_t);
151
amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
152
amqp_channel_t channel,
154
amqp_bytes_t exchange,
155
amqp_bytes_t routing_key,
156
amqp_table_t arguments)
158
amqp_rpc_reply_t *amqp_rpc_reply;
159
amqp_rpc_reply = amqp_get_rpc_reply();
161
AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK,
163
0, queue, exchange, routing_key, 0, arguments);
164
return RPC_REPLY(amqp_queue_bind_ok_t);
167
amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
168
amqp_channel_t channel,
170
amqp_bytes_t exchange,
171
amqp_bytes_t binding_key,
172
amqp_table_t arguments)
174
amqp_rpc_reply_t *amqp_rpc_reply;
175
amqp_rpc_reply = amqp_get_rpc_reply();
177
AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK,
179
0, queue, exchange, binding_key, arguments);
180
return RPC_REPLY(amqp_queue_unbind_ok_t);
183
amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
184
amqp_channel_t channel,
186
amqp_bytes_t consumer_tag,
187
amqp_boolean_t no_local,
188
amqp_boolean_t no_ack,
189
amqp_boolean_t exclusive)
191
amqp_rpc_reply_t *amqp_rpc_reply;
192
amqp_rpc_reply = amqp_get_rpc_reply();
194
AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK,
195
amqp_basic_consume_t,
196
0, queue, consumer_tag, no_local, no_ack, exclusive, 0);
197
return RPC_REPLY(amqp_basic_consume_ok_t);
200
int amqp_basic_ack(amqp_connection_state_t state,
201
amqp_channel_t channel,
202
uint64_t delivery_tag,
203
amqp_boolean_t multiple)
207
.delivery_tag = delivery_tag,
210
AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m));
214
amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
215
amqp_channel_t channel,
217
amqp_boolean_t no_wait)
219
amqp_rpc_reply_t *amqp_rpc_reply;
220
amqp_rpc_reply = amqp_get_rpc_reply();
221
*amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
222
amqp_queue_purge_t, channel, queue, no_wait);
223
return RPC_REPLY(amqp_queue_purge_ok_t);
226
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
227
amqp_channel_t channel,
229
amqp_boolean_t no_ack)
231
amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
232
AMQP_BASIC_GET_EMPTY_METHOD,
234
amqp_rpc_reply_t *amqp_rpc_reply;
235
amqp_rpc_reply = amqp_get_rpc_reply();
237
AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
239
channel, queue, no_ack);
240
return *amqp_rpc_reply;
244
* Expose amqp_rpc_reply to dynamically linked libraries
246
amqp_rpc_reply_t *amqp_get_rpc_reply(void)
248
#ifndef DISABLE_THREADS
249
static int initialized = 0;
250
static pthread_key_t reply_key;
251
static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
252
amqp_rpc_reply_t *amqp_rpc_reply;
254
pthread_mutex_lock(&init_mutex);
256
pthread_key_create(&reply_key, free);
259
pthread_mutex_unlock(&init_mutex);
261
amqp_rpc_reply = (amqp_rpc_reply_t *)pthread_getspecific(reply_key);
262
if(!amqp_rpc_reply) {
263
amqp_rpc_reply = calloc(1, sizeof(*amqp_rpc_reply));
264
pthread_setspecific(reply_key, (void *)amqp_rpc_reply);
266
return amqp_rpc_reply;
268
static amqp_rpc_reply_t amqp_rpc_reply;
269
return &amqp_rpc_reply;
274
* Expose tx-style transactions
277
amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state,
278
amqp_channel_t channel,
279
amqp_table_t arguments)
281
amqp_rpc_reply_t *amqp_rpc_reply;
282
amqp_rpc_reply = amqp_get_rpc_reply();
284
AMQP_SIMPLE_RPC(state, channel, TX, SELECT, SELECT_OK,
287
return RPC_REPLY(amqp_tx_select_ok_t);
290
amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state,
291
amqp_channel_t channel,
292
amqp_table_t arguments)
294
amqp_rpc_reply_t *amqp_rpc_reply;
295
amqp_rpc_reply = amqp_get_rpc_reply();
297
AMQP_SIMPLE_RPC(state, channel, TX, COMMIT, COMMIT_OK,
298
amqp_tx_commit_t, 0);
299
return RPC_REPLY(amqp_tx_commit_ok_t);
302
amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state,
303
amqp_channel_t channel,
304
amqp_table_t arguments)
306
amqp_rpc_reply_t *amqp_rpc_reply;
307
amqp_rpc_reply = amqp_get_rpc_reply();
309
AMQP_SIMPLE_RPC(state, channel, TX, ROLLBACK, ROLLBACK_OK,
310
amqp_tx_rollback_t, 0);
311
return RPC_REPLY(amqp_tx_rollback_ok_t);
314
amqp_basic_return_t *amqp_basic_return(amqp_connection_state_t state,
315
amqp_channel_t channel,
316
amqp_table_t arguments)
318
amqp_rpc_reply_t *amqp_rpc_reply;
319
amqp_rpc_reply = amqp_get_rpc_reply();
321
AMQP_SIMPLE_RPC(state, channel, BASIC, RETURN, RETURN,
322
amqp_basic_return_t, 0);
323
return RPC_REPLY(amqp_basic_return_t);