~verterok/+junk/postgresql-amqp

« back to all changes in this revision

Viewing changes to pg_amqp.c

  • Committer: Rodney Dawes
  • Date: 2010-08-19 14:35:20 UTC
  • Revision ID: rodney.dawes@canonical.com-20100819143520-25qfv1scbjt3p3xj
Tags: upstream-0.1+r180
ImportĀ upstreamĀ versionĀ 0.1+r180

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions are
 
7
 * met:
 
8
 *
 
9
 *     * Redistributions of source code must retain the above copyright
 
10
 *       notice, this list of conditions and the following disclaimer.
 
11
 *     * Redistributions in binary form must reproduce the above
 
12
 *       copyright notice, this list of conditions and the following
 
13
 *       disclaimer in the documentation and/or other materials provided
 
14
 *       with the distribution.
 
15
 *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
 
16
 *       of its contributors may be used to endorse or promote products
 
17
 *       derived from this software without specific prior written
 
18
 *       permission.
 
19
 *
 
20
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
21
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
22
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
23
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
24
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
25
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
26
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
27
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
28
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
29
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
30
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
31
 *
 
32
 * Author: Theo Schlossnagle
 
33
 *
 
34
 */
 
35
 
 
36
#include <time.h>
 
37
#include <sys/time.h>
 
38
 
 
39
#include "postgres.h"
 
40
#include "funcapi.h"
 
41
#include "fmgr.h"
 
42
#include "miscadmin.h"
 
43
#include "pgstat.h"
 
44
#include "executor/spi.h"
 
45
#include "storage/lwlock.h"
 
46
#include "storage/shmem.h"
 
47
#include "storage/ipc.h"
 
48
#include "access/xact.h"
 
49
#include "utils/memutils.h"
 
50
#include "utils/builtins.h"
 
51
#include "librabbitmq/amqp.h"
 
52
#include "librabbitmq/amqp_framing.h"
 
53
 
 
54
#define set_bytes_from_text(var,col) do { \
 
55
  if(!PG_ARGISNULL(col)) { \
 
56
    text *txt = PG_GETARG_TEXT_PP(col); \
 
57
    var.bytes = VARDATA_ANY(txt); \
 
58
    var.len = VARSIZE_ANY_EXHDR(txt); \
 
59
  } \
 
60
} while(0)
 
61
 
 
62
#ifdef PG_MODULE_MAGIC
 
63
PG_MODULE_MAGIC;
 
64
#endif
 
65
void _PG_init(void);
 
66
Datum pg_amqp_exchange_declare(PG_FUNCTION_ARGS);
 
67
Datum pg_amqp_publish(PG_FUNCTION_ARGS);
 
68
Datum pg_amqp_autonomous_publish(PG_FUNCTION_ARGS);
 
69
Datum pg_amqp_disconnect(PG_FUNCTION_ARGS);
 
70
 
 
71
struct brokerstate {
 
72
  int broker_id;
 
73
  amqp_connection_state_t conn;
 
74
  int sockfd;
 
75
  int uncommitted;
 
76
  int inerror;
 
77
  struct brokerstate *next;
 
78
};
 
79
 
 
80
static struct brokerstate *HEAD_BS = NULL;
 
81
 
 
82
static void
 
83
local_amqp_disconnect_bs(struct brokerstate *bs) {
 
84
  if(bs && bs->conn) {
 
85
    int errorstate = bs->inerror;
 
86
    amqp_connection_close(bs->conn, AMQP_REPLY_SUCCESS);
 
87
    if(bs->sockfd >= 0) close(bs->sockfd);
 
88
    amqp_destroy_connection(bs->conn);
 
89
    memset(bs, 0, sizeof(*bs));
 
90
    bs->inerror = errorstate;
 
91
  }
 
92
}
 
93
static void amqp_local_phase2(XactEvent event, void *arg) {
 
94
  amqp_rpc_reply_t *reply;
 
95
  struct brokerstate *bs;
 
96
  switch(event) {
 
97
    case XACT_EVENT_COMMIT:
 
98
      for(bs = HEAD_BS; bs; bs = bs->next) {
 
99
        if(bs->inerror) local_amqp_disconnect_bs(bs);
 
100
        bs->inerror = 0;
 
101
        if(!bs->uncommitted) continue;
 
102
        if(bs->conn) amqp_tx_commit(bs->conn, 2, AMQP_EMPTY_TABLE);
 
103
        reply = amqp_get_rpc_reply();
 
104
        if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
 
105
          elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id);
 
106
          local_amqp_disconnect_bs(bs);
 
107
        }
 
108
        bs->uncommitted = 0;
 
109
      }
 
110
      break;
 
111
    case XACT_EVENT_ABORT:
 
112
      for(bs = HEAD_BS; bs; bs = bs->next) {
 
113
        if(bs->inerror) local_amqp_disconnect_bs(bs);
 
114
        bs->inerror = 0;
 
115
        if(!bs->uncommitted) continue;
 
116
        if(bs->conn) amqp_tx_rollback(bs->conn, 2, AMQP_EMPTY_TABLE);
 
117
        reply = amqp_get_rpc_reply();
 
118
        if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
 
119
          elog(WARNING, "amqp could not commit tx mode on broker %d", bs->broker_id);
 
120
          local_amqp_disconnect_bs(bs);
 
121
        }
 
122
        bs->uncommitted = 0;
 
123
      }
 
124
      break;
 
125
    case XACT_EVENT_PREPARE:
 
126
      /* nothin' */
 
127
      return;
 
128
      break;
 
129
  }
 
130
}
 
131
 
 
132
void _PG_init() {
 
133
  RegisterXactCallback(amqp_local_phase2, NULL);
 
134
}
 
135
 
 
136
static struct brokerstate *
 
137
local_amqp_get_a_bs(broker_id) {
 
138
  struct brokerstate *bs;
 
139
  for(bs = HEAD_BS; bs; bs = bs->next) {
 
140
    if(bs->broker_id == broker_id) return bs;
 
141
  }
 
142
  bs = MemoryContextAllocZero(TopMemoryContext, sizeof(*bs));
 
143
  bs->broker_id = broker_id;
 
144
  bs->next = HEAD_BS;
 
145
  HEAD_BS = bs;
 
146
  return bs;
 
147
}
 
148
static struct brokerstate *
 
149
local_amqp_get_bs(broker_id) {
 
150
  char sql[1024];
 
151
  struct brokerstate *bs = local_amqp_get_a_bs(broker_id);
 
152
  if(bs->conn) return bs;
 
153
  if(SPI_connect() == SPI_ERROR_CONNECT) return NULL;
 
154
  snprintf(sql, sizeof(sql), "SELECT host, port, vhost, username, password "
 
155
                             "  FROM amqp.broker "
 
156
                             " WHERE broker_id = %d", broker_id);
 
157
  if(SPI_OK_SELECT == SPI_execute(sql, true, 1)) {
 
158
    if(1 == SPI_processed) {
 
159
      amqp_rpc_reply_t *reply, s_reply;
 
160
      char *host, *vhost, *user, *pass;
 
161
      Datum port_datum;
 
162
      bool is_null;
 
163
      int port = 5672;
 
164
      host = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
 
165
      if(!host) host = "localhost";
 
166
      port_datum = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, &is_null);
 
167
      if(!is_null) port = DatumGetInt32(port_datum);
 
168
      vhost = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3);
 
169
      if(!vhost) vhost = "/";
 
170
      user = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4);
 
171
      if(!user) user = "guest";
 
172
      pass = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5);
 
173
      if(!pass) pass = "guest";
 
174
 
 
175
      bs->conn = amqp_new_connection();
 
176
      if(!bs->conn) { SPI_finish(); return NULL; }
 
177
      bs->sockfd = amqp_open_socket(host, port);
 
178
      if(bs->sockfd < 0) goto busted;
 
179
      amqp_set_sockfd(bs->conn, bs->sockfd);
 
180
      s_reply = amqp_login(bs->conn, vhost, 0, 131072,
 
181
                           0, AMQP_SASL_METHOD_PLAIN,
 
182
                           user, pass);
 
183
      if(s_reply.reply_type != AMQP_RESPONSE_NORMAL) {
 
184
        elog(WARNING, "amqp login failed on broker %d", broker_id);
 
185
        goto busted;
 
186
      }
 
187
      amqp_channel_open(bs->conn, 1);
 
188
      reply = amqp_get_rpc_reply();
 
189
      if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
 
190
        elog(WARNING, "amqp channel open failed on broker %d", broker_id);
 
191
        goto busted;
 
192
      }
 
193
      amqp_channel_open(bs->conn, 2);
 
194
      reply = amqp_get_rpc_reply();
 
195
      if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
 
196
        elog(WARNING, "amqp channel open failed on broker %d", broker_id);
 
197
        goto busted;
 
198
      }
 
199
      amqp_tx_select(bs->conn, 2, AMQP_EMPTY_TABLE);
 
200
      reply = amqp_get_rpc_reply();
 
201
      if(reply->reply_type != AMQP_RESPONSE_NORMAL) {
 
202
        elog(WARNING, "amqp could not start tx mode on broker %d", broker_id);
 
203
        goto busted;
 
204
      }
 
205
    } else {
 
206
      elog(WARNING, "amqp can't find broker %d", broker_id);
 
207
    }
 
208
  } else {
 
209
    elog(WARNING, "amqp broker lookup query failed");
 
210
  }
 
211
  SPI_finish();
 
212
  return bs;
 
213
 busted:
 
214
  SPI_finish();
 
215
  local_amqp_disconnect_bs(bs);
 
216
  return bs;
 
217
}
 
218
static void
 
219
local_amqp_disconnect(broker_id) {
 
220
  struct brokerstate *bs = local_amqp_get_a_bs(broker_id);
 
221
  local_amqp_disconnect_bs(bs);
 
222
}
 
223
 
 
224
Datum
 
225
pg_amqp_exchange_declare(PG_FUNCTION_ARGS) {
 
226
  struct brokerstate *bs;
 
227
  if(!PG_ARGISNULL(0)) {
 
228
    int broker_id;
 
229
    broker_id = PG_GETARG_INT32(0);
 
230
    bs = local_amqp_get_bs(broker_id);
 
231
    if(bs && bs->conn) {
 
232
      amqp_rpc_reply_t *reply;
 
233
      amqp_bytes_t exchange_b;
 
234
      amqp_bytes_t exchange_type_b;
 
235
      amqp_boolean_t passive = 0;
 
236
      amqp_boolean_t durable = 0;
 
237
      amqp_boolean_t auto_delete = 0;
 
238
 
 
239
      set_bytes_from_text(exchange_b,1);
 
240
      set_bytes_from_text(exchange_type_b,2);
 
241
      passive = PG_GETARG_BOOL(3);
 
242
      durable = PG_GETARG_BOOL(4);
 
243
      auto_delete = PG_GETARG_BOOL(5);
 
244
      amqp_exchange_declare(bs->conn, 1,
 
245
                            exchange_b, exchange_type_b,
 
246
                            passive, durable, auto_delete, AMQP_EMPTY_TABLE);
 
247
      reply = amqp_get_rpc_reply();
 
248
      if(reply->reply_type == AMQP_RESPONSE_NORMAL)
 
249
        PG_RETURN_BOOL(0 == 0);
 
250
      bs->inerror = 1;
 
251
    }
 
252
  }
 
253
  PG_RETURN_BOOL(0 != 0);
 
254
}
 
255
static Datum
 
256
pg_amqp_publish_opt(PG_FUNCTION_ARGS, int channel) {
 
257
  struct brokerstate *bs;
 
258
  if(!PG_ARGISNULL(0)) {
 
259
    int broker_id;
 
260
    int once_more = 1;
 
261
    broker_id = PG_GETARG_INT32(0);
 
262
  redo:
 
263
    bs = local_amqp_get_bs(broker_id);
 
264
    if(bs && bs->conn && (channel == 1 || !bs->inerror)) {
 
265
      int rv;
 
266
      amqp_rpc_reply_t *reply;
 
267
      amqp_boolean_t mandatory = 0;
 
268
      amqp_boolean_t immediate = 0;
 
269
      amqp_bytes_t exchange_b = amqp_cstring_bytes("amq.direct");
 
270
      amqp_bytes_t routing_key_b = amqp_cstring_bytes("");
 
271
      amqp_bytes_t body_b = amqp_cstring_bytes("");
 
272
 
 
273
      set_bytes_from_text(exchange_b,1);
 
274
      set_bytes_from_text(routing_key_b,2);
 
275
      set_bytes_from_text(body_b,3);
 
276
      rv = amqp_basic_publish(bs->conn, channel, exchange_b, routing_key_b,
 
277
                              mandatory, immediate, NULL, body_b);
 
278
      reply = amqp_get_rpc_reply();
 
279
      if(rv || reply->reply_type != AMQP_RESPONSE_NORMAL) {
 
280
        if(once_more && (channel == 1 || bs->uncommitted == 0)) {
 
281
          once_more = 0;
 
282
          local_amqp_disconnect_bs(bs);
 
283
          goto redo;
 
284
        }
 
285
        bs->inerror = 1;
 
286
        PG_RETURN_BOOL(0 != 0);
 
287
      }
 
288
      /* channel two is transactional */
 
289
      if(channel == 2) bs->uncommitted++;
 
290
      PG_RETURN_BOOL(rv == 0);
 
291
    }
 
292
  }
 
293
  PG_RETURN_BOOL(0 != 0);
 
294
}
 
295
 
 
296
PG_FUNCTION_INFO_V1(pg_amqp_publish);
 
297
Datum
 
298
pg_amqp_publish(PG_FUNCTION_ARGS) {
 
299
  return pg_amqp_publish_opt(fcinfo, 2);
 
300
}
 
301
 
 
302
PG_FUNCTION_INFO_V1(pg_amqp_autonomous_publish);
 
303
Datum
 
304
pg_amqp_autonomous_publish(PG_FUNCTION_ARGS) {
 
305
  return pg_amqp_publish_opt(fcinfo, 1);
 
306
}
 
307
 
 
308
PG_FUNCTION_INFO_V1(pg_amqp_disconnect);
 
309
Datum
 
310
pg_amqp_disconnect(PG_FUNCTION_ARGS) {
 
311
  if(!PG_ARGISNULL(0)) {
 
312
    int broker_id;
 
313
    broker_id = PG_GETARG_INT32(0);
 
314
    local_amqp_disconnect(broker_id);
 
315
  }
 
316
  PG_RETURN_VOID();
 
317
}
 
318