11
#include "amqp_framing.h"
12
#include "amqp_private.h"
16
#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
17
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
18
#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
20
#define ENFORCE_STATE(statevec, statenum) \
22
amqp_connection_state_t _check_state = (statevec); \
23
int _wanted_state = (statenum); \
24
amqp_assert(_check_state->state == _wanted_state, \
25
"Programming error: invalid AMQP connection state: expected %d, got %d", \
27
_check_state->state); \
30
amqp_connection_state_t amqp_new_connection(void) {
31
amqp_connection_state_t state =
32
(amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
38
init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
39
init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
41
state->state = CONNECTION_STATE_IDLE;
43
state->inbound_buffer.bytes = NULL;
44
state->outbound_buffer.bytes = NULL;
45
if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
46
empty_amqp_pool(&state->frame_pool);
47
empty_amqp_pool(&state->decoding_pool);
52
state->inbound_offset = 0;
53
state->target_size = HEADER_SIZE;
56
state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
57
state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
58
if (state->sock_inbound_buffer.bytes == NULL) {
59
amqp_destroy_connection(state);
63
state->sock_inbound_offset = 0;
64
state->sock_inbound_limit = 0;
66
state->first_queued_frame = NULL;
67
state->last_queued_frame = NULL;
72
int amqp_get_sockfd(amqp_connection_state_t state) {
76
void amqp_set_sockfd(amqp_connection_state_t state,
79
state->sockfd = sockfd;
82
int amqp_tune_connection(amqp_connection_state_t state,
89
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
91
state->channel_max = channel_max;
92
state->frame_max = frame_max;
93
state->heartbeat = heartbeat;
95
empty_amqp_pool(&state->frame_pool);
96
init_amqp_pool(&state->frame_pool, frame_max);
98
state->inbound_buffer.len = frame_max;
99
state->outbound_buffer.len = frame_max;
100
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
101
if (newbuf == NULL) {
102
amqp_destroy_connection(state);
105
state->outbound_buffer.bytes = newbuf;
110
int amqp_get_channel_max(amqp_connection_state_t state) {
111
return state->channel_max;
114
void amqp_destroy_connection(amqp_connection_state_t state) {
115
empty_amqp_pool(&state->frame_pool);
116
empty_amqp_pool(&state->decoding_pool);
117
free(state->outbound_buffer.bytes);
118
free(state->sock_inbound_buffer.bytes);
122
static void return_to_idle(amqp_connection_state_t state) {
123
state->inbound_buffer.bytes = NULL;
124
state->inbound_offset = 0;
125
state->target_size = HEADER_SIZE;
126
state->state = CONNECTION_STATE_IDLE;
129
int amqp_handle_input(amqp_connection_state_t state,
130
amqp_bytes_t received_data,
131
amqp_frame_t *decoded_frame)
133
int total_bytes_consumed = 0;
136
/* Returning frame_type of zero indicates either insufficient input,
137
or a complete, ignored frame was read. */
138
decoded_frame->frame_type = 0;
141
if (received_data.len == 0) {
142
return total_bytes_consumed;
145
if (state->state == CONNECTION_STATE_IDLE) {
146
state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
147
state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
150
bytes_consumed = state->target_size - state->inbound_offset;
151
if (received_data.len < bytes_consumed) {
152
bytes_consumed = received_data.len;
155
E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
156
state->inbound_offset += bytes_consumed;
157
total_bytes_consumed += bytes_consumed;
159
assert(state->inbound_offset <= state->target_size);
161
if (state->inbound_offset < state->target_size) {
162
return total_bytes_consumed;
165
switch (state->state) {
166
case CONNECTION_STATE_WAITING_FOR_HEADER:
167
if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
168
D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
170
state->target_size = 8;
171
state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
173
state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
174
state->state = CONNECTION_STATE_WAITING_FOR_BODY;
177
/* Wind buffer forward, and try to read some body out of it. */
178
received_data.len -= bytes_consumed;
179
received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
182
case CONNECTION_STATE_WAITING_FOR_BODY: {
183
int frame_type = D_8(state->inbound_buffer, 0);
186
printf("recving:\n");
187
amqp_dump(state->inbound_buffer.bytes, state->target_size);
190
/* Check frame end marker (footer) */
191
if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
195
decoded_frame->channel = D_16(state->inbound_buffer, 1);
197
switch (frame_type) {
198
case AMQP_FRAME_METHOD: {
199
amqp_bytes_t encoded;
201
/* Four bytes of method ID before the method args. */
202
encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
203
encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
205
decoded_frame->frame_type = AMQP_FRAME_METHOD;
206
decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
207
AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
208
&state->decoding_pool,
210
&decoded_frame->payload.method.decoded));
214
case AMQP_FRAME_HEADER: {
215
amqp_bytes_t encoded;
217
/* 12 bytes for properties header. */
218
encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
219
encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
221
decoded_frame->frame_type = AMQP_FRAME_HEADER;
222
decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
223
decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
224
AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
225
&state->decoding_pool,
227
&decoded_frame->payload.properties.decoded));
231
case AMQP_FRAME_BODY: {
232
size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
234
decoded_frame->frame_type = AMQP_FRAME_BODY;
235
decoded_frame->payload.body_fragment.len = fragment_len;
236
decoded_frame->payload.body_fragment.bytes =
237
D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
241
case AMQP_FRAME_HEARTBEAT:
242
decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
246
/* Ignore the frame by not changing frame_type away from 0. */
250
return_to_idle(state);
251
return total_bytes_consumed;
254
case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
255
decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
256
decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
257
amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
258
"Invalid protocol header received");
259
decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
260
decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
261
decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
262
decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
264
return_to_idle(state);
265
return total_bytes_consumed;
268
amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
272
amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
273
return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
276
void amqp_release_buffers(amqp_connection_state_t state) {
277
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
279
amqp_assert(state->first_queued_frame == NULL,
280
"Programming error: attempt to amqp_release_buffers while waiting events enqueued");
282
recycle_amqp_pool(&state->frame_pool);
283
recycle_amqp_pool(&state->decoding_pool);
286
void amqp_maybe_release_buffers(amqp_connection_state_t state) {
287
if (amqp_release_buffers_ok(state)) {
288
amqp_release_buffers(state);
292
static int inner_send_frame(amqp_connection_state_t state,
293
amqp_frame_t const *frame,
294
amqp_bytes_t *encoded,
299
E_8(state->outbound_buffer, 0, frame->frame_type);
300
E_16(state->outbound_buffer, 1, frame->channel);
301
switch (frame->frame_type) {
302
case AMQP_FRAME_METHOD:
303
E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
304
encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
305
encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
306
*payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
307
frame->payload.method.decoded,
312
case AMQP_FRAME_HEADER:
313
E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
314
E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
315
E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
316
encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
317
encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
318
*payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
319
frame->payload.properties.decoded,
324
case AMQP_FRAME_BODY:
325
*encoded = frame->payload.body_fragment;
326
*payload_len = encoded->len;
330
case AMQP_FRAME_HEARTBEAT:
331
*encoded = AMQP_EMPTY_BYTES;
340
E_32(state->outbound_buffer, 3, *payload_len);
341
if (!separate_body) {
342
E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
347
printf("sending body frame (header):\n");
348
amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
349
printf("sending body frame (payload):\n");
350
amqp_dump(encoded->bytes, *payload_len);
352
printf("sending:\n");
353
amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
357
return separate_body;
360
int amqp_send_frame(amqp_connection_state_t state,
361
amqp_frame_t const *frame)
363
amqp_bytes_t encoded;
367
separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
368
switch (separate_body) {
370
AMQP_CHECK_RESULT(write(state->sockfd,
371
state->outbound_buffer.bytes,
372
payload_len + (HEADER_SIZE + FOOTER_SIZE)));
376
AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
377
AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
379
assert(FOOTER_SIZE == 1);
380
char frame_end_byte = AMQP_FRAME_END;
381
AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
386
return separate_body;
390
int amqp_send_frame_to(amqp_connection_state_t state,
391
amqp_frame_t const *frame,
395
amqp_bytes_t encoded;
399
separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
400
switch (separate_body) {
402
AMQP_CHECK_RESULT(fn(context,
403
state->outbound_buffer.bytes,
404
payload_len + (HEADER_SIZE + FOOTER_SIZE)));
408
AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
409
AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
411
assert(FOOTER_SIZE == 1);
412
char frame_end_byte = AMQP_FRAME_END;
413
AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
418
return separate_body;