2
* Copyright (c) 2009, OmniTI Computer Consulting, Inc.
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are
9
* * Redistributions of source code must retain the above copyright
10
* notice, this list of conditions and the following disclaimer.
11
* * Redistributions in binary form must reproduce the above
12
* copyright notice, this list of conditions and the following
13
* disclaimer in the documentation and/or other materials provided
14
* with the distribution.
15
* * Neither the name OmniTI Computer Consulting, Inc. nor the names
16
* of its contributors may be used to endorse or promote products
17
* derived from this software without specific prior written
20
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32
* Author: Theo Schlossnagle
42
#include "miscadmin.h"
44
#include "executor/spi.h"
45
#include "storage/lwlock.h"
46
#include "storage/shmem.h"
47
#include "storage/ipc.h"
48
#include "access/xact.h"
49
#include "utils/memutils.h"
50
#include "utils/builtins.h"
51
#include "librabbitmq/amqp.h"
52
#include "librabbitmq/amqp_framing.h"
54
#define set_bytes_from_text(var,col) do { \
55
if(!PG_ARGISNULL(col)) { \
56
text *txt = PG_GETARG_TEXT_PP(col); \
57
var.bytes = VARDATA_ANY(txt); \
58
var.len = VARSIZE_ANY_EXHDR(txt); \
62
#ifdef PG_MODULE_MAGIC
66
Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS);
67
Datum pg_amqp_publish(PG_FUNCTION_ARGS);
68
Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS);
69
Datum pg_amqp_disconnect(PG_FUNCTION_ARGS);
73
amqp_connection_state_t conn;
77
struct brokerstate *next;
80
static struct brokerstate *HEAD_BS = NULL;
83
local_amqp_disconnect_bs(struct brokerstate *bs) {
85
int errorstate = bs->inerror;
86
amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS);
87
if(bs->sockfd >= 0) close(bs->sockfd);
88
amqp_destroy_connection(bs->conn);
89
memset(bs, 0, sizeof(*bs));
90
bs->inerror = errorstate;
93
static void amqp_local_phase2(XactEvent event, void *arg) {
94
amqp_rpc_reply_t *reply;
95
struct brokerstate *bs;
97
case XACT_EVENT_COMMIT:
98
for(bs = HEAD_BS; bs; bs = bs->next) {
99
if(bs->inerror) local_amqp_disconnect_bs(bs);
101
if(!bs->uncommitted) continue;
102
if(bs->conn) amqp_tx_commit(bs->conn, 2, AMQP_EMPTY_TABLE);
103
reply = amqp_get_rpc_reply();
104
if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
105
elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id);
106
local_amqp_disconnect_bs(bs);
111
case XACT_EVENT_ABORT:
112
for(bs = HEAD_BS; bs; bs = bs->next) {
113
if(bs->inerror) local_amqp_disconnect_bs(bs);
115
if(!bs->uncommitted) continue;
116
if(bs->conn) amqp_tx_rollback(bs->conn, 2, AMQP_EMPTY_TABLE);
117
reply = amqp_get_rpc_reply();
118
if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
119
elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id);
120
local_amqp_disconnect_bs(bs);
125
case XACT_EVENT_PREPARE:
133
RegisterXactCallback(amqp_local_phase2, NULL);
136
static struct brokerstate *
137
local_amqp_get_a_bs(broker_id) {
138
struct brokerstate *bs;
139
for(bs = HEAD_BS; bs; bs = bs->next) {
140
if(bs->broker_id == broker_id) return bs;
142
bs = MemoryContextAllocZero(TopMemoryContext, sizeof(*bs));
143
bs->broker_id = broker_id;
148
static struct brokerstate *
149
local_amqp_get_bs(broker_id) {
151
struct brokerstate *bs = local_amqp_get_a_bs(broker_id);
152
if(bs->conn) return bs;
153
if(SPI_connect() == SPI_ERROR_CONNECT) return NULL;
154
snprintf(sql, sizeof(sql), "SELECT host, port, vhost, username, password "
156
" WHERE broker_id = %d", broker_id);
157
if(SPI_OK_SELECT == SPI_execute(sql, true, 1)) {
158
if(1 == SPI_processed) {
159
amqp_rpc_reply_t *reply, s_reply;
160
char *host, *vhost, *user, *pass;
164
host = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
165
if(!host) host = "localhost";
166
port_datum = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, &is_null);
167
if(!is_null) port = DatumGetInt32(port_datum);
168
vhost = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3);
169
if(!vhost) vhost = "/";
170
user = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4);
171
if(!user) user = "guest";
172
pass = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5);
173
if(!pass) pass = "guest";
175
bs->conn = amqp_new_connection();
176
if(!bs->conn) { SPI_finish(); return NULL; }
177
bs->sockfd = amqp_open_socket(host, port);
178
if(bs->sockfd < 0) goto busted;
179
amqp_set_sockfd(bs->conn, bs->sockfd);
180
s_reply = amqp_login(bs->conn, vhost, 0, 131072,
181
0, AMQP_SASL_METHOD_PLAIN,
183
if(s_reply.reply_type != AMQP_RESPONSE_NORMAL) {
184
elog(WARNING, "amqp login failed on broker %d", broker_id);
187
amqp_channel_open(bs->conn, 1);
188
reply = amqp_get_rpc_reply();
189
if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
190
elog(WARNING, "amqp channel open failed on broker %d", broker_id);
193
amqp_channel_open(bs->conn, 2);
194
reply = amqp_get_rpc_reply();
195
if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
196
elog(WARNING, "amqp channel open failed on broker %d", broker_id);
199
amqp_tx_select(bs->conn, 2, AMQP_EMPTY_TABLE);
200
reply = amqp_get_rpc_reply();
201
if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
202
elog(WARNING, "amqp could not start tx mode on broker %d", broker_id);
206
elog(WARNING, "amqp can't find broker %d", broker_id);
209
elog(WARNING, "amqp broker lookup query failed");
215
local_amqp_disconnect_bs(bs);
219
local_amqp_disconnect(broker_id) {
220
struct brokerstate *bs = local_amqp_get_a_bs(broker_id);
221
local_amqp_disconnect_bs(bs);
225
pg_amqp_exchange_declare(PG_FUNCTION_ARGS) {
226
struct brokerstate *bs;
227
if(!PG_ARGISNULL(0)) {
229
broker_id = PG_GETARG_INT32(0);
230
bs = local_amqp_get_bs(broker_id);
232
amqp_rpc_reply_t *reply;
233
amqp_bytes_t exchange_b;
234
amqp_bytes_t exchange_type_b;
235
amqp_boolean_t passive = 0;
236
amqp_boolean_t durable = 0;
237
amqp_boolean_t auto_delete = 0;
239
set_bytes_from_text(exchange_b,1);
240
set_bytes_from_text(exchange_type_b,2);
241
passive = PG_GETARG_BOOL(3);
242
durable = PG_GETARG_BOOL(4);
243
auto_delete = PG_GETARG_BOOL(5);
244
amqp_exchange_declare(bs->conn, 1,
245
exchange_b, exchange_type_b,
246
passive, durable, auto_delete, AMQP_EMPTY_TABLE);
247
reply = amqp_get_rpc_reply();
248
if(reply->reply_type == AMQP_RESPONSE_NORMAL)
249
PG_RETURN_BOOL(0 == 0);
253
PG_RETURN_BOOL(0 != 0);
256
pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) {
257
struct brokerstate *bs;
258
if(!PG_ARGISNULL(0)) {
261
broker_id = PG_GETARG_INT32(0);
263
bs = local_amqp_get_bs(broker_id);
264
if(bs && bs->conn && (channel == 1 || !bs->inerror)) {
266
amqp_rpc_reply_t *reply;
267
amqp_boolean_t mandatory = 0;
268
amqp_boolean_t immediate = 0;
269
amqp_bytes_t exchange_b = amqp_cstring_bytes("amq.direct");
270
amqp_bytes_t routing_key_b = amqp_cstring_bytes("");
271
amqp_bytes_t body_b = amqp_cstring_bytes("");
273
set_bytes_from_text(exchange_b,1);
274
set_bytes_from_text(routing_key_b,2);
275
set_bytes_from_text(body_b,3);
276
rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b,
277
mandatory, immediate, NULL, body_b);
278
reply = amqp_get_rpc_reply();
279
if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) {
280
if(once_more && (channel == 1 || bs->uncommitted == 0)) {
282
local_amqp_disconnect_bs(bs);
286
PG_RETURN_BOOL(0 != 0);
288
/* channel two is transactional */
289
if(channel == 2) bs->uncommitted++;
290
PG_RETURN_BOOL(rv == 0);
293
PG_RETURN_BOOL(0 != 0);
296
PG_FUNCTION_INFO_V1(pg_amqp_publish);
298
pg_amqp_publish(PG_FUNCTION_ARGS) {
299
return pg_amqp_publish_opt(fcinfo, 2);
302
PG_FUNCTION_INFO_V1(pg_amqp_autonomous_publish);
304
pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) {
305
return pg_amqp_publish_opt(fcinfo, 1);
308
PG_FUNCTION_INFO_V1(pg_amqp_disconnect);
310
pg_amqp_disconnect(PG_FUNCTION_ARGS) {
311
if(!PG_ARGISNULL(0)) {
313
broker_id = PG_GETARG_INT32(0);
314
local_amqp_disconnect(broker_id);