3
* ocf_ipc.c: IPC abstraction implementation.
6
* Copyright (c) 2002 Xiaoxiang Liu <xiliu@ncsa.uiuc.edu>
8
* This library is free software; you can redistribute it and/or
9
* modify it under the terms of the GNU Lesser General Public
10
* License as published by the Free Software Foundation; either
11
* version 2.1 of the License, or (at your option) any later version.
13
* This library is distributed in the hope that it will be useful,
14
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16
* Lesser General Public License for more details.
18
* You should have received a copy of the GNU Lesser General Public
19
* License along with this library; if not, write to the Free Software
20
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23
#include <lha_internal.h>
24
#include <clplumbing/ipc.h>
29
#include <clplumbing/cl_log.h>
30
#include <sys/types.h>
36
static int num_pool_allocated = 0;
37
static int num_pool_freed = 0;
41
void cl_log_message (int log_level, const struct ha_msg *m);
42
int timediff(longclock_t t1, longclock_t t2);
43
void ha_msg_del(struct ha_msg* msg);
44
void ipc_time_debug(IPC_Channel* ch, IPC_Message* ipcmsg, int whichpos);
48
struct IPC_WAIT_CONNECTION * socket_wait_conn_new(GHashTable* ch_attrs);
49
struct IPC_CHANNEL * socket_client_channel_new(GHashTable* ch_attrs);
51
int (*ipc_pollfunc_ptr)(struct pollfd*, unsigned int, int)
52
= (int (*)(struct pollfd*, unsigned int, int)) poll;
54
/* Set the IPC poll function to the given function */
56
ipc_set_pollfunc(int (*pf)(struct pollfd*, unsigned int, int))
58
ipc_pollfunc_ptr = pf;
61
struct IPC_WAIT_CONNECTION *
62
ipc_wait_conn_constructor(const char * ch_type, GHashTable* ch_attrs)
64
if (strcmp(ch_type, "domain_socket") == 0
65
|| strcmp(ch_type, IPC_ANYTYPE) == 0
66
|| strcmp(ch_type, IPC_DOMAIN_SOCKET) == 0) {
67
return socket_wait_conn_new(ch_attrs);
73
ipc_channel_constructor(const char * ch_type, GHashTable* ch_attrs)
75
if (strcmp(ch_type, "domain_socket") == 0
76
|| strcmp(ch_type, IPC_ANYTYPE) == 0
77
|| strcmp(ch_type, IPC_DOMAIN_SOCKET) == 0) {
79
return socket_client_channel_new(ch_attrs);
84
gnametonum(const char * gname, int gnlen)
89
if (isdigit((int) gname[0])) {
92
if (gnlen >= (int)sizeof(grpname)) {
95
strncpy(grpname, gname, gnlen);
97
if ((grp = getgrnam(grpname)) == NULL) {
99
, "Invalid group name [%s]", grpname);
102
return (int)grp->gr_gid;
106
unametonum(const char * lname, int llen)
111
if (llen >= (int)sizeof(loginname)) {
113
, "user id name [%s] is too long", loginname);
116
strncpy(loginname, lname, llen);
117
loginname[llen] = EOS;
119
if (isdigit((int) loginname[0])) {
120
return atoi(loginname);
122
if ((pwd = getpwnam(loginname)) == NULL) {
124
, "Invalid user id name [%s]", loginname);
127
return (int)pwd->pw_uid;
131
make_id_table(const char * list, int listlen, int (*map)(const char *, int))
135
const char * lastid = list + listlen;
140
ret = g_hash_table_new(g_direct_hash, g_direct_equal);
143
while (id < lastid && *id != EOS) {
144
idlen = strcspn(id, ",");
145
if (id+idlen >= lastid) {
146
idlen = (lastid - id);
148
idval = map(id, idlen);
150
g_hash_table_destroy(ret);
155
, "Adding [ug]id %*s [%d] to authorization g_hash_table"
158
g_hash_table_insert(ret, GUINT_TO_POINTER(idval), &one);
161
id += strspn(id, ",");
170
ipc_str_to_auth(const char* uidlist, int uidlen, const char* gidlist, int gidlen)
172
struct IPC_AUTH* auth;
174
auth = malloc(sizeof(struct IPC_AUTH));
176
cl_log(LOG_ERR, "Out of memory for IPC_AUTH");
180
memset(auth, 0, sizeof(*auth));
183
auth->uid = make_id_table(uidlist, uidlen, unametonum);
184
if (auth->uid == NULL) {
186
"Bad uid list [%*s]",
192
auth->gid = make_id_table(gidlist, gidlen, gnametonum);
193
if (auth->gid == NULL) {
195
"Bad gid list [%*s]",
206
g_hash_table_destroy(auth->uid);
211
g_hash_table_destroy(auth->gid);
222
ipc_set_auth(uid_t * a_uid, gid_t * a_gid, int num_uid, int num_gid)
224
struct IPC_AUTH *temp_auth;
228
temp_auth = malloc(sizeof(struct IPC_AUTH));
229
if (temp_auth == NULL){
230
cl_log(LOG_ERR, "%s: memory allocation failed",__FUNCTION__);
233
temp_auth->uid = g_hash_table_new(g_direct_hash, g_direct_equal);
234
temp_auth->gid = g_hash_table_new(g_direct_hash, g_direct_equal);
237
for (i=0; i<num_uid; i++) {
238
g_hash_table_insert(temp_auth->uid, GINT_TO_POINTER((gint)a_uid[i])
244
for (i=0; i<num_gid; i++) {
245
g_hash_table_insert(temp_auth->gid, GINT_TO_POINTER((gint)a_gid[i])
254
ipc_destroy_auth(struct IPC_AUTH *auth)
258
g_hash_table_destroy(auth->uid);
261
g_hash_table_destroy(auth->gid);
268
ipc_bufpool_display(struct ipc_bufpool* pool)
272
cl_log(LOG_ERR, "pool is NULL");
277
cl_log(LOG_INFO, "pool: refcount=%d, startpos=%p, currpos=%p,"
278
"consumepos=%p, endpos=%p, size=%d",
287
void ipc_bufpool_dump_stats(void);
289
ipc_bufpool_dump_stats(void)
291
cl_log(LOG_INFO, "num_pool_allocated=%d, num_pool_freed=%d, diff=%d",
294
num_pool_allocated - num_pool_freed);
300
ipc_bufpool_new(int size)
302
struct ipc_bufpool* pool;
306
/* there are memories for two struct SOCKET_MSG_HEAD
307
* one for the big message, the other one for the next
308
* message. This code prevents allocating
309
* <big memory> <4k> <big memory><4k> ...
310
* from happening when a client sends big messages
313
totalsize = size + sizeof(struct ipc_bufpool)
314
+ sizeof(struct SOCKET_MSG_HEAD) * 2 ;
316
if (totalsize < POOL_SIZE){
317
totalsize = POOL_SIZE;
320
if (totalsize > MAXMSG){
321
cl_log(LOG_INFO, "ipc_bufpool_new: "
322
"asking for buffer with size %d"
323
"corrupted data len???", totalsize);
327
pool = (struct ipc_bufpool*)malloc(totalsize+1);
329
cl_log(LOG_ERR, "%s: memory allocation failed", __FUNCTION__);
332
memset(pool, 0, totalsize);
334
pool->startpos = pool->currpos = pool->consumepos =
335
((char*)pool) + sizeof(struct ipc_bufpool);
337
pool->endpos = ((char*)pool) + totalsize;
338
pool->size = totalsize;
340
num_pool_allocated ++ ;
346
ipc_bufpool_del(struct ipc_bufpool* pool)
353
if (pool->refcount > 0){
354
cl_log(LOG_ERR," ipc_bufpool_del:"
355
" IPC buffer pool reference count"
360
memset(pool, 0, pool->size);
367
ipc_bufpool_spaceleft(struct ipc_bufpool* pool)
371
cl_log(LOG_ERR, "ipc_bufpool_spacelft:"
372
"invalid input argument");
376
return pool->endpos - pool->currpos;
382
/* brief free the memory space allocated to msg and destroy msg. */
385
ipc_bufpool_msg_done(struct IPC_MESSAGE * msg) {
387
struct ipc_bufpool* pool;
390
cl_log(LOG_ERR, "ipc_bufpool_msg_done:"
395
pool = (struct ipc_bufpool*)msg->msg_private;
397
ipc_bufpool_unref(pool);
402
static struct IPC_MESSAGE*
403
ipc_bufpool_msg_new(void)
405
struct IPC_MESSAGE * temp_msg;
407
temp_msg = malloc(sizeof(struct IPC_MESSAGE));
408
if (temp_msg == NULL){
409
cl_log(LOG_ERR, "ipc_bufpool_msg_new:"
410
"allocating new msg failed");
414
memset(temp_msg, 0, sizeof(struct IPC_MESSAGE));
421
ipcmsg_display(IPC_Message* ipcmsg)
424
cl_log(LOG_ERR, "ipcmsg is NULL");
428
cl_log(LOG_INFO, "ipcmsg: msg_len=%lu, msg_buf=%p, msg_body=%p,"
429
"msg_done=%p, msg_private=%p, msg_ch=%p",
430
(unsigned long)ipcmsg->msg_len,
441
/* after a recv call, we have new data
442
* in the pool buf, we need to update our
443
* pool struct to consume it
448
ipc_bufpool_update(struct ipc_bufpool* pool,
449
struct IPC_CHANNEL * ch,
454
struct SOCKET_MSG_HEAD localhead;
455
struct SOCKET_MSG_HEAD* head = &localhead;
460
cl_log(LOG_ERR, "ipc_update_bufpool:"
465
pool->currpos += msg_len;
468
/*not enough data for head*/
469
if ((int)(pool->currpos - pool->consumepos) < (int)ch->msgpad){
473
memcpy(head, pool->consumepos, sizeof(struct SOCKET_MSG_HEAD));
475
if (head->magic != HEADMAGIC){
476
GList* last = g_list_last(rqueue->queue);
477
cl_log(LOG_ERR, "ipc_bufpool_update: "
478
"magic number in head does not match."
479
"Something very bad happened, abort now, farside pid =%d",
481
cl_log(LOG_ERR, "magic=%x, expected value=%x", head->magic, HEADMAGIC);
482
ipc_bufpool_display(pool);
483
cl_log(LOG_INFO, "nmsgs=%d", nmsgs);
484
/*print out the last message in queue*/
486
IPC_Message* m = (IPC_Message*)last;
492
if ( head->msg_len > MAXMSG){
493
cl_log(LOG_ERR, "ipc_update_bufpool:"
494
"msg length is corruptted(%d)",
499
if (pool->consumepos + ch->msgpad + head->msg_len
504
ipcmsg = ipc_bufpool_msg_new();
506
cl_log(LOG_ERR, "ipc_update_bufpool:"
507
"allocating memory for new ipcmsg failed");
511
ipcmsg->msg_buf = pool->consumepos;
512
ipcmsg->msg_body = pool->consumepos + ch->msgpad;
513
ipcmsg->msg_len = head->msg_len;
514
ipcmsg->msg_private = pool;
515
ipcmsg->msg_done = ipc_bufpool_msg_done;
517
#ifdef IPC_TIME_DEBUG
518
ipc_time_debug(ch,ipcmsg, MSGPOS_RECV);
522
rqueue->queue = g_list_append(rqueue->queue, ipcmsg);
523
rqueue->current_qlen ++;
526
pool->consumepos += ch->msgpad + head->msg_len;
527
ipc_bufpool_ref(pool);
539
ipc_bufpool_full(struct ipc_bufpool* pool,
540
struct IPC_CHANNEL* ch,
541
int* dataspaceneeded)
544
struct SOCKET_MSG_HEAD localhead;
545
struct SOCKET_MSG_HEAD* head = &localhead;
547
*dataspaceneeded = 0;
548
/* not enough space for head */
549
if ((int)(pool->endpos - pool->consumepos) < (int)ch->msgpad){
553
/*enough space for head*/
554
if ((int)(pool->currpos - pool->consumepos) >= (int)ch->msgpad){
555
memcpy(head, pool->consumepos, sizeof(struct SOCKET_MSG_HEAD));
557
/* not enough space for data*/
558
if ( pool->consumepos + ch->msgpad + head->msg_len >= pool->endpos){
559
*dataspaceneeded = head->msg_len;
565
/* Either we are sure we have enough space
566
* or we cannot tell because we have not received
567
* head yet. But we are sure we have enough space
578
ipc_bufpool_partial_copy(struct ipc_bufpool* dstpool,
579
struct ipc_bufpool* srcpool)
581
struct SOCKET_MSG_HEAD localhead;
582
struct SOCKET_MSG_HEAD *head = &localhead;
588
cl_log(LOG_ERR, "ipc_bufpool_partial_ipcmsg_cp:"
593
if (srcpool->currpos - srcpool->consumepos >=
594
(ssize_t)sizeof(struct SOCKET_MSG_HEAD)){
596
memcpy(head, srcpool->consumepos, sizeof(struct SOCKET_MSG_HEAD));
597
space_needed = head->msg_len + sizeof(*head);
599
if (space_needed > ipc_bufpool_spaceleft(dstpool)){
600
cl_log(LOG_ERR, "ipc_bufpool_partial_ipcmsg_cp:"
601
" not enough space left in dst pool,spaced needed=%d",
607
nbytes = srcpool->currpos - srcpool->consumepos;
608
memcpy(dstpool->consumepos, srcpool->consumepos,nbytes);
611
srcpool->currpos = srcpool->consumepos;
612
dstpool->currpos = dstpool->consumepos + nbytes;
619
ipc_bufpool_ref(struct ipc_bufpool* pool)
622
cl_log(LOG_ERR, "ref_pool:"
632
ipc_bufpool_unref(struct ipc_bufpool* pool){
635
cl_log(LOG_ERR, "unref_pool:"
642
if (pool->refcount <= 0){
643
ipc_bufpool_del(pool);