~vcs-imports/mammoth-replicator/trunk

« back to all changes in this revision

Viewing changes to src/backend/libpq/pqcomm.c

  • Committer: alvherre
  • Date: 2005-12-16 21:24:52 UTC
  • Revision ID: svn-v4:db760fc0-0f08-0410-9d63-cc6633f64896:trunk:1
Initial import of the REL8_0_3 sources from the Pgsql CVS repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*-------------------------------------------------------------------------
 
2
 *
 
3
 * pqcomm.c
 
4
 *        Communication functions between the Frontend and the Backend
 
5
 *
 
6
 * These routines handle the low-level details of communication between
 
7
 * frontend and backend.  They just shove data across the communication
 
8
 * channel, and are ignorant of the semantics of the data --- or would be,
 
9
 * except for major brain damage in the design of the old COPY OUT protocol.
 
10
 * Unfortunately, COPY OUT was designed to commandeer the communication
 
11
 * channel (it just transfers data without wrapping it into messages).
 
12
 * No other messages can be sent while COPY OUT is in progress; and if the
 
13
 * copy is aborted by an ereport(ERROR), we need to close out the copy so that
 
14
 * the frontend gets back into sync.  Therefore, these routines have to be
 
15
 * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
 
16
 * set the DoingCopyOut flag.)
 
17
 *
 
18
 * NOTE: generally, it's a bad idea to emit outgoing messages directly with
 
19
 * pq_putbytes(), especially if the message would require multiple calls
 
20
 * to send.  Instead, use the routines in pqformat.c to construct the message
 
21
 * in a buffer and then emit it in one call to pq_putmessage.  This ensures
 
22
 * that the channel will not be clogged by an incomplete message if execution
 
23
 * is aborted by ereport(ERROR) partway through the message.  The only
 
24
 * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
 
25
 *
 
26
 * At one time, libpq was shared between frontend and backend, but now
 
27
 * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
 
28
 * All that remains is similarities of names to trap the unwary...
 
29
 *
 
30
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 
31
 * Portions Copyright (c) 1994, Regents of the University of California
 
32
 *
 
33
 *      $PostgreSQL: pgsql/src/backend/libpq/pqcomm.c,v 1.175 2005-01-12 16:38:17 tgl Exp $
 
34
 *
 
35
 *-------------------------------------------------------------------------
 
36
 */
 
37
 
 
38
/*------------------------
 
39
 * INTERFACE ROUTINES
 
40
 *
 
41
 * setup/teardown:
 
42
 *              StreamServerPort        - Open postmaster's server port
 
43
 *              StreamConnection        - Create new connection with client
 
44
 *              StreamClose                     - Close a client/backend connection
 
45
 *              TouchSocketFile         - Protect socket file against /tmp cleaners
 
46
 *              pq_init                 - initialize libpq at backend startup
 
47
 *              pq_comm_reset   - reset libpq during error recovery
 
48
 *              pq_close                - shutdown libpq at backend exit
 
49
 *
 
50
 * low-level I/O:
 
51
 *              pq_getbytes             - get a known number of bytes from connection
 
52
 *              pq_getstring    - get a null terminated string from connection
 
53
 *              pq_getmessage   - get a message with length word from connection
 
54
 *              pq_getbyte              - get next byte from connection
 
55
 *              pq_peekbyte             - peek at next byte from connection
 
56
 *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
 
57
 *              pq_flush                - flush pending output
 
58
 *
 
59
 * message-level I/O (and old-style-COPY-OUT cruft):
 
60
 *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
 
61
 *              pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
 
62
 *              pq_endcopyout   - end a COPY OUT transfer
 
63
 *
 
64
 *------------------------
 
65
 */
 
66
#include "postgres.h"
 
67
 
 
68
#include <signal.h>
 
69
#include <errno.h>
 
70
#include <fcntl.h>
 
71
#include <grp.h>
 
72
#include <unistd.h>
 
73
#include <sys/file.h>
 
74
#include <sys/socket.h>
 
75
#include <sys/stat.h>
 
76
#include <sys/time.h>
 
77
#include <netdb.h>
 
78
#include <netinet/in.h>
 
79
#ifdef HAVE_NETINET_TCP_H
 
80
#include <netinet/tcp.h>
 
81
#endif
 
82
#include <arpa/inet.h>
 
83
#ifdef HAVE_UTIME_H
 
84
#include <utime.h>
 
85
#endif
 
86
 
 
87
#include "libpq/libpq.h"
 
88
#include "miscadmin.h"
 
89
#include "storage/ipc.h"
 
90
 
 
91
 
 
92
/*
 
93
 * Configuration options
 
94
 */
 
95
int                     Unix_socket_permissions;
 
96
char       *Unix_socket_group;
 
97
 
 
98
 
 
99
/* Where the Unix socket file is */
 
100
static char sock_path[MAXPGPATH];
 
101
 
 
102
 
 
103
/*
 
104
 * Buffers for low-level I/O
 
105
 */
 
106
 
 
107
#define PQ_BUFFER_SIZE 8192
 
108
 
 
109
static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
 
110
static int      PqSendPointer;          /* Next index to store a byte in
 
111
                                                                 * PqSendBuffer */
 
112
 
 
113
static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
 
114
static int      PqRecvPointer;          /* Next index to read a byte from
 
115
                                                                 * PqRecvBuffer */
 
116
static int      PqRecvLength;           /* End of data available in PqRecvBuffer */
 
117
 
 
118
/*
 
119
 * Message status
 
120
 */
 
121
static bool PqCommBusy;
 
122
static bool DoingCopyOut;
 
123
 
 
124
 
 
125
/* Internal functions */
 
126
static void pq_close(int code, Datum arg);
 
127
static int      internal_putbytes(const char *s, size_t len);
 
128
static int      internal_flush(void);
 
129
#ifdef HAVE_UNIX_SOCKETS
 
130
static int      Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
 
131
static int      Setup_AF_UNIX(void);
 
132
#endif   /* HAVE_UNIX_SOCKETS */
 
133
 
 
134
 
 
135
/* --------------------------------
 
136
 *              pq_init - initialize libpq at backend startup
 
137
 * --------------------------------
 
138
 */
 
139
void
 
140
pq_init(void)
 
141
{
 
142
        PqSendPointer = PqRecvPointer = PqRecvLength = 0;
 
143
        PqCommBusy = false;
 
144
        DoingCopyOut = false;
 
145
        on_proc_exit(pq_close, 0);
 
146
}
 
147
 
 
148
/* --------------------------------
 
149
 *              pq_comm_reset - reset libpq during error recovery
 
150
 *
 
151
 * This is called from error recovery at the outer idle loop.  It's
 
152
 * just to get us out of trouble if we somehow manage to elog() from
 
153
 * inside a pqcomm.c routine (which ideally will never happen, but...)
 
154
 * --------------------------------
 
155
 */
 
156
void
 
157
pq_comm_reset(void)
 
158
{
 
159
        /* Do not throw away pending data, but do reset the busy flag */
 
160
        PqCommBusy = false;
 
161
        /* We can abort any old-style COPY OUT, too */
 
162
        pq_endcopyout(true);
 
163
}
 
164
 
 
165
/* --------------------------------
 
166
 *              pq_close - shutdown libpq at backend exit
 
167
 *
 
168
 * Note: in a standalone backend MyProcPort will be null,
 
169
 * don't crash during exit...
 
170
 * --------------------------------
 
171
 */
 
172
static void
 
173
pq_close(int code, Datum arg)
 
174
{
 
175
        if (MyProcPort != NULL)
 
176
        {
 
177
                /* Cleanly shut down SSL layer */
 
178
                secure_close(MyProcPort);
 
179
 
 
180
                /*
 
181
                 * Formerly we did an explicit close() here, but it seems better
 
182
                 * to leave the socket open until the process dies.  This allows
 
183
                 * clients to perform a "synchronous close" if they care --- wait
 
184
                 * till the transport layer reports connection closure, and you
 
185
                 * can be sure the backend has exited.
 
186
                 *
 
187
                 * We do set sock to -1 to prevent any further I/O, though.
 
188
                 */
 
189
                MyProcPort->sock = -1;
 
190
        }
 
191
}
 
192
 
 
193
 
 
194
 
 
195
/*
 
196
 * Streams -- wrapper around Unix socket system calls
 
197
 *
 
198
 *
 
199
 *              Stream functions are used for vanilla TCP connection protocol.
 
200
 */
 
201
 
 
202
 
 
203
/* StreamDoUnlink()
 
204
 * Shutdown routine for backend connection
 
205
 * If a Unix socket is used for communication, explicitly close it.
 
206
 */
 
207
#ifdef HAVE_UNIX_SOCKETS
 
208
static void
 
209
StreamDoUnlink(int code, Datum arg)
 
210
{
 
211
        Assert(sock_path[0]);
 
212
        unlink(sock_path);
 
213
}
 
214
#endif   /* HAVE_UNIX_SOCKETS */
 
215
 
 
216
/*
 
217
 * StreamServerPort -- open a "listening" port to accept connections.
 
218
 *
 
219
 * Successfully opened sockets are added to the ListenSocket[] array,
 
220
 * at the first position that isn't -1.
 
221
 *
 
222
 * RETURNS: STATUS_OK or STATUS_ERROR
 
223
 */
 
224
 
 
225
int
 
226
StreamServerPort(int family, char *hostName, unsigned short portNumber,
 
227
                                 char *unixSocketName,
 
228
                                 int ListenSocket[], int MaxListen)
 
229
{
 
230
        int                     fd,
 
231
                                err;
 
232
        int                     maxconn;
 
233
        int                     one = 1;
 
234
        int                     ret;
 
235
        char            portNumberStr[32];
 
236
        const char *familyDesc;
 
237
        char            familyDescBuf[64];
 
238
        char       *service;
 
239
        struct addrinfo *addrs = NULL,
 
240
                           *addr;
 
241
        struct addrinfo hint;
 
242
        int                     listen_index = 0;
 
243
        int                     added = 0;
 
244
 
 
245
        /* Initialize hint structure */
 
246
        MemSet(&hint, 0, sizeof(hint));
 
247
        hint.ai_family = family;
 
248
        hint.ai_flags = AI_PASSIVE;
 
249
        hint.ai_socktype = SOCK_STREAM;
 
250
 
 
251
#ifdef HAVE_UNIX_SOCKETS
 
252
        if (family == AF_UNIX)
 
253
        {
 
254
                /* Lock_AF_UNIX will also fill in sock_path. */
 
255
                if (Lock_AF_UNIX(portNumber, unixSocketName) != STATUS_OK)
 
256
                        return STATUS_ERROR;
 
257
                service = sock_path;
 
258
        }
 
259
        else
 
260
#endif   /* HAVE_UNIX_SOCKETS */
 
261
        {
 
262
                snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
 
263
                service = portNumberStr;
 
264
        }
 
265
 
 
266
        ret = getaddrinfo_all(hostName, service, &hint, &addrs);
 
267
        if (ret || !addrs)
 
268
        {
 
269
                if (hostName)
 
270
                        ereport(LOG,
 
271
                                        (errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
 
272
                                                        hostName, service, gai_strerror(ret))));
 
273
                else
 
274
                        ereport(LOG,
 
275
                         (errmsg("could not translate service \"%s\" to address: %s",
 
276
                                         service, gai_strerror(ret))));
 
277
                if (addrs)
 
278
                        freeaddrinfo_all(hint.ai_family, addrs);
 
279
                return STATUS_ERROR;
 
280
        }
 
281
 
 
282
        for (addr = addrs; addr; addr = addr->ai_next)
 
283
        {
 
284
                if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
 
285
                {
 
286
                        /*
 
287
                         * Only set up a unix domain socket when they really asked for
 
288
                         * it.  The service/port is different in that case.
 
289
                         */
 
290
                        continue;
 
291
                }
 
292
 
 
293
                /* See if there is still room to add 1 more socket. */
 
294
                for (; listen_index < MaxListen; listen_index++)
 
295
                {
 
296
                        if (ListenSocket[listen_index] == -1)
 
297
                                break;
 
298
                }
 
299
                if (listen_index >= MaxListen)
 
300
                {
 
301
                        ereport(LOG,
 
302
                                        (errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
 
303
                                                        MaxListen)));
 
304
                        break;
 
305
                }
 
306
 
 
307
                /* set up family name for possible error messages */
 
308
                switch (addr->ai_family)
 
309
                {
 
310
                        case AF_INET:
 
311
                                familyDesc = gettext("IPv4");
 
312
                                break;
 
313
#ifdef HAVE_IPV6
 
314
                        case AF_INET6:
 
315
                                familyDesc = gettext("IPv6");
 
316
                                break;
 
317
#endif
 
318
#ifdef HAVE_UNIX_SOCKETS
 
319
                        case AF_UNIX:
 
320
                                familyDesc = gettext("Unix");
 
321
                                break;
 
322
#endif
 
323
                        default:
 
324
                                snprintf(familyDescBuf, sizeof(familyDescBuf),
 
325
                                                 gettext("unrecognized address family %d"),
 
326
                                                 addr->ai_family);
 
327
                                familyDesc = familyDescBuf;
 
328
                                break;
 
329
                }
 
330
 
 
331
                if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)
 
332
                {
 
333
                        ereport(LOG,
 
334
                                        (errcode_for_socket_access(),
 
335
                        /* translator: %s is IPv4, IPv6, or Unix */
 
336
                                         errmsg("could not create %s socket: %m",
 
337
                                                        familyDesc)));
 
338
                        continue;
 
339
                }
 
340
 
 
341
                if (!IS_AF_UNIX(addr->ai_family))
 
342
                {
 
343
                        if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
 
344
                                                        (char *) &one, sizeof(one))) == -1)
 
345
                        {
 
346
                                ereport(LOG,
 
347
                                                (errcode_for_socket_access(),
 
348
                                                 errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
 
349
                                closesocket(fd);
 
350
                                continue;
 
351
                        }
 
352
                }
 
353
 
 
354
#ifdef IPV6_V6ONLY
 
355
                if (addr->ai_family == AF_INET6)
 
356
                {
 
357
                        if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
 
358
                                                   (char *) &one, sizeof(one)) == -1)
 
359
                        {
 
360
                                ereport(LOG,
 
361
                                                (errcode_for_socket_access(),
 
362
                                                 errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
 
363
                                closesocket(fd);
 
364
                                continue;
 
365
                        }
 
366
                }
 
367
#endif
 
368
 
 
369
                /*
 
370
                 * Note: This might fail on some OS's, like Linux older than
 
371
                 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and
 
372
                 * map ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all
 
373
                 * ipv4 connections.
 
374
                 */
 
375
                err = bind(fd, addr->ai_addr, addr->ai_addrlen);
 
376
                if (err < 0)
 
377
                {
 
378
                        ereport(LOG,
 
379
                                        (errcode_for_socket_access(),
 
380
                        /* translator: %s is IPv4, IPv6, or Unix */
 
381
                                         errmsg("could not bind %s socket: %m",
 
382
                                                        familyDesc),
 
383
                                         (IS_AF_UNIX(addr->ai_family)) ?
 
384
                          errhint("Is another postmaster already running on port %d?"
 
385
                                          " If not, remove socket file \"%s\" and retry.",
 
386
                                          (int) portNumber, sock_path) :
 
387
                          errhint("Is another postmaster already running on port %d?"
 
388
                                          " If not, wait a few seconds and retry.",
 
389
                                          (int) portNumber)));
 
390
                        closesocket(fd);
 
391
                        continue;
 
392
                }
 
393
 
 
394
#ifdef HAVE_UNIX_SOCKETS
 
395
                if (addr->ai_family == AF_UNIX)
 
396
                {
 
397
                        if (Setup_AF_UNIX() != STATUS_OK)
 
398
                        {
 
399
                                closesocket(fd);
 
400
                                break;
 
401
                        }
 
402
                }
 
403
#endif
 
404
 
 
405
                /*
 
406
                 * Select appropriate accept-queue length limit.  PG_SOMAXCONN is
 
407
                 * only intended to provide a clamp on the request on platforms
 
408
                 * where an overly large request provokes a kernel error (are
 
409
                 * there any?).
 
410
                 */
 
411
                maxconn = MaxBackends * 2;
 
412
                if (maxconn > PG_SOMAXCONN)
 
413
                        maxconn = PG_SOMAXCONN;
 
414
 
 
415
                err = listen(fd, maxconn);
 
416
                if (err < 0)
 
417
                {
 
418
                        ereport(LOG,
 
419
                                        (errcode_for_socket_access(),
 
420
                        /* translator: %s is IPv4, IPv6, or Unix */
 
421
                                         errmsg("could not listen on %s socket: %m",
 
422
                                                        familyDesc)));
 
423
                        closesocket(fd);
 
424
                        continue;
 
425
                }
 
426
                ListenSocket[listen_index] = fd;
 
427
                added++;
 
428
        }
 
429
 
 
430
        freeaddrinfo_all(hint.ai_family, addrs);
 
431
 
 
432
        if (!added)
 
433
                return STATUS_ERROR;
 
434
 
 
435
        return STATUS_OK;
 
436
}
 
437
 
 
438
 
 
439
#ifdef HAVE_UNIX_SOCKETS
 
440
 
 
441
/*
 
442
 * Lock_AF_UNIX -- configure unix socket file path
 
443
 */
 
444
static int
 
445
Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName)
 
446
{
 
447
        UNIXSOCK_PATH(sock_path, portNumber, unixSocketName);
 
448
 
 
449
        /*
 
450
         * Grab an interlock file associated with the socket file.
 
451
         */
 
452
        CreateSocketLockFile(sock_path, true);
 
453
 
 
454
        /*
 
455
         * Once we have the interlock, we can safely delete any pre-existing
 
456
         * socket file to avoid failure at bind() time.
 
457
         */
 
458
        unlink(sock_path);
 
459
 
 
460
        return STATUS_OK;
 
461
}
 
462
 
 
463
 
 
464
/*
 
465
 * Setup_AF_UNIX -- configure unix socket permissions
 
466
 */
 
467
static int
 
468
Setup_AF_UNIX(void)
 
469
{
 
470
        /* Arrange to unlink the socket file at exit */
 
471
        on_proc_exit(StreamDoUnlink, 0);
 
472
 
 
473
        /*
 
474
         * Fix socket ownership/permission if requested.  Note we must do this
 
475
         * before we listen() to avoid a window where unwanted connections
 
476
         * could get accepted.
 
477
         */
 
478
        Assert(Unix_socket_group);
 
479
        if (Unix_socket_group[0] != '\0')
 
480
        {
 
481
#ifdef WIN32
 
482
                elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
 
483
#else
 
484
                char       *endptr;
 
485
                unsigned long int val;
 
486
                gid_t           gid;
 
487
 
 
488
                val = strtoul(Unix_socket_group, &endptr, 10);
 
489
                if (*endptr == '\0')
 
490
                {                                               /* numeric group id */
 
491
                        gid = val;
 
492
                }
 
493
                else
 
494
                {                                               /* convert group name to id */
 
495
                        struct group *gr;
 
496
 
 
497
                        gr = getgrnam(Unix_socket_group);
 
498
                        if (!gr)
 
499
                        {
 
500
                                ereport(LOG,
 
501
                                                (errmsg("group \"%s\" does not exist",
 
502
                                                                Unix_socket_group)));
 
503
                                return STATUS_ERROR;
 
504
                        }
 
505
                        gid = gr->gr_gid;
 
506
                }
 
507
                if (chown(sock_path, -1, gid) == -1)
 
508
                {
 
509
                        ereport(LOG,
 
510
                                        (errcode_for_file_access(),
 
511
                                         errmsg("could not set group of file \"%s\": %m",
 
512
                                                        sock_path)));
 
513
                        return STATUS_ERROR;
 
514
                }
 
515
#endif
 
516
        }
 
517
 
 
518
        if (chmod(sock_path, Unix_socket_permissions) == -1)
 
519
        {
 
520
                ereport(LOG,
 
521
                                (errcode_for_file_access(),
 
522
                                 errmsg("could not set permissions of file \"%s\": %m",
 
523
                                                sock_path)));
 
524
                return STATUS_ERROR;
 
525
        }
 
526
        return STATUS_OK;
 
527
}
 
528
#endif   /* HAVE_UNIX_SOCKETS */
 
529
 
 
530
 
 
531
/*
 
532
 * StreamConnection -- create a new connection with client using
 
533
 *              server port.
 
534
 *
 
535
 * ASSUME: that this doesn't need to be non-blocking because
 
536
 *              the Postmaster uses select() to tell when the server master
 
537
 *              socket is ready for accept().
 
538
 *
 
539
 * RETURNS: STATUS_OK or STATUS_ERROR
 
540
 */
 
541
int
 
542
StreamConnection(int server_fd, Port *port)
 
543
{
 
544
        /* accept connection and fill in the client (remote) address */
 
545
        port->raddr.salen = sizeof(port->raddr.addr);
 
546
        if ((port->sock = accept(server_fd,
 
547
                                                         (struct sockaddr *) & port->raddr.addr,
 
548
                                                         &port->raddr.salen)) < 0)
 
549
        {
 
550
                ereport(LOG,
 
551
                                (errcode_for_socket_access(),
 
552
                                 errmsg("could not accept new connection: %m")));
 
553
                return STATUS_ERROR;
 
554
        }
 
555
 
 
556
#ifdef SCO_ACCEPT_BUG
 
557
 
 
558
        /*
 
559
         * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
 
560
         * shouldn't hurt to catch it for all versions of those platforms.
 
561
         */
 
562
        if (port->raddr.addr.ss_family == 0)
 
563
                port->raddr.addr.ss_family = AF_UNIX;
 
564
#endif
 
565
 
 
566
        /* fill in the server (local) address */
 
567
        port->laddr.salen = sizeof(port->laddr.addr);
 
568
        if (getsockname(port->sock,
 
569
                                        (struct sockaddr *) & port->laddr.addr,
 
570
                                        &port->laddr.salen) < 0)
 
571
        {
 
572
                elog(LOG, "getsockname() failed: %m");
 
573
                return STATUS_ERROR;
 
574
        }
 
575
 
 
576
        /* select NODELAY and KEEPALIVE options if it's a TCP connection */
 
577
        if (!IS_AF_UNIX(port->laddr.addr.ss_family))
 
578
        {
 
579
                int                     on;
 
580
 
 
581
#ifdef  TCP_NODELAY
 
582
                on = 1;
 
583
                if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
 
584
                                           (char *) &on, sizeof(on)) < 0)
 
585
                {
 
586
                        elog(LOG, "setsockopt(TCP_NODELAY) failed: %m");
 
587
                        return STATUS_ERROR;
 
588
                }
 
589
#endif
 
590
                on = 1;
 
591
                if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
 
592
                                           (char *) &on, sizeof(on)) < 0)
 
593
                {
 
594
                        elog(LOG, "setsockopt(SO_KEEPALIVE) failed: %m");
 
595
                        return STATUS_ERROR;
 
596
                }
 
597
        }
 
598
 
 
599
        return STATUS_OK;
 
600
}
 
601
 
 
602
/*
 
603
 * StreamClose -- close a client/backend connection
 
604
 *
 
605
 * NOTE: this is NOT used to terminate a session; it is just used to release
 
606
 * the file descriptor in a process that should no longer have the socket
 
607
 * open.  (For example, the postmaster calls this after passing ownership
 
608
 * of the connection to a child process.)  It is expected that someone else
 
609
 * still has the socket open.  So, we only want to close the descriptor,
 
610
 * we do NOT want to send anything to the far end.
 
611
 */
 
612
void
 
613
StreamClose(int sock)
 
614
{
 
615
        closesocket(sock);
 
616
}
 
617
 
 
618
/*
 
619
 * TouchSocketFile -- mark socket file as recently accessed
 
620
 *
 
621
 * This routine should be called every so often to ensure that the socket
 
622
 * file has a recent mod date (ordinary operations on sockets usually won't
 
623
 * change the mod date).  That saves it from being removed by
 
624
 * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
 
625
 * never have put the socket file in /tmp...)
 
626
 */
 
627
void
 
628
TouchSocketFile(void)
 
629
{
 
630
        /* Do nothing if we did not create a socket... */
 
631
        if (sock_path[0] != '\0')
 
632
        {
 
633
                /*
 
634
                 * utime() is POSIX standard, utimes() is a common alternative. If
 
635
                 * we have neither, there's no way to affect the mod or access
 
636
                 * time of the socket :-(
 
637
                 *
 
638
                 * In either path, we ignore errors; there's no point in complaining.
 
639
                 */
 
640
#ifdef HAVE_UTIME
 
641
                utime(sock_path, NULL);
 
642
#else                                                   /* !HAVE_UTIME */
 
643
#ifdef HAVE_UTIMES
 
644
                utimes(sock_path, NULL);
 
645
#endif   /* HAVE_UTIMES */
 
646
#endif   /* HAVE_UTIME */
 
647
        }
 
648
}
 
649
 
 
650
 
 
651
/* --------------------------------
 
652
 * Low-level I/O routines begin here.
 
653
 *
 
654
 * These routines communicate with a frontend client across a connection
 
655
 * already established by the preceding routines.
 
656
 * --------------------------------
 
657
 */
 
658
 
 
659
 
 
660
/* --------------------------------
 
661
 *              pq_recvbuf - load some bytes into the input buffer
 
662
 *
 
663
 *              returns 0 if OK, EOF if trouble
 
664
 * --------------------------------
 
665
 */
 
666
static int
 
667
pq_recvbuf(void)
 
668
{
 
669
        if (PqRecvPointer > 0)
 
670
        {
 
671
                if (PqRecvLength > PqRecvPointer)
 
672
                {
 
673
                        /* still some unread data, left-justify it in the buffer */
 
674
                        memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
 
675
                                        PqRecvLength - PqRecvPointer);
 
676
                        PqRecvLength -= PqRecvPointer;
 
677
                        PqRecvPointer = 0;
 
678
                }
 
679
                else
 
680
                        PqRecvLength = PqRecvPointer = 0;
 
681
        }
 
682
 
 
683
        /* Can fill buffer from PqRecvLength and upwards */
 
684
        for (;;)
 
685
        {
 
686
                int                     r;
 
687
 
 
688
                r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
 
689
                                                PQ_BUFFER_SIZE - PqRecvLength);
 
690
 
 
691
                if (r < 0)
 
692
                {
 
693
                        if (errno == EINTR)
 
694
                                continue;               /* Ok if interrupted */
 
695
 
 
696
                        /*
 
697
                         * Careful: an ereport() that tries to write to the client
 
698
                         * would cause recursion to here, leading to stack overflow
 
699
                         * and core dump!  This message must go *only* to the
 
700
                         * postmaster log.
 
701
                         */
 
702
                        ereport(COMMERROR,
 
703
                                        (errcode_for_socket_access(),
 
704
                                         errmsg("could not receive data from client: %m")));
 
705
                        return EOF;
 
706
                }
 
707
                if (r == 0)
 
708
                {
 
709
                        /*
 
710
                         * EOF detected.  We used to write a log message here, but
 
711
                         * it's better to expect the ultimate caller to do that.
 
712
                         */
 
713
                        return EOF;
 
714
                }
 
715
                /* r contains number of bytes read, so just incr length */
 
716
                PqRecvLength += r;
 
717
                return 0;
 
718
        }
 
719
}
 
720
 
 
721
/* --------------------------------
 
722
 *              pq_getbyte      - get a single byte from connection, or return EOF
 
723
 * --------------------------------
 
724
 */
 
725
int
 
726
pq_getbyte(void)
 
727
{
 
728
        while (PqRecvPointer >= PqRecvLength)
 
729
        {
 
730
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
 
731
                        return EOF;                     /* Failed to recv data */
 
732
        }
 
733
        return PqRecvBuffer[PqRecvPointer++];
 
734
}
 
735
 
 
736
/* --------------------------------
 
737
 *              pq_peekbyte             - peek at next byte from connection
 
738
 *
 
739
 *       Same as pq_getbyte() except we don't advance the pointer.
 
740
 * --------------------------------
 
741
 */
 
742
int
 
743
pq_peekbyte(void)
 
744
{
 
745
        while (PqRecvPointer >= PqRecvLength)
 
746
        {
 
747
                if (pq_recvbuf())               /* If nothing in buffer, then recv some */
 
748
                        return EOF;                     /* Failed to recv data */
 
749
        }
 
750
        return PqRecvBuffer[PqRecvPointer];
 
751
}
 
752
 
 
753
/* --------------------------------
 
754
 *              pq_getbytes             - get a known number of bytes from connection
 
755
 *
 
756
 *              returns 0 if OK, EOF if trouble
 
757
 * --------------------------------
 
758
 */
 
759
int
 
760
pq_getbytes(char *s, size_t len)
 
761
{
 
762
        size_t          amount;
 
763
 
 
764
        while (len > 0)
 
765
        {
 
766
                while (PqRecvPointer >= PqRecvLength)
 
767
                {
 
768
                        if (pq_recvbuf())       /* If nothing in buffer, then recv some */
 
769
                                return EOF;             /* Failed to recv data */
 
770
                }
 
771
                amount = PqRecvLength - PqRecvPointer;
 
772
                if (amount > len)
 
773
                        amount = len;
 
774
                memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
 
775
                PqRecvPointer += amount;
 
776
                s += amount;
 
777
                len -= amount;
 
778
        }
 
779
        return 0;
 
780
}
 
781
 
 
782
/* --------------------------------
 
783
 *              pq_discardbytes         - throw away a known number of bytes
 
784
 *
 
785
 *              same as pq_getbytes except we do not copy the data to anyplace.
 
786
 *              this is used for resynchronizing after read errors.
 
787
 *
 
788
 *              returns 0 if OK, EOF if trouble
 
789
 * --------------------------------
 
790
 */
 
791
static int
 
792
pq_discardbytes(size_t len)
 
793
{
 
794
        size_t          amount;
 
795
 
 
796
        while (len > 0)
 
797
        {
 
798
                while (PqRecvPointer >= PqRecvLength)
 
799
                {
 
800
                        if (pq_recvbuf())       /* If nothing in buffer, then recv some */
 
801
                                return EOF;             /* Failed to recv data */
 
802
                }
 
803
                amount = PqRecvLength - PqRecvPointer;
 
804
                if (amount > len)
 
805
                        amount = len;
 
806
                PqRecvPointer += amount;
 
807
                len -= amount;
 
808
        }
 
809
        return 0;
 
810
}
 
811
 
 
812
/* --------------------------------
 
813
 *              pq_getstring    - get a null terminated string from connection
 
814
 *
 
815
 *              The return value is placed in an expansible StringInfo, which has
 
816
 *              already been initialized by the caller.
 
817
 *
 
818
 *              This is used only for dealing with old-protocol clients.  The idea
 
819
 *              is to produce a StringInfo that looks the same as we would get from
 
820
 *              pq_getmessage() with a newer client; we will then process it with
 
821
 *              pq_getmsgstring.  Therefore, no character set conversion is done here,
 
822
 *              even though this is presumably useful only for text.
 
823
 *
 
824
 *              returns 0 if OK, EOF if trouble
 
825
 * --------------------------------
 
826
 */
 
827
int
 
828
pq_getstring(StringInfo s)
 
829
{
 
830
        int                     i;
 
831
 
 
832
        /* Reset string to empty */
 
833
        s->len = 0;
 
834
        s->data[0] = '\0';
 
835
        s->cursor = 0;
 
836
 
 
837
        /* Read until we get the terminating '\0' */
 
838
        for (;;)
 
839
        {
 
840
                while (PqRecvPointer >= PqRecvLength)
 
841
                {
 
842
                        if (pq_recvbuf())       /* If nothing in buffer, then recv some */
 
843
                                return EOF;             /* Failed to recv data */
 
844
                }
 
845
 
 
846
                for (i = PqRecvPointer; i < PqRecvLength; i++)
 
847
                {
 
848
                        if (PqRecvBuffer[i] == '\0')
 
849
                        {
 
850
                                /* include the '\0' in the copy */
 
851
                                appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
 
852
                                                                           i - PqRecvPointer + 1);
 
853
                                PqRecvPointer = i + 1;  /* advance past \0 */
 
854
                                return 0;
 
855
                        }
 
856
                }
 
857
 
 
858
                /* If we're here we haven't got the \0 in the buffer yet. */
 
859
                appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
 
860
                                                           PqRecvLength - PqRecvPointer);
 
861
                PqRecvPointer = PqRecvLength;
 
862
        }
 
863
}
 
864
 
 
865
 
 
866
/* --------------------------------
 
867
 *              pq_getmessage   - get a message with length word from connection
 
868
 *
 
869
 *              The return value is placed in an expansible StringInfo, which has
 
870
 *              already been initialized by the caller.
 
871
 *              Only the message body is placed in the StringInfo; the length word
 
872
 *              is removed.  Also, s->cursor is initialized to zero for convenience
 
873
 *              in scanning the message contents.
 
874
 *
 
875
 *              If maxlen is not zero, it is an upper limit on the length of the
 
876
 *              message we are willing to accept.  We abort the connection (by
 
877
 *              returning EOF) if client tries to send more than that.
 
878
 *
 
879
 *              returns 0 if OK, EOF if trouble
 
880
 * --------------------------------
 
881
 */
 
882
int
 
883
pq_getmessage(StringInfo s, int maxlen)
 
884
{
 
885
        int32           len;
 
886
 
 
887
        /* Reset message buffer to empty */
 
888
        s->len = 0;
 
889
        s->data[0] = '\0';
 
890
        s->cursor = 0;
 
891
 
 
892
        /* Read message length word */
 
893
        if (pq_getbytes((char *) &len, 4) == EOF)
 
894
        {
 
895
                ereport(COMMERROR,
 
896
                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
 
897
                                 errmsg("unexpected EOF within message length word")));
 
898
                return EOF;
 
899
        }
 
900
 
 
901
        len = ntohl(len);
 
902
 
 
903
        if (len < 4 ||
 
904
                (maxlen > 0 && len > maxlen))
 
905
        {
 
906
                ereport(COMMERROR,
 
907
                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
 
908
                                 errmsg("invalid message length")));
 
909
                return EOF;
 
910
        }
 
911
 
 
912
        len -= 4;                                       /* discount length itself */
 
913
 
 
914
        if (len > 0)
 
915
        {
 
916
                /*
 
917
                 * Allocate space for message.  If we run out of room (ridiculously
 
918
                 * large message), we will elog(ERROR), but we want to discard the
 
919
                 * message body so as not to lose communication sync.
 
920
                 */
 
921
                PG_TRY();
 
922
                {
 
923
                        enlargeStringInfo(s, len);
 
924
                }
 
925
                PG_CATCH();
 
926
                {
 
927
                        if (pq_discardbytes(len) == EOF)
 
928
                                ereport(COMMERROR,
 
929
                                                (errcode(ERRCODE_PROTOCOL_VIOLATION),
 
930
                                                 errmsg("incomplete message from client")));
 
931
                        PG_RE_THROW();
 
932
                }
 
933
                PG_END_TRY();
 
934
 
 
935
                /* And grab the message */
 
936
                if (pq_getbytes(s->data, len) == EOF)
 
937
                {
 
938
                        ereport(COMMERROR,
 
939
                                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
 
940
                                         errmsg("incomplete message from client")));
 
941
                        return EOF;
 
942
                }
 
943
                s->len = len;
 
944
                /* Place a trailing null per StringInfo convention */
 
945
                s->data[len] = '\0';
 
946
        }
 
947
 
 
948
        return 0;
 
949
}
 
950
 
 
951
 
 
952
/* --------------------------------
 
953
 *              pq_putbytes             - send bytes to connection (not flushed until pq_flush)
 
954
 *
 
955
 *              returns 0 if OK, EOF if trouble
 
956
 * --------------------------------
 
957
 */
 
958
int
 
959
pq_putbytes(const char *s, size_t len)
 
960
{
 
961
        int                     res;
 
962
 
 
963
        /* Should only be called by old-style COPY OUT */
 
964
        Assert(DoingCopyOut);
 
965
        /* No-op if reentrant call */
 
966
        if (PqCommBusy)
 
967
                return 0;
 
968
        PqCommBusy = true;
 
969
        res = internal_putbytes(s, len);
 
970
        PqCommBusy = false;
 
971
        return res;
 
972
}
 
973
 
 
974
static int
 
975
internal_putbytes(const char *s, size_t len)
 
976
{
 
977
        size_t          amount;
 
978
 
 
979
        while (len > 0)
 
980
        {
 
981
                /* If buffer is full, then flush it out */
 
982
                if (PqSendPointer >= PQ_BUFFER_SIZE)
 
983
                        if (internal_flush())
 
984
                                return EOF;
 
985
                amount = PQ_BUFFER_SIZE - PqSendPointer;
 
986
                if (amount > len)
 
987
                        amount = len;
 
988
                memcpy(PqSendBuffer + PqSendPointer, s, amount);
 
989
                PqSendPointer += amount;
 
990
                s += amount;
 
991
                len -= amount;
 
992
        }
 
993
        return 0;
 
994
}
 
995
 
 
996
/* --------------------------------
 
997
 *              pq_flush                - flush pending output
 
998
 *
 
999
 *              returns 0 if OK, EOF if trouble
 
1000
 * --------------------------------
 
1001
 */
 
1002
int
 
1003
pq_flush(void)
 
1004
{
 
1005
        int                     res;
 
1006
 
 
1007
        /* No-op if reentrant call */
 
1008
        if (PqCommBusy)
 
1009
                return 0;
 
1010
        PqCommBusy = true;
 
1011
        res = internal_flush();
 
1012
        PqCommBusy = false;
 
1013
        return res;
 
1014
}
 
1015
 
 
1016
static int
 
1017
internal_flush(void)
 
1018
{
 
1019
        static int      last_reported_send_errno = 0;
 
1020
 
 
1021
        unsigned char *bufptr = PqSendBuffer;
 
1022
        unsigned char *bufend = PqSendBuffer + PqSendPointer;
 
1023
 
 
1024
        while (bufptr < bufend)
 
1025
        {
 
1026
                int                     r;
 
1027
 
 
1028
                r = secure_write(MyProcPort, bufptr, bufend - bufptr);
 
1029
 
 
1030
                if (r <= 0)
 
1031
                {
 
1032
                        if (errno == EINTR)
 
1033
                                continue;               /* Ok if we were interrupted */
 
1034
 
 
1035
                        /*
 
1036
                         * Careful: an ereport() that tries to write to the client
 
1037
                         * would cause recursion to here, leading to stack overflow
 
1038
                         * and core dump!  This message must go *only* to the
 
1039
                         * postmaster log.
 
1040
                         *
 
1041
                         * If a client disconnects while we're in the midst of output, we
 
1042
                         * might write quite a bit of data before we get to a safe
 
1043
                         * query abort point.  So, suppress duplicate log messages.
 
1044
                         */
 
1045
                        if (errno != last_reported_send_errno)
 
1046
                        {
 
1047
                                last_reported_send_errno = errno;
 
1048
                                ereport(COMMERROR,
 
1049
                                                (errcode_for_socket_access(),
 
1050
                                                 errmsg("could not send data to client: %m")));
 
1051
                        }
 
1052
 
 
1053
                        /*
 
1054
                         * We drop the buffered data anyway so that processing can
 
1055
                         * continue, even though we'll probably quit soon.
 
1056
                         */
 
1057
                        PqSendPointer = 0;
 
1058
                        return EOF;
 
1059
                }
 
1060
 
 
1061
                last_reported_send_errno = 0;   /* reset after any successful send */
 
1062
                bufptr += r;
 
1063
        }
 
1064
 
 
1065
        PqSendPointer = 0;
 
1066
        return 0;
 
1067
}
 
1068
 
 
1069
 
 
1070
/* --------------------------------
 
1071
 * Message-level I/O routines begin here.
 
1072
 *
 
1073
 * These routines understand about the old-style COPY OUT protocol.
 
1074
 * --------------------------------
 
1075
 */
 
1076
 
 
1077
 
 
1078
/* --------------------------------
 
1079
 *              pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
 
1080
 *
 
1081
 *              If msgtype is not '\0', it is a message type code to place before
 
1082
 *              the message body.  If msgtype is '\0', then the message has no type
 
1083
 *              code (this is only valid in pre-3.0 protocols).
 
1084
 *
 
1085
 *              len is the length of the message body data at *s.  In protocol 3.0
 
1086
 *              and later, a message length word (equal to len+4 because it counts
 
1087
 *              itself too) is inserted by this routine.
 
1088
 *
 
1089
 *              All normal messages are suppressed while old-style COPY OUT is in
 
1090
 *              progress.  (In practice only a few notice messages might get emitted
 
1091
 *              then; dropping them is annoying, but at least they will still appear
 
1092
 *              in the postmaster log.)
 
1093
 *
 
1094
 *              We also suppress messages generated while pqcomm.c is busy.  This
 
1095
 *              avoids any possibility of messages being inserted within other
 
1096
 *              messages.  The only known trouble case arises if SIGQUIT occurs
 
1097
 *              during a pqcomm.c routine --- quickdie() will try to send a warning
 
1098
 *              message, and the most reasonable approach seems to be to drop it.
 
1099
 *
 
1100
 *              returns 0 if OK, EOF if trouble
 
1101
 * --------------------------------
 
1102
 */
 
1103
int
 
1104
pq_putmessage(char msgtype, const char *s, size_t len)
 
1105
{
 
1106
        if (DoingCopyOut || PqCommBusy)
 
1107
                return 0;
 
1108
        PqCommBusy = true;
 
1109
        if (msgtype)
 
1110
                if (internal_putbytes(&msgtype, 1))
 
1111
                        goto fail;
 
1112
        if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
 
1113
        {
 
1114
                uint32          n32;
 
1115
 
 
1116
                n32 = htonl((uint32) (len + 4));
 
1117
                if (internal_putbytes((char *) &n32, 4))
 
1118
                        goto fail;
 
1119
        }
 
1120
        if (internal_putbytes(s, len))
 
1121
                goto fail;
 
1122
        PqCommBusy = false;
 
1123
        return 0;
 
1124
 
 
1125
fail:
 
1126
        PqCommBusy = false;
 
1127
        return EOF;
 
1128
}
 
1129
 
 
1130
/* --------------------------------
 
1131
 *              pq_startcopyout - inform libpq that an old-style COPY OUT transfer
 
1132
 *                      is beginning
 
1133
 * --------------------------------
 
1134
 */
 
1135
void
 
1136
pq_startcopyout(void)
 
1137
{
 
1138
        DoingCopyOut = true;
 
1139
}
 
1140
 
 
1141
/* --------------------------------
 
1142
 *              pq_endcopyout   - end an old-style COPY OUT transfer
 
1143
 *
 
1144
 *              If errorAbort is indicated, we are aborting a COPY OUT due to an error,
 
1145
 *              and must send a terminator line.  Since a partial data line might have
 
1146
 *              been emitted, send a couple of newlines first (the first one could
 
1147
 *              get absorbed by a backslash...)  Note that old-style COPY OUT does
 
1148
 *              not allow binary transfers, so a textual terminator is always correct.
 
1149
 * --------------------------------
 
1150
 */
 
1151
void
 
1152
pq_endcopyout(bool errorAbort)
 
1153
{
 
1154
        if (!DoingCopyOut)
 
1155
                return;
 
1156
        if (errorAbort)
 
1157
                pq_putbytes("\n\n\\.\n", 5);
 
1158
        /* in non-error case, copy.c will have emitted the terminator line */
 
1159
        DoingCopyOut = false;
 
1160
}