~ubuntu-branches/ubuntu/maverick/evolution-data-server/maverick-proposed

« back to all changes in this revision

Viewing changes to libdb/examples_c/ex_repquote/ex_rq_net.c

  • Committer: Bazaar Package Importer
  • Author(s): Didier Roche
  • Date: 2010-05-17 17:02:06 UTC
  • mfrom: (1.1.79 upstream) (1.6.12 experimental)
  • Revision ID: james.westby@ubuntu.com-20100517170206-4ufr52vwrhh26yh0
Tags: 2.30.1-1ubuntu1
* Merge from debian experimental. Remaining change:
  (LP: #42199, #229669, #173703, #360344, #508494)
  + debian/control:
    - add Vcs-Bzr tag
    - don't use libgnome
    - Use Breaks instead of Conflicts against evolution 2.25 and earlier.
  + debian/evolution-data-server.install,
    debian/patches/45_libcamel_providers_version.patch:
    - use the upstream versioning, not a Debian-specific one 
  + debian/libedata-book1.2-dev.install, debian/libebackend-1.2-dev.install,
    debian/libcamel1.2-dev.install, debian/libedataserverui1.2-dev.install:
    - install html documentation
  + debian/rules:
    - don't build documentation it's shipped with the tarball

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*-
2
 
 * See the file LICENSE for redistribution information.
3
 
 *
4
 
 * Copyright (c) 2001-2002
5
 
 *      Sleepycat Software.  All rights reserved.
6
 
 *
7
 
 * $Id$
8
 
 */
9
 
 
10
 
#include <sys/types.h>
11
 
 
12
 
#include <netinet/in.h>
13
 
#include <sys/socket.h>
14
 
#include <sys/wait.h>
15
 
 
16
 
#include <assert.h>
17
 
#include <errno.h>
18
 
#include <netdb.h>
19
 
#include <pthread.h>
20
 
#include <signal.h>
21
 
#include <stdio.h>
22
 
#include <stdlib.h>
23
 
#include <string.h>
24
 
#include <unistd.h>
25
 
 
26
 
#include <db.h>
27
 
#include <dbinc/queue.h>                /* !!!: for the LIST_XXX macros. */
28
 
 
29
 
#include "ex_repquote.h"
30
 
 
31
 
int machtab_add __P((machtab_t *, int, u_int32_t, int, int *));
32
 
ssize_t readn __P((int, void *, size_t));
33
 
 
34
 
/*
35
 
 * This file defines the communication infrastructure for the ex_repquote
36
 
 * sample application.
37
 
 *
38
 
 * This application uses TCP/IP for its communication.  In an N-site
39
 
 * replication group, this means that there are N * N communication
40
 
 * channels so that every site can communicate with every other site
41
 
 * (this allows elections to be held when the master fails).  We do
42
 
 * not require that anyone know about all sites when the application
43
 
 * starts up.  In order to communicate, the application should know
44
 
 * about someone, else it has no idea how to ever get in the game.
45
 
 *
46
 
 * Communication is handled via a number of different threads.  These
47
 
 * thread functions are implemented in rep_util.c  In this file, we
48
 
 * define the data structures that maintain the state that describes
49
 
 * the comm infrastructure, the functions that manipulates this state
50
 
 * and the routines used to actually send and receive data over the
51
 
 * sockets.
52
 
 */
53
 
 
54
 
/*
55
 
 * The communication infrastructure is represented by a machine table,
56
 
 * machtab_t, which is essentially a mutex-protected linked list of members
57
 
 * of the group.  The machtab also contains the parameters that are needed
58
 
 * to call for an election.  We hardwire values for these parameters in the
59
 
 * init function, but these could be set via some configuration setup in a
60
 
 * real application.  We reserve the machine-id 1 to refer to ourselves and
61
 
 * make the machine-id 0 be invalid.
62
 
 */
63
 
 
64
 
#define MACHID_INVALID  0
65
 
#define MACHID_SELF     1
66
 
 
67
 
struct __machtab {
68
 
        LIST_HEAD(__machlist, __member) machlist;
69
 
        int nextid;
70
 
        pthread_mutex_t mtmutex;
71
 
        u_int32_t timeout_time;
72
 
        int current;
73
 
        int max;
74
 
        int nsites;
75
 
        int priority;
76
 
};
77
 
 
78
 
/* Data structure that describes each entry in the machtab. */
79
 
struct __member {
80
 
        u_int32_t hostaddr;     /* Host IP address. */
81
 
        int port;               /* Port number. */
82
 
        int eid;                /* Application-specific machine id. */
83
 
        int fd;                 /* File descriptor for the socket. */
84
 
        LIST_ENTRY(__member) links;
85
 
                                /* For linked list of all members we know of. */
86
 
};
87
 
 
88
 
static int quote_send_broadcast __P((machtab_t *,
89
 
    const DBT *, const DBT *, u_int32_t));
90
 
static int quote_send_one __P((const DBT *, const DBT *, int, u_int32_t));
91
 
 
92
 
/*
93
 
 * machtab_init --
94
 
 *      Initialize the machine ID table.
95
 
 * XXX Right now we treat the number of sites as the maximum
96
 
 * number we've ever had on the list at one time.  We probably
97
 
 * want to make that smarter.
98
 
 */
99
 
int
100
 
machtab_init(machtabp, pri, nsites)
101
 
        machtab_t **machtabp;
102
 
        int pri, nsites;
103
 
{
104
 
        int ret;
105
 
        machtab_t *machtab;
106
 
 
107
 
        if ((machtab = malloc(sizeof(machtab_t))) == NULL)
108
 
                return (ENOMEM);
109
 
 
110
 
        LIST_INIT(&machtab->machlist);
111
 
 
112
 
        /* Reserve eid's 0 and 1. */
113
 
        machtab->nextid = 2;
114
 
        machtab->timeout_time = 2 * 1000000;            /* 2 seconds. */
115
 
        machtab->current = machtab->max = 0;
116
 
        machtab->priority = pri;
117
 
        machtab->nsites = nsites;
118
 
 
119
 
        ret = pthread_mutex_init(&machtab->mtmutex, NULL);
120
 
 
121
 
        *machtabp = machtab;
122
 
 
123
 
        return (ret);
124
 
}
125
 
 
126
 
/*
127
 
 * machtab_add --
128
 
 *      Add a file descriptor to the table of machines, returning
129
 
 *  a new machine ID.
130
 
 */
131
 
int
132
 
machtab_add(machtab, fd, hostaddr, port, idp)
133
 
        machtab_t *machtab;
134
 
        int fd;
135
 
        u_int32_t hostaddr;
136
 
        int port, *idp;
137
 
{
138
 
        int ret;
139
 
        member_t *m, *member;
140
 
 
141
 
        if ((member = malloc(sizeof(member_t))) == NULL)
142
 
                return (ENOMEM);
143
 
 
144
 
        member->fd = fd;
145
 
        member->hostaddr = hostaddr;
146
 
        member->port = port;
147
 
 
148
 
        if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0)
149
 
                return (ret);
150
 
 
151
 
        for (m = LIST_FIRST(&machtab->machlist);
152
 
            m != NULL; m = LIST_NEXT(m, links))
153
 
                if (m->hostaddr == hostaddr && m->port == port)
154
 
                        break;
155
 
 
156
 
        if (m == NULL) {
157
 
                member->eid = machtab->nextid++;
158
 
                LIST_INSERT_HEAD(&machtab->machlist, member, links);
159
 
        } else
160
 
                member->eid = m->eid;
161
 
 
162
 
        ret = pthread_mutex_unlock(&machtab->mtmutex);
163
 
 
164
 
        if (idp != NULL)
165
 
                *idp = member->eid;
166
 
 
167
 
        if (m == NULL) {
168
 
                if (++machtab->current > machtab->max)
169
 
                        machtab->max = machtab->current;
170
 
        } else {
171
 
                free(member);
172
 
                ret = EEXIST;
173
 
        }
174
 
        return (ret);
175
 
}
176
 
 
177
 
/*
178
 
 * machtab_getinfo --
179
 
 *      Return host and port information for a particular machine id.
180
 
 */
181
 
int
182
 
machtab_getinfo(machtab, eid, hostp, portp)
183
 
        machtab_t *machtab;
184
 
        int eid;
185
 
        u_int32_t *hostp;
186
 
        int *portp;
187
 
{
188
 
        int ret;
189
 
        member_t *member;
190
 
 
191
 
        if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0)
192
 
                return (ret);
193
 
 
194
 
        for (member = LIST_FIRST(&machtab->machlist);
195
 
            member != NULL;
196
 
            member = LIST_NEXT(member, links))
197
 
                if (member->eid == eid) {
198
 
                        *hostp = member->hostaddr;
199
 
                        *portp = member->port;
200
 
                        break;
201
 
                }
202
 
 
203
 
        if ((ret = pthread_mutex_unlock(&machtab->mtmutex)) != 0)
204
 
                return (ret);
205
 
 
206
 
        return (member != NULL ? 0 : EINVAL);
207
 
}
208
 
 
209
 
/*
210
 
 * machtab_rem --
211
 
 *      Remove a mapping from the table of machines.  Lock indicates
212
 
 * whether we need to lock the machtab or not (0 indicates we do not
213
 
 * need to lock; non-zero indicates that we do need to lock).
214
 
 */
215
 
int
216
 
machtab_rem(machtab, eid, lock)
217
 
        machtab_t *machtab;
218
 
        int eid;
219
 
        int lock;
220
 
{
221
 
        int found, ret;
222
 
        member_t *member;
223
 
 
224
 
        ret = 0;
225
 
        if (lock && (ret = pthread_mutex_lock(&machtab->mtmutex)) != 0)
226
 
                return (ret);
227
 
 
228
 
        for (found = 0, member = LIST_FIRST(&machtab->machlist);
229
 
            member != NULL;
230
 
            member = LIST_NEXT(member, links))
231
 
                if (member->eid == eid) {
232
 
                        found = 1;
233
 
                        LIST_REMOVE(member, links);
234
 
                        (void)close(member->fd);
235
 
                        free(member);
236
 
                        machtab->current--;
237
 
                        break;
238
 
                }
239
 
 
240
 
        if (LIST_FIRST(&machtab->machlist) == NULL)
241
 
                machtab->nextid = 2;
242
 
 
243
 
        if (lock)
244
 
                ret = pthread_mutex_unlock(&machtab->mtmutex);
245
 
 
246
 
        return (ret);
247
 
}
248
 
 
249
 
void
250
 
machtab_parm(machtab, nump, prip, timeoutp)
251
 
        machtab_t *machtab;
252
 
        int *nump, *prip;
253
 
        u_int32_t *timeoutp;
254
 
{
255
 
        if (machtab->nsites == 0)
256
 
                *nump = machtab->max;
257
 
        else
258
 
                *nump = machtab->nsites;
259
 
        *prip = machtab->priority;
260
 
        *timeoutp = machtab->timeout_time;
261
 
}
262
 
 
263
 
/*
264
 
 * listen_socket_init --
265
 
 *      Initialize a socket for listening on the specified port.  Returns
266
 
 *      a file descriptor for the socket, ready for an accept() call
267
 
 *      in a thread that we're happy to let block.
268
 
 */
269
 
int
270
 
listen_socket_init(progname, port)
271
 
        const char *progname;
272
 
        int port;
273
 
{
274
 
        int s;
275
 
        struct protoent *proto;
276
 
        struct sockaddr_in si;
277
 
 
278
 
        if ((proto = getprotobyname("tcp")) == NULL)
279
 
                return (-1);
280
 
 
281
 
        if ((s = socket(AF_INET, SOCK_STREAM, proto->p_proto)) < 0)
282
 
                return (-1);
283
 
 
284
 
        memset(&si, 0, sizeof(si));
285
 
        si.sin_family = AF_INET;
286
 
        si.sin_addr.s_addr = htonl(INADDR_ANY);
287
 
        si.sin_port = htons(port);
288
 
 
289
 
        if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0)
290
 
                goto err;
291
 
 
292
 
        if (listen(s, 5) != 0)
293
 
                goto err;
294
 
 
295
 
        return (s);
296
 
 
297
 
err:    fprintf(stderr, "%s: %s", progname, strerror(errno));
298
 
        close (s);
299
 
        return (-1);
300
 
}
301
 
 
302
 
/*
303
 
 * listen_socket_accept --
304
 
 *      Accept a connection on a socket.  This is essentially just a wrapper
305
 
 *      for accept(3).
306
 
 */
307
 
int
308
 
listen_socket_accept(machtab, progname, s, eidp)
309
 
        machtab_t *machtab;
310
 
        const char *progname;
311
 
        int s, *eidp;
312
 
{
313
 
        struct sockaddr_in si;
314
 
        int si_len;
315
 
        int host, ns, port, ret;
316
 
 
317
 
        COMPQUIET(progname, NULL);
318
 
 
319
 
wait:   memset(&si, 0, sizeof(si));
320
 
        si_len = sizeof(si);
321
 
        ns = accept(s, (struct sockaddr *)&si, &si_len);
322
 
        host = ntohl(si.sin_addr.s_addr);
323
 
        port = ntohs(si.sin_port);
324
 
        ret = machtab_add(machtab, ns, host, port, eidp);
325
 
        if (ret == EEXIST) {
326
 
                close(ns);
327
 
                goto wait;
328
 
        } else if (ret != 0)
329
 
                goto err;
330
 
 
331
 
        return (ns);
332
 
 
333
 
err:    close(ns);
334
 
        return (-1);
335
 
}
336
 
 
337
 
/*
338
 
 * get_accepted_socket --
339
 
 *      Listen on the specified port, and return a file descriptor
340
 
 *      when we have accepted a connection on it.
341
 
 */
342
 
int
343
 
get_accepted_socket(progname, port)
344
 
        const char *progname;
345
 
        int port;
346
 
{
347
 
        struct protoent *proto;
348
 
        struct sockaddr_in si;
349
 
        int si_len;
350
 
        int s, ns;
351
 
 
352
 
        if ((proto = getprotobyname("tcp")) == NULL)
353
 
                return (-1);
354
 
 
355
 
        if ((s = socket(AF_INET, SOCK_STREAM, proto->p_proto)) < 0)
356
 
                return (-1);
357
 
 
358
 
        memset(&si, 0, sizeof(si));
359
 
        si.sin_family = AF_INET;
360
 
        si.sin_addr.s_addr = htonl(INADDR_ANY);
361
 
        si.sin_port = htons(port);
362
 
 
363
 
        if (bind(s, (struct sockaddr *)&si, sizeof(si)) != 0)
364
 
                goto err;
365
 
 
366
 
        if (listen(s, 5) != 0)
367
 
                goto err;
368
 
 
369
 
        memset(&si, 0, sizeof(si));
370
 
        si_len = sizeof(si);
371
 
        ns = accept(s, (struct sockaddr *)&si, &si_len);
372
 
 
373
 
        return (ns);
374
 
 
375
 
err:    fprintf(stderr, "%s: %s", progname, strerror(errno));
376
 
        close (s);
377
 
        return (-1);
378
 
}
379
 
 
380
 
/*
381
 
 * get_connected_socket --
382
 
 *      Connect to the specified port of the specified remote machine,
383
 
 *      and return a file descriptor when we have accepted a connection on it.
384
 
 *      Add this connection to the machtab.  If we already have a connection
385
 
 *      open to this machine, then don't create another one, return the eid
386
 
 *      of the connection (in *eidp) and set is_open to 1.  Return 0.
387
 
 */
388
 
int
389
 
get_connected_socket(machtab, progname, remotehost, port, is_open, eidp)
390
 
        machtab_t *machtab;
391
 
        const char *progname, *remotehost;
392
 
        int port, *is_open, *eidp;
393
 
{
394
 
        int ret, s;
395
 
        struct hostent *hp;
396
 
        struct protoent *proto;
397
 
        struct sockaddr_in si;
398
 
        u_int32_t addr;
399
 
 
400
 
        *is_open = 0;
401
 
 
402
 
        if ((proto = getprotobyname("tcp")) == NULL)
403
 
                return (-1);
404
 
 
405
 
        if ((hp = gethostbyname(remotehost)) == NULL) {
406
 
                fprintf(stderr, "%s: host not found: %s\n", progname,
407
 
                    strerror(errno));
408
 
                return (-1);
409
 
        }
410
 
 
411
 
        if ((s = socket(AF_INET, SOCK_STREAM, proto->p_proto)) < 0)
412
 
                return (-1);
413
 
        memset(&si, 0, sizeof(si));
414
 
        memcpy((char *)&si.sin_addr, hp->h_addr, hp->h_length);
415
 
        addr = ntohl(si.sin_addr.s_addr);
416
 
        ret = machtab_add(machtab, s, addr, port, eidp);
417
 
        if (ret == EEXIST) {
418
 
                *is_open = 1;
419
 
                close(s);
420
 
                return (0);
421
 
        } else if (ret != 0) {
422
 
                close (s);
423
 
                return (-1);
424
 
        }
425
 
 
426
 
        si.sin_family = AF_INET;
427
 
        si.sin_port = htons(port);
428
 
        if (connect(s, (struct sockaddr *)&si, sizeof(si)) < 0) {
429
 
                fprintf(stderr, "%s: connection failed: %s",
430
 
                    progname, strerror(errno));
431
 
                (void)machtab_rem(machtab, *eidp, 1);
432
 
                return (-1);
433
 
        }
434
 
 
435
 
        return (s);
436
 
}
437
 
 
438
 
/*
439
 
 * get_next_message --
440
 
 *      Read a single message from the specified file descriptor, and
441
 
 * return it in the format used by rep functions (two DBTs and a type).
442
 
 *
443
 
 * This function is called in a loop by both clients and masters, and
444
 
 * the resulting DBTs are manually dispatched to DB_ENV->rep_process_message().
445
 
 */
446
 
int
447
 
get_next_message(fd, rec, control)
448
 
        int fd;
449
 
        DBT *rec, *control;
450
 
{
451
 
        size_t nr;
452
 
        u_int32_t rsize, csize;
453
 
        u_int8_t *recbuf, *controlbuf;
454
 
 
455
 
        /*
456
 
         * The protocol we use on the wire is dead simple:
457
 
         *
458
 
         *      4 bytes         - rec->size
459
 
         *      (# read above)  - rec->data
460
 
         *      4 bytes         - control->size
461
 
         *      (# read above)  - control->data
462
 
         */
463
 
 
464
 
        /* Read rec->size. */
465
 
        nr = readn(fd, &rsize, 4);
466
 
        if (nr != 4)
467
 
                return (1);
468
 
 
469
 
        /* Read the record itself. */
470
 
        if (rsize > 0) {
471
 
                if (rec->size < rsize)
472
 
                        rec->data = realloc(rec->data, rsize);
473
 
                recbuf = rec->data;
474
 
                nr = readn(fd, recbuf, rsize);
475
 
        } else {
476
 
                if (rec->data != NULL)
477
 
                        free(rec->data);
478
 
                rec->data = NULL;
479
 
        }
480
 
        rec->size = rsize;
481
 
 
482
 
        /* Read control->size. */
483
 
        nr = readn(fd, &csize, 4);
484
 
        if (nr != 4)
485
 
                return (1);
486
 
 
487
 
        /* Read the control struct itself. */
488
 
        if (csize > 0) {
489
 
                controlbuf = control->data;
490
 
                if (control->size < csize)
491
 
                        controlbuf = realloc(controlbuf, csize);
492
 
                nr = readn(fd, controlbuf, csize);
493
 
                if (nr != csize)
494
 
                        return (1);
495
 
        } else {
496
 
                if (control->data != NULL)
497
 
                        free(control->data);
498
 
                controlbuf = NULL;
499
 
        }
500
 
        control->data = controlbuf;
501
 
        control->size = csize;
502
 
 
503
 
        return (0);
504
 
}
505
 
 
506
 
/*
507
 
 * readn --
508
 
 *     Read a full n characters from a file descriptor, unless we get an error
509
 
 * or EOF.
510
 
 */
511
 
ssize_t
512
 
readn(fd, vptr, n)
513
 
        int fd;
514
 
        void *vptr;
515
 
        size_t n;
516
 
{
517
 
        size_t nleft;
518
 
        ssize_t nread;
519
 
        char *ptr;
520
 
 
521
 
        ptr = vptr;
522
 
        nleft = n;
523
 
        while (nleft > 0) {
524
 
                if ( (nread = read(fd, ptr, nleft)) < 0) {
525
 
                        /*
526
 
                         * Call read() again on interrupted system call;
527
 
                         * on other errors, bail.
528
 
                         */
529
 
                        if (errno == EINTR)
530
 
                                nread = 0;
531
 
                        else
532
 
                                return (-1);
533
 
                } else if (nread == 0)
534
 
                        break;  /* EOF */
535
 
 
536
 
                nleft -= nread;
537
 
                ptr   += nread;
538
 
        }
539
 
 
540
 
        return (n - nleft);
541
 
}
542
 
 
543
 
/*
544
 
 * quote_send --
545
 
 * The f_send function for DB_ENV->set_rep_transport.
546
 
 */
547
 
int
548
 
quote_send(dbenv, control, rec, eid, flags)
549
 
        DB_ENV *dbenv;
550
 
        const DBT *control, *rec;
551
 
        int eid;
552
 
        u_int32_t flags;
553
 
{
554
 
        int fd, n, ret, t_ret;
555
 
        machtab_t *machtab;
556
 
        member_t *m;
557
 
 
558
 
        machtab = (machtab_t *)dbenv->app_private;
559
 
 
560
 
        if (eid == DB_EID_BROADCAST) {
561
 
                /*
562
 
                 * Right now, we do not require successful transmission.
563
 
                 * I'd like to move this requiring at least one successful
564
 
                 * transmission on PERMANENT requests.
565
 
                 */
566
 
                n = quote_send_broadcast(machtab, rec, control, flags);
567
 
                if (n < 0 /*|| (n == 0 && LF_ISSET(DB_REP_PERMANENT))*/)
568
 
                        return (DB_REP_UNAVAIL);
569
 
                return (0);
570
 
        }
571
 
 
572
 
        if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0)
573
 
                return (ret);
574
 
 
575
 
        fd = 0;
576
 
        for (m = LIST_FIRST(&machtab->machlist); m != NULL;
577
 
            m = LIST_NEXT(m, links)) {
578
 
                if (m->eid == eid) {
579
 
                        fd = m->fd;
580
 
                        break;
581
 
                }
582
 
        }
583
 
 
584
 
        if (fd == 0) {
585
 
                dbenv->err(dbenv, DB_REP_UNAVAIL,
586
 
                    "quote_send: cannot find machine ID %d", eid);
587
 
                return (DB_REP_UNAVAIL);
588
 
        }
589
 
 
590
 
        ret = quote_send_one(rec, control, fd, flags);
591
 
 
592
 
        if ((t_ret = (pthread_mutex_unlock(&machtab->mtmutex))) != 0 &&
593
 
            ret == 0)
594
 
                ret = t_ret;
595
 
 
596
 
        return (ret);
597
 
}
598
 
 
599
 
/*
600
 
 * quote_send_broadcast --
601
 
 *      Send a message to everybody.
602
 
 * Returns the number of sites to which this message was successfully
603
 
 * communicated.  A -1 indicates a fatal error.
604
 
 */
605
 
static int
606
 
quote_send_broadcast(machtab, rec, control, flags)
607
 
        machtab_t *machtab;
608
 
        const DBT *rec, *control;
609
 
        u_int32_t flags;
610
 
{
611
 
        int ret, sent;
612
 
        member_t *m, *next;
613
 
 
614
 
        if ((ret = pthread_mutex_lock(&machtab->mtmutex)) != 0)
615
 
                return (0);
616
 
 
617
 
        sent = 0;
618
 
        for (m = LIST_FIRST(&machtab->machlist); m != NULL; m = next) {
619
 
                next = LIST_NEXT(m, links);
620
 
                if ((ret = quote_send_one(rec, control, m->fd, flags)) != 0) {
621
 
                        (void)machtab_rem(machtab, m->eid, 0);
622
 
                } else
623
 
                        sent++;
624
 
        }
625
 
 
626
 
        if (pthread_mutex_unlock(&machtab->mtmutex) != 0)
627
 
                return (-1);
628
 
 
629
 
        return (sent);
630
 
}
631
 
 
632
 
/*
633
 
 * quote_send_one --
634
 
 *      Send a message to a single machine, given that machine's file
635
 
 * descriptor.
636
 
 *
637
 
 * !!!
638
 
 * Note that the machtab mutex should be held through this call.
639
 
 * It doubles as a synchronizer to make sure that two threads don't
640
 
 * intersperse writes that are part of two single messages.
641
 
 */
642
 
static int
643
 
quote_send_one(rec, control, fd, flags)
644
 
        const DBT *rec, *control;
645
 
        int fd;
646
 
        u_int32_t flags;
647
 
 
648
 
{
649
 
        int retry;
650
 
        ssize_t bytes_left, nw;
651
 
        u_int8_t *wp;
652
 
 
653
 
        COMPQUIET(flags, 0);
654
 
 
655
 
        /*
656
 
         * The protocol is simply: write rec->size, write rec->data,
657
 
         * write control->size, write control->data.
658
 
         */
659
 
        nw = write(fd, &rec->size, 4);
660
 
        if (nw != 4)
661
 
                return (DB_REP_UNAVAIL);
662
 
 
663
 
        if (rec->size > 0) {
664
 
                nw = write(fd, rec->data, rec->size);
665
 
                if (nw < 0)
666
 
                        return (DB_REP_UNAVAIL);
667
 
                if (nw != (ssize_t)rec->size) {
668
 
                        /* Try a couple of times to finish the write. */
669
 
                        wp = (u_int8_t *)rec->data + nw;
670
 
                        bytes_left = rec->size - nw;
671
 
                        for (retry = 0; bytes_left > 0 && retry < 3; retry++) {
672
 
                                nw = write(fd, wp, bytes_left);
673
 
                                if (nw < 0)
674
 
                                        return (DB_REP_UNAVAIL);
675
 
                                bytes_left -= nw;
676
 
                                wp += nw;
677
 
                        }
678
 
                        if (bytes_left > 0)
679
 
                                return (DB_REP_UNAVAIL);
680
 
                }
681
 
        }
682
 
 
683
 
        nw = write(fd, &control->size, 4);
684
 
        if (nw != 4)
685
 
                return (DB_REP_UNAVAIL);
686
 
        if (control->size > 0) {
687
 
                nw = write(fd, control->data, control->size);
688
 
                if (nw != (ssize_t)control->size)
689
 
                        return (DB_REP_UNAVAIL);
690
 
        }
691
 
        return (0);
692
 
}