~ubuntu-branches/ubuntu/wily/syslog-ng/wily-proposed

« back to all changes in this revision

Viewing changes to modules/afamqp/rabbitmq-c/tools/consume.c

  • Committer: Package Import Robot
  • Author(s): Gergely Nagy, Gergely Nagy
  • Date: 2013-11-04 15:27:37 UTC
  • mfrom: (1.3.12)
  • Revision ID: package-import@ubuntu.com-20131104152737-mqh6eqtna2xk97jq
Tags: 3.5.1-1
[ Gergely Nagy <algernon@madhouse-project.org> ]
* New upstream release.
  + Support auto-loading modules (Closes: #650814)
  + The SMTP module is available in syslog-ng-mod-smtp (Closes: #722746)
  + New modules: amqp, geoip, stomp, redis and smtp.
  + Multi-line input support (indented multiline and regexp-based)
  + Template type hinting for the MongoDB destination and $(format-json)
  + Support for unit suffixes in the configuration file
  + New filters, template functions and other miscellaneous changes
* New (team) maintainer, Laszlo Boszormenyi, Attila Szalay and myself
  added to Uploaders.
* Ship /var/lib/syslog-ng in the syslog-ng-core package, instead of
  creating it in the init script. Thanks Michael Biebl
  <biebl@debian.org> for the report & assistance. (Closes: #699942, #719910)
* Use dh-systemd for proper systemd-related maintainer scripts. Based on
  a patch by Michael Biebl <biebl@debian.org>. (Closes: #713982,
  #690067)
* Do not wait for syslog-ng to settle down during installation / update.
  This also fixes installing via debootstrap and a fake
  start-stop-daemon. (Closes: #714254)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * ***** BEGIN LICENSE BLOCK *****
 
3
 * Version: MIT
 
4
 *
 
5
 * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
 
6
 * All Rights Reserved.
 
7
 *
 
8
 * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
 
9
 * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
 
10
 *
 
11
 * Permission is hereby granted, free of charge, to any person
 
12
 * obtaining a copy of this software and associated documentation
 
13
 * files (the "Software"), to deal in the Software without
 
14
 * restriction, including without limitation the rights to use, copy,
 
15
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 
16
 * of the Software, and to permit persons to whom the Software is
 
17
 * furnished to do so, subject to the following conditions:
 
18
 *
 
19
 * The above copyright notice and this permission notice shall be
 
20
 * included in all copies or substantial portions of the Software.
 
21
 *
 
22
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 
23
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 
24
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 
25
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 
26
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 
27
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 
28
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 
29
 * SOFTWARE.
 
30
 * ***** END LICENSE BLOCK *****
 
31
 */
 
32
 
 
33
#ifdef HAVE_CONFIG_H
 
34
#include "config.h"
 
35
#endif
 
36
 
 
37
#include <stdio.h>
 
38
#include <stdlib.h>
 
39
 
 
40
#include "common.h"
 
41
#include "process.h"
 
42
 
 
43
/* Convert a amqp_bytes_t to an escaped string form for printing.  We
 
44
   use the same escaping conventions as rabbitmqctl. */
 
45
static char *stringify_bytes(amqp_bytes_t bytes)
 
46
{
 
47
        /* We will need up to 4 chars per byte, plus the terminating 0 */
 
48
        char *res = malloc(bytes.len * 4 + 1);
 
49
        uint8_t *data = bytes.bytes;
 
50
        char *p = res;
 
51
        size_t i;
 
52
 
 
53
        for (i = 0; i < bytes.len; i++) {
 
54
                if (data[i] >= 32 && data[i] != 127) {
 
55
                        *p++ = data[i];
 
56
                }
 
57
                else {
 
58
                        *p++ = '\\';
 
59
                        *p++ = '0' + (data[i] >> 6);
 
60
                        *p++ = '0' + (data[i] >> 3 & 0x7);
 
61
                        *p++ = '0' + (data[i] & 0x7);
 
62
                }
 
63
        }
 
64
 
 
65
        *p = 0;
 
66
        return res;
 
67
}
 
68
 
 
69
static amqp_bytes_t setup_queue(amqp_connection_state_t conn,
 
70
                                char *queue, char *exchange,
 
71
                                char *routing_key, int declare)
 
72
{
 
73
        amqp_bytes_t queue_bytes = cstring_bytes(queue);
 
74
 
 
75
        /* if an exchange name wasn't provided, check that we don't
 
76
           have options that require it. */
 
77
        if (!exchange && routing_key) {
 
78
                fprintf(stderr, "--routing-key option requires an exchange"
 
79
                        " name to be provided with --exchange\n");
 
80
                exit(1);
 
81
        }
 
82
 
 
83
        if (!queue || exchange || declare) {
 
84
                /* Declare the queue as auto-delete.  */
 
85
                amqp_queue_declare_ok_t *res = amqp_queue_declare(conn, 1,
 
86
                                                    queue_bytes, 0, 0, 1, 1,
 
87
                                                    amqp_empty_table);
 
88
                if (!res)
 
89
                        die_rpc(amqp_get_rpc_reply(conn), "queue.declare");
 
90
 
 
91
                if (!queue) {
 
92
                        /* the server should have provided a queue name */
 
93
                        char *sq;
 
94
                        queue_bytes = amqp_bytes_malloc_dup(res->queue);
 
95
                        sq = stringify_bytes(queue_bytes);
 
96
                        fprintf(stderr, "Server provided queue name: %s\n",
 
97
                                sq);
 
98
                        free(sq);
 
99
                }
 
100
 
 
101
                /* Bind to an exchange if requested */
 
102
                if (exchange) {
 
103
                        amqp_bytes_t eb = amqp_cstring_bytes(exchange);
 
104
                        if (!amqp_queue_bind(conn, 1, queue_bytes, eb,
 
105
                                             cstring_bytes(routing_key),
 
106
                                             amqp_empty_table))
 
107
                                die_rpc(amqp_get_rpc_reply(conn),
 
108
                                        "queue.bind");
 
109
                }
 
110
        }
 
111
 
 
112
        return queue_bytes;
 
113
}
 
114
 
 
115
static void do_consume(amqp_connection_state_t conn, amqp_bytes_t queue,
 
116
                       int no_ack, int count, const char * const *argv)
 
117
{
 
118
        int i;
 
119
 
 
120
        /* If there is a limit, set the qos to match */
 
121
        if (count > 0 && count <= 65535
 
122
            && !amqp_basic_qos(conn, 1, 0, count, 0))
 
123
                die_rpc(amqp_get_rpc_reply(conn), "basic.qos");
 
124
 
 
125
        if (!amqp_basic_consume(conn, 1, queue, amqp_empty_bytes, 0, no_ack,
 
126
                                0, amqp_empty_table))
 
127
                die_rpc(amqp_get_rpc_reply(conn), "basic.consume");
 
128
 
 
129
        for (i = 0; count < 0 || i < count; i++) {
 
130
                amqp_frame_t frame;
 
131
                struct pipeline pl;
 
132
                uint64_t delivery_tag;
 
133
                int res = amqp_simple_wait_frame(conn, &frame);
 
134
                die_amqp_error(res, "waiting for header frame");
 
135
 
 
136
                if (frame.frame_type != AMQP_FRAME_METHOD
 
137
                    || frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
 
138
                        continue;
 
139
 
 
140
                amqp_basic_deliver_t *deliver
 
141
                        = (amqp_basic_deliver_t *)frame.payload.method.decoded;
 
142
                delivery_tag = deliver->delivery_tag;
 
143
 
 
144
                pipeline(argv, &pl);
 
145
                copy_body(conn, pl.infd);
 
146
 
 
147
                if (finish_pipeline(&pl) && !no_ack)
 
148
                        die_amqp_error(amqp_basic_ack(conn, 1, delivery_tag,
 
149
                                                      0),
 
150
                                       "basic.ack");
 
151
 
 
152
                amqp_maybe_release_buffers(conn);
 
153
        }
 
154
}
 
155
 
 
156
int main(int argc, const char **argv)
 
157
{
 
158
        poptContext opts;
 
159
        amqp_connection_state_t conn;
 
160
        const char * const *cmd_argv;
 
161
        char *queue = NULL;
 
162
        char *exchange = NULL;
 
163
        char *routing_key = NULL;
 
164
        int declare = 0;
 
165
        int no_ack = 0;
 
166
        int count = -1;
 
167
        amqp_bytes_t queue_bytes;
 
168
 
 
169
        struct poptOption options[] = {
 
170
                INCLUDE_OPTIONS(connect_options),
 
171
                {"queue", 'q', POPT_ARG_STRING, &queue, 0,
 
172
                 "the queue to consume from", "queue"},
 
173
                {"exchange", 'e', POPT_ARG_STRING, &exchange, 0,
 
174
                 "bind the queue to this exchange", "exchange"},
 
175
                {"routing-key", 'r', POPT_ARG_STRING, &routing_key, 0,
 
176
                 "the routing key to bind with", "routing key"},
 
177
                {"declare", 'd', POPT_ARG_NONE, &declare, 0,
 
178
                 "declare an exclusive queue", NULL},
 
179
                {"no-ack", 'A', POPT_ARG_NONE, &no_ack, 0,
 
180
                 "consume in no-ack mode", NULL},
 
181
                {"count", 'c', POPT_ARG_INT, &count, 0,
 
182
                 "stop consuming after this many messages are consumed",
 
183
                 "limit"},
 
184
                POPT_AUTOHELP
 
185
                { NULL, '\0', 0, NULL, 0, NULL, NULL }
 
186
        };
 
187
 
 
188
        opts = process_options(argc, argv, options,
 
189
                               "[OPTIONS]... <command> <args>");
 
190
 
 
191
        cmd_argv = poptGetArgs(opts);
 
192
        if (!cmd_argv || !cmd_argv[0]) {
 
193
                fprintf(stderr, "consuming command not specified\n");
 
194
                poptPrintUsage(opts, stderr, 0);
 
195
                goto error;
 
196
        }
 
197
 
 
198
        conn = make_connection();
 
199
        queue_bytes = setup_queue(conn, queue, exchange, routing_key, declare);
 
200
        do_consume(conn, queue_bytes, no_ack, count, cmd_argv);
 
201
        close_connection(conn);
 
202
        return 0;
 
203
 
 
204
error:
 
205
        poptFreeContext(opts);
 
206
        return 1;
 
207
}