~ubuntu-branches/ubuntu/precise/corosync/precise-proposed

« back to all changes in this revision

Viewing changes to exec/coroipcs.c

  • Committer: Bazaar Package Importer
  • Author(s): Ante Karamatic
  • Date: 2009-08-21 09:29:56 UTC
  • mfrom: (1.1.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20090821092956-w9qxxxx3zeoh8dem
Tags: 1.0.0-4ubuntu2
* debian/control:
  - 'Ubuntu Developers' instead of 'Ubuntu Core Developers'
    as maintainer
  - Bump debhelper dependecy to 7

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright (c) 2006-2009 Red Hat, Inc.
 
3
 *
 
4
 * All rights reserved.
 
5
 *
 
6
 * Author: Steven Dake (sdake@redhat.com)
 
7
 *
 
8
 * This software licensed under BSD license, the text of which follows:
 
9
 *
 
10
 * Redistribution and use in source and binary forms, with or without
 
11
 * modification, are permitted provided that the following conditions are met:
 
12
 *
 
13
 * - Redistributions of source code must retain the above copyright notice,
 
14
 *   this list of conditions and the following disclaimer.
 
15
 * - Redistributions in binary form must reproduce the above copyright notice,
 
16
 *   this list of conditions and the following disclaimer in the documentation
 
17
 *   and/or other materials provided with the distribution.
 
18
 * - Neither the name of the MontaVista Software, Inc. nor the names of its
 
19
 *   contributors may be used to endorse or promote products derived from this
 
20
 *   software without specific prior written permission.
 
21
 *
 
22
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 
23
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
24
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
25
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 
26
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 
27
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 
28
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 
29
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 
30
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 
31
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 
32
 * THE POSSIBILITY OF SUCH DAMAGE.
 
33
 */
 
34
 
 
35
#include <config.h>
 
36
 
 
37
#ifndef _GNU_SOURCE
 
38
#define _GNU_SOURCE 1
 
39
#endif
 
40
#include <pthread.h>
 
41
#include <assert.h>
 
42
#include <pwd.h>
 
43
#include <grp.h>
 
44
#include <sys/types.h>
 
45
#include <sys/poll.h>
 
46
#include <sys/uio.h>
 
47
#include <sys/mman.h>
 
48
#include <sys/socket.h>
 
49
#include <sys/un.h>
 
50
#include <sys/time.h>
 
51
#include <sys/resource.h>
 
52
#include <sys/wait.h>
 
53
#include <sys/stat.h>
 
54
#include <netinet/in.h>
 
55
#include <arpa/inet.h>
 
56
#include <unistd.h>
 
57
#include <fcntl.h>
 
58
#include <stdlib.h>
 
59
#include <stdio.h>
 
60
#include <errno.h>
 
61
#include <signal.h>
 
62
#include <sched.h>
 
63
#include <time.h>
 
64
#if defined(HAVE_GETPEERUCRED)
 
65
#include <ucred.h>
 
66
#endif
 
67
#include <string.h>
 
68
 
 
69
#include <sys/shm.h>
 
70
#include <sys/sem.h>
 
71
#include <corosync/corotypes.h>
 
72
#include <corosync/list.h>
 
73
 
 
74
#include <corosync/coroipc_types.h>
 
75
#include <corosync/coroipcs.h>
 
76
#include <corosync/coroipc_ipc.h>
 
77
 
 
78
#ifndef MSG_NOSIGNAL
 
79
#define MSG_NOSIGNAL 0
 
80
#endif
 
81
 
 
82
#define SERVER_BACKLOG 5
 
83
 
 
84
#define MSG_SEND_LOCKED         0
 
85
#define MSG_SEND_UNLOCKED       1
 
86
 
 
87
static struct coroipcs_init_state *api;
 
88
 
 
89
DECLARE_LIST_INIT (conn_info_list_head);
 
90
 
 
91
struct outq_item {
 
92
        void *msg;
 
93
        size_t mlen;
 
94
        struct list_head list;
 
95
};
 
96
 
 
97
struct zcb_mapped {
 
98
        struct list_head list;
 
99
        void *addr;
 
100
        size_t size;
 
101
};
 
102
 
 
103
#if defined(_SEM_SEMUN_UNDEFINED)
 
104
union semun {
 
105
        int val;
 
106
        struct semid_ds *buf;
 
107
        unsigned short int *array;
 
108
        struct seminfo *__buf;
 
109
};
 
110
#endif
 
111
 
 
112
enum conn_state {
 
113
        CONN_STATE_THREAD_INACTIVE = 0,
 
114
        CONN_STATE_THREAD_ACTIVE = 1,
 
115
        CONN_STATE_THREAD_REQUEST_EXIT = 2,
 
116
        CONN_STATE_THREAD_DESTROYED = 3,
 
117
        CONN_STATE_LIB_EXIT_CALLED = 4,
 
118
        CONN_STATE_DISCONNECT_INACTIVE = 5
 
119
};
 
120
 
 
121
struct conn_info {
 
122
        int fd;
 
123
        pthread_t thread;
 
124
        pthread_attr_t thread_attr;
 
125
        unsigned int service;
 
126
        enum conn_state state;
 
127
        int notify_flow_control_enabled;
 
128
        int refcount;
 
129
        key_t shmkey;
 
130
        key_t semkey;
 
131
        int semid;
 
132
        unsigned int pending_semops;
 
133
        pthread_mutex_t mutex;
 
134
        struct control_buffer *control_buffer;
 
135
        char *request_buffer;
 
136
        char *response_buffer;
 
137
        char *dispatch_buffer;
 
138
        size_t control_size;
 
139
        size_t request_size;
 
140
        size_t response_size;
 
141
        size_t dispatch_size;
 
142
        struct list_head outq_head;
 
143
        void *private_data;
 
144
        struct list_head list;
 
145
        char setup_msg[sizeof (mar_req_setup_t)];
 
146
        unsigned int setup_bytes_read;
 
147
        struct list_head zcb_mapped_list_head;
 
148
        char *sending_allowed_private_data[64];
 
149
};
 
150
 
 
151
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
 
152
 
 
153
static void outq_flush (struct conn_info *conn_info);
 
154
 
 
155
static int priv_change (struct conn_info *conn_info);
 
156
 
 
157
static void ipc_disconnect (struct conn_info *conn_info);
 
158
 
 
159
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 
160
                      int locked);
 
161
 
 
162
static int
 
163
memory_map (
 
164
        const char *path,
 
165
        size_t bytes,
 
166
        void **buf)
 
167
{
 
168
        int fd;
 
169
        void *addr_orig;
 
170
        void *addr;
 
171
        int res;
 
172
 
 
173
        fd = open (path, O_RDWR, 0600);
 
174
 
 
175
        unlink (path);
 
176
 
 
177
        res = ftruncate (fd, bytes);
 
178
 
 
179
        addr_orig = mmap (NULL, bytes, PROT_NONE,
 
180
                MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
 
181
 
 
182
        if (addr_orig == MAP_FAILED) {
 
183
                return (-1);
 
184
        }
 
185
 
 
186
        addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
 
187
                MAP_FIXED | MAP_SHARED, fd, 0);
 
188
 
 
189
        if (addr != addr_orig) {
 
190
                return (-1);
 
191
        }
 
192
#ifdef COROSYNC_BSD
 
193
        madvise(addr, bytes, MADV_NOSYNC);
 
194
#endif
 
195
 
 
196
        res = close (fd);
 
197
        if (res) {
 
198
                return (-1);
 
199
        }
 
200
        *buf = addr_orig;
 
201
        return (0);
 
202
}
 
203
 
 
204
static int
 
205
circular_memory_map (
 
206
        const char *path,
 
207
        size_t bytes,
 
208
        void **buf)
 
209
{
 
210
        int fd;
 
211
        void *addr_orig;
 
212
        void *addr;
 
213
        int res;
 
214
 
 
215
        fd = open (path, O_RDWR, 0600);
 
216
 
 
217
        unlink (path);
 
218
 
 
219
        res = ftruncate (fd, bytes);
 
220
 
 
221
        addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
 
222
                MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
 
223
 
 
224
        if (addr_orig == MAP_FAILED) {
 
225
                return (-1);
 
226
        }
 
227
 
 
228
        addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
 
229
                MAP_FIXED | MAP_SHARED, fd, 0);
 
230
 
 
231
        if (addr != addr_orig) {
 
232
                return (-1);
 
233
        }
 
234
#ifdef COROSYNC_BSD
 
235
        madvise(addr_orig, bytes, MADV_NOSYNC);
 
236
#endif
 
237
 
 
238
        addr = mmap (((char *)addr_orig) + bytes,
 
239
                  bytes, PROT_READ | PROT_WRITE,
 
240
                  MAP_FIXED | MAP_SHARED, fd, 0);
 
241
#ifdef COROSYNC_BSD
 
242
        madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
 
243
#endif
 
244
 
 
245
        res = close (fd);
 
246
        if (res) {
 
247
                return (-1);
 
248
        }
 
249
        *buf = addr_orig;
 
250
        return (0);
 
251
}
 
252
 
 
253
static inline int
 
254
circular_memory_unmap (void *buf, size_t bytes)
 
255
{
 
256
        int res;
 
257
 
 
258
        res = munmap (buf, bytes << 1);
 
259
 
 
260
        return (res);
 
261
}
 
262
 
 
263
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
 
264
{
 
265
        unsigned int res;
 
266
 
 
267
        res = munmap (zcb_mapped->addr, zcb_mapped->size);
 
268
        list_del (&zcb_mapped->list);
 
269
        free (zcb_mapped);
 
270
        return (res);
 
271
}
 
272
 
 
273
static inline int zcb_by_addr_free (struct conn_info *conn_info, void *addr)
 
274
{
 
275
        struct list_head *list;
 
276
        struct zcb_mapped *zcb_mapped;
 
277
        unsigned int res = 0;
 
278
 
 
279
        for (list = conn_info->zcb_mapped_list_head.next;
 
280
                list != &conn_info->zcb_mapped_list_head; list = list->next) {
 
281
 
 
282
                zcb_mapped = list_entry (list, struct zcb_mapped, list);
 
283
 
 
284
                if (zcb_mapped->addr == addr) {
 
285
                        res = zcb_free (zcb_mapped);
 
286
                        break;
 
287
                }
 
288
 
 
289
        }
 
290
        return (res);
 
291
}
 
292
 
 
293
static inline int zcb_all_free (
 
294
        struct conn_info *conn_info)
 
295
{
 
296
        struct list_head *list;
 
297
        struct zcb_mapped *zcb_mapped;
 
298
 
 
299
        for (list = conn_info->zcb_mapped_list_head.next;
 
300
                list != &conn_info->zcb_mapped_list_head;) {
 
301
 
 
302
                zcb_mapped = list_entry (list, struct zcb_mapped, list);
 
303
 
 
304
                list = list->next;
 
305
 
 
306
                zcb_free (zcb_mapped);
 
307
        }
 
308
        return (0);
 
309
}
 
310
 
 
311
static inline int zcb_alloc (
 
312
        struct conn_info *conn_info,
 
313
        const char *path_to_file,
 
314
        size_t size,
 
315
        void **addr)
 
316
{
 
317
        struct zcb_mapped *zcb_mapped;
 
318
        unsigned int res;
 
319
 
 
320
        zcb_mapped = malloc (sizeof (struct zcb_mapped));
 
321
        if (zcb_mapped == NULL) {
 
322
                return (-1);
 
323
        }
 
324
 
 
325
        res = memory_map (
 
326
                path_to_file,
 
327
                size,
 
328
                addr);
 
329
        if (res == -1) {
 
330
                return (-1);
 
331
        }
 
332
 
 
333
        list_init (&zcb_mapped->list);
 
334
        zcb_mapped->addr = *addr;
 
335
        zcb_mapped->size = size;
 
336
        list_add_tail (&zcb_mapped->list, &conn_info->zcb_mapped_list_head);
 
337
        return (0);
 
338
}
 
339
 
 
340
static int ipc_thread_active (void *conn)
 
341
{
 
342
        struct conn_info *conn_info = (struct conn_info *)conn;
 
343
        int retval = 0;
 
344
 
 
345
        pthread_mutex_lock (&conn_info->mutex);
 
346
        if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
 
347
                retval = 1;
 
348
        }
 
349
        pthread_mutex_unlock (&conn_info->mutex);
 
350
        return (retval);
 
351
}
 
352
 
 
353
static int ipc_thread_exiting (void *conn)
 
354
{
 
355
        struct conn_info *conn_info = (struct conn_info *)conn;
 
356
        int retval = 1;
 
357
 
 
358
        pthread_mutex_lock (&conn_info->mutex);
 
359
        if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
 
360
                retval = 0;
 
361
        } else
 
362
        if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
 
363
                retval = 0;
 
364
        }
 
365
        pthread_mutex_unlock (&conn_info->mutex);
 
366
        return (retval);
 
367
}
 
368
 
 
369
/*
 
370
 * returns 0 if should be called again, -1 if finished
 
371
 */
 
372
static inline int conn_info_destroy (struct conn_info *conn_info)
 
373
{
 
374
        unsigned int res;
 
375
        void *retval;
 
376
 
 
377
        list_del (&conn_info->list);
 
378
        list_init (&conn_info->list);
 
379
 
 
380
        if (conn_info->state == CONN_STATE_THREAD_REQUEST_EXIT) {
 
381
                res = pthread_join (conn_info->thread, &retval);
 
382
                conn_info->state = CONN_STATE_THREAD_DESTROYED;
 
383
                return (0);
 
384
        }
 
385
 
 
386
        if (conn_info->state == CONN_STATE_THREAD_INACTIVE ||
 
387
                conn_info->state == CONN_STATE_DISCONNECT_INACTIVE) {
 
388
                list_del (&conn_info->list);
 
389
                close (conn_info->fd);
 
390
                api->free (conn_info);
 
391
                return (-1);
 
392
        }
 
393
 
 
394
        if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
 
395
                pthread_kill (conn_info->thread, SIGUSR1);
 
396
                return (0);
 
397
        }
 
398
 
 
399
        api->serialize_lock ();
 
400
        /*
 
401
         * Retry library exit function if busy
 
402
         */
 
403
        if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
 
404
                res = api->exit_fn_get (conn_info->service) (conn_info);
 
405
                if (res == -1) {
 
406
                        api->serialize_unlock ();
 
407
                        return (0);
 
408
                } else {
 
409
                        conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
 
410
                }
 
411
        }
 
412
 
 
413
        pthread_mutex_lock (&conn_info->mutex);
 
414
        if (conn_info->refcount > 0) {
 
415
                pthread_mutex_unlock (&conn_info->mutex);
 
416
                api->serialize_unlock ();
 
417
                return (0);
 
418
        }
 
419
        list_del (&conn_info->list);
 
420
        pthread_mutex_unlock (&conn_info->mutex);
 
421
 
 
422
        /*
 
423
         * Destroy shared memory segment and semaphore
 
424
         */
 
425
        res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
 
426
        res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
 
427
        res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
 
428
        semctl (conn_info->semid, 0, IPC_RMID);
 
429
 
 
430
        /*
 
431
         * Free allocated data needed to retry exiting library IPC connection
 
432
         */
 
433
        if (conn_info->private_data) {
 
434
                api->free (conn_info->private_data);
 
435
        }
 
436
        close (conn_info->fd);
 
437
        res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
 
438
        zcb_all_free (conn_info);
 
439
        api->free (conn_info);
 
440
        api->serialize_unlock ();
 
441
        return (-1);
 
442
}
 
443
 
 
444
union u {
 
445
        uint64_t server_addr;
 
446
        void *server_ptr;
 
447
};
 
448
 
 
449
static uint64_t void2serveraddr (void *server_ptr)
 
450
{
 
451
        union u u;
 
452
 
 
453
        u.server_ptr = server_ptr;
 
454
        return (u.server_addr);
 
455
}
 
456
 
 
457
static void *serveraddr2void (uint64_t server_addr)
 
458
{
 
459
        union u u;
 
460
 
 
461
        u.server_addr = server_addr;
 
462
        return (u.server_ptr);
 
463
};
 
464
 
 
465
static inline void zerocopy_operations_process (
 
466
        struct conn_info *conn_info,
 
467
        coroipc_request_header_t **header_out,
 
468
        unsigned int *new_message)
 
469
{
 
470
        coroipc_request_header_t *header;
 
471
 
 
472
        header = (coroipc_request_header_t *)conn_info->request_buffer;
 
473
        if (header->id == ZC_ALLOC_HEADER) {
 
474
                mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header;
 
475
                coroipc_response_header_t res_header;
 
476
                void *addr = NULL;
 
477
                struct coroipcs_zc_header *zc_header;
 
478
                unsigned int res;
 
479
 
 
480
                res = zcb_alloc (conn_info, hdr->path_to_file, hdr->map_size,
 
481
                        &addr);
 
482
 
 
483
                zc_header = (struct coroipcs_zc_header *)addr;
 
484
                zc_header->server_address = void2serveraddr(addr);
 
485
 
 
486
                res_header.size = sizeof (coroipc_response_header_t);
 
487
                res_header.id = 0;
 
488
                coroipcs_response_send (
 
489
                        conn_info, &res_header,
 
490
                        res_header.size);
 
491
                *new_message = 0;
 
492
                return;
 
493
        } else
 
494
        if (header->id == ZC_FREE_HEADER) {
 
495
                mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)header;
 
496
                coroipc_response_header_t res_header;
 
497
                void *addr = NULL;
 
498
 
 
499
                addr = serveraddr2void (hdr->server_address);
 
500
 
 
501
                zcb_by_addr_free (conn_info, addr);
 
502
 
 
503
                res_header.size = sizeof (coroipc_response_header_t);
 
504
                res_header.id = 0;
 
505
                coroipcs_response_send (
 
506
                        conn_info, &res_header,
 
507
                        res_header.size);
 
508
 
 
509
                *new_message = 0;
 
510
                return;
 
511
        } else
 
512
        if (header->id == ZC_EXECUTE_HEADER) {
 
513
                mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)header;
 
514
 
 
515
                header = (coroipc_request_header_t *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
 
516
        }
 
517
        *header_out = header;
 
518
        *new_message = 1;
 
519
}
 
520
 
 
521
static void *pthread_ipc_consumer (void *conn)
 
522
{
 
523
        struct conn_info *conn_info = (struct conn_info *)conn;
 
524
        struct sembuf sop;
 
525
        int res;
 
526
        coroipc_request_header_t *header;
 
527
        coroipc_response_header_t coroipc_response_header;
 
528
        int send_ok;
 
529
        unsigned int new_message;
 
530
 
 
531
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
 
532
        if (api->sched_policy != 0) {
 
533
                res = pthread_setschedparam (conn_info->thread,
 
534
                        api->sched_policy, api->sched_param);
 
535
        }
 
536
#endif
 
537
 
 
538
        for (;;) {
 
539
                sop.sem_num = 0;
 
540
                sop.sem_op = -1;
 
541
                sop.sem_flg = 0;
 
542
retry_semop:
 
543
                if (ipc_thread_active (conn_info) == 0) {
 
544
                        coroipcs_refcount_dec (conn_info);
 
545
                        pthread_exit (0);
 
546
                }
 
547
                res = semop (conn_info->semid, &sop, 1);
 
548
                if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
 
549
                        goto retry_semop;
 
550
                } else
 
551
                if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 
552
                        coroipcs_refcount_dec (conn_info);
 
553
                        pthread_exit (0);
 
554
                }
 
555
 
 
556
                zerocopy_operations_process (conn_info, &header, &new_message);
 
557
                /*
 
558
                 * There is no new message to process, continue for loop
 
559
                 */
 
560
                if (new_message == 0) {
 
561
                        continue;
 
562
                }
 
563
 
 
564
                coroipcs_refcount_inc (conn);
 
565
 
 
566
                send_ok = api->sending_allowed (conn_info->service,
 
567
                        header->id,
 
568
                        header,
 
569
                        conn_info->sending_allowed_private_data);
 
570
 
 
571
                if (send_ok) {
 
572
                        api->serialize_lock();
 
573
                        api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
 
574
                        api->serialize_unlock();
 
575
                } else {
 
576
                        /*
 
577
                         * Overload, tell library to retry
 
578
                         */
 
579
                        coroipc_response_header.size = sizeof (coroipc_response_header_t);
 
580
                        coroipc_response_header.id = 0;
 
581
                        coroipc_response_header.error = CS_ERR_TRY_AGAIN;
 
582
                        coroipcs_response_send (conn_info,
 
583
                                &coroipc_response_header,
 
584
                                sizeof (coroipc_response_header_t));
 
585
                }
 
586
 
 
587
                api->sending_allowed_release (conn_info->sending_allowed_private_data);
 
588
                coroipcs_refcount_dec (conn);
 
589
        }
 
590
        pthread_exit (0);
 
591
}
 
592
 
 
593
static int
 
594
req_setup_send (
 
595
        struct conn_info *conn_info,
 
596
        int error)
 
597
{
 
598
        mar_res_setup_t res_setup;
 
599
        unsigned int res;
 
600
 
 
601
        res_setup.error = error;
 
602
 
 
603
retry_send:
 
604
        res = send (conn_info->fd, &res_setup, sizeof (mar_res_setup_t), MSG_WAITALL);
 
605
        if (res == -1 && errno == EINTR) {
 
606
                goto retry_send;
 
607
        } else
 
608
        if (res == -1 && errno == EAGAIN) {
 
609
                goto retry_send;
 
610
        }
 
611
        return (0);
 
612
}
 
613
 
 
614
static int
 
615
req_setup_recv (
 
616
        struct conn_info *conn_info)
 
617
{
 
618
        int res;
 
619
        struct msghdr msg_recv;
 
620
        struct iovec iov_recv;
 
621
        int authenticated = 0;
 
622
 
 
623
#ifdef COROSYNC_LINUX
 
624
        struct cmsghdr *cmsg;
 
625
        char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
 
626
        int off = 0;
 
627
        int on = 1;
 
628
        struct ucred *cred;
 
629
#endif
 
630
 
 
631
        msg_recv.msg_iov = &iov_recv;
 
632
        msg_recv.msg_iovlen = 1;
 
633
        msg_recv.msg_name = 0;
 
634
        msg_recv.msg_namelen = 0;
 
635
#ifdef COROSYNC_LINUX
 
636
        msg_recv.msg_control = (void *)cmsg_cred;
 
637
        msg_recv.msg_controllen = sizeof (cmsg_cred);
 
638
#endif
 
639
#ifdef COROSYNC_SOLARIS
 
640
        msg_recv.msg_accrights = 0;
 
641
        msg_recv.msg_accrightslen = 0;
 
642
#endif /* COROSYNC_SOLARIS */
 
643
 
 
644
        iov_recv.iov_base = &conn_info->setup_msg[conn_info->setup_bytes_read];
 
645
        iov_recv.iov_len = sizeof (mar_req_setup_t) - conn_info->setup_bytes_read;
 
646
#ifdef COROSYNC_LINUX
 
647
        setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
 
648
#endif
 
649
 
 
650
retry_recv:
 
651
        res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
 
652
        if (res == -1 && errno == EINTR) {
 
653
                goto retry_recv;
 
654
        } else
 
655
        if (res == -1 && errno != EAGAIN) {
 
656
                return (0);
 
657
        } else
 
658
        if (res == 0) {
 
659
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 
660
                /* On many OS poll never return POLLHUP or POLLERR.
 
661
                 * EOF is detected when recvmsg return 0.
 
662
                 */
 
663
                ipc_disconnect (conn_info);
 
664
                return 0;
 
665
#else
 
666
                return (-1);
 
667
#endif
 
668
        }
 
669
        conn_info->setup_bytes_read += res;
 
670
 
 
671
/*
 
672
 * currently support getpeerucred, getpeereid, and SO_PASSCRED credential
 
673
 * retrieval mechanisms for various Platforms
 
674
 */
 
675
#ifdef HAVE_GETPEERUCRED
 
676
/*
 
677
 * Solaris and some BSD systems
 
678
 */
 
679
        {
 
680
                ucred_t *uc = NULL;
 
681
                uid_t euid = -1;
 
682
                gid_t egid = -1;
 
683
 
 
684
                if (getpeerucred (conn_info->fd, &uc) == 0) {
 
685
                        euid = ucred_geteuid (uc);
 
686
                        egid = ucred_getegid (uc);
 
687
                        if (api->security_valid (euid, egid)) {
 
688
                                authenticated = 1;
 
689
                        }
 
690
                        ucred_free(uc);
 
691
                }
 
692
        }
 
693
#elif HAVE_GETPEEREID
 
694
/*
 
695
 * Usually MacOSX systems
 
696
 */
 
697
 
 
698
        {
 
699
                uid_t euid;
 
700
                gid_t egid;
 
701
 
 
702
                euid = -1;
 
703
                egid = -1;
 
704
                if (getpeereid (conn_info->fd, &euid, &egid) == 0) {
 
705
                        if (api->security_valid (euid, egid)) {
 
706
                                authenticated = 1;
 
707
                        }
 
708
                }
 
709
        }
 
710
 
 
711
#elif SO_PASSCRED
 
712
/*
 
713
 * Usually Linux systems
 
714
 */
 
715
        cmsg = CMSG_FIRSTHDR (&msg_recv);
 
716
        assert (cmsg);
 
717
        cred = (struct ucred *)CMSG_DATA (cmsg);
 
718
        if (cred) {
 
719
                if (api->security_valid (cred->uid, cred->gid)) {
 
720
                        authenticated = 1;
 
721
                }
 
722
        }
 
723
 
 
724
#else /* no credentials */
 
725
        authenticated = 1;
 
726
        api->log_printf ("Platform does not support IPC authentication.  Using no authentication\n");
 
727
#endif /* no credentials */
 
728
 
 
729
        if (authenticated == 0) {
 
730
                api->log_printf ("Invalid IPC credentials.\n");
 
731
                ipc_disconnect (conn_info);
 
732
                return (-1);
 
733
        }
 
734
 
 
735
        if (conn_info->setup_bytes_read == sizeof (mar_req_setup_t)) {
 
736
#ifdef COROSYNC_LINUX
 
737
                setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED,
 
738
                        &off, sizeof (off));
 
739
#endif
 
740
                return (1);
 
741
        }
 
742
        return (0);
 
743
}
 
744
 
 
745
static void ipc_disconnect (struct conn_info *conn_info)
 
746
{
 
747
        if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
 
748
                conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
 
749
                return;
 
750
        }
 
751
        if (conn_info->state != CONN_STATE_THREAD_ACTIVE) {
 
752
                return;
 
753
        }
 
754
        pthread_mutex_lock (&conn_info->mutex);
 
755
        conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
 
756
        pthread_mutex_unlock (&conn_info->mutex);
 
757
 
 
758
        pthread_kill (conn_info->thread, SIGUSR1);
 
759
}
 
760
 
 
761
static int conn_info_create (int fd)
 
762
{
 
763
        struct conn_info *conn_info;
 
764
 
 
765
        conn_info = api->malloc (sizeof (struct conn_info));
 
766
        if (conn_info == NULL) {
 
767
                return (-1);
 
768
        }
 
769
        memset (conn_info, 0, sizeof (struct conn_info));
 
770
 
 
771
        conn_info->fd = fd;
 
772
        conn_info->service = SOCKET_SERVICE_INIT;
 
773
        conn_info->state = CONN_STATE_THREAD_INACTIVE;
 
774
        list_init (&conn_info->outq_head);
 
775
        list_init (&conn_info->list);
 
776
        list_init (&conn_info->zcb_mapped_list_head);
 
777
        list_add (&conn_info->list, &conn_info_list_head);
 
778
 
 
779
        api->poll_dispatch_add (fd, conn_info);
 
780
 
 
781
        return (0);
 
782
}
 
783
 
 
784
#if defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS)
 
785
/* SUN_LEN is broken for abstract namespace
 
786
 */
 
787
#define COROSYNC_SUN_LEN(a) sizeof(*(a))
 
788
#else
 
789
#define COROSYNC_SUN_LEN(a) SUN_LEN(a)
 
790
#endif
 
791
 
 
792
 
 
793
/*
 
794
 * Exported functions
 
795
 */
 
796
extern void coroipcs_ipc_init (
 
797
        struct coroipcs_init_state *init_state)
 
798
{
 
799
        int server_fd;
 
800
        struct sockaddr_un un_addr;
 
801
        int res;
 
802
 
 
803
        api = init_state;
 
804
 
 
805
        /*
 
806
         * Create socket for IPC clients, name socket, listen for connections
 
807
         */
 
808
#if defined(COROSYNC_SOLARIS)
 
809
        server_fd = socket (PF_UNIX, SOCK_STREAM, 0);
 
810
#else
 
811
        server_fd = socket (PF_LOCAL, SOCK_STREAM, 0);
 
812
#endif
 
813
        if (server_fd == -1) {
 
814
                api->log_printf ("Cannot create client connections socket.\n");
 
815
                api->fatal_error ("Can't create library listen socket");
 
816
        };
 
817
 
 
818
        res = fcntl (server_fd, F_SETFL, O_NONBLOCK);
 
819
        if (res == -1) {
 
820
                api->log_printf ("Could not set non-blocking operation on server socket: %s\n", strerror (errno));
 
821
                api->fatal_error ("Could not set non-blocking operation on server socket");
 
822
        }
 
823
 
 
824
        memset (&un_addr, 0, sizeof (struct sockaddr_un));
 
825
        un_addr.sun_family = AF_UNIX;
 
826
#if defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 
827
        un_addr.sun_len = SUN_LEN(&un_addr);
 
828
#endif
 
829
 
 
830
#if defined(COROSYNC_LINUX)
 
831
        sprintf (un_addr.sun_path + 1, "%s", api->socket_name);
 
832
#else
 
833
        {
 
834
                struct stat stat_out;
 
835
                res = stat (SOCKETDIR, &stat_out);
 
836
                if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
 
837
                        api->log_printf ("Required directory not present %s\n", SOCKETDIR);
 
838
                        api->fatal_error ("Please create required directory.");
 
839
                }
 
840
                sprintf (un_addr.sun_path, "%s/%s", SOCKETDIR, api->socket_name);
 
841
                unlink (un_addr.sun_path);
 
842
        }
 
843
#endif
 
844
 
 
845
        res = bind (server_fd, (struct sockaddr *)&un_addr, COROSYNC_SUN_LEN(&un_addr));
 
846
        if (res) {
 
847
                api->log_printf ("Could not bind AF_UNIX (%s): %s.\n", un_addr.sun_path, strerror (errno));
 
848
                api->fatal_error ("Could not bind to AF_UNIX socket\n");
 
849
        }
 
850
 
 
851
        /*
 
852
         * Allow eveyrone to write to the socket since the IPC layer handles
 
853
         * security automatically
 
854
         */
 
855
#if !defined(COROSYNC_LINUX)
 
856
        res = chmod (un_addr.sun_path, S_IRWXU|S_IRWXG|S_IRWXO);
 
857
#endif
 
858
        listen (server_fd, SERVER_BACKLOG);
 
859
 
 
860
        /*
 
861
         * Setup connection dispatch routine
 
862
         */
 
863
        api->poll_accept_add (server_fd);
 
864
}
 
865
 
 
866
void coroipcs_ipc_exit (void)
 
867
{
 
868
        struct list_head *list;
 
869
        struct conn_info *conn_info;
 
870
        unsigned int res;
 
871
 
 
872
        for (list = conn_info_list_head.next; list != &conn_info_list_head;
 
873
                list = list->next) {
 
874
 
 
875
                conn_info = list_entry (list, struct conn_info, list);
 
876
 
 
877
                /*
 
878
                 * Unmap memory segments
 
879
                 */
 
880
                res = munmap ((void *)conn_info->control_buffer,
 
881
                        conn_info->control_size);
 
882
                res = munmap ((void *)conn_info->request_buffer,
 
883
                        conn_info->request_size);
 
884
                res = munmap ((void *)conn_info->response_buffer,
 
885
                        conn_info->response_size);
 
886
                res = circular_memory_unmap (conn_info->dispatch_buffer,
 
887
                        conn_info->dispatch_size);
 
888
 
 
889
                semctl (conn_info->semid, 0, IPC_RMID);
 
890
 
 
891
                pthread_kill (conn_info->thread, SIGUSR1);
 
892
        }
 
893
}
 
894
 
 
895
/*
 
896
 * Get the conn info private data
 
897
 */
 
898
void *coroipcs_private_data_get (void *conn)
 
899
{
 
900
        struct conn_info *conn_info = (struct conn_info *)conn;
 
901
 
 
902
        return (conn_info->private_data);
 
903
}
 
904
 
 
905
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 
906
{
 
907
        struct conn_info *conn_info = (struct conn_info *)conn;
 
908
        struct sembuf sop;
 
909
        int res;
 
910
 
 
911
        memcpy (conn_info->response_buffer, msg, mlen);
 
912
        sop.sem_num = 1;
 
913
        sop.sem_op = 1;
 
914
        sop.sem_flg = 0;
 
915
 
 
916
retry_semop:
 
917
        res = semop (conn_info->semid, &sop, 1);
 
918
        if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
 
919
                goto retry_semop;
 
920
        } else
 
921
        if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 
922
                return (0);
 
923
        }
 
924
        return (0);
 
925
}
 
926
 
 
927
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 
928
{
 
929
        struct conn_info *conn_info = (struct conn_info *)conn;
 
930
        struct sembuf sop;
 
931
        int res;
 
932
        int write_idx = 0;
 
933
        int i;
 
934
 
 
935
        for (i = 0; i < iov_len; i++) {
 
936
                memcpy (&conn_info->response_buffer[write_idx],
 
937
                        iov[i].iov_base, iov[i].iov_len);
 
938
                write_idx += iov[i].iov_len;
 
939
        }
 
940
 
 
941
        sop.sem_num = 1;
 
942
        sop.sem_op = 1;
 
943
        sop.sem_flg = 0;
 
944
 
 
945
retry_semop:
 
946
        res = semop (conn_info->semid, &sop, 1);
 
947
        if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
 
948
                goto retry_semop;
 
949
        } else
 
950
        if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 
951
                return (0);
 
952
        }
 
953
        return (0);
 
954
}
 
955
 
 
956
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info)
 
957
{
 
958
        unsigned int n_read;
 
959
        unsigned int n_write;
 
960
        unsigned int bytes_left;
 
961
 
 
962
        n_read = conn_info->control_buffer->read;
 
963
        n_write = conn_info->control_buffer->write;
 
964
 
 
965
        if (n_read <= n_write) {
 
966
                bytes_left = conn_info->dispatch_size - n_write + n_read;
 
967
        } else {
 
968
                bytes_left = n_read - n_write;
 
969
        }
 
970
        return (bytes_left);
 
971
}
 
972
 
 
973
static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int len)
 
974
{
 
975
        unsigned int write_idx;
 
976
 
 
977
        write_idx = conn_info->control_buffer->write;
 
978
 
 
979
        memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
 
980
        conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
 
981
}
 
982
 
 
983
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 
984
                      int locked)
 
985
{
 
986
        struct conn_info *conn_info = (struct conn_info *)conn;
 
987
        struct sembuf sop;
 
988
        int res;
 
989
        int i;
 
990
        char buf;
 
991
 
 
992
        for (i = 0; i < iov_len; i++) {
 
993
                memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
 
994
        }
 
995
 
 
996
        buf = !list_empty (&conn_info->outq_head);
 
997
        res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
 
998
        if (res == -1 && errno == EAGAIN) {
 
999
                if (locked == 0) {
 
1000
                        pthread_mutex_lock (&conn_info->mutex);
 
1001
                }
 
1002
                conn_info->pending_semops += 1;
 
1003
                if (locked == 0) {
 
1004
                        pthread_mutex_unlock (&conn_info->mutex);
 
1005
                }
 
1006
                api->poll_dispatch_modify (conn_info->fd,
 
1007
                        POLLIN|POLLOUT|POLLNVAL);
 
1008
        } else
 
1009
        if (res == -1) {
 
1010
                ipc_disconnect (conn_info);
 
1011
        }
 
1012
        sop.sem_num = 2;
 
1013
        sop.sem_op = 1;
 
1014
        sop.sem_flg = 0;
 
1015
 
 
1016
retry_semop:
 
1017
        res = semop (conn_info->semid, &sop, 1);
 
1018
        if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
 
1019
                goto retry_semop;
 
1020
        } else
 
1021
        if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 
1022
                return;
 
1023
        }
 
1024
}
 
1025
 
 
1026
static void outq_flush (struct conn_info *conn_info) {
 
1027
        struct list_head *list, *list_next;
 
1028
        struct outq_item *outq_item;
 
1029
        unsigned int bytes_left;
 
1030
        struct iovec iov;
 
1031
        char buf;
 
1032
        int res;
 
1033
 
 
1034
        pthread_mutex_lock (&conn_info->mutex);
 
1035
        if (list_empty (&conn_info->outq_head)) {
 
1036
                buf = 3;
 
1037
                res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
 
1038
                pthread_mutex_unlock (&conn_info->mutex);
 
1039
                return;
 
1040
        }
 
1041
        for (list = conn_info->outq_head.next;
 
1042
                list != &conn_info->outq_head; list = list_next) {
 
1043
 
 
1044
                list_next = list->next;
 
1045
                outq_item = list_entry (list, struct outq_item, list);
 
1046
                bytes_left = shared_mem_dispatch_bytes_left (conn_info);
 
1047
                if (bytes_left > outq_item->mlen) {
 
1048
                        iov.iov_base = outq_item->msg;
 
1049
                        iov.iov_len = outq_item->mlen;
 
1050
                        msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
 
1051
                        list_del (list);
 
1052
                        api->free (iov.iov_base);
 
1053
                        api->free (outq_item);
 
1054
                } else {
 
1055
                        break;
 
1056
                }
 
1057
        }
 
1058
        pthread_mutex_unlock (&conn_info->mutex);
 
1059
}
 
1060
 
 
1061
static int priv_change (struct conn_info *conn_info)
 
1062
{
 
1063
        mar_req_priv_change req_priv_change;
 
1064
        unsigned int res;
 
1065
        union semun semun;
 
1066
        struct semid_ds ipc_set;
 
1067
        int i;
 
1068
 
 
1069
retry_recv:
 
1070
        res = recv (conn_info->fd, &req_priv_change,
 
1071
                sizeof (mar_req_priv_change),
 
1072
                MSG_NOSIGNAL);
 
1073
        if (res == -1 && errno == EINTR) {
 
1074
                goto retry_recv;
 
1075
        }
 
1076
        if (res == -1 && errno == EAGAIN) {
 
1077
                goto retry_recv;
 
1078
        }
 
1079
        if (res == -1 && errno != EAGAIN) {
 
1080
                return (-1);
 
1081
        }
 
1082
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 
1083
        /* Error on socket, EOF is detected when recv return 0
 
1084
         */
 
1085
        if (res == 0) {
 
1086
                return (-1);
 
1087
        }
 
1088
#endif
 
1089
 
 
1090
        ipc_set.sem_perm.uid = req_priv_change.euid;
 
1091
        ipc_set.sem_perm.gid = req_priv_change.egid;
 
1092
        ipc_set.sem_perm.mode = 0600;
 
1093
 
 
1094
        semun.buf = &ipc_set;
 
1095
 
 
1096
        for (i = 0; i < 3; i++) {
 
1097
                res = semctl (conn_info->semid, 0, IPC_SET, semun);
 
1098
                if (res == -1) {
 
1099
                        return (-1);
 
1100
                }
 
1101
        }
 
1102
        return (0);
 
1103
}
 
1104
 
 
1105
static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int iov_len)
 
1106
{
 
1107
        struct conn_info *conn_info = (struct conn_info *)conn;
 
1108
        unsigned int bytes_left;
 
1109
        unsigned int bytes_msg = 0;
 
1110
        int i;
 
1111
        struct outq_item *outq_item;
 
1112
        char *write_buf = 0;
 
1113
 
 
1114
        /*
 
1115
         * Exit transmission if the connection is dead
 
1116
         */
 
1117
        if (ipc_thread_active (conn) == 0) {
 
1118
                return;
 
1119
        }
 
1120
 
 
1121
        bytes_left = shared_mem_dispatch_bytes_left (conn_info);
 
1122
        for (i = 0; i < iov_len; i++) {
 
1123
                bytes_msg += iov[i].iov_len;
 
1124
        }
 
1125
        if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
 
1126
                outq_item = api->malloc (sizeof (struct outq_item));
 
1127
                if (outq_item == NULL) {
 
1128
                        ipc_disconnect (conn);
 
1129
                        return;
 
1130
                }
 
1131
                outq_item->msg = api->malloc (bytes_msg);
 
1132
                if (outq_item->msg == 0) {
 
1133
                        api->free (outq_item);
 
1134
                        ipc_disconnect (conn);
 
1135
                        return;
 
1136
                }
 
1137
 
 
1138
                write_buf = outq_item->msg;
 
1139
                for (i = 0; i < iov_len; i++) {
 
1140
                        memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
 
1141
                        write_buf += iov[i].iov_len;
 
1142
                }
 
1143
                outq_item->mlen = bytes_msg;
 
1144
                list_init (&outq_item->list);
 
1145
                pthread_mutex_lock (&conn_info->mutex);
 
1146
                if (list_empty (&conn_info->outq_head)) {
 
1147
                        conn_info->notify_flow_control_enabled = 1;
 
1148
                        api->poll_dispatch_modify (conn_info->fd,
 
1149
                                POLLIN|POLLOUT|POLLNVAL);
 
1150
                }
 
1151
                list_add_tail (&outq_item->list, &conn_info->outq_head);
 
1152
                pthread_mutex_unlock (&conn_info->mutex);
 
1153
                return;
 
1154
        }
 
1155
        msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
 
1156
}
 
1157
 
 
1158
void coroipcs_refcount_inc (void *conn)
 
1159
{
 
1160
        struct conn_info *conn_info = (struct conn_info *)conn;
 
1161
 
 
1162
        pthread_mutex_lock (&conn_info->mutex);
 
1163
        conn_info->refcount++;
 
1164
        pthread_mutex_unlock (&conn_info->mutex);
 
1165
}
 
1166
 
 
1167
void coroipcs_refcount_dec (void *conn)
 
1168
{
 
1169
        struct conn_info *conn_info = (struct conn_info *)conn;
 
1170
 
 
1171
        pthread_mutex_lock (&conn_info->mutex);
 
1172
        conn_info->refcount--;
 
1173
        pthread_mutex_unlock (&conn_info->mutex);
 
1174
}
 
1175
 
 
1176
int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen)
 
1177
{
 
1178
        struct iovec iov;
 
1179
 
 
1180
        iov.iov_base = (void *)msg;
 
1181
        iov.iov_len = mlen;
 
1182
 
 
1183
        msg_send_or_queue (conn, &iov, 1);
 
1184
        return (0);
 
1185
}
 
1186
 
 
1187
int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 
1188
{
 
1189
        msg_send_or_queue (conn, iov, iov_len);
 
1190
        return (0);
 
1191
}
 
1192
 
 
1193
int coroipcs_handler_accept (
 
1194
        int fd,
 
1195
        int revent,
 
1196
        void *data)
 
1197
{
 
1198
        socklen_t addrlen;
 
1199
        struct sockaddr_un un_addr;
 
1200
        int new_fd;
 
1201
#ifdef COROSYNC_LINUX
 
1202
        int on = 1;
 
1203
#endif
 
1204
        int res;
 
1205
 
 
1206
        addrlen = sizeof (struct sockaddr_un);
 
1207
 
 
1208
retry_accept:
 
1209
        new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen);
 
1210
        if (new_fd == -1 && errno == EINTR) {
 
1211
                goto retry_accept;
 
1212
        }
 
1213
 
 
1214
        if (new_fd == -1) {
 
1215
                api->log_printf ("Could not accept Library connection: %s\n", strerror (errno));
 
1216
                return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
 
1217
        }
 
1218
 
 
1219
        res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
 
1220
        if (res == -1) {
 
1221
                api->log_printf ("Could not set non-blocking operation on library connection: %s\n", strerror (errno));
 
1222
                close (new_fd);
 
1223
                return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
 
1224
        }
 
1225
 
 
1226
        /*
 
1227
         * Valid accept
 
1228
         */
 
1229
 
 
1230
        /*
 
1231
         * Request credentials of sender provided by kernel
 
1232
         */
 
1233
#ifdef COROSYNC_LINUX
 
1234
        setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
 
1235
#endif
 
1236
 
 
1237
        res = conn_info_create (new_fd);
 
1238
        if (res != 0) {
 
1239
                close (new_fd);
 
1240
        }
 
1241
 
 
1242
        return (0);
 
1243
}
 
1244
 
 
1245
int coroipcs_handler_dispatch (
 
1246
        int fd,
 
1247
        int revent,
 
1248
        void *context)
 
1249
{
 
1250
        mar_req_setup_t *req_setup;
 
1251
        struct conn_info *conn_info = (struct conn_info *)context;
 
1252
        int res;
 
1253
        char buf;
 
1254
 
 
1255
 
 
1256
        if (ipc_thread_exiting (conn_info)) {
 
1257
                return conn_info_destroy (conn_info);
 
1258
        }
 
1259
 
 
1260
        /*
 
1261
         * If an error occurs, request exit
 
1262
         */
 
1263
        if (revent & (POLLERR|POLLHUP)) {
 
1264
                ipc_disconnect (conn_info);
 
1265
                return (0);
 
1266
        }
 
1267
 
 
1268
        /*
 
1269
         * Read the header and process it
 
1270
         */
 
1271
        if (conn_info->service == SOCKET_SERVICE_INIT && (revent & POLLIN)) {
 
1272
                /*
 
1273
                 * Receive in a nonblocking fashion the request
 
1274
                 * IF security invalid, send ERR_SECURITY, otherwise
 
1275
                 * send OK
 
1276
                 */
 
1277
                res = req_setup_recv (conn_info);
 
1278
                if (res == -1) {
 
1279
                        req_setup_send (conn_info, CS_ERR_SECURITY);
 
1280
                }
 
1281
                if (res != 1) {
 
1282
                        return (0);
 
1283
                }
 
1284
                req_setup_send (conn_info, CS_OK);
 
1285
 
 
1286
                pthread_mutex_init (&conn_info->mutex, NULL);
 
1287
                req_setup = (mar_req_setup_t *)conn_info->setup_msg;
 
1288
                /*
 
1289
                 * Is the service registered ?
 
1290
                 */
 
1291
                if (api->service_available (req_setup->service) == 0) {
 
1292
                        ipc_disconnect (conn_info);
 
1293
                        return (0);
 
1294
                }
 
1295
 
 
1296
                conn_info->semkey = req_setup->semkey;
 
1297
                res = memory_map (
 
1298
                        req_setup->control_file,
 
1299
                        req_setup->control_size,
 
1300
                        (void *)&conn_info->control_buffer);
 
1301
                conn_info->control_size = req_setup->control_size;
 
1302
 
 
1303
                res = memory_map (
 
1304
                        req_setup->request_file,
 
1305
                        req_setup->request_size,
 
1306
                        (void *)&conn_info->request_buffer);
 
1307
                conn_info->request_size = req_setup->request_size;
 
1308
 
 
1309
                res = memory_map (
 
1310
                        req_setup->response_file,
 
1311
                        req_setup->response_size,
 
1312
                        (void *)&conn_info->response_buffer);
 
1313
                conn_info->response_size = req_setup->response_size;
 
1314
 
 
1315
                res = circular_memory_map (
 
1316
                        req_setup->dispatch_file,
 
1317
                        req_setup->dispatch_size,
 
1318
                        (void *)&conn_info->dispatch_buffer);
 
1319
                conn_info->dispatch_size = req_setup->dispatch_size;
 
1320
 
 
1321
                conn_info->service = req_setup->service;
 
1322
                conn_info->refcount = 0;
 
1323
                conn_info->notify_flow_control_enabled = 0;
 
1324
                conn_info->setup_bytes_read = 0;
 
1325
 
 
1326
                conn_info->semid = semget (conn_info->semkey, 3, 0600);
 
1327
                conn_info->pending_semops = 0;
 
1328
 
 
1329
                /*
 
1330
                 * ipc thread is the only reference at startup
 
1331
                 */
 
1332
                conn_info->refcount = 1;
 
1333
                conn_info->state = CONN_STATE_THREAD_ACTIVE;
 
1334
 
 
1335
                conn_info->private_data = api->malloc (api->private_data_size_get (conn_info->service));
 
1336
                memset (conn_info->private_data, 0,
 
1337
                        api->private_data_size_get (conn_info->service));
 
1338
 
 
1339
                api->init_fn_get (conn_info->service) (conn_info);
 
1340
 
 
1341
                pthread_attr_init (&conn_info->thread_attr);
 
1342
                /*
 
1343
                * IA64 needs more stack space then other arches
 
1344
                */
 
1345
                #if defined(__ia64__)
 
1346
                pthread_attr_setstacksize (&conn_info->thread_attr, 400000);
 
1347
                #else
 
1348
                pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
 
1349
                #endif
 
1350
 
 
1351
                pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE);
 
1352
                res = pthread_create (&conn_info->thread,
 
1353
                        &conn_info->thread_attr,
 
1354
                        pthread_ipc_consumer,
 
1355
                        conn_info);
 
1356
 
 
1357
                /*
 
1358
                 * Security check - disallow multiple configurations of
 
1359
                 * the ipc connection
 
1360
                 */
 
1361
                if (conn_info->service == SOCKET_SERVICE_INIT) {
 
1362
                        conn_info->service = -1;
 
1363
                }
 
1364
        } else
 
1365
        if (revent & POLLIN) {
 
1366
                coroipcs_refcount_inc (conn_info);
 
1367
                res = recv (fd, &buf, 1, MSG_NOSIGNAL);
 
1368
                if (res == 1) {
 
1369
                        switch (buf) {
 
1370
                        case MESSAGE_REQ_OUTQ_FLUSH:
 
1371
                                outq_flush (conn_info);
 
1372
                                break;
 
1373
                        case MESSAGE_REQ_CHANGE_EUID:
 
1374
                                if (priv_change (conn_info) == -1) {
 
1375
                                        ipc_disconnect (conn_info);
 
1376
                                }
 
1377
                                break;
 
1378
                        default:
 
1379
                                res = 0;
 
1380
                                break;
 
1381
                        }
 
1382
                }
 
1383
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 
1384
                /* On many OS poll never return POLLHUP or POLLERR.
 
1385
                 * EOF is detected when recvmsg return 0.
 
1386
                 */
 
1387
                if (res == 0) {
 
1388
                        ipc_disconnect (conn_info);
 
1389
                        coroipcs_refcount_dec (conn_info);
 
1390
                        return (0);
 
1391
                }
 
1392
#endif
 
1393
                coroipcs_refcount_dec (conn_info);
 
1394
        }
 
1395
 
 
1396
        coroipcs_refcount_inc (conn_info);
 
1397
        pthread_mutex_lock (&conn_info->mutex);
 
1398
        if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
 
1399
                buf = !list_empty (&conn_info->outq_head);
 
1400
                for (; conn_info->pending_semops;) {
 
1401
                        res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
 
1402
                        if (res == 1) {
 
1403
                                conn_info->pending_semops--;
 
1404
                        } else {
 
1405
                                break;
 
1406
                        }
 
1407
                }
 
1408
                if (conn_info->notify_flow_control_enabled) {
 
1409
                        buf = 2;
 
1410
                        res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
 
1411
                        if (res == 1) {
 
1412
                                conn_info->notify_flow_control_enabled = 0;
 
1413
                        }
 
1414
                }
 
1415
                if (conn_info->notify_flow_control_enabled == 0 &&
 
1416
                        conn_info->pending_semops == 0) {
 
1417
 
 
1418
                        api->poll_dispatch_modify (conn_info->fd,
 
1419
                                POLLIN|POLLNVAL);
 
1420
                }
 
1421
        }
 
1422
        pthread_mutex_unlock (&conn_info->mutex);
 
1423
        coroipcs_refcount_dec (conn_info);
 
1424
 
 
1425
        return (0);
 
1426
}