~ubuntu-branches/ubuntu/oneiric/pgpool2/oneiric

« back to all changes in this revision

Viewing changes to pool_rewrite_query.c

  • Committer: Bazaar Package Importer
  • Author(s): Marc Gariepy
  • Date: 2010-02-17 13:58:08 UTC
  • mfrom: (1.1.5 upstream)
  • Revision ID: james.westby@ubuntu.com-20100217135808-vqxtfe80r5z8toje
Tags: 2.3.2.1-0ubuntu1
* New upstream release (2.3.2.1)
 * Lots of bug fixes
 * Add SSL support
 * Add support for large object replication
 * Enhanced replication (TIMESTAMP, DATES)
 * Save node status on restart
 * Some other minor changes

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*-pgsql-c-*- */
2
2
/*
3
 
 * $Header: /cvsroot/pgpool/pgpool-II/pool_rewrite_query.c,v 1.13 2009/01/30 00:06:53 y-mori Exp $
 
3
 * $Header: /cvsroot/pgpool/pgpool-II/pool_rewrite_query.c,v 1.14 2009/08/22 04:04:21 t-ishii Exp $
4
4
 *
5
 
 * pgpool: a language independent connection pool server for PostgreSQL 
 
5
 * pgpool: a language independent connection pool server for PostgreSQL
6
6
 * written by Tatsuo Ishii
7
7
 *
8
8
 * Copyright (c) 2003-2009      PgPool Global Development Group
50
50
}
51
51
 
52
52
/*
53
 
 *  search DistDefInfo(this info is build in starting process 
 
53
 *  search DistDefInfo(this info is build in starting process
54
54
 *  and get node id where a query send.
55
 
 */ 
 
55
 */
56
56
static int getInsertRule(ListCell *lc,List *list_t ,DistDefInfo *info,int div_key_num)
57
57
{
58
58
        int loop_counter = 0;
118
118
}
119
119
 
120
120
/*
121
 
 * This function processes the decision whether to 
122
 
 * distribute the insert sentence to the node. 
 
121
 * This function processes the decision whether to
 
122
 * distribute the insert sentence to the node.
123
123
 */
124
124
static void examInsertStmt(Node *node,POOL_CONNECTION_POOL *backend, RewriteQuery *message)
125
125
{
180
180
                message->rewrite_query = pool_error_message("cannot find target List");
181
181
                return;
182
182
        }
183
 
        
 
183
 
184
184
        /* number of target list */
185
185
 
186
186
        if(list_t->length == 1 && IsA(lfirst(list_head(list_t)),List))
187
187
        {
188
188
                cell_num = ((List *) lfirst(list_head(list_t)))->length;
189
 
        } 
190
 
        else 
 
189
        }
 
190
        else
191
191
        {
192
192
                        /* send  error message to frontend */
193
193
                        message->r_code = INSERT_SQL_RESTRICTION;
215
215
                }
216
216
 
217
217
  }
218
 
        else 
 
218
        else
219
219
        {
220
220
                List *list_cols = (List *) insert->cols;
221
221
 
263
263
                message->rewrite_query = pool_error_message("cannot get node_id from system db");
264
264
                return;
265
265
        }
266
 
        
 
266
 
267
267
        pool_debug("insert node_number =%d",node_number);
268
268
        message->r_code = 0;
269
269
        message->r_node = node_number;
333
333
 
334
334
        initdblink(&dblink,backend);
335
335
 
336
 
        if(message.is_pg_catalog) 
 
336
        if(message.is_pg_catalog)
337
337
        {
338
338
                pool_debug("Isselectpgcatalog %d",message.is_pg_catalog);
339
339
                return 1;
344
344
        }
345
345
}
346
346
 
347
 
/* 
348
 
 *  SELECT statement or INSERT statement is special, 
 
347
/*
 
348
 *  SELECT statement or INSERT statement is special,
349
349
 *  peculiar process is needed in parallel mode.
350
350
 */
351
351
RewriteQuery *rewrite_query_stmt(Node *node,POOL_CONNECTION *frontend,POOL_CONNECTION_POOL *backend,RewriteQuery *message)
355
355
                case T_SelectStmt:
356
356
                {
357
357
                        SelectStmt *stmt = (SelectStmt *)node;
358
 
 
359
 
                         /* Because "SELECT INTO" cannot be used in a parallel mode, 
360
 
                          * the error message is generated and send "ready for query" to frontend. 
361
 
                          */    
 
358
 
 
359
                         /* Because "SELECT INTO" cannot be used in a parallel mode,
 
360
                          * the error message is generated and send "ready for query" to frontend.
 
361
                          */
362
362
                        if(stmt->intoClause)
363
363
                        {
364
364
                                pool_send_error_message(frontend, MAJOR(backend), "XX000",
380
380
                        if (message->r_code != SELECT_PGCATALOG &&
381
381
                                message->r_code != SELECT_RELATION_ERROR)
382
382
                        {
383
 
                                /* 
 
383
                                /*
384
384
                                 * The rewritten Query is transmitted to system db,
385
385
                                 * and execution status is received.
386
386
                                 */
387
387
                                POOL_CONNECTION_POOL_SLOT *system_db = pool_system_db_connection();
388
 
                                message->status = OneNode_do_command(frontend, 
389
 
                                                                                                        system_db->con, 
 
388
                                message->status = OneNode_do_command(frontend,
 
389
                                                                                                        system_db->con,
390
390
                                                                                                        message->rewrite_query,
391
391
                                                                                                        backend->info->database);
392
392
                        }
396
396
                                   message->r_code == SELECT_RELATION_ERROR)
397
397
                                {
398
398
                                        /*
399
 
                                         * In the case of message->r_code == SELECT_RELATION_ERROR and in the transaction, 
 
399
                                         * In the case of message->r_code == SELECT_RELATION_ERROR and in the transaction,
400
400
                                         * Transmit the Query to all back ends, and to abort transaction.
401
401
                                         */
402
402
                                        pool_debug("pool_rewrite_stmt(select): Inside transaction. abort transaction");
403
403
                                        message->rewrite_query = nodeToString(node);
404
404
                                        message->status = pool_parallel_exec(frontend,backend,message->rewrite_query,node,true);
405
 
                                } 
 
405
                                }
406
406
                                else
407
 
                                { 
 
407
                                {
408
408
                                        /*
409
409
                                         * Ohter cases of message->r_code == SELECT_RELATION_ERROR
410
410
                                         * or SELECT_PG_CATALOG,
412
412
                                         */
413
413
                                        pool_debug("pool_rewrite_stmt: executed by Master");
414
414
                                        message->rewrite_query = nodeToString(node);
415
 
                                        message->status = OneNode_do_command(frontend, 
 
415
                                        message->status = OneNode_do_command(frontend,
416
416
                                                                                                                MASTER(backend),
417
417
                                                                                                                message->rewrite_query,
418
418
                                                                                                                backend->info->database);
421
421
                        pool_debug("pool_rewrite_stmt: select message_code %d",message->r_code);
422
422
                }
423
423
                break;
424
 
                        
 
424
 
425
425
                case T_InsertStmt:
426
426
 
427
427
                  /* The distribution of the INSERT sentence. */
430
430
                        if(message->r_code == 0 )
431
431
                        {
432
432
                                /* send the INSERT sentence */
433
 
                                message->status = OneNode_do_command(frontend, 
 
433
                                message->status = OneNode_do_command(frontend,
434
434
                                                                                                        CONNECTION(backend,message->r_node),
435
435
                                                                                                        message->rewrite_query,
436
436
                                                                                                        backend->info->database);
469
469
        }
470
470
 
471
471
        pool_debug("pool_rewrite_stmt: query rule %d",node->type);
472
 
        
 
472
 
473
473
        return message;
474
474
}
475
475
 
478
478
 
479
479
/*
480
480
 * After analyzing query, check the analyze[0]->state.
481
 
 * if the analyze[0]->state ==`P`, this query can be executed 
 
481
 * if the analyze[0]->state ==`P`, this query can be executed
482
482
 * on parallel engine.
483
483
 */
484
484
static int direct_parallel_query(RewriteQuery *message)
496
496
        char *result;
497
497
        int i,j = 0;
498
498
        int len = strlen(str);
499
 
        
 
499
 
500
500
        result = palloc(len -1);
501
501
 
502
502
        for(i = 0; i < len; i++)
504
504
                char c = (unsigned char) str[i];
505
505
                if((i != 0) && (i != len -1))
506
506
                {
507
 
                        if(c=='\'' && (char) str[i+1]=='\'') 
 
507
                        if(c=='\'' && (char) str[i+1]=='\'')
508
508
                                i++;
509
509
                        result[j] = c;
510
510
                        j++;
527
527
                AnalyzeSelect *analyze = message->analyze[i];
528
528
                pool_debug("analyze_debug :select no(%d), last select(%d), last_part(%d), state(%c)",
529
529
             analyze->now_select,analyze->last_select,analyze->call_part,analyze->state);
530
 
        }       
 
530
        }
531
531
}
532
532
 
533
 
/* 
 
533
/*
534
534
 * This function checks the KEYWORD(POOL_PARALLEL,POOL_LOADBALANCE)
535
535
 * if the special function(like pool_parallel() or pool_loadbalance())
536
 
 * is used, mark the r_code,is_parallel and is_loadbalance. 
537
 
 * In othe cases, It is necessary to analyze the Query. 
 
536
 * is used, mark the r_code,is_parallel and is_loadbalance.
 
537
 * In othe cases, It is necessary to analyze the Query.
538
538
 */
539
539
RewriteQuery *is_parallel_query(Node *node, POOL_CONNECTION_POOL *backend)
540
540
{
555
555
                if (!(stmt->distinctClause || stmt->intoClause ||
556
556
                        stmt->fromClause || stmt->groupClause || stmt->havingClause ||
557
557
                        stmt->sortClause || stmt->limitOffset || stmt->limitCount ||
558
 
                        stmt->lockingClause || stmt->larg || stmt->rarg) && 
 
558
                        stmt->lockingClause || stmt->larg || stmt->rarg) &&
559
559
                        (n = lfirst(list_head(stmt->targetList))) && IsA(n, ResTarget))
560
560
                {
561
561
                        ResTarget *target = (ResTarget *) n;
562
 
                
 
562
 
563
563
                        if (target->val && IsA(target->val, FuncCall))
564
564
                        {
565
565
                                FuncCall *func = (FuncCall *) target->val;
575
575
                                                message.r_code = SEND_PARALLEL_ENGINE;
576
576
                                                message.is_parallel = true;
577
577
                                                message.is_loadbalance = false;
578
 
                                                pool_debug("can pool_parallel_exec %s",message.rewrite_query);  
 
578
                                                pool_debug("can pool_parallel_exec %s",message.rewrite_query);
579
579
                                                return &message;
580
 
                                        } 
581
 
                                        else /* pool_loadbalance() is used in this query */ 
 
580
                                        }
 
581
                                        else /* pool_loadbalance() is used in this query */
582
582
                                        if(strcmp(strVal(lfirst(list_head(func->funcname))),
583
583
                                                                                                POOL_LOADBALANCE) == 0)
584
584
                                        {
585
585
                                                message.r_code = SEND_LOADBALANCE_ENGINE;
586
586
                                                message.is_loadbalance = true;
587
587
                                                message.is_parallel = false;
588
 
                                                pool_debug("can loadbalance_mode %s",message.rewrite_query);    
 
588
                                                pool_debug("can loadbalance_mode %s",message.rewrite_query);
589
589
                                                return &message;
590
590
                                        }
591
 
                                } 
 
591
                                }
592
592
                        }
593
593
                }
594
 
                
 
594
 
595
595
    /* ANALYZE QUERY */
596
596
                message.r_code = SELECT_ANALYZE;
597
597
                message.is_loadbalance = true;
617
617
                /* Analyzing Query Start */
618
618
                analyze_debug(&message);
619
619
 
620
 
                /* After the analyzing query, 
 
620
                /* After the analyzing query,
621
621
                 * this query can be executed as parallel exec, is_parallel flag is turned on
622
622
                 */
623
623
                direct_ok = direct_parallel_query(&message);
626
626
                        message.rewrite_query = nodeToString(node);
627
627
                        message.is_parallel = true;
628
628
                        message.is_loadbalance = false;
629
 
                        pool_debug("can pool_parallel_exec %s",message.rewrite_query);  
 
629
                        pool_debug("can pool_parallel_exec %s",message.rewrite_query);
630
630
                        return &message;
631
631
                }
632
632
        }