~verterok/+junk/postgresql-amqp

« back to all changes in this revision

Viewing changes to librabbitmq/amqp_api.c

  • Committer: Rodney Dawes
  • Date: 2010-08-19 14:35:20 UTC
  • Revision ID: rodney.dawes@canonical.com-20100819143520-25qfv1scbjt3p3xj
Tags: upstream-0.1+r180
ImportĀ upstreamĀ versionĀ 0.1+r180

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <stdlib.h>
 
2
#include <stdio.h>
 
3
#include <string.h>
 
4
#include <stdint.h>
 
5
#include <errno.h>
 
6
#ifndef DISABLE_THREADS
 
7
#include <pthread.h>
 
8
#endif
 
9
 
 
10
#include "amqp.h"
 
11
#include "amqp_framing.h"
 
12
#include "amqp_private.h"
 
13
 
 
14
#include <assert.h>
 
15
 
 
16
#define RPC_REPLY(replytype)                                    \
 
17
  (amqp_rpc_reply->reply_type == AMQP_RESPONSE_NORMAL           \
 
18
   ? (replytype *) amqp_rpc_reply->reply.decoded        \
 
19
   : NULL)
 
20
 
 
21
amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state,
 
22
                                          amqp_channel_t channel)
 
23
{
 
24
  amqp_rpc_reply_t *amqp_rpc_reply;
 
25
  amqp_rpc_reply = amqp_get_rpc_reply();
 
26
  *amqp_rpc_reply =
 
27
    AMQP_SIMPLE_RPC(state, channel, CHANNEL, OPEN, OPEN_OK,
 
28
                    amqp_channel_open_t,
 
29
                    AMQP_EMPTY_BYTES);
 
30
  return RPC_REPLY(amqp_channel_open_ok_t);
 
31
}
 
32
 
 
33
int amqp_basic_publish(amqp_connection_state_t state,
 
34
                       amqp_channel_t channel,
 
35
                       amqp_bytes_t exchange,
 
36
                       amqp_bytes_t routing_key,
 
37
                       amqp_boolean_t mandatory,
 
38
                       amqp_boolean_t immediate,
 
39
                       amqp_basic_properties_t const *properties,
 
40
                       amqp_bytes_t body)
 
41
{
 
42
  amqp_frame_t f;
 
43
  size_t body_offset;
 
44
  size_t usable_body_payload_size = state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
 
45
 
 
46
  amqp_basic_publish_t m =
 
47
    (amqp_basic_publish_t) {
 
48
      .exchange = exchange,
 
49
      .routing_key = routing_key,
 
50
      .mandatory = mandatory,
 
51
      .immediate = immediate
 
52
    };
 
53
 
 
54
  amqp_basic_properties_t default_properties;
 
55
 
 
56
  AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m));
 
57
 
 
58
  if (properties == NULL) {
 
59
    memset(&default_properties, 0, sizeof(default_properties));
 
60
    properties = &default_properties;
 
61
  }
 
62
 
 
63
  f.frame_type = AMQP_FRAME_HEADER;
 
64
  f.channel = channel;
 
65
  f.payload.properties.class_id = AMQP_BASIC_CLASS;
 
66
  f.payload.properties.body_size = body.len;
 
67
  f.payload.properties.decoded = (void *) properties;
 
68
  AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
 
69
 
 
70
  body_offset = 0;
 
71
  while (1) {
 
72
    int remaining = body.len - body_offset;
 
73
    assert(remaining >= 0);
 
74
 
 
75
    if (remaining == 0)
 
76
      break;
 
77
 
 
78
    f.frame_type = AMQP_FRAME_BODY;
 
79
    f.channel = channel;
 
80
    f.payload.body_fragment.bytes = BUF_AT(body, body_offset);
 
81
    if (remaining >= usable_body_payload_size) {
 
82
      f.payload.body_fragment.len = usable_body_payload_size;
 
83
    } else {
 
84
      f.payload.body_fragment.len = remaining;
 
85
    }
 
86
 
 
87
    body_offset += f.payload.body_fragment.len;
 
88
    AMQP_CHECK_RESULT(amqp_send_frame(state, &f));
 
89
  }
 
90
 
 
91
  return 0;
 
92
}
 
93
 
 
94
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
 
95
                                    amqp_channel_t channel,
 
96
                                    int code)
 
97
{
 
98
  char codestr[13];
 
99
  snprintf(codestr, sizeof(codestr), "%d", code);
 
100
  return AMQP_SIMPLE_RPC(state, channel, CHANNEL, CLOSE, CLOSE_OK,
 
101
                         amqp_channel_close_t,
 
102
                         code, amqp_cstring_bytes(codestr), 0, 0);
 
103
}
 
104
 
 
105
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
 
106
                                       int code)
 
107
{
 
108
  char codestr[13];
 
109
  snprintf(codestr, sizeof(codestr), "%d", code);
 
110
  return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
 
111
                         amqp_connection_close_t,
 
112
                         code, amqp_cstring_bytes(codestr), 0, 0);
 
113
}
 
114
 
 
115
amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
 
116
                                                  amqp_channel_t channel,
 
117
                                                  amqp_bytes_t exchange,
 
118
                                                  amqp_bytes_t type,
 
119
                                                  amqp_boolean_t passive,
 
120
                                                  amqp_boolean_t durable,
 
121
                                                  amqp_boolean_t auto_delete,
 
122
                                                  amqp_table_t arguments)
 
123
{
 
124
  amqp_rpc_reply_t *amqp_rpc_reply;
 
125
  amqp_rpc_reply = amqp_get_rpc_reply();
 
126
  *amqp_rpc_reply =
 
127
    AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK,
 
128
                    amqp_exchange_declare_t,
 
129
                    0, exchange, type, passive, durable, auto_delete, 0, 0, arguments);
 
130
  return RPC_REPLY(amqp_exchange_declare_ok_t);
 
131
}
 
132
 
 
133
amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
 
134
                                            amqp_channel_t channel,
 
135
                                            amqp_bytes_t queue,
 
136
                                            amqp_boolean_t passive,
 
137
                                            amqp_boolean_t durable,
 
138
                                            amqp_boolean_t exclusive,
 
139
                                            amqp_boolean_t auto_delete,
 
140
                                            amqp_table_t arguments)
 
141
{
 
142
  amqp_rpc_reply_t *amqp_rpc_reply;
 
143
  amqp_rpc_reply = amqp_get_rpc_reply();
 
144
  *amqp_rpc_reply =
 
145
    AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK,
 
146
                    amqp_queue_declare_t,
 
147
                    0, queue, passive, durable, exclusive, auto_delete, 0, arguments);
 
148
  return RPC_REPLY(amqp_queue_declare_ok_t);
 
149
}
 
150
 
 
151
amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
 
152
                                      amqp_channel_t channel,
 
153
                                      amqp_bytes_t queue,
 
154
                                      amqp_bytes_t exchange,
 
155
                                      amqp_bytes_t routing_key,
 
156
                                      amqp_table_t arguments)
 
157
{
 
158
  amqp_rpc_reply_t *amqp_rpc_reply;
 
159
  amqp_rpc_reply = amqp_get_rpc_reply();
 
160
  *amqp_rpc_reply =
 
161
    AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK,
 
162
                    amqp_queue_bind_t,
 
163
                    0, queue, exchange, routing_key, 0, arguments);
 
164
  return RPC_REPLY(amqp_queue_bind_ok_t);
 
165
}
 
166
 
 
167
amqp_queue_unbind_ok_t *amqp_queue_unbind(amqp_connection_state_t state,
 
168
                                          amqp_channel_t channel,
 
169
                                          amqp_bytes_t queue,
 
170
                                          amqp_bytes_t exchange,
 
171
                                          amqp_bytes_t binding_key,
 
172
                                          amqp_table_t arguments)
 
173
{
 
174
  amqp_rpc_reply_t *amqp_rpc_reply;
 
175
  amqp_rpc_reply = amqp_get_rpc_reply();
 
176
  *amqp_rpc_reply =
 
177
    AMQP_SIMPLE_RPC(state, channel, QUEUE, UNBIND, UNBIND_OK,
 
178
                    amqp_queue_unbind_t,
 
179
                    0, queue, exchange, binding_key, arguments);
 
180
  return RPC_REPLY(amqp_queue_unbind_ok_t);
 
181
}
 
182
 
 
183
amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
 
184
                                            amqp_channel_t channel,
 
185
                                            amqp_bytes_t queue,
 
186
                                            amqp_bytes_t consumer_tag,
 
187
                                            amqp_boolean_t no_local,
 
188
                                            amqp_boolean_t no_ack,
 
189
                                            amqp_boolean_t exclusive)
 
190
{
 
191
  amqp_rpc_reply_t *amqp_rpc_reply;
 
192
  amqp_rpc_reply = amqp_get_rpc_reply();
 
193
  *amqp_rpc_reply =
 
194
    AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK,
 
195
                    amqp_basic_consume_t,
 
196
                    0, queue, consumer_tag, no_local, no_ack, exclusive, 0);
 
197
  return RPC_REPLY(amqp_basic_consume_ok_t);
 
198
}
 
199
 
 
200
int amqp_basic_ack(amqp_connection_state_t state,
 
201
                   amqp_channel_t channel,
 
202
                   uint64_t delivery_tag,
 
203
                   amqp_boolean_t multiple)
 
204
{
 
205
  amqp_basic_ack_t m =
 
206
    (amqp_basic_ack_t) {
 
207
      .delivery_tag = delivery_tag,
 
208
      .multiple = multiple
 
209
    };
 
210
  AMQP_CHECK_RESULT(amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m));
 
211
  return 0;
 
212
}
 
213
 
 
214
amqp_queue_purge_ok_t *amqp_queue_purge(amqp_connection_state_t state,
 
215
                                                amqp_channel_t channel,
 
216
                                                amqp_bytes_t queue,
 
217
                                                amqp_boolean_t no_wait)
 
218
{
 
219
  amqp_rpc_reply_t *amqp_rpc_reply;
 
220
  amqp_rpc_reply = amqp_get_rpc_reply();
 
221
  *amqp_rpc_reply = AMQP_SIMPLE_RPC(state, channel, QUEUE, PURGE, PURGE_OK,
 
222
                        amqp_queue_purge_t, channel, queue, no_wait);
 
223
  return RPC_REPLY(amqp_queue_purge_ok_t);
 
224
}
 
225
 
 
226
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
 
227
                        amqp_channel_t channel,
 
228
                        amqp_bytes_t queue,
 
229
                        amqp_boolean_t no_ack)
 
230
{
 
231
        amqp_method_number_t replies[] = { AMQP_BASIC_GET_OK_METHOD,
 
232
                                           AMQP_BASIC_GET_EMPTY_METHOD,
 
233
                                           0 };
 
234
        amqp_rpc_reply_t *amqp_rpc_reply;
 
235
        amqp_rpc_reply = amqp_get_rpc_reply();
 
236
        *amqp_rpc_reply =
 
237
                AMQP_MULTIPLE_RESPONSE_RPC(state, channel, BASIC, GET, replies,
 
238
                        amqp_basic_get_t,
 
239
                        channel, queue, no_ack);
 
240
        return *amqp_rpc_reply;
 
241
}
 
242
 
 
243
/*
 
244
 * Expose amqp_rpc_reply to dynamically linked libraries
 
245
 */
 
246
amqp_rpc_reply_t *amqp_get_rpc_reply(void)
 
247
{
 
248
#ifndef DISABLE_THREADS
 
249
  static int initialized = 0;
 
250
  static pthread_key_t reply_key;
 
251
  static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
 
252
  amqp_rpc_reply_t *amqp_rpc_reply;
 
253
  if(!initialized) {
 
254
    pthread_mutex_lock(&init_mutex);
 
255
    if(!initialized) {
 
256
      pthread_key_create(&reply_key, free);
 
257
      initialized = 1;
 
258
    }
 
259
    pthread_mutex_unlock(&init_mutex);
 
260
  }
 
261
  amqp_rpc_reply = (amqp_rpc_reply_t *)pthread_getspecific(reply_key);
 
262
  if(!amqp_rpc_reply) {
 
263
    amqp_rpc_reply = calloc(1, sizeof(*amqp_rpc_reply));
 
264
    pthread_setspecific(reply_key, (void *)amqp_rpc_reply);
 
265
  }
 
266
  return amqp_rpc_reply;
 
267
#else
 
268
  static amqp_rpc_reply_t amqp_rpc_reply;
 
269
  return &amqp_rpc_reply;
 
270
#endif
 
271
}
 
272
 
 
273
/*
 
274
 * Expose tx-style transactions
 
275
 */
 
276
 
 
277
amqp_tx_select_ok_t *amqp_tx_select(amqp_connection_state_t state,
 
278
                                    amqp_channel_t channel,
 
279
                                    amqp_table_t arguments)
 
280
{
 
281
  amqp_rpc_reply_t *amqp_rpc_reply;
 
282
  amqp_rpc_reply = amqp_get_rpc_reply();
 
283
  *amqp_rpc_reply =
 
284
    AMQP_SIMPLE_RPC(state, channel, TX, SELECT, SELECT_OK,
 
285
                    amqp_tx_select_t,
 
286
                    channel);
 
287
  return RPC_REPLY(amqp_tx_select_ok_t);
 
288
}
 
289
 
 
290
amqp_tx_commit_ok_t *amqp_tx_commit(amqp_connection_state_t state,
 
291
                                    amqp_channel_t channel,
 
292
                                    amqp_table_t arguments)
 
293
{
 
294
  amqp_rpc_reply_t *amqp_rpc_reply;
 
295
  amqp_rpc_reply = amqp_get_rpc_reply();
 
296
  *amqp_rpc_reply =
 
297
    AMQP_SIMPLE_RPC(state, channel, TX, COMMIT, COMMIT_OK,
 
298
                    amqp_tx_commit_t, 0);
 
299
  return RPC_REPLY(amqp_tx_commit_ok_t);
 
300
}
 
301
 
 
302
amqp_tx_rollback_ok_t *amqp_tx_rollback(amqp_connection_state_t state,
 
303
                                    amqp_channel_t channel,
 
304
                                    amqp_table_t arguments)
 
305
{
 
306
  amqp_rpc_reply_t *amqp_rpc_reply;
 
307
  amqp_rpc_reply = amqp_get_rpc_reply();
 
308
  *amqp_rpc_reply =
 
309
    AMQP_SIMPLE_RPC(state, channel, TX, ROLLBACK, ROLLBACK_OK,
 
310
                    amqp_tx_rollback_t, 0);
 
311
  return RPC_REPLY(amqp_tx_rollback_ok_t);
 
312
}
 
313
 
 
314
amqp_basic_return_t *amqp_basic_return(amqp_connection_state_t state,
 
315
                                       amqp_channel_t channel,
 
316
                                       amqp_table_t arguments)
 
317
{
 
318
  amqp_rpc_reply_t *amqp_rpc_reply;
 
319
  amqp_rpc_reply = amqp_get_rpc_reply();
 
320
  *amqp_rpc_reply =
 
321
    AMQP_SIMPLE_RPC(state, channel, BASIC, RETURN, RETURN,
 
322
                    amqp_basic_return_t, 0);
 
323
  return RPC_REPLY(amqp_basic_return_t);
 
324
}