3
* $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.6.2.7 2009/08/06 07:55:17 t-ishii Exp $
3
* $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.35 2010/02/07 03:05:17 t-ishii Exp $
5
5
* pgpool: a language independent connection pool server for PostgreSQL
6
6
* written by Tatsuo Ishii
8
* Copyright (c) 2003-2009 PgPool Global Development Group
8
* Copyright (c) 2003-2010 PgPool Global Development Group
10
10
* Permission to use, copy, modify, and distribute this software and
11
11
* its documentation for any purpose and without fee is hereby
232
240
if (pool_config->parallel_mode)
234
/* The Query is analyzed first in a parallel mode(in_parallel_query),
242
/* The Query is analyzed first in a parallel mode(in_parallel_query),
235
243
* and, next, the Query is rewritten(rewrite_query_stmt).
238
246
/* analyze the query */
239
247
RewriteQuery *r_query = is_parallel_query(node,backend);
241
249
if(r_query->is_loadbalance)
243
/* Usual processing of pgpool is done by using the rewritten Query
244
* if judged a possible load-balancing as a result of analyzing
246
* Of course, the load is distributed only for load_balance_mode=true.
251
/* Usual processing of pgpool is done by using the rewritten Query
252
* if judged a possible load-balancing as a result of analyzing
254
* Of course, the load is distributed only for load_balance_mode=true.
248
256
if(r_query->r_code == SEND_LOADBALANCE_ENGINE)
479
488
/* start a transaction if needed */
480
if (start_internal_transaction(backend, (Node *)node) != POOL_CONTINUE)
489
if (start_internal_transaction(frontend, backend, (Node *)node) != POOL_CONTINUE)
483
492
/* check if need lock */
484
493
if (need_insert_lock(backend, string, node))
486
495
/* if so, issue lock command */
487
status = insert_lock(backend, string, (InsertStmt *)node);
496
status = insert_lock(frontend, backend, string, (InsertStmt *)node);
488
497
if (status != POOL_CONTINUE)
519
528
/* check if query is "COMMIT" or "ROLLBACK" */
520
529
commit = is_commit_query(node);
524
532
* Query is not commit/rollback
540
Portal *portal = NULL;
542
if (IsA(node, PrepareStmt))
544
portal = pending_prepared_portal;
545
portal->num_tsparams = 0;
547
else if (IsA(node, ExecuteStmt))
548
portal = lookup_prepared_statement_by_statement(
549
&prepared_list, ((ExecuteStmt *) node)->name);
551
/* rewrite `now()' to timestamp literal */
552
rewrite_query = rewrite_timestamp(backend, node, false, portal);
553
if (rewrite_query != NULL)
555
string = rewrite_query;
556
len = strlen(string) + 1;
562
* Optimization effort: If there's only one session, we do
563
* not need to wait for the master node's response, and
564
* could execute a query concurrently.
566
if (pool_config->num_init_children == 1)
568
/* Send query to DB nodes */
569
for (i=0;i<NUM_BACKENDS;i++)
571
if (!VALID_BACKEND(i))
574
per_node_statement_log(backend, i, string);
576
if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE)
583
/* Wait for response from DB nodes */
584
for (i=0;i<NUM_BACKENDS;i++)
586
if (!VALID_BACKEND(i))
589
if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
591
/* Cancel current transaction */
592
CancelPacket cancel_packet;
594
cancel_packet.protoVersion = htonl(PROTO_CANCEL);
595
cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
596
cancel_packet.key= MASTER_CONNECTION(backend)->key;
597
cancel_request(&cancel_packet);
604
* Check if some error detected. If so, emit
605
* log. This is usefull when invalid encoding error
606
* occurs. In this case, PostgreSQL does not report
607
* what statement caused that error and make users
610
per_node_error_log(backend, i, string, "SimpleQuery: Error or notice message from backend: ", true);
615
TSTATE(backend) = 'I';
618
return POOL_CONTINUE;
528
621
/* Send the query to master node */
623
per_node_statement_log(backend, MASTER_NODE_ID, string);
530
625
if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
533
if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
631
if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
535
633
/* Cancel current transaction */
536
634
CancelPacket cancel_packet;
554
653
string = POOL_ERROR_QUERY;
555
654
len = strlen(string) + 1;
659
* Check if some error detected. If so, emit
660
* log. This is usefull when invalid encoding error
661
* occurs. In this case, PostgreSQL does not report
662
* what statement caused that error and make users
665
per_node_error_log(backend, MASTER_NODE_ID, string, "SimpleQuery: Error or notice message from backend: ", true);
559
669
/* send query to other than master nodes */
582
697
cancel_packet.key= MASTER_CONNECTION(backend)->key;
583
698
cancel_request(&cancel_packet);
705
* Check if some error detected. If so, emit
706
* log. This is usefull when invalid encoding error
707
* occurs. In this case, PostgreSQL does not report
708
1* what statement caused that error and make users
711
per_node_error_log(backend, i, string, "SimpleQuery: Error or notice message from backend: ", true);
589
715
/* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
718
per_node_statement_log(backend, MASTER_NODE_ID, string);
592
720
if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
595
if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
726
if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
597
728
/* Cancel current transaction */
598
729
CancelPacket cancel_packet;
602
733
cancel_packet.key= MASTER_CONNECTION(backend)->key;
603
734
cancel_request(&cancel_packet);
741
* Check if some error detected. If so, emit
742
* log. This is usefull when invalid encoding error
743
* occurs. In this case, PostgreSQL does not report
744
1* what statement caused that error and make users
747
per_node_error_log(backend, MASTER_NODE_ID, string, "SimpleQuery: Error or notice message from backend: ", true);
609
749
TSTATE(backend) = 'I';
755
per_node_statement_log(backend, MASTER_NODE_ID, string);
615
757
if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
618
if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
760
if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
620
762
/* Cancel current transaction */
621
763
CancelPacket cancel_packet;
669
823
p_stmt = (PrepareStmt *)portal->stmt;
671
825
string1 = portal->sql_string;
826
pool_debug("Execute: query: %s", string1);
672
827
node = (Node *)p_stmt->query;
673
828
strncpy(query_string_buffer, string1, sizeof(query_string_buffer));
675
if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
676
IsA(node, VariableSetStmt)) &&
677
MASTER_SLAVE && TSTATE(backend) != 'E')
830
if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
831
IsA(node, VariableSetStmt)) &&
832
MASTER_SLAVE && TSTATE(backend) != 'E')
835
* PREPARE, DEALLOCATE, SET, DISCARD
836
* should be executed on all nodes. So we set
679
839
force_replication = 1;
682
* JDBC driver sends "BEGIN" query internally if setAutoCommit(false).
683
* But it does not send Sync message after "BEGIN" query.
684
* In extended query protocol, PostgreSQL returns
685
* ReadyForQuery when a client sends Sync message.
686
* We can't know a transaction state...
687
* So pgpool send Sync message internally.
842
* JDBC driver sends "BEGIN" query internally if
843
* setAutoCommit(false). But it does not send Sync message
844
* after "BEGIN" query. In extended query protocol,
845
* PostgreSQL returns ReadyForQuery when a client sends Sync
846
* message. Problem is, pgpool can't know the transaction
847
* state without receiving ReadyForQuery. So we remember that
848
* we need to send Sync message internally afterward, whenever
849
* we receive BEGIN in extended protocol.
689
851
else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
742
905
/* Send the query to master node */
906
per_node_statement_log(backend, MASTER_NODE_ID, string1);
744
907
if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
747
if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
910
if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
749
912
/* Cancel current transaction */
750
913
CancelPacket cancel_packet;
809
976
/* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
979
per_node_statement_log(backend, MASTER_NODE_ID, string1);
812
981
if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
815
if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
984
if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
817
986
/* Cancel current transaction */
818
987
CancelPacket cancel_packet;
1000
per_node_statement_log(backend, MASTER_NODE_ID, string1);
831
1002
if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
832
1003
return POOL_END;
834
if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
1005
if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
836
1007
/* Cancel current transaction */
837
1008
CancelPacket cancel_packet;
903
1075
parse_tree_list = raw_parser(stmt);
904
1076
if (parse_tree_list == NIL)
1078
/* free_parser(); */
1083
/* Save last query string for logging purpose */
1084
snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
910
1086
node = (Node *) lfirst(list_head(parse_tree_list));
912
1088
insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
1090
/* Special treatment for master/slave + temp tables */
1093
/* Is there "NO LOAD BALANCE" comment? */
1094
if (!strncasecmp(stmt, NO_LOAD_BALANCE, NO_LOAD_BALANCE_COMMENT_SZ) ||
1095
/* or the table used in a query is a temporary one ? */
1096
is_temp_table(backend, node))
1099
* From now on, let only master handle queries. This is
1100
* typically usefull for using temp tables in master/slave
1103
master_slave_was_enabled = 1;
1105
master_slave_dml = 1;
914
1109
portal = create_portal();
915
1110
if (portal == NULL)
917
1112
pool_error("Parse: create_portal() failed");
918
1114
return POOL_END;
944
1145
pending_prepared_portal = portal;
947
/* switch old memory context */
1149
* Switch to old memory context. Caution. Now we are in parser
1151
* Palloced memories will be gone if free_parser() called!
948
1153
pool_memory = old_context;
950
1155
if (REPLICATION)
1157
char *rewrite_query;
1158
bool rewrite_to_params = true;
1162
* if stmt is unnamed, we rewrite `now()' to timestamp constant.
1163
* else we rewrite `now()' to params and expand that at Bind
1167
rewrite_to_params = false;
1168
portal->num_tsparams = 0;
1169
rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, portal);
1170
if (rewrite_query != NULL)
1172
string = palloc(strlen(name) + strlen(rewrite_query) + 2);
1173
strcpy(string, name);
1174
strcpy(string + strlen(name) + 1, rewrite_query);
1175
memcpy(string + strlen(name) + strlen(rewrite_query) + 2,
1176
stmt + strlen(stmt) + 1,
1177
len - (strlen(name) + strlen(stmt) + 2));
1179
len = len - strlen(stmt) + strlen(rewrite_query);
1181
stmt = string + strlen(name) + 1;
1182
pool_debug("rewrite query %s %s len=%d", name, stmt, len);
954
1190
if (TSTATE(backend) != 'T')
966
1202
kind = pool_read_kind(backend);
967
1203
if (kind != 'Z')
968
1206
return POOL_END;
969
1209
if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
970
1212
return POOL_END;
973
1216
if (is_strict_query(node))
974
start_internal_transaction(backend, node);
1217
start_internal_transaction(frontend, backend, node);
976
1219
if (insert_stmt_with_lock)
978
1221
/* start a transaction if needed and lock the table */
979
status = insert_lock(backend, stmt, (InsertStmt *)node);
1222
status = insert_lock(frontend, backend, stmt, (InsertStmt *)node);
980
1223
if (status != POOL_CONTINUE)
989
1232
/* send to master node */
1233
snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
1234
per_node_statement_log(backend, MASTER_NODE_ID, per_node_statement_log_buffer);
990
1235
if (send_extended_protocol_message(backend, MASTER_NODE_ID,
991
1236
"P", len, string))
992
1239
return POOL_END;
1243
* Cannot call free_parser() here. Since "string" might be allocated in parser context.
994
1247
if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE)
996
/* We must synchronize because Parse message acquires table
1250
* We must synchronize because Parse message acquires table
999
pool_debug("waiting for master completing the query");
1000
if (synchronize(MASTER(backend)))
1253
pool_debug("Parse: waiting for master completing the query");
1254
if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
1256
/* Cancel current transaction */
1257
CancelPacket cancel_packet;
1259
cancel_packet.protoVersion = htonl(PROTO_CANCEL);
1260
cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
1261
cancel_packet.key= MASTER_CONNECTION(backend)->key;
1262
cancel_request(&cancel_packet);
1001
1264
return POOL_END;
1004
1268
* We must check deadlock error because a aborted transaction
1009
1273
deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
1010
1274
if (deadlock_detected < 0)
1011
1277
return POOL_END;
1282
* Check if other than dealock error detected. If so, emit
1283
* log. This is usefull when invalid encoding error occurs. In
1284
* this case, PostgreSQL does not report what statement caused
1285
* that error and make users confused.
1287
per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse(): Error or notice message from backend: ", true);
1013
1290
for (i=0;i<NUM_BACKENDS;i++)
1019
1296
pool_log("Parse: received deadlock error message from master node");
1298
per_node_statement_log(backend, i, POOL_ERROR_QUERY);
1021
1300
if (send_simplequery_message(CONNECTION(backend, i),
1022
1301
strlen(POOL_ERROR_QUERY)+1,
1023
1302
POOL_ERROR_QUERY,
1024
1303
MAJOR(backend)))
1027
else if (send_extended_protocol_message(backend, i,
1311
snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
1312
per_node_statement_log(backend, i, per_node_statement_log_buffer);
1314
if (send_extended_protocol_message(backend, i,"P", len, string))
1036
1326
if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1039
pool_debug("waiting for %dth backend completing the query", i);
1040
if (synchronize(CONNECTION(backend, i)))
1329
pool_debug("Parse: waiting for %dth backend completing the query", i);
1330
if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
1332
/* Cancel current transaction */
1333
CancelPacket cancel_packet;
1335
cancel_packet.protoVersion = htonl(PROTO_CANCEL);
1336
cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
1337
cancel_packet.key= MASTER_CONNECTION(backend)->key;
1338
cancel_request(&cancel_packet);
1041
1340
return POOL_END;
1344
* Check if error (or notice response) from backend is
1345
* detected. If so, emit log. This is usefull when
1346
* invalid encoding error occurs. In this case, PostgreSQL
1347
* does not report what statement caused that error and
1348
* make users confused.
1350
per_node_error_log(backend, i, stmt, "Parse(): Error or notice message from backend: ", true);
1355
* Ok. we are safe to call free_parser();
1047
1361
POOL_STATUS ret;
1257
1568
sp = MASTER_CONNECTION(backend)->sp;
1258
1569
if (MASTER(backend)->tstate == 'T')
1259
snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
1570
snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
1260
1571
sp->user, sp->database, remote_ps_data);
1262
snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
1573
snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
1263
1574
sp->user, sp->database, remote_ps_data);
1264
1575
set_ps_display(psbuf, false);
1484
1799
case 'P': /* Parse message */
1485
1800
allow_close_transaction = 0;
1803
(TSTATE(backend) != 'I' || receive_extended_begin))
1805
pool_debug("kind: %c master_slave_dml enabled", fkind);
1806
master_slave_was_enabled = 1;
1808
master_slave_dml = 1;
1486
1811
status = Parse(frontend, backend);
1814
case 'S': /* Sync message */
1490
1815
receive_extended_begin = 0;
1491
1816
/* fall through */
1494
if (MAJOR(backend) == PROTO_MAJOR_V3)
1819
if ((MAJOR(backend) == PROTO_MAJOR_V3) &&
1820
(fkind == 'S' || fkind == 'H' || fkind == 'D' || fkind == 'f'||
1821
fkind == 'C' || fkind == 'B' || fkind == 'F' || fkind == 'd' || fkind == 'c'))
1496
1823
if (MASTER_SLAVE &&
1497
1824
(TSTATE(backend) != 'I' || receive_extended_begin))
2374
2701
pool_error(msg->data, query);
2375
2702
free_string(msg);
2706
* Judge the table used in a query represented by node is a temporary
2709
static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node)
2712
* Query to know if pg_class has relistemp column or not.
2713
* PostgreSQL 8.4 or later has this.
2715
#define HASRELITEMPPQUERY "SELECT count(*) FROM pg_catalog.pg_class AS c, pg_attribute AS a WHERE c.relname = 'pg_class' AND a.attrelid = c.oid AND a.attname = 'relistemp'"
2718
* Query to know if the target table is a temporary one.
2719
* This query is valid through PostgreSQL 7.3 to 8.3.
2721
#define ISTEMPQUERY83 "SELECT count(*) FROM pg_class AS c, pg_namespace AS n WHERE c.relname = '%s' AND c.relnamespace = n.oid AND n.nspname ~ '^pg_temp_'"
2724
* Query to know if the target table is a temporary one.
2725
* This query is valid PostgreSQL 8.4 or later.
2727
#define ISTEMPQUERY84 "SELECT count(*) FROM pg_catalog.pg_class AS c WHERE c.relname = '%s' AND c.relistemp"
2732
static POOL_RELCACHE *hasrelistemp_cache;
2733
static POOL_RELCACHE *relcache;
2737
* For version 2 protocol, we cannot support the checking since
2738
* the underlying infrastructure (do_query) does not support the
2739
* protocol. So we just return false.
2741
if (MAJOR(backend) == PROTO_MAJOR_V2)
2744
/* For SELECT, it's hard to extract table names. So we always return 0 */
2745
if (IsA(node, SelectStmt))
2750
/* Obtain table name */
2751
if (IsA(node, InsertStmt))
2752
str = nodeToString(((InsertStmt *)node)->relation);
2753
else if (IsA(node, UpdateStmt))
2754
str = nodeToString(((UpdateStmt *)node)->relation);
2755
else if (IsA(node, DeleteStmt))
2756
str = nodeToString(((DeleteStmt *)node)->relation);
2757
else /* Unknown statement */
2766
* Check backend version
2768
if (!hasrelistemp_cache)
2770
hasrelistemp_cache = pool_create_relcache(32, HASRELITEMPPQUERY,
2771
int_register_func, int_unregister_func,
2773
if (hasrelistemp_cache == NULL)
2775
pool_error("is_temp_table: pool_create_relcache error");
2780
hasrelistemp = pool_search_relcache(hasrelistemp_cache, backend, "pg_class")==0?0:1;
2782
query = ISTEMPQUERY84;
2784
query = ISTEMPQUERY83;
2787
* If relcache does not exist, create it.
2791
relcache = pool_create_relcache(32, query,
2792
int_register_func, int_unregister_func,
2794
if (relcache == NULL)
2796
pool_error("is_temp_table: pool_create_relcache error");
2804
result = pool_search_relcache(relcache, backend, str)==0?0:1;
2809
* Make per DB node statement log
2811
void per_node_statement_log(POOL_CONNECTION_POOL *backend, int node_id, char *query)
2813
POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
2815
if (pool_config->log_per_node_statement)
2816
pool_log("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query);
2820
* Check kind and produce error message
2821
* All data read in this function is returned to stream.
2823
void per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query, char *prefix, bool unread)
2825
POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
2828
if (pool_extract_error_message(true, CONNECTION(backend, node_id), MAJOR(backend), true, &message) > 0)
2830
pool_log("%s: DB node id: %d backend pid: %d statement: %s message: %s",
2831
prefix, node_id, ntohl(slot->pid), query, message);