2
* Copyright (c) 2006-2009 Red Hat, Inc.
6
* Author: Steven Dake (sdake@redhat.com)
8
* This software licensed under BSD license, the text of which follows:
10
* Redistribution and use in source and binary forms, with or without
11
* modification, are permitted provided that the following conditions are met:
13
* - Redistributions of source code must retain the above copyright notice,
14
* this list of conditions and the following disclaimer.
15
* - Redistributions in binary form must reproduce the above copyright notice,
16
* this list of conditions and the following disclaimer in the documentation
17
* and/or other materials provided with the distribution.
18
* - Neither the name of the MontaVista Software, Inc. nor the names of its
19
* contributors may be used to endorse or promote products derived from this
20
* software without specific prior written permission.
22
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32
* THE POSSIBILITY OF SUCH DAMAGE.
44
#include <sys/types.h>
48
#include <sys/socket.h>
51
#include <sys/resource.h>
54
#include <netinet/in.h>
55
#include <arpa/inet.h>
64
#if defined(HAVE_GETPEERUCRED)
71
#include <corosync/corotypes.h>
72
#include <corosync/list.h>
74
#include <corosync/coroipc_types.h>
75
#include <corosync/coroipcs.h>
76
#include <corosync/coroipc_ipc.h>
79
#define MSG_NOSIGNAL 0
82
#define SERVER_BACKLOG 5
84
#define MSG_SEND_LOCKED 0
85
#define MSG_SEND_UNLOCKED 1
87
static struct coroipcs_init_state *api;
89
DECLARE_LIST_INIT (conn_info_list_head);
94
struct list_head list;
98
struct list_head list;
103
#if defined(_SEM_SEMUN_UNDEFINED)
106
struct semid_ds *buf;
107
unsigned short int *array;
108
struct seminfo *__buf;
113
CONN_STATE_THREAD_INACTIVE = 0,
114
CONN_STATE_THREAD_ACTIVE = 1,
115
CONN_STATE_THREAD_REQUEST_EXIT = 2,
116
CONN_STATE_THREAD_DESTROYED = 3,
117
CONN_STATE_LIB_EXIT_CALLED = 4,
118
CONN_STATE_DISCONNECT_INACTIVE = 5
124
pthread_attr_t thread_attr;
125
unsigned int service;
126
enum conn_state state;
127
int notify_flow_control_enabled;
132
unsigned int pending_semops;
133
pthread_mutex_t mutex;
134
struct control_buffer *control_buffer;
135
char *request_buffer;
136
char *response_buffer;
137
char *dispatch_buffer;
140
size_t response_size;
141
size_t dispatch_size;
142
struct list_head outq_head;
144
struct list_head list;
145
char setup_msg[sizeof (mar_req_setup_t)];
146
unsigned int setup_bytes_read;
147
struct list_head zcb_mapped_list_head;
148
char *sending_allowed_private_data[64];
151
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
153
static void outq_flush (struct conn_info *conn_info);
155
static int priv_change (struct conn_info *conn_info);
157
static void ipc_disconnect (struct conn_info *conn_info);
159
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
173
fd = open (path, O_RDWR, 0600);
177
res = ftruncate (fd, bytes);
179
addr_orig = mmap (NULL, bytes, PROT_NONE,
180
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
182
if (addr_orig == MAP_FAILED) {
186
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
187
MAP_FIXED | MAP_SHARED, fd, 0);
189
if (addr != addr_orig) {
193
madvise(addr, bytes, MADV_NOSYNC);
205
circular_memory_map (
215
fd = open (path, O_RDWR, 0600);
219
res = ftruncate (fd, bytes);
221
addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
222
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
224
if (addr_orig == MAP_FAILED) {
228
addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
229
MAP_FIXED | MAP_SHARED, fd, 0);
231
if (addr != addr_orig) {
235
madvise(addr_orig, bytes, MADV_NOSYNC);
238
addr = mmap (((char *)addr_orig) + bytes,
239
bytes, PROT_READ | PROT_WRITE,
240
MAP_FIXED | MAP_SHARED, fd, 0);
242
madvise(((char *)addr_orig) + bytes, bytes, MADV_NOSYNC);
254
circular_memory_unmap (void *buf, size_t bytes)
258
res = munmap (buf, bytes << 1);
263
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
267
res = munmap (zcb_mapped->addr, zcb_mapped->size);
268
list_del (&zcb_mapped->list);
273
static inline int zcb_by_addr_free (struct conn_info *conn_info, void *addr)
275
struct list_head *list;
276
struct zcb_mapped *zcb_mapped;
277
unsigned int res = 0;
279
for (list = conn_info->zcb_mapped_list_head.next;
280
list != &conn_info->zcb_mapped_list_head; list = list->next) {
282
zcb_mapped = list_entry (list, struct zcb_mapped, list);
284
if (zcb_mapped->addr == addr) {
285
res = zcb_free (zcb_mapped);
293
static inline int zcb_all_free (
294
struct conn_info *conn_info)
296
struct list_head *list;
297
struct zcb_mapped *zcb_mapped;
299
for (list = conn_info->zcb_mapped_list_head.next;
300
list != &conn_info->zcb_mapped_list_head;) {
302
zcb_mapped = list_entry (list, struct zcb_mapped, list);
306
zcb_free (zcb_mapped);
311
static inline int zcb_alloc (
312
struct conn_info *conn_info,
313
const char *path_to_file,
317
struct zcb_mapped *zcb_mapped;
320
zcb_mapped = malloc (sizeof (struct zcb_mapped));
321
if (zcb_mapped == NULL) {
333
list_init (&zcb_mapped->list);
334
zcb_mapped->addr = *addr;
335
zcb_mapped->size = size;
336
list_add_tail (&zcb_mapped->list, &conn_info->zcb_mapped_list_head);
340
static int ipc_thread_active (void *conn)
342
struct conn_info *conn_info = (struct conn_info *)conn;
345
pthread_mutex_lock (&conn_info->mutex);
346
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
349
pthread_mutex_unlock (&conn_info->mutex);
353
static int ipc_thread_exiting (void *conn)
355
struct conn_info *conn_info = (struct conn_info *)conn;
358
pthread_mutex_lock (&conn_info->mutex);
359
if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
362
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
365
pthread_mutex_unlock (&conn_info->mutex);
370
* returns 0 if should be called again, -1 if finished
372
static inline int conn_info_destroy (struct conn_info *conn_info)
377
list_del (&conn_info->list);
378
list_init (&conn_info->list);
380
if (conn_info->state == CONN_STATE_THREAD_REQUEST_EXIT) {
381
res = pthread_join (conn_info->thread, &retval);
382
conn_info->state = CONN_STATE_THREAD_DESTROYED;
386
if (conn_info->state == CONN_STATE_THREAD_INACTIVE ||
387
conn_info->state == CONN_STATE_DISCONNECT_INACTIVE) {
388
list_del (&conn_info->list);
389
close (conn_info->fd);
390
api->free (conn_info);
394
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
395
pthread_kill (conn_info->thread, SIGUSR1);
399
api->serialize_lock ();
401
* Retry library exit function if busy
403
if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
404
res = api->exit_fn_get (conn_info->service) (conn_info);
406
api->serialize_unlock ();
409
conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
413
pthread_mutex_lock (&conn_info->mutex);
414
if (conn_info->refcount > 0) {
415
pthread_mutex_unlock (&conn_info->mutex);
416
api->serialize_unlock ();
419
list_del (&conn_info->list);
420
pthread_mutex_unlock (&conn_info->mutex);
423
* Destroy shared memory segment and semaphore
425
res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
426
res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
427
res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
428
semctl (conn_info->semid, 0, IPC_RMID);
431
* Free allocated data needed to retry exiting library IPC connection
433
if (conn_info->private_data) {
434
api->free (conn_info->private_data);
436
close (conn_info->fd);
437
res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
438
zcb_all_free (conn_info);
439
api->free (conn_info);
440
api->serialize_unlock ();
445
uint64_t server_addr;
449
static uint64_t void2serveraddr (void *server_ptr)
453
u.server_ptr = server_ptr;
454
return (u.server_addr);
457
static void *serveraddr2void (uint64_t server_addr)
461
u.server_addr = server_addr;
462
return (u.server_ptr);
465
static inline void zerocopy_operations_process (
466
struct conn_info *conn_info,
467
coroipc_request_header_t **header_out,
468
unsigned int *new_message)
470
coroipc_request_header_t *header;
472
header = (coroipc_request_header_t *)conn_info->request_buffer;
473
if (header->id == ZC_ALLOC_HEADER) {
474
mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header;
475
coroipc_response_header_t res_header;
477
struct coroipcs_zc_header *zc_header;
480
res = zcb_alloc (conn_info, hdr->path_to_file, hdr->map_size,
483
zc_header = (struct coroipcs_zc_header *)addr;
484
zc_header->server_address = void2serveraddr(addr);
486
res_header.size = sizeof (coroipc_response_header_t);
488
coroipcs_response_send (
489
conn_info, &res_header,
494
if (header->id == ZC_FREE_HEADER) {
495
mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)header;
496
coroipc_response_header_t res_header;
499
addr = serveraddr2void (hdr->server_address);
501
zcb_by_addr_free (conn_info, addr);
503
res_header.size = sizeof (coroipc_response_header_t);
505
coroipcs_response_send (
506
conn_info, &res_header,
512
if (header->id == ZC_EXECUTE_HEADER) {
513
mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)header;
515
header = (coroipc_request_header_t *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
517
*header_out = header;
521
static void *pthread_ipc_consumer (void *conn)
523
struct conn_info *conn_info = (struct conn_info *)conn;
526
coroipc_request_header_t *header;
527
coroipc_response_header_t coroipc_response_header;
529
unsigned int new_message;
531
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
532
if (api->sched_policy != 0) {
533
res = pthread_setschedparam (conn_info->thread,
534
api->sched_policy, api->sched_param);
543
if (ipc_thread_active (conn_info) == 0) {
544
coroipcs_refcount_dec (conn_info);
547
res = semop (conn_info->semid, &sop, 1);
548
if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
551
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
552
coroipcs_refcount_dec (conn_info);
556
zerocopy_operations_process (conn_info, &header, &new_message);
558
* There is no new message to process, continue for loop
560
if (new_message == 0) {
564
coroipcs_refcount_inc (conn);
566
send_ok = api->sending_allowed (conn_info->service,
569
conn_info->sending_allowed_private_data);
572
api->serialize_lock();
573
api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
574
api->serialize_unlock();
577
* Overload, tell library to retry
579
coroipc_response_header.size = sizeof (coroipc_response_header_t);
580
coroipc_response_header.id = 0;
581
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
582
coroipcs_response_send (conn_info,
583
&coroipc_response_header,
584
sizeof (coroipc_response_header_t));
587
api->sending_allowed_release (conn_info->sending_allowed_private_data);
588
coroipcs_refcount_dec (conn);
595
struct conn_info *conn_info,
598
mar_res_setup_t res_setup;
601
res_setup.error = error;
604
res = send (conn_info->fd, &res_setup, sizeof (mar_res_setup_t), MSG_WAITALL);
605
if (res == -1 && errno == EINTR) {
608
if (res == -1 && errno == EAGAIN) {
616
struct conn_info *conn_info)
619
struct msghdr msg_recv;
620
struct iovec iov_recv;
621
int authenticated = 0;
623
#ifdef COROSYNC_LINUX
624
struct cmsghdr *cmsg;
625
char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
631
msg_recv.msg_iov = &iov_recv;
632
msg_recv.msg_iovlen = 1;
633
msg_recv.msg_name = 0;
634
msg_recv.msg_namelen = 0;
635
#ifdef COROSYNC_LINUX
636
msg_recv.msg_control = (void *)cmsg_cred;
637
msg_recv.msg_controllen = sizeof (cmsg_cred);
639
#ifdef COROSYNC_SOLARIS
640
msg_recv.msg_accrights = 0;
641
msg_recv.msg_accrightslen = 0;
642
#endif /* COROSYNC_SOLARIS */
644
iov_recv.iov_base = &conn_info->setup_msg[conn_info->setup_bytes_read];
645
iov_recv.iov_len = sizeof (mar_req_setup_t) - conn_info->setup_bytes_read;
646
#ifdef COROSYNC_LINUX
647
setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
651
res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
652
if (res == -1 && errno == EINTR) {
655
if (res == -1 && errno != EAGAIN) {
659
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
660
/* On many OS poll never return POLLHUP or POLLERR.
661
* EOF is detected when recvmsg return 0.
663
ipc_disconnect (conn_info);
669
conn_info->setup_bytes_read += res;
672
* currently support getpeerucred, getpeereid, and SO_PASSCRED credential
673
* retrieval mechanisms for various Platforms
675
#ifdef HAVE_GETPEERUCRED
677
* Solaris and some BSD systems
684
if (getpeerucred (conn_info->fd, &uc) == 0) {
685
euid = ucred_geteuid (uc);
686
egid = ucred_getegid (uc);
687
if (api->security_valid (euid, egid)) {
693
#elif HAVE_GETPEEREID
695
* Usually MacOSX systems
704
if (getpeereid (conn_info->fd, &euid, &egid) == 0) {
705
if (api->security_valid (euid, egid)) {
713
* Usually Linux systems
715
cmsg = CMSG_FIRSTHDR (&msg_recv);
717
cred = (struct ucred *)CMSG_DATA (cmsg);
719
if (api->security_valid (cred->uid, cred->gid)) {
724
#else /* no credentials */
726
api->log_printf ("Platform does not support IPC authentication. Using no authentication\n");
727
#endif /* no credentials */
729
if (authenticated == 0) {
730
api->log_printf ("Invalid IPC credentials.\n");
731
ipc_disconnect (conn_info);
735
if (conn_info->setup_bytes_read == sizeof (mar_req_setup_t)) {
736
#ifdef COROSYNC_LINUX
737
setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED,
745
static void ipc_disconnect (struct conn_info *conn_info)
747
if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
748
conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
751
if (conn_info->state != CONN_STATE_THREAD_ACTIVE) {
754
pthread_mutex_lock (&conn_info->mutex);
755
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
756
pthread_mutex_unlock (&conn_info->mutex);
758
pthread_kill (conn_info->thread, SIGUSR1);
761
static int conn_info_create (int fd)
763
struct conn_info *conn_info;
765
conn_info = api->malloc (sizeof (struct conn_info));
766
if (conn_info == NULL) {
769
memset (conn_info, 0, sizeof (struct conn_info));
772
conn_info->service = SOCKET_SERVICE_INIT;
773
conn_info->state = CONN_STATE_THREAD_INACTIVE;
774
list_init (&conn_info->outq_head);
775
list_init (&conn_info->list);
776
list_init (&conn_info->zcb_mapped_list_head);
777
list_add (&conn_info->list, &conn_info_list_head);
779
api->poll_dispatch_add (fd, conn_info);
784
#if defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS)
785
/* SUN_LEN is broken for abstract namespace
787
#define COROSYNC_SUN_LEN(a) sizeof(*(a))
789
#define COROSYNC_SUN_LEN(a) SUN_LEN(a)
796
extern void coroipcs_ipc_init (
797
struct coroipcs_init_state *init_state)
800
struct sockaddr_un un_addr;
806
* Create socket for IPC clients, name socket, listen for connections
808
#if defined(COROSYNC_SOLARIS)
809
server_fd = socket (PF_UNIX, SOCK_STREAM, 0);
811
server_fd = socket (PF_LOCAL, SOCK_STREAM, 0);
813
if (server_fd == -1) {
814
api->log_printf ("Cannot create client connections socket.\n");
815
api->fatal_error ("Can't create library listen socket");
818
res = fcntl (server_fd, F_SETFL, O_NONBLOCK);
820
api->log_printf ("Could not set non-blocking operation on server socket: %s\n", strerror (errno));
821
api->fatal_error ("Could not set non-blocking operation on server socket");
824
memset (&un_addr, 0, sizeof (struct sockaddr_un));
825
un_addr.sun_family = AF_UNIX;
826
#if defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
827
un_addr.sun_len = SUN_LEN(&un_addr);
830
#if defined(COROSYNC_LINUX)
831
sprintf (un_addr.sun_path + 1, "%s", api->socket_name);
834
struct stat stat_out;
835
res = stat (SOCKETDIR, &stat_out);
836
if (res == -1 || (res == 0 && !S_ISDIR(stat_out.st_mode))) {
837
api->log_printf ("Required directory not present %s\n", SOCKETDIR);
838
api->fatal_error ("Please create required directory.");
840
sprintf (un_addr.sun_path, "%s/%s", SOCKETDIR, api->socket_name);
841
unlink (un_addr.sun_path);
845
res = bind (server_fd, (struct sockaddr *)&un_addr, COROSYNC_SUN_LEN(&un_addr));
847
api->log_printf ("Could not bind AF_UNIX (%s): %s.\n", un_addr.sun_path, strerror (errno));
848
api->fatal_error ("Could not bind to AF_UNIX socket\n");
852
* Allow eveyrone to write to the socket since the IPC layer handles
853
* security automatically
855
#if !defined(COROSYNC_LINUX)
856
res = chmod (un_addr.sun_path, S_IRWXU|S_IRWXG|S_IRWXO);
858
listen (server_fd, SERVER_BACKLOG);
861
* Setup connection dispatch routine
863
api->poll_accept_add (server_fd);
866
void coroipcs_ipc_exit (void)
868
struct list_head *list;
869
struct conn_info *conn_info;
872
for (list = conn_info_list_head.next; list != &conn_info_list_head;
875
conn_info = list_entry (list, struct conn_info, list);
878
* Unmap memory segments
880
res = munmap ((void *)conn_info->control_buffer,
881
conn_info->control_size);
882
res = munmap ((void *)conn_info->request_buffer,
883
conn_info->request_size);
884
res = munmap ((void *)conn_info->response_buffer,
885
conn_info->response_size);
886
res = circular_memory_unmap (conn_info->dispatch_buffer,
887
conn_info->dispatch_size);
889
semctl (conn_info->semid, 0, IPC_RMID);
891
pthread_kill (conn_info->thread, SIGUSR1);
896
* Get the conn info private data
898
void *coroipcs_private_data_get (void *conn)
900
struct conn_info *conn_info = (struct conn_info *)conn;
902
return (conn_info->private_data);
905
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
907
struct conn_info *conn_info = (struct conn_info *)conn;
911
memcpy (conn_info->response_buffer, msg, mlen);
917
res = semop (conn_info->semid, &sop, 1);
918
if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
921
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
927
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
929
struct conn_info *conn_info = (struct conn_info *)conn;
935
for (i = 0; i < iov_len; i++) {
936
memcpy (&conn_info->response_buffer[write_idx],
937
iov[i].iov_base, iov[i].iov_len);
938
write_idx += iov[i].iov_len;
946
res = semop (conn_info->semid, &sop, 1);
947
if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
950
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
956
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info)
959
unsigned int n_write;
960
unsigned int bytes_left;
962
n_read = conn_info->control_buffer->read;
963
n_write = conn_info->control_buffer->write;
965
if (n_read <= n_write) {
966
bytes_left = conn_info->dispatch_size - n_write + n_read;
968
bytes_left = n_read - n_write;
973
static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int len)
975
unsigned int write_idx;
977
write_idx = conn_info->control_buffer->write;
979
memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
980
conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
983
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
986
struct conn_info *conn_info = (struct conn_info *)conn;
992
for (i = 0; i < iov_len; i++) {
993
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
996
buf = !list_empty (&conn_info->outq_head);
997
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
998
if (res == -1 && errno == EAGAIN) {
1000
pthread_mutex_lock (&conn_info->mutex);
1002
conn_info->pending_semops += 1;
1004
pthread_mutex_unlock (&conn_info->mutex);
1006
api->poll_dispatch_modify (conn_info->fd,
1007
POLLIN|POLLOUT|POLLNVAL);
1010
ipc_disconnect (conn_info);
1017
res = semop (conn_info->semid, &sop, 1);
1018
if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
1021
if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
1026
static void outq_flush (struct conn_info *conn_info) {
1027
struct list_head *list, *list_next;
1028
struct outq_item *outq_item;
1029
unsigned int bytes_left;
1034
pthread_mutex_lock (&conn_info->mutex);
1035
if (list_empty (&conn_info->outq_head)) {
1037
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
1038
pthread_mutex_unlock (&conn_info->mutex);
1041
for (list = conn_info->outq_head.next;
1042
list != &conn_info->outq_head; list = list_next) {
1044
list_next = list->next;
1045
outq_item = list_entry (list, struct outq_item, list);
1046
bytes_left = shared_mem_dispatch_bytes_left (conn_info);
1047
if (bytes_left > outq_item->mlen) {
1048
iov.iov_base = outq_item->msg;
1049
iov.iov_len = outq_item->mlen;
1050
msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
1052
api->free (iov.iov_base);
1053
api->free (outq_item);
1058
pthread_mutex_unlock (&conn_info->mutex);
1061
static int priv_change (struct conn_info *conn_info)
1063
mar_req_priv_change req_priv_change;
1066
struct semid_ds ipc_set;
1070
res = recv (conn_info->fd, &req_priv_change,
1071
sizeof (mar_req_priv_change),
1073
if (res == -1 && errno == EINTR) {
1076
if (res == -1 && errno == EAGAIN) {
1079
if (res == -1 && errno != EAGAIN) {
1082
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
1083
/* Error on socket, EOF is detected when recv return 0
1090
ipc_set.sem_perm.uid = req_priv_change.euid;
1091
ipc_set.sem_perm.gid = req_priv_change.egid;
1092
ipc_set.sem_perm.mode = 0600;
1094
semun.buf = &ipc_set;
1096
for (i = 0; i < 3; i++) {
1097
res = semctl (conn_info->semid, 0, IPC_SET, semun);
1105
static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int iov_len)
1107
struct conn_info *conn_info = (struct conn_info *)conn;
1108
unsigned int bytes_left;
1109
unsigned int bytes_msg = 0;
1111
struct outq_item *outq_item;
1112
char *write_buf = 0;
1115
* Exit transmission if the connection is dead
1117
if (ipc_thread_active (conn) == 0) {
1121
bytes_left = shared_mem_dispatch_bytes_left (conn_info);
1122
for (i = 0; i < iov_len; i++) {
1123
bytes_msg += iov[i].iov_len;
1125
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
1126
outq_item = api->malloc (sizeof (struct outq_item));
1127
if (outq_item == NULL) {
1128
ipc_disconnect (conn);
1131
outq_item->msg = api->malloc (bytes_msg);
1132
if (outq_item->msg == 0) {
1133
api->free (outq_item);
1134
ipc_disconnect (conn);
1138
write_buf = outq_item->msg;
1139
for (i = 0; i < iov_len; i++) {
1140
memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
1141
write_buf += iov[i].iov_len;
1143
outq_item->mlen = bytes_msg;
1144
list_init (&outq_item->list);
1145
pthread_mutex_lock (&conn_info->mutex);
1146
if (list_empty (&conn_info->outq_head)) {
1147
conn_info->notify_flow_control_enabled = 1;
1148
api->poll_dispatch_modify (conn_info->fd,
1149
POLLIN|POLLOUT|POLLNVAL);
1151
list_add_tail (&outq_item->list, &conn_info->outq_head);
1152
pthread_mutex_unlock (&conn_info->mutex);
1155
msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
1158
void coroipcs_refcount_inc (void *conn)
1160
struct conn_info *conn_info = (struct conn_info *)conn;
1162
pthread_mutex_lock (&conn_info->mutex);
1163
conn_info->refcount++;
1164
pthread_mutex_unlock (&conn_info->mutex);
1167
void coroipcs_refcount_dec (void *conn)
1169
struct conn_info *conn_info = (struct conn_info *)conn;
1171
pthread_mutex_lock (&conn_info->mutex);
1172
conn_info->refcount--;
1173
pthread_mutex_unlock (&conn_info->mutex);
1176
int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen)
1180
iov.iov_base = (void *)msg;
1183
msg_send_or_queue (conn, &iov, 1);
1187
int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
1189
msg_send_or_queue (conn, iov, iov_len);
1193
int coroipcs_handler_accept (
1199
struct sockaddr_un un_addr;
1201
#ifdef COROSYNC_LINUX
1206
addrlen = sizeof (struct sockaddr_un);
1209
new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen);
1210
if (new_fd == -1 && errno == EINTR) {
1215
api->log_printf ("Could not accept Library connection: %s\n", strerror (errno));
1216
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
1219
res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
1221
api->log_printf ("Could not set non-blocking operation on library connection: %s\n", strerror (errno));
1223
return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
1231
* Request credentials of sender provided by kernel
1233
#ifdef COROSYNC_LINUX
1234
setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
1237
res = conn_info_create (new_fd);
1245
int coroipcs_handler_dispatch (
1250
mar_req_setup_t *req_setup;
1251
struct conn_info *conn_info = (struct conn_info *)context;
1256
if (ipc_thread_exiting (conn_info)) {
1257
return conn_info_destroy (conn_info);
1261
* If an error occurs, request exit
1263
if (revent & (POLLERR|POLLHUP)) {
1264
ipc_disconnect (conn_info);
1269
* Read the header and process it
1271
if (conn_info->service == SOCKET_SERVICE_INIT && (revent & POLLIN)) {
1273
* Receive in a nonblocking fashion the request
1274
* IF security invalid, send ERR_SECURITY, otherwise
1277
res = req_setup_recv (conn_info);
1279
req_setup_send (conn_info, CS_ERR_SECURITY);
1284
req_setup_send (conn_info, CS_OK);
1286
pthread_mutex_init (&conn_info->mutex, NULL);
1287
req_setup = (mar_req_setup_t *)conn_info->setup_msg;
1289
* Is the service registered ?
1291
if (api->service_available (req_setup->service) == 0) {
1292
ipc_disconnect (conn_info);
1296
conn_info->semkey = req_setup->semkey;
1298
req_setup->control_file,
1299
req_setup->control_size,
1300
(void *)&conn_info->control_buffer);
1301
conn_info->control_size = req_setup->control_size;
1304
req_setup->request_file,
1305
req_setup->request_size,
1306
(void *)&conn_info->request_buffer);
1307
conn_info->request_size = req_setup->request_size;
1310
req_setup->response_file,
1311
req_setup->response_size,
1312
(void *)&conn_info->response_buffer);
1313
conn_info->response_size = req_setup->response_size;
1315
res = circular_memory_map (
1316
req_setup->dispatch_file,
1317
req_setup->dispatch_size,
1318
(void *)&conn_info->dispatch_buffer);
1319
conn_info->dispatch_size = req_setup->dispatch_size;
1321
conn_info->service = req_setup->service;
1322
conn_info->refcount = 0;
1323
conn_info->notify_flow_control_enabled = 0;
1324
conn_info->setup_bytes_read = 0;
1326
conn_info->semid = semget (conn_info->semkey, 3, 0600);
1327
conn_info->pending_semops = 0;
1330
* ipc thread is the only reference at startup
1332
conn_info->refcount = 1;
1333
conn_info->state = CONN_STATE_THREAD_ACTIVE;
1335
conn_info->private_data = api->malloc (api->private_data_size_get (conn_info->service));
1336
memset (conn_info->private_data, 0,
1337
api->private_data_size_get (conn_info->service));
1339
api->init_fn_get (conn_info->service) (conn_info);
1341
pthread_attr_init (&conn_info->thread_attr);
1343
* IA64 needs more stack space then other arches
1345
#if defined(__ia64__)
1346
pthread_attr_setstacksize (&conn_info->thread_attr, 400000);
1348
pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
1351
pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE);
1352
res = pthread_create (&conn_info->thread,
1353
&conn_info->thread_attr,
1354
pthread_ipc_consumer,
1358
* Security check - disallow multiple configurations of
1359
* the ipc connection
1361
if (conn_info->service == SOCKET_SERVICE_INIT) {
1362
conn_info->service = -1;
1365
if (revent & POLLIN) {
1366
coroipcs_refcount_inc (conn_info);
1367
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
1370
case MESSAGE_REQ_OUTQ_FLUSH:
1371
outq_flush (conn_info);
1373
case MESSAGE_REQ_CHANGE_EUID:
1374
if (priv_change (conn_info) == -1) {
1375
ipc_disconnect (conn_info);
1383
#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
1384
/* On many OS poll never return POLLHUP or POLLERR.
1385
* EOF is detected when recvmsg return 0.
1388
ipc_disconnect (conn_info);
1389
coroipcs_refcount_dec (conn_info);
1393
coroipcs_refcount_dec (conn_info);
1396
coroipcs_refcount_inc (conn_info);
1397
pthread_mutex_lock (&conn_info->mutex);
1398
if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
1399
buf = !list_empty (&conn_info->outq_head);
1400
for (; conn_info->pending_semops;) {
1401
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
1403
conn_info->pending_semops--;
1408
if (conn_info->notify_flow_control_enabled) {
1410
res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
1412
conn_info->notify_flow_control_enabled = 0;
1415
if (conn_info->notify_flow_control_enabled == 0 &&
1416
conn_info->pending_semops == 0) {
1418
api->poll_dispatch_modify (conn_info->fd,
1422
pthread_mutex_unlock (&conn_info->mutex);
1423
coroipcs_refcount_dec (conn_info);