~ubuntu-branches/ubuntu/oneiric/pgpool2/oneiric

« back to all changes in this revision

Viewing changes to pool_proto_modules.c

  • Committer: Bazaar Package Importer
  • Author(s): Marc Gariepy
  • Date: 2010-02-17 13:58:08 UTC
  • mfrom: (1.1.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20100217135808-vqxtfe80r5z8toje
Tags: 2.3.2.1-0ubuntu1
* New upstream release (2.3.2.1)
 * Lots of bug fixes
 * Add SSL support
 * Add support for large object replication
 * Enhanced replication (TIMESTAMP, DATES)
 * Save node status on restart
 * Some other minor changes

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*-pgsql-c-*- */
2
2
/*
3
 
 * $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.6.2.7 2009/08/06 07:55:17 t-ishii Exp $
 
3
 * $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.35 2010/02/07 03:05:17 t-ishii Exp $
4
4
 * 
5
5
 * pgpool: a language independent connection pool server for PostgreSQL 
6
6
 * written by Tatsuo Ishii
7
7
 *
8
 
 * Copyright (c) 2003-2009      PgPool Global Development Group
 
8
 * Copyright (c) 2003-2010      PgPool Global Development Group
9
9
 *
10
10
 * Permission to use, copy, modify, and distribute this software and
11
11
 * its documentation for any purpose and without fee is hereby
45
45
 
46
46
#include "pool.h"
47
47
#include "pool_signal.h"
 
48
#include "pool_timestamp.h"
48
49
#include "pool_proto_modules.h"
49
50
#include "parser/pool_string.h"
50
51
 
68
69
 
69
70
/* non 0 if "BEGIN" query with extended query protocol received */
70
71
int receive_extended_begin = 0;
71
 
/* non 0 if allow to close internal transaction */
72
 
int allow_close_transaction = 1;
 
72
 
 
73
/*
 
74
 * Non 0 if allow to close internal transaction.  This variable was
 
75
 * introduced on 2008/4/3 not to close an internal transaction when
 
76
 * Sync message is received after receiving Parse message. This hack
 
77
 * is for PHP-PDO.
 
78
 */
 
79
static int allow_close_transaction = 1;
73
80
 
74
81
PreparedStatementList prepared_list; /* prepared statement name list */
75
82
 
90
97
 
91
98
static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
92
99
static void generate_error_message(char *prefix, int specific_error, char *query);
 
100
static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node);
93
101
 
94
 
POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend, 
 
102
POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
95
103
                                                                                POOL_CONNECTION_POOL *backend)
96
104
{
97
105
        int pid, pid1;
131
139
 * Process Query('Q') message
132
140
 * Query messages include a SQL string.
133
141
 */
134
 
 POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend, 
 
142
 POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
135
143
                                                 POOL_CONNECTION_POOL *backend, char *query)
136
144
{
137
145
        char *string, *string1;
231
239
 
232
240
                if (pool_config->parallel_mode)
233
241
                {
234
 
      /* The Query is analyzed first in a parallel mode(in_parallel_query), 
 
242
      /* The Query is analyzed first in a parallel mode(in_parallel_query),
235
243
       * and, next, the Query is rewritten(rewrite_query_stmt).
236
244
       */
237
 
 
 
245
 
238
246
                        /* analyze the query */
239
247
                        RewriteQuery *r_query = is_parallel_query(node,backend);
240
248
 
241
249
                        if(r_query->is_loadbalance)
242
250
                        {
243
 
        /* Usual processing of pgpool is done by using the rewritten Query 
244
 
         * if judged a possible load-balancing as a result of analyzing 
245
 
         * the Query. 
246
 
         * Of course, the load is distributed only for load_balance_mode=true. 
 
251
        /* Usual processing of pgpool is done by using the rewritten Query
 
252
         * if judged a possible load-balancing as a result of analyzing
 
253
         * the Query.
 
254
         * Of course, the load is distributed only for load_balance_mode=true.
247
255
         */
248
256
                                if(r_query->r_code ==  SEND_LOADBALANCE_ENGINE)
249
257
                                {
255
263
                                pool_debug("SimpleQuery: loadbalance_query =%s",string);
256
264
                        }
257
265
                        else if (r_query->is_parallel)
258
 
                        { 
259
 
                                /* 
 
266
                        {
 
267
                                /*
260
268
                                 * For the Query that the parallel processing is possible.
261
269
                                 * Call parallel exe engine and return status to the upper layer.
262
270
                                 */
343
351
                        IsA(node, VariableSetStmt) || IsA(node, DiscardStmt))
344
352
                {
345
353
                        /*
346
 
                         * PREPARE, DEALLOCATE and SET statements must be replicated.
 
354
                         * PREPARE, SET, DEALLOCATE and DISCARD statements must be
 
355
                         * replicated even if we are in master/slave mode.
347
356
                         */
348
357
                        if (MASTER_SLAVE && TSTATE(backend) != 'E')
349
358
                                force_replication = 1;
477
486
                if (REPLICATION)
478
487
                {
479
488
                        /* start a transaction if needed */
480
 
                        if (start_internal_transaction(backend, (Node *)node) != POOL_CONTINUE)
 
489
                        if (start_internal_transaction(frontend, backend, (Node *)node) != POOL_CONTINUE)
481
490
                                return POOL_END;
482
491
 
483
492
                        /* check if need lock */
484
493
                        if (need_insert_lock(backend, string, node))
485
494
                        {
486
495
                                /* if so, issue lock command */
487
 
                                status = insert_lock(backend, string, (InsertStmt *)node);
 
496
                                status = insert_lock(frontend, backend, string, (InsertStmt *)node);
488
497
                                if (status != POOL_CONTINUE)
489
498
                                {
490
499
                                        free_parser();
492
501
                                }
493
502
                        }
494
503
                }
495
 
                else if (REPLICATION && query == NULL && start_internal_transaction(backend, node))
 
504
                else if (REPLICATION && query == NULL && start_internal_transaction(frontend, backend, node))
496
505
                {
497
506
                        free_parser();
498
507
                        return POOL_ERROR;
506
515
                        master_slave_was_enabled = 1;
507
516
                        MASTER_SLAVE = 0;
508
517
                        master_slave_dml = 1;
509
 
                }               
 
518
                }
510
519
        }
511
520
 
512
521
        if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
518
527
        {
519
528
                /* check if query is "COMMIT" or "ROLLBACK" */
520
529
                commit = is_commit_query(node);
521
 
                free_parser();
522
530
 
523
531
                /*
524
532
                 * Query is not commit/rollback
525
533
                 */
526
534
                if (!commit)
527
535
                {
 
536
                        char            *rewrite_query;
 
537
 
 
538
                        if (node)
 
539
                        {
 
540
                                Portal *portal = NULL;
 
541
 
 
542
                                if (IsA(node, PrepareStmt))
 
543
                                {
 
544
                                        portal = pending_prepared_portal;
 
545
                                        portal->num_tsparams = 0;
 
546
                                }
 
547
                                else if (IsA(node, ExecuteStmt))
 
548
                                        portal = lookup_prepared_statement_by_statement(
 
549
                                                        &prepared_list, ((ExecuteStmt *) node)->name);
 
550
 
 
551
                                /* rewrite `now()' to timestamp literal */
 
552
                                rewrite_query = rewrite_timestamp(backend, node, false, portal);
 
553
                                if (rewrite_query != NULL)
 
554
                                {
 
555
                                        string = rewrite_query;
 
556
                                        len = strlen(string) + 1;
 
557
                                }
 
558
 
 
559
                        }
 
560
 
 
561
                        /*
 
562
                         * Optimization effort: If there's only one session, we do
 
563
                         * not need to wait for the master node's response, and
 
564
                         * could execute a query concurrently.
 
565
                         */
 
566
                        if (pool_config->num_init_children == 1)
 
567
                        {
 
568
                                /* Send query to DB nodes */
 
569
                                for (i=0;i<NUM_BACKENDS;i++)
 
570
                                {
 
571
                                        if (!VALID_BACKEND(i))
 
572
                                                continue;
 
573
 
 
574
                                        per_node_statement_log(backend, i, string);
 
575
 
 
576
                                        if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE)
 
577
                                        {
 
578
                                                free_parser();
 
579
                                                return POOL_END;
 
580
                                        }
 
581
                                }
 
582
 
 
583
                                /* Wait for response from DB nodes */
 
584
                                for (i=0;i<NUM_BACKENDS;i++)
 
585
                                {
 
586
                                        if (!VALID_BACKEND(i))
 
587
                                                continue;
 
588
 
 
589
                                        if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
 
590
                                        {
 
591
                                                /* Cancel current transaction */
 
592
                                                CancelPacket cancel_packet;
 
593
 
 
594
                                                cancel_packet.protoVersion = htonl(PROTO_CANCEL);
 
595
                                                cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
 
596
                                                cancel_packet.key= MASTER_CONNECTION(backend)->key;
 
597
                                                cancel_request(&cancel_packet);
 
598
 
 
599
                                                free_parser();
 
600
                                                return POOL_END;
 
601
                                        }
 
602
 
 
603
                                        /*
 
604
                                         * Check if some error detected.  If so, emit
 
605
                                         * log. This is usefull when invalid encoding error
 
606
                                         * occurs. In this case, PostgreSQL does not report
 
607
                                         * what statement caused that error and make users
 
608
                                         * confused.
 
609
                                         */
 
610
                                        per_node_error_log(backend, i, string, "SimpleQuery: Error or notice message from backend: ", true);
 
611
 
 
612
                                }
 
613
                                if (commit)
 
614
                                {
 
615
                                        TSTATE(backend) = 'I';
 
616
                                }
 
617
                                free_parser();
 
618
                                return POOL_CONTINUE;
 
619
                        }
 
620
 
528
621
                        /* Send the query to master node */
529
622
 
 
623
                        per_node_statement_log(backend, MASTER_NODE_ID, string);
 
624
 
530
625
                        if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
 
626
                        {
 
627
                                free_parser();
531
628
                                return POOL_END;
 
629
                        }
532
630
 
533
 
                        if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
 
631
                        if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
534
632
                        {
535
633
                                /* Cancel current transaction */
536
634
                                CancelPacket cancel_packet;
540
638
                                cancel_packet.key= MASTER_CONNECTION(backend)->key;
541
639
                                cancel_request(&cancel_packet);
542
640
 
 
641
                                free_parser();
543
642
                                return POOL_END;
544
643
                        }
545
644
 
554
653
                                string = POOL_ERROR_QUERY;
555
654
                                len = strlen(string) + 1;
556
655
                        }
 
656
                        else
 
657
                        {
 
658
                                /*
 
659
                                 * Check if some error detected.  If so, emit
 
660
                                 * log. This is usefull when invalid encoding error
 
661
                                 * occurs. In this case, PostgreSQL does not report
 
662
                                 * what statement caused that error and make users
 
663
                                 * confused.
 
664
                                 */
 
665
                                per_node_error_log(backend, MASTER_NODE_ID, string, "SimpleQuery: Error or notice message from backend: ", true);
 
666
                        }
557
667
                }
558
668
 
559
669
                /* send query to other than master nodes */
562
672
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
563
673
                                continue;
564
674
 
 
675
                        per_node_statement_log(backend, i, string);
 
676
 
565
677
                        if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE)
 
678
                        {
 
679
                                free_parser();
566
680
                                return POOL_END;
 
681
                        }
567
682
                }
568
683
 
569
684
                /* Wait for nodes othan than the master node */
572
687
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
573
688
                                continue;
574
689
 
575
 
                        if (wait_for_query_response(frontend, CONNECTION(backend, i), string) != POOL_CONTINUE)
 
690
                        if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
576
691
                        {
577
692
                                /* Cancel current transaction */
578
693
                                CancelPacket cancel_packet;
582
697
                                cancel_packet.key= MASTER_CONNECTION(backend)->key;
583
698
                                cancel_request(&cancel_packet);
584
699
 
 
700
                                free_parser();
585
701
                                return POOL_END;
586
702
                        }
 
703
 
 
704
                        /*
 
705
                         * Check if some error detected.  If so, emit
 
706
                         * log. This is usefull when invalid encoding error
 
707
                         * occurs. In this case, PostgreSQL does not report
 
708
                         1* what statement caused that error and make users
 
709
                         * confused.
 
710
                         */
 
711
                        per_node_error_log(backend, i, string, "SimpleQuery: Error or notice message from backend: ", true);
 
712
 
587
713
                }
588
714
 
589
715
                /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
590
716
                if (commit)
591
717
                {
 
718
                        per_node_statement_log(backend, MASTER_NODE_ID, string);
 
719
 
592
720
                        if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
 
721
                        {
 
722
                                free_parser();
593
723
                                return POOL_END;
 
724
                        }
594
725
 
595
 
                        if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
 
726
                        if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
596
727
                        {
597
728
                                /* Cancel current transaction */
598
729
                                CancelPacket cancel_packet;
602
733
                                cancel_packet.key= MASTER_CONNECTION(backend)->key;
603
734
                                cancel_request(&cancel_packet);
604
735
 
 
736
                                free_parser();
605
737
                                return POOL_END;
606
738
                        }
607
739
 
 
740
                        /*
 
741
                         * Check if some error detected.  If so, emit
 
742
                         * log. This is usefull when invalid encoding error
 
743
                         * occurs. In this case, PostgreSQL does not report
 
744
                         1* what statement caused that error and make users
 
745
                         * confused.
 
746
                         */
 
747
                        per_node_error_log(backend, MASTER_NODE_ID, string, "SimpleQuery: Error or notice message from backend: ", true);
608
748
 
609
749
                        TSTATE(backend) = 'I';
610
750
                }
 
751
                free_parser();
611
752
        }
612
753
        else
613
754
        {
614
 
                free_parser();
 
755
                per_node_statement_log(backend, MASTER_NODE_ID, string);
 
756
 
615
757
                if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
616
758
                        return POOL_END;
617
759
 
618
 
                if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
 
760
                if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
619
761
                {
620
762
                                /* Cancel current transaction */
621
763
                                CancelPacket cancel_packet;
625
767
                                cancel_packet.key= MASTER_CONNECTION(backend)->key;
626
768
                                cancel_request(&cancel_packet);
627
769
 
 
770
                                free_parser();
628
771
                                return POOL_END;
629
772
                }
 
773
 
 
774
                /*
 
775
                 * Check if some error detected.  If so, emit
 
776
                 * log. This is usefull when invalid encoding error
 
777
                 * occurs. In this case, PostgreSQL does not report
 
778
                 1* what statement caused that error and make users
 
779
                 * confused.
 
780
                 */
 
781
                per_node_error_log(backend, MASTER_NODE_ID, string, "SimpleQuery: Error or notice message from backend: ", true);
 
782
 
 
783
                free_parser();
630
784
        }
631
785
 
632
786
        return POOL_CONTINUE;
635
789
/*
636
790
 * process EXECUTE (V3 only)
637
791
 */
638
 
POOL_STATUS Execute(POOL_CONNECTION *frontend, 
 
792
POOL_STATUS Execute(POOL_CONNECTION *frontend,
639
793
                                                   POOL_CONNECTION_POOL *backend)
640
794
{
641
795
        char *string;           /* portal name + null terminate + max_tobe_returned_rows */
644
798
        char kind;
645
799
        int status, commit = 0;
646
800
        Portal *portal;
647
 
        char *string1;
 
801
        char *string1 = NULL;
648
802
        PrepareStmt *p_stmt;
649
803
        POOL_STATUS ret;
650
804
        int specific_error = 0;
669
823
                p_stmt = (PrepareStmt *)portal->stmt;
670
824
 
671
825
                string1 = portal->sql_string;
 
826
                pool_debug("Execute: query: %s", string1);
672
827
                node = (Node *)p_stmt->query;
673
828
                strncpy(query_string_buffer, string1, sizeof(query_string_buffer));
674
829
 
675
 
                if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
676
 
                         IsA(node, VariableSetStmt)) &&
677
 
                        MASTER_SLAVE && TSTATE(backend) != 'E')
 
830
                if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
 
831
                         IsA(node, VariableSetStmt)) &&
 
832
                        MASTER_SLAVE && TSTATE(backend) != 'E')
678
833
                {
 
834
                        /*
 
835
                         * PREPARE, DEALLOCATE, SET, DISCARD
 
836
                         * should be executed on all nodes.  So we set
 
837
                         * force_replication.
 
838
                         */
679
839
                        force_replication = 1;
680
840
                }
681
841
                /*
682
 
                 * JDBC driver sends "BEGIN" query internally if setAutoCommit(false).
683
 
                 * But it does not send Sync message after "BEGIN" query.
684
 
                 * In extended query protocol, PostgreSQL returns
685
 
                 * ReadyForQuery when a client sends Sync message.
686
 
                 * We can't know a transaction state...
687
 
                 * So pgpool send Sync message internally.
 
842
                 * JDBC driver sends "BEGIN" query internally if
 
843
                 * setAutoCommit(false).  But it does not send Sync message
 
844
                 * after "BEGIN" query.  In extended query protocol,
 
845
                 * PostgreSQL returns ReadyForQuery when a client sends Sync
 
846
                 * message.  Problem is, pgpool can't know the transaction
 
847
                 * state without receiving ReadyForQuery. So we remember that
 
848
                 * we need to send Sync message internally afterward, whenever
 
849
                 * we receive BEGIN in extended protocol.
688
850
                 */
689
851
                else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
690
852
                {
692
854
 
693
855
                        if (stmt->kind == TRANS_STMT_BEGIN ||
694
856
                                stmt->kind == TRANS_STMT_START)
695
 
                                receive_extended_begin = true;
 
857
                                /* Remember we need to send sync later in extended protocol */
 
858
                                receive_extended_begin = 1;
696
859
                }
697
860
 
698
861
                if (load_balance_enabled(backend, node, string1))
740
903
                if (!commit)
741
904
                {
742
905
                        /* Send the query to master node */
743
 
 
 
906
                        per_node_statement_log(backend, MASTER_NODE_ID, string1);
744
907
                        if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
745
908
                                return POOL_END;
746
909
 
747
 
                        if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
 
910
                        if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
748
911
                        {
749
912
                                /* Cancel current transaction */
750
913
                                CancelPacket cancel_packet;
782
945
                                if (send_execute_message(backend, i, len + 5, msg))
783
946
                                        return POOL_END;
784
947
                        }
785
 
                        else if (send_execute_message(backend, i, len, string) != POOL_CONTINUE)
786
 
                                return POOL_END;
 
948
                        else
 
949
                        {
 
950
                                per_node_statement_log(backend, i, string1);
 
951
                                if (send_execute_message(backend, i, len, string) != POOL_CONTINUE)
 
952
                                        return POOL_END;
 
953
                        }
787
954
                }
788
955
 
789
956
                /* Wait for nodes other than the master node */
792
959
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
793
960
                                continue;
794
961
 
795
 
                        if (wait_for_query_response(frontend, CONNECTION(backend, i), string) != POOL_CONTINUE)
 
962
                        if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
796
963
                        {
797
964
                                /* Cancel current transaction */
798
965
                                CancelPacket cancel_packet;
809
976
                /* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
810
977
                if (commit)
811
978
                {
 
979
                        per_node_statement_log(backend, MASTER_NODE_ID, string1);
 
980
 
812
981
                        if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
813
982
                                return POOL_END;
814
983
 
815
 
                        if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
 
984
                        if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
816
985
                        {
817
986
                                /* Cancel current transaction */
818
987
                                CancelPacket cancel_packet;
828
997
        }
829
998
        else
830
999
        {
 
1000
                per_node_statement_log(backend, MASTER_NODE_ID, string1);
 
1001
 
831
1002
                if (send_execute_message(backend, MASTER_NODE_ID, len, string) != POOL_CONTINUE)
832
1003
                        return POOL_END;
833
1004
 
834
 
                if (wait_for_query_response(frontend, MASTER(backend), string) != POOL_CONTINUE)
 
1005
                if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
835
1006
                {
836
1007
                                /* Cancel current transaction */
837
1008
                                CancelPacket cancel_packet;
871
1042
/*
872
1043
 * process Parse (V3 only)
873
1044
 */
874
 
POOL_STATUS Parse(POOL_CONNECTION *frontend, 
 
1045
POOL_STATUS Parse(POOL_CONNECTION *frontend,
875
1046
                                                 POOL_CONNECTION_POOL *backend)
876
1047
{
877
1048
        char kind;
887
1058
        int deadlock_detected = 0;
888
1059
        int insert_stmt_with_lock = 0;
889
1060
        POOL_STATUS status;
 
1061
        char per_node_statement_log_buffer[1024];
890
1062
 
891
1063
        /* read Parse packet */
892
1064
        if (pool_read(frontend, &len, sizeof(len)) < 0)
895
1067
        len = ntohl(len) - 4;
896
1068
        string = pool_read2(frontend, len);
897
1069
 
898
 
        pool_debug("Parse: portal name <%s>", string);
 
1070
        pool_debug("Parse: statement name <%s>", string);
899
1071
 
900
1072
        name = string;
901
1073
        stmt = string + strlen(string) + 1;
903
1075
        parse_tree_list = raw_parser(stmt);
904
1076
        if (parse_tree_list == NIL)
905
1077
        {
906
 
                free_parser();
 
1078
                /* free_parser(); */
 
1079
                ;
907
1080
        }
908
1081
        else
909
1082
        {
 
1083
                /* Save last query string for logging purpose */
 
1084
                snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
 
1085
 
910
1086
                node = (Node *) lfirst(list_head(parse_tree_list));
911
1087
 
912
1088
                insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
913
1089
 
 
1090
                /* Special treatment for master/slave + temp tables */
 
1091
                if (MASTER_SLAVE)
 
1092
                {
 
1093
                        /* Is there "NO LOAD BALANCE" comment? */
 
1094
                        if (!strncasecmp(stmt, NO_LOAD_BALANCE, NO_LOAD_BALANCE_COMMENT_SZ) ||
 
1095
                                /* or the table used in a query is a temporary one ? */
 
1096
                                is_temp_table(backend, node))
 
1097
                        {
 
1098
                                /*
 
1099
                                 * From now on, let only master handle queries.  This is
 
1100
                                 * typically usefull for using temp tables in master/slave
 
1101
                                 * mode
 
1102
                                 */
 
1103
                                master_slave_was_enabled = 1;
 
1104
                                MASTER_SLAVE = 0;
 
1105
                                master_slave_dml = 1;
 
1106
                        }
 
1107
                }
 
1108
 
914
1109
                portal = create_portal();
915
1110
                if (portal == NULL)
916
1111
                {
917
1112
                        pool_error("Parse: create_portal() failed");
 
1113
                        free_parser();
918
1114
                        return POOL_END;
919
1115
                }
920
1116
 
925
1121
                /* translate Parse message to PrepareStmt */
926
1122
                p_stmt = palloc(sizeof(PrepareStmt));
927
1123
                p_stmt->type = T_PrepareStmt;
 
1124
 
 
1125
                /* XXX: there's a confusion here. Someone mixed up statement
 
1126
                 * name with portal name. It is regarded that statment name ==
 
1127
                 * portal name. Someday we should fix this. Sigh.
 
1128
                 */
928
1129
                p_stmt->name = pstrdup(name);
929
1130
                p_stmt->query = copyObject(node);
930
1131
                portal->stmt = (Node *)p_stmt;
944
1145
                        pending_prepared_portal = portal;
945
1146
                }
946
1147
 
947
 
                /* switch old memory context */
 
1148
                /*
 
1149
                 * Switch to old memory context. Caution. Now we are in parser
 
1150
                 * memory context.
 
1151
                 * Palloced memories will be gone if free_parser() called!
 
1152
                 */
948
1153
                pool_memory = old_context;
949
1154
 
950
1155
                if (REPLICATION)
951
1156
                {
 
1157
                        char            *rewrite_query;
 
1158
                        bool             rewrite_to_params = true;
 
1159
 
 
1160
                        /*
 
1161
                         * rewrite `now()'.
 
1162
                         * if stmt is unnamed, we rewrite `now()' to timestamp constant.
 
1163
                         * else we rewrite `now()' to params and expand that at Bind
 
1164
                         * message.
 
1165
                         */
 
1166
                        if (*name == '\0')
 
1167
                                rewrite_to_params = false;
 
1168
                        portal->num_tsparams = 0;
 
1169
                        rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, portal);
 
1170
                        if (rewrite_query != NULL)
 
1171
                        {
 
1172
                                string = palloc(strlen(name) + strlen(rewrite_query) + 2);
 
1173
                                strcpy(string, name);
 
1174
                                strcpy(string + strlen(name) + 1, rewrite_query);
 
1175
                                memcpy(string + strlen(name) + strlen(rewrite_query) + 2,
 
1176
                                                stmt + strlen(stmt) + 1,
 
1177
                                                len - (strlen(name) + strlen(stmt) + 2));
 
1178
 
 
1179
                                len = len - strlen(stmt) + strlen(rewrite_query);
 
1180
                                name = string;
 
1181
                                stmt = string + strlen(name) + 1;
 
1182
                                pool_debug("rewrite query  %s %s len=%d", name, stmt, len);
 
1183
                        }
 
1184
                }
 
1185
 
 
1186
                if (REPLICATION)
 
1187
                {
952
1188
                        char kind;
953
1189
 
954
1190
                        if (TSTATE(backend) != 'T')
965
1201
 
966
1202
                                kind = pool_read_kind(backend);
967
1203
                                if (kind != 'Z')
 
1204
                                {
 
1205
                                        free_parser();
968
1206
                                        return POOL_END;
 
1207
                                }
 
1208
 
969
1209
                                if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
 
1210
                                {
 
1211
                                        free_parser();
970
1212
                                        return POOL_END;
 
1213
                                }
971
1214
                        }
972
1215
 
973
1216
                        if (is_strict_query(node))
974
 
                                start_internal_transaction(backend, node);
 
1217
                                start_internal_transaction(frontend, backend, node);
975
1218
 
976
1219
                        if (insert_stmt_with_lock)
977
1220
                        {
978
1221
                                /* start a transaction if needed and lock the table */
979
 
                                status = insert_lock(backend, stmt, (InsertStmt *)node);
 
1222
                                status = insert_lock(frontend, backend, stmt, (InsertStmt *)node);
980
1223
                                if (status != POOL_CONTINUE)
981
1224
                                {
 
1225
                                        free_parser();
982
1226
                                        return status;
983
1227
                                }
984
1228
                        }
985
1229
                }
986
 
                free_parser();
987
1230
        }
988
1231
 
989
1232
        /* send to master node */
 
1233
        snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
 
1234
        per_node_statement_log(backend, MASTER_NODE_ID, per_node_statement_log_buffer);
990
1235
        if (send_extended_protocol_message(backend, MASTER_NODE_ID,
991
1236
                                                                           "P", len, string))
 
1237
        {
 
1238
                free_parser();
992
1239
                return POOL_END;
 
1240
        }
 
1241
 
 
1242
        /*
 
1243
         * Cannot call free_parser() here. Since "string" might be allocated in parser context.
 
1244
         * free_parser();
 
1245
         */
993
1246
 
994
1247
        if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE)
995
1248
        {
996
 
                /* We must synchronize because Parse message acquires table
 
1249
                /*
 
1250
                 * We must synchronize because Parse message acquires table
997
1251
                 * locks.
998
1252
                 */
999
 
                pool_debug("waiting for master completing the query");
1000
 
                if (synchronize(MASTER(backend)))
 
1253
                pool_debug("Parse: waiting for master completing the query");
 
1254
                if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
 
1255
                {
 
1256
                        /* Cancel current transaction */
 
1257
                        CancelPacket cancel_packet;
 
1258
 
 
1259
                        cancel_packet.protoVersion = htonl(PROTO_CANCEL);
 
1260
                        cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
 
1261
                        cancel_packet.key= MASTER_CONNECTION(backend)->key;
 
1262
                        cancel_request(&cancel_packet);
 
1263
                        free_parser();
1001
1264
                        return POOL_END;
 
1265
                }
1002
1266
 
1003
1267
                /*
1004
1268
                 * We must check deadlock error because a aborted transaction
1008
1272
                 */
1009
1273
                deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
1010
1274
                if (deadlock_detected < 0)
 
1275
                {
 
1276
                        free_parser();
1011
1277
                        return POOL_END;
 
1278
                }
 
1279
                else
 
1280
                {
 
1281
                        /*
 
1282
                         * Check if other than dealock error detected.  If so, emit
 
1283
                         * log. This is usefull when invalid encoding error occurs. In
 
1284
                         * this case, PostgreSQL does not report what statement caused
 
1285
                         * that error and make users confused.
 
1286
                         */
 
1287
                        per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse(): Error or notice message from backend: ", true);
 
1288
                }
1012
1289
 
1013
1290
                for (i=0;i<NUM_BACKENDS;i++)
1014
1291
                {
1018
1295
                                {
1019
1296
                                        pool_log("Parse: received deadlock error message from master node");
1020
1297
 
 
1298
                                        per_node_statement_log(backend, i, POOL_ERROR_QUERY);
 
1299
 
1021
1300
                                        if (send_simplequery_message(CONNECTION(backend, i),
1022
1301
                                                                                                 strlen(POOL_ERROR_QUERY)+1,
1023
1302
                                                                                                 POOL_ERROR_QUERY,
1024
1303
                                                                                                 MAJOR(backend)))
1025
 
                                                return POOL_END;
1026
 
                                }
1027
 
                                else if (send_extended_protocol_message(backend, i,
1028
 
                                                                                                                "P", len, string))
1029
 
                                        return POOL_END;
 
1304
                                        {
 
1305
                                                free_parser();
 
1306
                                                return POOL_END;
 
1307
                                        }
 
1308
                                }
 
1309
                                else
 
1310
                                {
 
1311
                                        snprintf(per_node_statement_log_buffer, sizeof(per_node_statement_log_buffer), "Parse: %s", stmt);
 
1312
                                        per_node_statement_log(backend, i, per_node_statement_log_buffer);
 
1313
 
 
1314
                                        if (send_extended_protocol_message(backend, i,"P", len, string))
 
1315
                                        {
 
1316
                                                free_parser();
 
1317
                                                return POOL_END;
 
1318
                                        }
 
1319
                                }
1030
1320
                        }
1031
1321
                }
1032
1322
 
1036
1326
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1037
1327
                                continue;
1038
1328
 
1039
 
                        pool_debug("waiting for %dth backend completing the query", i);
1040
 
                        if (synchronize(CONNECTION(backend, i)))
 
1329
                        pool_debug("Parse: waiting for %dth backend completing the query", i);
 
1330
                        if (wait_for_query_response(frontend, CONNECTION(backend, i), string, MAJOR(backend)) != POOL_CONTINUE)
 
1331
                        {
 
1332
                                /* Cancel current transaction */
 
1333
                                CancelPacket cancel_packet;
 
1334
 
 
1335
                                cancel_packet.protoVersion = htonl(PROTO_CANCEL);
 
1336
                                cancel_packet.pid = MASTER_CONNECTION(backend)->pid;
 
1337
                                cancel_packet.key= MASTER_CONNECTION(backend)->key;
 
1338
                                cancel_request(&cancel_packet);
 
1339
                                free_parser();
1041
1340
                                return POOL_END;
 
1341
                        }
 
1342
 
 
1343
                        /*
 
1344
                         * Check if error (or notice response) from backend is
 
1345
                         * detected.  If so, emit log. This is usefull when
 
1346
                         * invalid encoding error occurs. In this case, PostgreSQL
 
1347
                         * does not report what statement caused that error and
 
1348
                         * make users confused.
 
1349
                         */
 
1350
                        per_node_error_log(backend, i, stmt, "Parse(): Error or notice message from backend: ", true);
1042
1351
                }
1043
1352
        }
1044
1353
 
 
1354
        /*
 
1355
         * Ok. we are safe to call free_parser();
 
1356
         */
 
1357
        free_parser();
 
1358
 
1045
1359
        for (;;)
1046
1360
        {
1047
1361
                POOL_STATUS ret;
1049
1363
 
1050
1364
                if (ret != POOL_CONTINUE)
1051
1365
                        return ret;
1052
 
                        
 
1366
 
1053
1367
                SimpleForwardToFrontend(kind, frontend, backend);
1054
 
                if (pool_flush(frontend) < 0)
1055
 
                        return POOL_ERROR;
 
1368
                pool_flush(frontend);
1056
1369
 
1057
1370
                /* Ignore warning messages */
1058
1371
                if (kind != 'N')
1068
1381
 *       to all DB nodes to abort transaction.
1069
1382
 * - internal transaction is closed
1070
1383
 */
1071
 
POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, 
 
1384
POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
1072
1385
                                                                 POOL_CONNECTION_POOL *backend, int send_ready)
1073
1386
{
1074
1387
        StartupPacket *sp;
1137
1450
                mismatch_ntuples = 0;
1138
1451
        }
1139
1452
 
1140
 
        /* 
 
1453
        /*
1141
1454
         * if a transaction is started for insert lock, we need to close
1142
1455
         * the transaction.
1143
1456
         */
1163
1476
                        pool_debug("ReadyForQuery: transaction state: %c", state);
1164
1477
                }
1165
1478
 
1166
 
                if (end_internal_transaction(backend) != POOL_CONTINUE)
 
1479
                if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
1167
1480
                        return POOL_ERROR;
1168
1481
        }
1169
1482
 
1191
1504
                        {
1192
1505
                                if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
1193
1506
                                        continue;
1194
 
                                
 
1507
 
1195
1508
                                if (pool_read(CONNECTION(backend, i), &kind1, sizeof(kind)))
1196
1509
                                        return POOL_END;
1197
1510
                        }
1226
1539
                        pool_write(frontend, &len, sizeof(len));
1227
1540
                        pool_write(frontend, &state, 1);
1228
1541
                }
1229
 
 
1230
 
                if (pool_flush(frontend))
1231
 
                        return POOL_END;
 
1542
                pool_flush(frontend);
1232
1543
        }
1233
1544
 
1234
1545
        in_progress = 0;
1235
1546
 
1236
1547
        /* end load balance mode */
1237
1548
        if (in_load_balance)
1238
 
                end_load_balance(backend);
 
1549
                end_load_balance();
1239
1550
 
1240
1551
        if (master_slave_dml)
1241
1552
        {
1256
1567
 
1257
1568
        sp = MASTER_CONNECTION(backend)->sp;
1258
1569
        if (MASTER(backend)->tstate == 'T')
1259
 
                snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction", 
 
1570
                snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction",
1260
1571
                                 sp->user, sp->database, remote_ps_data);
1261
1572
        else
1262
 
                snprintf(psbuf, sizeof(psbuf), "%s %s %s idle", 
 
1573
                snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
1263
1574
                                 sp->user, sp->database, remote_ps_data);
1264
1575
        set_ps_display(psbuf, false);
1265
1576
 
1267
1578
}
1268
1579
 
1269
1580
 
1270
 
POOL_STATUS FunctionCall(POOL_CONNECTION *frontend, 
 
1581
POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
1271
1582
                                                                POOL_CONNECTION_POOL *backend)
1272
1583
{
1273
1584
        char dummy[2];
1364
1675
        return POOL_CONTINUE;
1365
1676
}
1366
1677
 
1367
 
POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend, 
 
1678
POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend,
1368
1679
                                                                                  POOL_CONNECTION_POOL *backend)
1369
1680
{
1370
1681
        char dummy;
1426
1737
        return pool_flush(frontend);
1427
1738
}
1428
1739
 
1429
 
POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend, 
 
1740
POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
1430
1741
                                                                                   POOL_CONNECTION_POOL *backend)
1431
1742
{
1432
1743
        char fkind;
1445
1756
 
1446
1757
        pool_debug("read kind from frontend %c(%02x)", fkind, fkind);
1447
1758
 
 
1759
        /*
 
1760
         * If we have received BEGIN in extended protocol before, we need
 
1761
         * to send a sync message to know the transaction stare.
 
1762
         */
1448
1763
        if (receive_extended_begin)
1449
1764
        {
1450
1765
                receive_extended_begin = 0;
1483
1798
 
1484
1799
                case 'P':  /* Parse message */
1485
1800
                        allow_close_transaction = 0;
 
1801
 
 
1802
                        if (MASTER_SLAVE &&
 
1803
                                (TSTATE(backend) != 'I' || receive_extended_begin))
 
1804
                        {
 
1805
                                pool_debug("kind: %c master_slave_dml enabled", fkind);
 
1806
                                master_slave_was_enabled = 1;
 
1807
                                MASTER_SLAVE = 0;
 
1808
                                master_slave_dml = 1;
 
1809
                        }
 
1810
 
1486
1811
                        status = Parse(frontend, backend);
1487
1812
                        break;
1488
1813
 
1489
 
                case 'S':
 
1814
                case 'S':  /* Sync message */
1490
1815
                        receive_extended_begin = 0;
1491
1816
                        /* fall through */
1492
1817
 
1493
1818
                default:
1494
 
                        if (MAJOR(backend) == PROTO_MAJOR_V3)
 
1819
                        if ((MAJOR(backend) == PROTO_MAJOR_V3) &&
 
1820
                            (fkind == 'S' || fkind == 'H' || fkind == 'D' || fkind == 'f'||
 
1821
                                 fkind == 'C' || fkind == 'B' || fkind == 'F' || fkind == 'd' || fkind == 'c'))
1495
1822
                        {
1496
1823
                                if (MASTER_SLAVE &&
1497
1824
                                        (TSTATE(backend) != 'I' || receive_extended_begin))
1501
1828
                                        MASTER_SLAVE = 0;
1502
1829
                                        master_slave_dml = 1;
1503
1830
                                }
1504
 
 
 
1831
        
1505
1832
                                status = SimpleForwardToBackend(fkind, frontend, backend);
1506
1833
                                for (i=0;i<NUM_BACKENDS;i++)
1507
1834
                                {
1527
1854
        return status;
1528
1855
}
1529
1856
 
1530
 
POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend, 
 
1857
POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend,
1531
1858
                                                                                   POOL_CONNECTION_POOL *backend)
1532
1859
{
1533
1860
        int i;
1556
1883
                {
1557
1884
                        pool_debug("Complete Command Response: message length does not match between master(%d \"%s\",) and %d th server (%d \"%s\",)",
1558
1885
                                           len, string, i, len1, string1);
1559
 
                        
 
1886
 
1560
1887
                        free(string1);
1561
1888
                        return POOL_END;
1562
1889
                }
1574
1901
        return pool_flush(frontend);
1575
1902
}
1576
1903
 
1577
 
int RowDescription(POOL_CONNECTION *frontend, 
 
1904
int RowDescription(POOL_CONNECTION *frontend,
1578
1905
                                                  POOL_CONNECTION_POOL *backend,
1579
1906
                                                  short *result)
1580
1907
{
1701
2028
        return pool_flush(frontend);
1702
2029
}
1703
2030
 
1704
 
POOL_STATUS AsciiRow(POOL_CONNECTION *frontend, 
 
2031
POOL_STATUS AsciiRow(POOL_CONNECTION *frontend,
1705
2032
                                                        POOL_CONNECTION_POOL *backend,
1706
2033
                                                        short num_fields)
1707
2034
{
1813
2140
        return POOL_CONTINUE;
1814
2141
}
1815
2142
 
1816
 
POOL_STATUS BinaryRow(POOL_CONNECTION *frontend, 
 
2143
POOL_STATUS BinaryRow(POOL_CONNECTION *frontend,
1817
2144
                                                         POOL_CONNECTION_POOL *backend,
1818
2145
                                                         short num_fields)
1819
2146
{
1865
2192
                {
1866
2193
                        /* field size */
1867
2194
                        if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
1868
 
                                return POOL_END;                        
 
2195
                                return POOL_END;
1869
2196
                        for (j=0;j<NUM_BACKENDS;j++)
1870
2197
                        {
1871
2198
                                if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
1872
2199
                                {
1873
2200
                                        /* field size */
1874
2201
                                        if (pool_read(CONNECTION(backend, i), &size, sizeof(int)) < 0)
1875
 
                                                return POOL_END;                        
 
2202
                                                return POOL_END;
1876
2203
 
1877
2204
                                        /* XXX: field size maybe different among
1878
2205
                                           backends. If we were a paranoid, we have to treat
1915
2242
        return POOL_CONTINUE;
1916
2243
}
1917
2244
 
1918
 
POOL_STATUS CursorResponse(POOL_CONNECTION *frontend, 
 
2245
POOL_STATUS CursorResponse(POOL_CONNECTION *frontend,
1919
2246
                                                                  POOL_CONNECTION_POOL *backend)
1920
2247
{
1921
2248
        char *string = NULL;
1964
2291
        return POOL_CONTINUE;
1965
2292
}
1966
2293
 
1967
 
POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend, 
 
2294
POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend,
1968
2295
                                                  POOL_CONNECTION_POOL *backend)
1969
2296
{
1970
2297
        char *string = NULL;
1992
2319
                TSTATE(backend) = 'E';
1993
2320
        else
1994
2321
                TSTATE(backend) = 'I';
1995
 
                        
 
2322
 
1996
2323
        return POOL_CONTINUE;
1997
2324
}
1998
2325
 
1999
 
POOL_STATUS NoticeResponse(POOL_CONNECTION *frontend, 
 
2326
POOL_STATUS NoticeResponse(POOL_CONNECTION *frontend,
2000
2327
                                                                  POOL_CONNECTION_POOL *backend)
2001
2328
{
2002
2329
        char *string = NULL;
2023
2350
        return POOL_CONTINUE;
2024
2351
}
2025
2352
 
2026
 
POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend, 
 
2353
POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend,
2027
2354
                                                                  POOL_CONNECTION_POOL *backend)
2028
2355
{
2029
2356
        POOL_STATUS status;
2044
2371
        return status;
2045
2372
}
2046
2373
 
2047
 
POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend, 
 
2374
POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend,
2048
2375
                                                                   POOL_CONNECTION_POOL *backend)
2049
2376
{
2050
2377
        POOL_STATUS status;
2234
2561
                        }
2235
2562
                }
2236
2563
                else
2237
 
                        pool_write(frontend, string, len);              
 
2564
                        pool_write(frontend, string, len);
2238
2565
 
2239
2566
                if (len == PROTO_MAJOR_V3)
2240
2567
                {
2331
2658
        /*
2332
2659
         * check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
2333
2660
         * This happens in following scenario:
2334
 
         * 
 
2661
         *
2335
2662
         * M:S1:BEGIN;
2336
2663
         * S:S1:BEGIN;
2337
2664
         * M:S1:SELECT 1; <-- only sent to MASTER
2374
2701
        pool_error(msg->data, query);
2375
2702
        free_string(msg);
2376
2703
}
 
2704
 
 
2705
/*
 
2706
 * Judge the table used in a query represented by node is a temporary
 
2707
 * table or not.
 
2708
 */
 
2709
static int is_temp_table(POOL_CONNECTION_POOL *backend, Node *node)
 
2710
{
 
2711
/*
 
2712
 * Query to know if pg_class has relistemp column or not.
 
2713
 * PostgreSQL 8.4 or later has this.
 
2714
 */
 
2715
#define HASRELITEMPPQUERY "SELECT count(*) FROM pg_catalog.pg_class AS c, pg_attribute AS a WHERE c.relname = 'pg_class' AND a.attrelid = c.oid AND a.attname = 'relistemp'"
 
2716
 
 
2717
/*
 
2718
 * Query to know if the target table is a temporary one.
 
2719
 * This query is valid through PostgreSQL 7.3 to 8.3.
 
2720
 */
 
2721
#define ISTEMPQUERY83 "SELECT count(*) FROM pg_class AS c, pg_namespace AS n WHERE c.relname = '%s' AND c.relnamespace = n.oid AND n.nspname ~ '^pg_temp_'"
 
2722
 
 
2723
/*
 
2724
 * Query to know if the target table is a temporary one.
 
2725
 * This query is valid PostgreSQL 8.4 or later.
 
2726
 */
 
2727
#define ISTEMPQUERY84 "SELECT count(*) FROM pg_catalog.pg_class AS c WHERE c.relname = '%s' AND c.relistemp"
 
2728
 
 
2729
        char *str;
 
2730
        int hasrelistemp;
 
2731
        int result;
 
2732
        static POOL_RELCACHE *hasrelistemp_cache;
 
2733
        static POOL_RELCACHE *relcache;
 
2734
        char *query;
 
2735
 
 
2736
        /*
 
2737
         * For version 2 protocol, we cannot support the checking since
 
2738
         * the underlying infrastructure (do_query) does not support the
 
2739
         * protocol. So we just return false.
 
2740
         */
 
2741
        if (MAJOR(backend) == PROTO_MAJOR_V2)
 
2742
                return 0;
 
2743
 
 
2744
        /* For SELECT, it's hard to extract table names. So we always return 0 */
 
2745
        if (IsA(node, SelectStmt))
 
2746
        {
 
2747
                return 0;
 
2748
        }
 
2749
 
 
2750
        /* Obtain table name */
 
2751
        if (IsA(node, InsertStmt))
 
2752
                str = nodeToString(((InsertStmt *)node)->relation);
 
2753
        else if (IsA(node, UpdateStmt))
 
2754
                str = nodeToString(((UpdateStmt *)node)->relation);
 
2755
        else if (IsA(node, DeleteStmt))
 
2756
                str = nodeToString(((DeleteStmt *)node)->relation);
 
2757
        else            /* Unknown statement */
 
2758
                str = NULL;
 
2759
 
 
2760
        if (str == NULL)
 
2761
        {
 
2762
                        return 0;
 
2763
        }
 
2764
 
 
2765
        /*
 
2766
         * Check backend version
 
2767
         */
 
2768
        if (!hasrelistemp_cache)
 
2769
        {
 
2770
                hasrelistemp_cache = pool_create_relcache(32, HASRELITEMPPQUERY,
 
2771
                                                                                int_register_func, int_unregister_func,
 
2772
                                                                                false);
 
2773
                if (hasrelistemp_cache == NULL)
 
2774
                {
 
2775
                        pool_error("is_temp_table: pool_create_relcache error");
 
2776
                        return false;
 
2777
                }
 
2778
        }
 
2779
 
 
2780
        hasrelistemp = pool_search_relcache(hasrelistemp_cache, backend, "pg_class")==0?0:1;
 
2781
        if (hasrelistemp)
 
2782
                query = ISTEMPQUERY84;
 
2783
        else
 
2784
                query = ISTEMPQUERY83;
 
2785
 
 
2786
        /*
 
2787
         * If relcache does not exist, create it.
 
2788
         */
 
2789
        if (!relcache)
 
2790
        {
 
2791
                relcache = pool_create_relcache(32, query,
 
2792
                                                                                int_register_func, int_unregister_func,
 
2793
                                                                                true);
 
2794
                if (relcache == NULL)
 
2795
                {
 
2796
                        pool_error("is_temp_table: pool_create_relcache error");
 
2797
                        return false;
 
2798
                }
 
2799
        }
 
2800
 
 
2801
        /*
 
2802
         * Search relcache.
 
2803
         */
 
2804
        result = pool_search_relcache(relcache, backend, str)==0?0:1;
 
2805
        return result;
 
2806
}
 
2807
 
 
2808
/*
 
2809
 * Make per DB node statement log
 
2810
 */
 
2811
void per_node_statement_log(POOL_CONNECTION_POOL *backend, int node_id, char *query)
 
2812
{
 
2813
        POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
 
2814
 
 
2815
        if (pool_config->log_per_node_statement)
 
2816
                pool_log("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query);
 
2817
}
 
2818
 
 
2819
/*
 
2820
 * Check kind and produce error message
 
2821
 * All data read in this function is returned to stream.
 
2822
 */
 
2823
void per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query, char *prefix, bool unread)
 
2824
{
 
2825
        POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
 
2826
        char *message;
 
2827
 
 
2828
        if (pool_extract_error_message(true, CONNECTION(backend, node_id), MAJOR(backend), true, &message) > 0)
 
2829
        {
 
2830
                pool_log("%s: DB node id: %d backend pid: %d statement: %s message: %s",
 
2831
                                 prefix, node_id, ntohl(slot->pid), query, message);
 
2832
        }
 
2833
}