41
45
#include "amqp_framing.h"
42
46
#include <string.h>
44
/* Error numbering: Because of differences in error numbering on
45
* different platforms, we want to keep error numbers opaque for
46
* client code. Internally, we encode the category of an error
47
* (i.e. where its number comes from) in the top bits of the number
48
* (assuming that an int has at least 32 bits).
50
#define ERROR_CATEGORY_MASK (1 << 29)
52
#define ERROR_CATEGORY_CLIENT (0 << 29) /* librabbitmq error codes */
53
#define ERROR_CATEGORY_OS (1 << 29) /* OS-specific error codes */
55
/* librabbitmq error codes */
56
#define ERROR_NO_MEMORY 1
57
#define ERROR_BAD_AMQP_DATA 2
58
#define ERROR_UNKNOWN_CLASS 3
59
#define ERROR_UNKNOWN_METHOD 4
60
#define ERROR_GETHOSTBYNAME_FAILED 5
61
#define ERROR_INCOMPATIBLE_AMQP_VERSION 6
62
#define ERROR_CONNECTION_CLOSED 7
63
#define ERROR_BAD_AMQP_URL 8
49
# include <Winsock2.h>
51
# include <arpa/inet.h>
66
54
/* GCC attributes */
67
55
#if __GNUC__ > 2 | (__GNUC__ == 2 && __GNUC_MINOR__ > 4)
68
56
#define AMQP_NORETURN \
69
57
__attribute__ ((__noreturn__))
59
__attribute__ ((__unused__))
71
61
#define AMQP_NORETURN
123
#define POOL_TABLE_SIZE 16
125
typedef struct amqp_pool_table_entry_t_ {
126
struct amqp_pool_table_entry_t_ *next;
128
amqp_channel_t channel;
129
} amqp_pool_table_entry_t;
126
131
struct amqp_connection_state_t_ {
127
amqp_pool_t frame_pool;
128
amqp_pool_t decoding_pool;
132
amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];
130
134
amqp_connection_state_enum state;
140
/* buffer for holding frame headers. Allows us to delay allocating
141
* the raw frame buffer until the type, channel, and size are all known
143
char header_buffer[HEADER_SIZE + 1];
135
144
amqp_bytes_t inbound_buffer;
137
146
size_t inbound_offset;
148
158
amqp_link_t *last_queued_frame;
150
160
amqp_rpc_reply_t most_recent_api_result;
162
uint64_t next_recv_heartbeat;
163
uint64_t next_send_heartbeat;
166
amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
167
amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel);
169
static inline amqp_boolean_t amqp_heartbeat_enabled(amqp_connection_state_t state)
171
return (state->heartbeat > 0);
174
static inline uint64_t amqp_calc_next_send_heartbeat(amqp_connection_state_t state, uint64_t cur)
176
return cur + ((uint64_t)state->heartbeat * AMQP_NS_PER_S);
179
static inline uint64_t amqp_calc_next_recv_heartbeat(amqp_connection_state_t state, uint64_t cur)
181
return cur + ((uint64_t)state->heartbeat * 2 * AMQP_NS_PER_S);
184
int amqp_try_recv(amqp_connection_state_t state, uint64_t current_time);
153
186
static inline void *amqp_offset(void *data, size_t offset)
155
188
return (char *)data + offset;
158
191
/* This macro defines the encoding and decoding functions associated with a
161
#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
163
static inline void amqp_e##bits(void *data, size_t offset, \
164
uint##bits##_t val) \
166
/* The AMQP data might be unaligned. So we encode and then copy the \
167
result into place. */ \
168
uint##bits##_t res = htonx(val); \
169
memcpy(amqp_offset(data, offset), &res, bits/8); \
172
static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
174
/* The AMQP data might be unaligned. So we copy the source value \
175
into a variable and then decode it. */ \
176
uint##bits##_t val; \
177
memcpy(&val, amqp_offset(data, offset), bits/8); \
181
static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
182
uint##bits##_t input) \
185
size_t o = *offset; \
186
if ((*offset = o + bits / 8) <= encoded.len) { \
187
amqp_e##bits(encoded.bytes, o, input); \
195
static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
196
uint##bits##_t *output) \
199
size_t o = *offset; \
200
if ((*offset = o + bits / 8) <= encoded.len) { \
201
*output = amqp_d##bits(encoded.bytes, o); \
194
#define DECLARE_CODEC_BASE_TYPE(bits, htonx, ntohx) \
196
static inline void amqp_e##bits(void *data, size_t offset, \
197
uint##bits##_t val) \
199
/* The AMQP data might be unaligned. So we encode and then copy the \
200
result into place. */ \
201
uint##bits##_t res = htonx(val); \
202
memcpy(amqp_offset(data, offset), &res, bits/8); \
205
static inline uint##bits##_t amqp_d##bits(void *data, size_t offset) \
207
/* The AMQP data might be unaligned. So we copy the source value \
208
into a variable and then decode it. */ \
209
uint##bits##_t val; \
210
memcpy(&val, amqp_offset(data, offset), bits/8); \
214
static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
215
uint##bits##_t input) \
218
size_t o = *offset; \
219
if ((*offset = o + bits / 8) <= encoded.len) { \
220
amqp_e##bits(encoded.bytes, o, input); \
228
static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
229
uint##bits##_t *output) \
232
size_t o = *offset; \
233
if ((*offset = o + bits / 8) <= encoded.len) { \
234
*output = amqp_d##bits(encoded.bytes, o); \
209
242
/* Determine byte order */
210
243
#if defined(__GLIBC__)
234
267
defined(__i386__) || defined(_M_IX86)
235
268
# define AMQP_LITTLE_ENDIAN
237
/* Don't define anything */
270
/* Don't define anything */
240
273
#if defined(AMQP_LITTLE_ENDIAN)
242
#define DECLARE_XTOXLL(func) \
243
static inline uint64_t func##ll(uint64_t val) \
247
uint32_t halves[2]; \
252
u.halves[0] = func##l(u.halves[1]); \
253
u.halves[1] = func##l(t); \
275
#define DECLARE_XTOXLL(func) \
276
static inline uint64_t func##ll(uint64_t val) \
280
uint32_t halves[2]; \
285
u.halves[0] = func##l(u.halves[1]); \
286
u.halves[1] = func##l(t); \
257
290
#elif defined(AMQP_BIG_ENDIAN)
259
#define DECLARE_XTOXLL(func) \
260
static inline uint64_t func##ll(uint64_t val) \
264
uint32_t halves[2]; \
267
u.halves[0] = func##l(u.halves[0]); \
268
u.halves[1] = func##l(u.halves[1]); \
292
#define DECLARE_XTOXLL(func) \
293
static inline uint64_t func##ll(uint64_t val) \
297
uint32_t halves[2]; \
300
u.halves[0] = func##l(u.halves[0]); \
301
u.halves[1] = func##l(u.halves[1]); \
273
306
# error Endianness not known
284
317
DECLARE_CODEC_BASE_TYPE(64, htonll, ntohll)
286
319
static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset,
289
322
size_t o = *offset;
290
323
if ((*offset = o + input.len) <= encoded.len) {
291
324
memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len);
299
331
static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
300
amqp_bytes_t *output, size_t len)
332
amqp_bytes_t *output, size_t len)
302
334
size_t o = *offset;
303
335
if ((*offset = o + len) <= encoded.len) {
304
336
output->bytes = amqp_offset(encoded.bytes, o);
305
337
output->len = len;