~verterok/+junk/postgresql-amqp

« back to all changes in this revision

Viewing changes to librabbitmq/amqp_connection.c

  • Committer: Rodney Dawes
  • Date: 2010-08-19 14:35:20 UTC
  • Revision ID: rodney.dawes@canonical.com-20100819143520-25qfv1scbjt3p3xj
Tags: upstream-0.1+r180
ImportĀ upstreamĀ versionĀ 0.1+r180

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <stdlib.h>
 
2
#include <stdio.h>
 
3
#include <string.h>
 
4
#include <stdint.h>
 
5
#include <errno.h>
 
6
 
 
7
#include <unistd.h>
 
8
#include <sys/uio.h>
 
9
 
 
10
#include "amqp.h"
 
11
#include "amqp_framing.h"
 
12
#include "amqp_private.h"
 
13
 
 
14
#include <assert.h>
 
15
 
 
16
#define INITIAL_FRAME_POOL_PAGE_SIZE 65536
 
17
#define INITIAL_DECODING_POOL_PAGE_SIZE 131072
 
18
#define INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
 
19
 
 
20
#define ENFORCE_STATE(statevec, statenum)                               \
 
21
  {                                                                     \
 
22
    amqp_connection_state_t _check_state = (statevec);                  \
 
23
    int _wanted_state = (statenum);                                     \
 
24
    amqp_assert(_check_state->state == _wanted_state,                   \
 
25
                "Programming error: invalid AMQP connection state: expected %d, got %d", \
 
26
                _wanted_state,                                          \
 
27
                _check_state->state);                                   \
 
28
  }
 
29
 
 
30
amqp_connection_state_t amqp_new_connection(void) {
 
31
  amqp_connection_state_t state =
 
32
    (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_));
 
33
 
 
34
  if (state == NULL) {
 
35
    return NULL;
 
36
  }
 
37
 
 
38
  init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
 
39
  init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
 
40
 
 
41
  state->state = CONNECTION_STATE_IDLE;
 
42
 
 
43
  state->inbound_buffer.bytes = NULL;
 
44
  state->outbound_buffer.bytes = NULL;
 
45
  if (amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0) != 0) {
 
46
    empty_amqp_pool(&state->frame_pool);
 
47
    empty_amqp_pool(&state->decoding_pool);
 
48
    free(state);
 
49
    return NULL;
 
50
  }
 
51
 
 
52
  state->inbound_offset = 0;
 
53
  state->target_size = HEADER_SIZE;
 
54
 
 
55
  state->sockfd = -1;
 
56
  state->sock_inbound_buffer.len = INITIAL_INBOUND_SOCK_BUFFER_SIZE;
 
57
  state->sock_inbound_buffer.bytes = malloc(INITIAL_INBOUND_SOCK_BUFFER_SIZE);
 
58
  if (state->sock_inbound_buffer.bytes == NULL) {
 
59
    amqp_destroy_connection(state);
 
60
    return NULL;
 
61
  }
 
62
 
 
63
  state->sock_inbound_offset = 0;
 
64
  state->sock_inbound_limit = 0;
 
65
 
 
66
  state->first_queued_frame = NULL;
 
67
  state->last_queued_frame = NULL;
 
68
 
 
69
  return state;
 
70
}
 
71
 
 
72
int amqp_get_sockfd(amqp_connection_state_t state) {
 
73
  return state->sockfd;
 
74
}
 
75
 
 
76
void amqp_set_sockfd(amqp_connection_state_t state,
 
77
                     int sockfd)
 
78
{
 
79
  state->sockfd = sockfd;
 
80
}
 
81
 
 
82
int amqp_tune_connection(amqp_connection_state_t state,
 
83
                         int channel_max,
 
84
                         int frame_max,
 
85
                         int heartbeat)
 
86
{
 
87
  void *newbuf;
 
88
 
 
89
  ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
 
90
 
 
91
  state->channel_max = channel_max;
 
92
  state->frame_max = frame_max;
 
93
  state->heartbeat = heartbeat;
 
94
 
 
95
  empty_amqp_pool(&state->frame_pool);
 
96
  init_amqp_pool(&state->frame_pool, frame_max);
 
97
 
 
98
  state->inbound_buffer.len = frame_max;
 
99
  state->outbound_buffer.len = frame_max;
 
100
  newbuf = realloc(state->outbound_buffer.bytes, frame_max);
 
101
  if (newbuf == NULL) {
 
102
    amqp_destroy_connection(state);
 
103
    return -ENOMEM;
 
104
  }
 
105
  state->outbound_buffer.bytes = newbuf;
 
106
 
 
107
  return 0;
 
108
}
 
109
 
 
110
int amqp_get_channel_max(amqp_connection_state_t state) {
 
111
  return state->channel_max;
 
112
}
 
113
 
 
114
void amqp_destroy_connection(amqp_connection_state_t state) {
 
115
  empty_amqp_pool(&state->frame_pool);
 
116
  empty_amqp_pool(&state->decoding_pool);
 
117
  free(state->outbound_buffer.bytes);
 
118
  free(state->sock_inbound_buffer.bytes);
 
119
  free(state);
 
120
}
 
121
 
 
122
static void return_to_idle(amqp_connection_state_t state) {
 
123
  state->inbound_buffer.bytes = NULL;
 
124
  state->inbound_offset = 0;
 
125
  state->target_size = HEADER_SIZE;
 
126
  state->state = CONNECTION_STATE_IDLE;
 
127
}
 
128
 
 
129
int amqp_handle_input(amqp_connection_state_t state,
 
130
                      amqp_bytes_t received_data,
 
131
                      amqp_frame_t *decoded_frame)
 
132
{
 
133
  int total_bytes_consumed = 0;
 
134
  int bytes_consumed;
 
135
 
 
136
  /* Returning frame_type of zero indicates either insufficient input,
 
137
     or a complete, ignored frame was read. */
 
138
  decoded_frame->frame_type = 0;
 
139
 
 
140
 read_more:
 
141
  if (received_data.len == 0) {
 
142
    return total_bytes_consumed;
 
143
  }
 
144
 
 
145
  if (state->state == CONNECTION_STATE_IDLE) {
 
146
    state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
 
147
    state->state = CONNECTION_STATE_WAITING_FOR_HEADER;
 
148
  }
 
149
 
 
150
  bytes_consumed = state->target_size - state->inbound_offset;
 
151
  if (received_data.len < bytes_consumed) {
 
152
    bytes_consumed = received_data.len;
 
153
  }
 
154
 
 
155
  E_BYTES(state->inbound_buffer, state->inbound_offset, bytes_consumed, received_data.bytes);
 
156
  state->inbound_offset += bytes_consumed;
 
157
  total_bytes_consumed += bytes_consumed;
 
158
 
 
159
  assert(state->inbound_offset <= state->target_size);
 
160
 
 
161
  if (state->inbound_offset < state->target_size) {
 
162
    return total_bytes_consumed;
 
163
  }
 
164
 
 
165
  switch (state->state) {
 
166
    case CONNECTION_STATE_WAITING_FOR_HEADER:
 
167
      if (D_8(state->inbound_buffer, 0) == AMQP_PSEUDOFRAME_PROTOCOL_HEADER &&
 
168
          D_16(state->inbound_buffer, 1) == AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL)
 
169
      {
 
170
        state->target_size = 8;
 
171
        state->state = CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER;
 
172
      } else {
 
173
        state->target_size = D_32(state->inbound_buffer, 3) + HEADER_SIZE + FOOTER_SIZE;
 
174
        state->state = CONNECTION_STATE_WAITING_FOR_BODY;
 
175
      }
 
176
 
 
177
      /* Wind buffer forward, and try to read some body out of it. */
 
178
      received_data.len -= bytes_consumed;
 
179
      received_data.bytes = ((char *) received_data.bytes) + bytes_consumed;
 
180
      goto read_more;
 
181
 
 
182
    case CONNECTION_STATE_WAITING_FOR_BODY: {
 
183
      int frame_type = D_8(state->inbound_buffer, 0);
 
184
 
 
185
#if 0
 
186
      printf("recving:\n");
 
187
      amqp_dump(state->inbound_buffer.bytes, state->target_size);
 
188
#endif
 
189
 
 
190
      /* Check frame end marker (footer) */
 
191
      if (D_8(state->inbound_buffer, state->target_size - 1) != AMQP_FRAME_END) {
 
192
        return -EINVAL;
 
193
      }
 
194
 
 
195
      decoded_frame->channel = D_16(state->inbound_buffer, 1);
 
196
 
 
197
      switch (frame_type) {
 
198
        case AMQP_FRAME_METHOD: {
 
199
          amqp_bytes_t encoded;
 
200
 
 
201
          /* Four bytes of method ID before the method args. */
 
202
          encoded.len = state->target_size - (HEADER_SIZE + 4 + FOOTER_SIZE);
 
203
          encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 4, encoded.len);
 
204
 
 
205
          decoded_frame->frame_type = AMQP_FRAME_METHOD;
 
206
          decoded_frame->payload.method.id = D_32(state->inbound_buffer, HEADER_SIZE);
 
207
          AMQP_CHECK_RESULT(amqp_decode_method(decoded_frame->payload.method.id,
 
208
                                               &state->decoding_pool,
 
209
                                               encoded,
 
210
                                               &decoded_frame->payload.method.decoded));
 
211
          break;
 
212
        }
 
213
 
 
214
        case AMQP_FRAME_HEADER: {
 
215
          amqp_bytes_t encoded;
 
216
 
 
217
          /* 12 bytes for properties header. */
 
218
          encoded.len = state->target_size - (HEADER_SIZE + 12 + FOOTER_SIZE);
 
219
          encoded.bytes = D_BYTES(state->inbound_buffer, HEADER_SIZE + 12, encoded.len);
 
220
 
 
221
          decoded_frame->frame_type = AMQP_FRAME_HEADER;
 
222
          decoded_frame->payload.properties.class_id = D_16(state->inbound_buffer, HEADER_SIZE);
 
223
          decoded_frame->payload.properties.body_size = D_64(state->inbound_buffer, HEADER_SIZE+4);
 
224
          AMQP_CHECK_RESULT(amqp_decode_properties(decoded_frame->payload.properties.class_id,
 
225
                                                   &state->decoding_pool,
 
226
                                                   encoded,
 
227
                                                   &decoded_frame->payload.properties.decoded));
 
228
          break;
 
229
        }
 
230
 
 
231
        case AMQP_FRAME_BODY: {
 
232
          size_t fragment_len = state->target_size - (HEADER_SIZE + FOOTER_SIZE);
 
233
 
 
234
          decoded_frame->frame_type = AMQP_FRAME_BODY;
 
235
          decoded_frame->payload.body_fragment.len = fragment_len;
 
236
          decoded_frame->payload.body_fragment.bytes =
 
237
            D_BYTES(state->inbound_buffer, HEADER_SIZE, fragment_len);
 
238
          break;
 
239
        }
 
240
 
 
241
        case AMQP_FRAME_HEARTBEAT:
 
242
          decoded_frame->frame_type = AMQP_FRAME_HEARTBEAT;
 
243
          break;
 
244
 
 
245
        default:
 
246
          /* Ignore the frame by not changing frame_type away from 0. */
 
247
          break;
 
248
      }
 
249
 
 
250
      return_to_idle(state);
 
251
      return total_bytes_consumed;
 
252
    }
 
253
 
 
254
    case CONNECTION_STATE_WAITING_FOR_PROTOCOL_HEADER:
 
255
      decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
 
256
      decoded_frame->channel = AMQP_PSEUDOFRAME_PROTOCOL_CHANNEL;
 
257
      amqp_assert(D_8(state->inbound_buffer, 3) == (uint8_t) 'P',
 
258
                  "Invalid protocol header received");
 
259
      decoded_frame->payload.protocol_header.transport_high = D_8(state->inbound_buffer, 4);
 
260
      decoded_frame->payload.protocol_header.transport_low = D_8(state->inbound_buffer, 5);
 
261
      decoded_frame->payload.protocol_header.protocol_version_major = D_8(state->inbound_buffer, 6);
 
262
      decoded_frame->payload.protocol_header.protocol_version_minor = D_8(state->inbound_buffer, 7);
 
263
 
 
264
      return_to_idle(state);
 
265
      return total_bytes_consumed;
 
266
 
 
267
    default:
 
268
      amqp_assert(0, "Internal error: invalid amqp_connection_state_t->state %d", state->state);
 
269
  }
 
270
}
 
271
 
 
272
amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
 
273
  return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
 
274
}
 
275
 
 
276
void amqp_release_buffers(amqp_connection_state_t state) {
 
277
  ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
 
278
 
 
279
  amqp_assert(state->first_queued_frame == NULL,
 
280
              "Programming error: attempt to amqp_release_buffers while waiting events enqueued");
 
281
 
 
282
  recycle_amqp_pool(&state->frame_pool);
 
283
  recycle_amqp_pool(&state->decoding_pool);
 
284
}
 
285
 
 
286
void amqp_maybe_release_buffers(amqp_connection_state_t state) {
 
287
  if (amqp_release_buffers_ok(state)) {
 
288
    amqp_release_buffers(state);
 
289
  }
 
290
}
 
291
 
 
292
static int inner_send_frame(amqp_connection_state_t state,
 
293
                            amqp_frame_t const *frame,
 
294
                            amqp_bytes_t *encoded,
 
295
                            int *payload_len)
 
296
{
 
297
  int separate_body;
 
298
 
 
299
  E_8(state->outbound_buffer, 0, frame->frame_type);
 
300
  E_16(state->outbound_buffer, 1, frame->channel);
 
301
  switch (frame->frame_type) {
 
302
    case AMQP_FRAME_METHOD:
 
303
      E_32(state->outbound_buffer, HEADER_SIZE, frame->payload.method.id);
 
304
      encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 4 + FOOTER_SIZE);
 
305
      encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 4, encoded->len);
 
306
      *payload_len = AMQP_CHECK_RESULT(amqp_encode_method(frame->payload.method.id,
 
307
                                                          frame->payload.method.decoded,
 
308
                                                          *encoded)) + 4;
 
309
      separate_body = 0;
 
310
      break;
 
311
 
 
312
    case AMQP_FRAME_HEADER:
 
313
      E_16(state->outbound_buffer, HEADER_SIZE, frame->payload.properties.class_id);
 
314
      E_16(state->outbound_buffer, HEADER_SIZE+2, 0); /* "weight" */
 
315
      E_64(state->outbound_buffer, HEADER_SIZE+4, frame->payload.properties.body_size);
 
316
      encoded->len = state->outbound_buffer.len - (HEADER_SIZE + 12 + FOOTER_SIZE);
 
317
      encoded->bytes = D_BYTES(state->outbound_buffer, HEADER_SIZE + 12, encoded->len);
 
318
      *payload_len = AMQP_CHECK_RESULT(amqp_encode_properties(frame->payload.properties.class_id,
 
319
                                                              frame->payload.properties.decoded,
 
320
                                                              *encoded)) + 12;
 
321
      separate_body = 0;
 
322
      break;
 
323
 
 
324
    case AMQP_FRAME_BODY:
 
325
      *encoded = frame->payload.body_fragment;
 
326
      *payload_len = encoded->len;
 
327
      separate_body = 1;
 
328
      break;
 
329
 
 
330
    case AMQP_FRAME_HEARTBEAT:
 
331
      *encoded = AMQP_EMPTY_BYTES;
 
332
      *payload_len = 0;
 
333
      separate_body = 0;
 
334
      break;
 
335
 
 
336
    default:
 
337
      return -EINVAL;
 
338
  }
 
339
 
 
340
  E_32(state->outbound_buffer, 3, *payload_len);
 
341
  if (!separate_body) {
 
342
    E_8(state->outbound_buffer, *payload_len + HEADER_SIZE, AMQP_FRAME_END);
 
343
  }
 
344
 
 
345
#if 0
 
346
  if (separate_body) {
 
347
    printf("sending body frame (header):\n");
 
348
    amqp_dump(state->outbound_buffer.bytes, HEADER_SIZE);
 
349
    printf("sending body frame (payload):\n");
 
350
    amqp_dump(encoded->bytes, *payload_len);
 
351
  } else {
 
352
    printf("sending:\n");
 
353
    amqp_dump(state->outbound_buffer.bytes, *payload_len + HEADER_SIZE + FOOTER_SIZE);
 
354
  }
 
355
#endif
 
356
 
 
357
  return separate_body;
 
358
}
 
359
 
 
360
int amqp_send_frame(amqp_connection_state_t state,
 
361
                    amqp_frame_t const *frame)
 
362
{
 
363
  amqp_bytes_t encoded;
 
364
  int payload_len;
 
365
  int separate_body;
 
366
 
 
367
  separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
 
368
  switch (separate_body) {
 
369
    case 0:
 
370
      AMQP_CHECK_RESULT(write(state->sockfd,
 
371
                              state->outbound_buffer.bytes,
 
372
                              payload_len + (HEADER_SIZE + FOOTER_SIZE)));
 
373
      return 0;
 
374
 
 
375
    case 1:
 
376
      AMQP_CHECK_RESULT(write(state->sockfd, state->outbound_buffer.bytes, HEADER_SIZE));
 
377
      AMQP_CHECK_RESULT(write(state->sockfd, encoded.bytes, payload_len));
 
378
      {
 
379
        assert(FOOTER_SIZE == 1);
 
380
        char frame_end_byte = AMQP_FRAME_END;
 
381
        AMQP_CHECK_RESULT(write(state->sockfd, &frame_end_byte, FOOTER_SIZE));
 
382
      }
 
383
      return 0;
 
384
 
 
385
    default:
 
386
      return separate_body;
 
387
  }
 
388
}
 
389
 
 
390
int amqp_send_frame_to(amqp_connection_state_t state,
 
391
                       amqp_frame_t const *frame,
 
392
                       amqp_output_fn_t fn,
 
393
                       void *context)
 
394
{
 
395
  amqp_bytes_t encoded;
 
396
  int payload_len;
 
397
  int separate_body;
 
398
 
 
399
  separate_body = inner_send_frame(state, frame, &encoded, &payload_len);
 
400
  switch (separate_body) {
 
401
    case 0:
 
402
      AMQP_CHECK_RESULT(fn(context,
 
403
                           state->outbound_buffer.bytes,
 
404
                           payload_len + (HEADER_SIZE + FOOTER_SIZE)));
 
405
      return 0;
 
406
 
 
407
    case 1:
 
408
      AMQP_CHECK_RESULT(fn(context, state->outbound_buffer.bytes, HEADER_SIZE));
 
409
      AMQP_CHECK_RESULT(fn(context, encoded.bytes, payload_len));
 
410
      {
 
411
        assert(FOOTER_SIZE == 1);
 
412
        char frame_end_byte = AMQP_FRAME_END;
 
413
        AMQP_CHECK_RESULT(fn(context, &frame_end_byte, FOOTER_SIZE));
 
414
      }
 
415
      return 0;
 
416
 
 
417
    default:
 
418
      return separate_body;
 
419
  }
 
420
}