~ubuntu-branches/ubuntu/precise/corosync/precise-proposed

« back to all changes in this revision

Viewing changes to exec/coroipcs.c

  • Committer: Bazaar Package Importer
  • Author(s): Martin Loschwitz
  • Date: 2011-10-19 14:32:18 UTC
  • mfrom: (1.1.6 upstream) (5.1.16 sid)
  • Revision ID: james.westby@ubuntu.com-20111019143218-ew8phl0raqyog844
Tags: 1.4.2-1
* Changed my email address in debian/control
* Add corosync-blackbox to the corosync package
* Imported Upstream version 1.4.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
117
117
        size_t size;
118
118
};
119
119
 
120
 
#if _POSIX_THREAD_PROCESS_SHARED < 1
121
 
#if defined(_SEM_SEMUN_UNDEFINED)
122
 
union semun {
123
 
        int val;
124
 
        struct semid_ds *buf;
125
 
        unsigned short int *array;
126
 
        struct seminfo *__buf;
127
 
};
128
 
#endif
129
 
#endif
130
 
 
131
120
 
132
121
enum conn_state {
133
122
        CONN_STATE_THREAD_INACTIVE = 0,
357
346
        return (res);
358
347
}
359
348
 
360
 
static void flow_control_state_set (
 
349
static int32_t flow_control_state_set (
361
350
        struct conn_info *conn_info,
362
351
        int flow_control_state)
363
352
{
364
353
        if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
365
 
                return;
 
354
                return 0;
366
355
        }
367
356
        if (flow_control_state == 0) {
368
357
                log_printf (LOGSYS_LEVEL_DEBUG,
375
364
                        conn_info->client_pid);
376
365
        }
377
366
 
378
 
 
379
367
        conn_info->control_buffer->flow_control_enabled = flow_control_state;
380
 
        api->stats_update_value (conn_info->stats_handle,
381
 
                "flow_control",
382
 
                &flow_control_state,
383
 
                sizeof(flow_control_state));
384
 
        api->stats_increment_value (conn_info->stats_handle,
385
 
                "flow_control_count");
 
368
        return 1;
 
369
}
 
370
 
 
371
static void flow_control_stats_update (
 
372
        hdb_handle_t stats_handle,
 
373
        int flow_control_state)
 
374
{
 
375
        uint32_t fc_state = flow_control_state;
 
376
        api->stats_update_value (stats_handle, "flow_control",
 
377
                                 &fc_state, sizeof(fc_state));
 
378
        api->stats_increment_value (stats_handle, "flow_control_count");
386
379
}
387
380
 
388
381
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
523
516
                return (0);
524
517
        }
525
518
 
526
 
        api->serialize_lock ();
527
519
        /*
528
520
         * Retry library exit function if busy
529
521
         */
530
522
        if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
 
523
                api->serialize_lock ();
 
524
                res = api->exit_fn_get (conn_info->service) (conn_info);
 
525
                api->serialize_unlock ();
531
526
                api->stats_destroy_connection (conn_info->stats_handle);
532
 
                res = api->exit_fn_get (conn_info->service) (conn_info);
533
527
                if (res == -1) {
534
 
                        api->serialize_unlock ();
535
528
                        return (0);
536
529
                } else {
537
530
                        conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
541
534
        pthread_mutex_lock (&conn_info->mutex);
542
535
        if (conn_info->refcount > 0) {
543
536
                pthread_mutex_unlock (&conn_info->mutex);
544
 
                api->serialize_unlock ();
545
537
                return (0);
546
538
        }
547
539
        list_del (&conn_info->list);
579
571
        res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
580
572
        zcb_all_free (conn_info);
581
573
        api->free (conn_info);
582
 
        api->serialize_unlock ();
583
574
        return (-1);
584
575
}
585
576
 
714
705
                 * parameter, such as an invalid size
715
706
                 */
716
707
                if (send_ok == -1) {
 
708
                        api->stats_increment_value (conn_info->stats_handle, "invalid_request");
717
709
                        coroipc_response_header.size = sizeof (coroipc_response_header_t);
718
710
                        coroipc_response_header.id = 0;
719
711
                        coroipc_response_header.error = CS_ERR_INVALID_PARAM;
722
714
                                sizeof (coroipc_response_header_t));
723
715
                } else 
724
716
                if (send_ok) {
 
717
                        api->stats_increment_value (conn_info->stats_handle, "requests");
725
718
                        api->serialize_lock();
726
 
                        api->stats_increment_value (conn_info->stats_handle, "requests");
727
719
                        api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
728
720
                        api->serialize_unlock();
729
721
                } else {
730
722
                        /*
731
723
                         * Overload, tell library to retry
732
724
                         */
 
725
                        api->stats_increment_value (conn_info->stats_handle, "overload");
733
726
                        coroipc_response_header.size = sizeof (coroipc_response_header_t);
734
727
                        coroipc_response_header.id = 0;
735
728
                        coroipc_response_header.error = CS_ERR_TRY_AGAIN;
1039
1032
 
1040
1033
        res = fcntl (server_fd, F_SETFL, O_NONBLOCK);
1041
1034
        if (res == -1) {
1042
 
                char error_str[100];
1043
 
                strerror_r (errno, error_str, 100);
1044
 
                log_printf (LOGSYS_LEVEL_CRIT, "Could not set non-blocking operation on server socket: %s\n", error_str);
 
1035
                LOGSYS_PERROR (errno, LOGSYS_LEVEL_CRIT,
 
1036
                        "Could not set non-blocking operation on server socket");
1045
1037
                api->fatal_error ("Could not set non-blocking operation on server socket");
1046
1038
        }
1047
1039
 
1068
1060
 
1069
1061
        res = bind (server_fd, (struct sockaddr *)&un_addr, COROSYNC_SUN_LEN(&un_addr));
1070
1062
        if (res) {
1071
 
                char error_str[100];
1072
 
                strerror_r (errno, error_str, 100);
1073
 
                log_printf (LOGSYS_LEVEL_CRIT, "Could not bind AF_UNIX (%s): %s.\n", un_addr.sun_path, error_str);
 
1063
                LOGSYS_PERROR (errno, LOGSYS_LEVEL_CRIT,
 
1064
                                "Could not bind AF_UNIX (%s)", un_addr.sun_path);
1074
1065
                api->fatal_error ("Could not bind to AF_UNIX socket\n");
1075
1066
        }
1076
1067
 
1240
1231
        write_idx = conn_info->control_buffer->write;
1241
1232
 
1242
1233
        memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
1243
 
        conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
 
1234
        conn_info->control_buffer->write = ((write_idx + len + 7) & 0xFFFFFFF8) % conn_info->dispatch_size;
1244
1235
}
1245
1236
 
1246
1237
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
1267
1258
        }
1268
1259
 
1269
1260
        ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
1270
 
 
1271
 
        api->stats_increment_value (conn_info->stats_handle, "dispatched");
1272
1261
}
1273
1262
 
1274
1263
static void outq_flush (struct conn_info *conn_info) {
1276
1265
        struct outq_item *outq_item;
1277
1266
        unsigned int bytes_left;
1278
1267
        struct iovec iov;
 
1268
        int32_t q_size_dec = 0;
 
1269
        int32_t i;
 
1270
        int32_t fc_set;
1279
1271
 
1280
1272
        pthread_mutex_lock (&conn_info->mutex);
1281
1273
        if (list_empty (&conn_info->outq_head)) {
1282
 
                flow_control_state_set (conn_info, 0);
 
1274
                fc_set = flow_control_state_set (conn_info, 0);
1283
1275
                pthread_mutex_unlock (&conn_info->mutex);
 
1276
                if (fc_set) {
 
1277
                        flow_control_stats_update (conn_info->stats_handle, 0);
 
1278
                }
1284
1279
                return;
1285
1280
        }
1286
1281
        for (list = conn_info->outq_head.next;
1296
1291
                        list_del (list);
1297
1292
                        api->free (iov.iov_base);
1298
1293
                        api->free (outq_item);
1299
 
                        api->stats_decrement_value (conn_info->stats_handle, "queue_size");
 
1294
                        q_size_dec++;
1300
1295
                } else {
1301
1296
                        break;
1302
1297
                }
1303
1298
        }
1304
1299
        pthread_mutex_unlock (&conn_info->mutex);
 
1300
 
 
1301
        /*
 
1302
         * these need to be sent out of the conn_info->mutex
 
1303
         */
 
1304
        for (i = 0; i < q_size_dec; i++) {
 
1305
                api->stats_decrement_value (conn_info->stats_handle, "queue_size");
 
1306
                api->stats_increment_value (conn_info->stats_handle, "dispatched");
 
1307
        }
1305
1308
}
1306
1309
 
1307
1310
static int priv_change (struct conn_info *conn_info)
1375
1378
                bytes_msg += iov[i].iov_len;
1376
1379
        }
1377
1380
        if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
1378
 
                flow_control_state_set (conn_info, 1);
 
1381
                if (flow_control_state_set (conn_info, 1)) {
 
1382
                        flow_control_stats_update(conn_info->stats_handle, 1);
 
1383
                }
1379
1384
                outq_item = api->malloc (sizeof (struct outq_item));
1380
1385
                if (outq_item == NULL) {
1381
1386
                        ipc_disconnect (conn);
1402
1407
                return;
1403
1408
        }
1404
1409
        msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
 
1410
        api->stats_increment_value (conn_info->stats_handle, "dispatched");
1405
1411
}
1406
1412
 
1407
1413
void coroipcs_refcount_inc (void *conn)
1461
1467
        }
1462
1468
 
1463
1469
        if (new_fd == -1) {
1464
 
                char error_str[100];
1465
 
                strerror_r (errno, error_str, 100);
1466
 
                log_printf (LOGSYS_LEVEL_ERROR,
1467
 
                        "Could not accept Library connection: %s\n", error_str);
 
1470
                LOGSYS_PERROR (errno, LOGSYS_LEVEL_ERROR,
 
1471
                        "Could not accept Library connection");
1468
1472
                return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
1469
1473
        }
1470
1474
 
1471
1475
        res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
1472
1476
        if (res == -1) {
1473
 
                char error_str[100];
1474
 
                strerror_r (errno, error_str, 100);
1475
 
                log_printf (LOGSYS_LEVEL_ERROR,
1476
 
                        "Could not set non-blocking operation on library connection: %s\n",
1477
 
                        error_str);
 
1477
                LOGSYS_PERROR (errno, LOGSYS_LEVEL_ERROR,
 
1478
                        "Could not set non-blocking operation on library connection");
1478
1479
                close (new_fd);
1479
1480
                return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
1480
1481
        }
1545
1546
static void coroipcs_init_conn_stats (
1546
1547
        struct conn_info *conn)
1547
1548
{
1548
 
        char conn_name[42];
1549
 
        char proc_name[32];
 
1549
        char conn_name[CS_MAX_NAME_LENGTH];
 
1550
        char proc_name[CS_MAX_NAME_LENGTH];
 
1551
        char int_str[4];
1550
1552
 
1551
1553
        if (conn->client_pid > 0) {
1552
 
                if (pid_to_name (conn->client_pid, proc_name, sizeof(proc_name)))
1553
 
                        snprintf (conn_name, sizeof(conn_name), "%s:%d:%d", proc_name, conn->client_pid, conn->fd);
1554
 
                else
1555
 
                        snprintf (conn_name, sizeof(conn_name), "%d:%d", conn->client_pid, conn->fd);
1556
 
        } else
1557
 
                snprintf (conn_name, sizeof(conn_name), "%d", conn->fd);
1558
 
 
 
1554
                if (pid_to_name (conn->client_pid, proc_name, sizeof(proc_name))) {
 
1555
                        snprintf (conn_name, sizeof(conn_name),
 
1556
                                "%s:%s:%d:%d", proc_name,
 
1557
                                short_service_name_get(conn->service, int_str, 4),
 
1558
                                conn->client_pid, conn->fd);
 
1559
                } else {
 
1560
                        snprintf (conn_name, sizeof(conn_name),
 
1561
                                "proc:%s:%d:%d",
 
1562
                                short_service_name_get(conn->service, int_str, 4),
 
1563
                                conn->client_pid,
 
1564
                                conn->fd);
 
1565
                }
 
1566
        } else {
 
1567
                snprintf (conn_name, sizeof(conn_name),
 
1568
                        "proc:%s:pid:%d",
 
1569
                        short_service_name_get(conn->service, int_str, 4),
 
1570
                        conn->fd);
 
1571
        }
1559
1572
        conn->stats_handle = api->stats_create_connection (conn_name, conn->client_pid, conn->fd);
1560
1573
        api->stats_update_value (conn->stats_handle, "service_id",
1561
1574
                &conn->service, sizeof(conn->service));
1569
1582
        mar_req_setup_t *req_setup;
1570
1583
        struct conn_info *conn_info = (struct conn_info *)context;
1571
1584
        int res;
1572
 
        char buf;
 
1585
        char buf = 0;
1573
1586
 
1574
1587
 
1575
1588
        if (ipc_thread_exiting (conn_info)) {
1605
1618
                req_setup = (mar_req_setup_t *)conn_info->setup_msg;
1606
1619
                /*
1607
1620
                 * Is the service registered ?
 
1621
                 * Has service init function ?
1608
1622
                 */
1609
 
                if (api->service_available (req_setup->service) == 0) {
 
1623
                if (api->service_available (req_setup->service) == 0 ||
 
1624
                    api->init_fn_get (req_setup->service) == NULL) {
1610
1625
                        req_setup_send (conn_info, CS_ERR_NOT_EXIST);
1611
1626
                        ipc_disconnect (conn_info);
1612
1627
                        return (0);
1613
1628
                }
1614
 
                req_setup_send (conn_info, CS_OK);
1615
 
 
1616
1629
#if _POSIX_THREAD_PROCESS_SHARED < 1
1617
1630
                conn_info->semkey = req_setup->semkey;
1618
1631
#endif
1620
1633
                        req_setup->control_file,
1621
1634
                        req_setup->control_size,
1622
1635
                        (void *)&conn_info->control_buffer);
 
1636
                if (res == -1) {
 
1637
                        goto send_setup_response;
 
1638
                }
1623
1639
                conn_info->control_size = req_setup->control_size;
1624
1640
 
1625
1641
                res = memory_map (
1626
1642
                        req_setup->request_file,
1627
1643
                        req_setup->request_size,
1628
1644
                        (void *)&conn_info->request_buffer);
 
1645
                if (res == -1) {
 
1646
                        goto send_setup_response;
 
1647
                }
1629
1648
                conn_info->request_size = req_setup->request_size;
1630
1649
 
1631
1650
                res = memory_map (
1632
1651
                        req_setup->response_file,
1633
1652
                        req_setup->response_size,
1634
1653
                        (void *)&conn_info->response_buffer);
 
1654
                if (res == -1) {
 
1655
                        goto send_setup_response;
 
1656
                }
1635
1657
                conn_info->response_size = req_setup->response_size;
1636
1658
 
1637
1659
                res = circular_memory_map (
1638
1660
                        req_setup->dispatch_file,
1639
1661
                        req_setup->dispatch_size,
1640
1662
                        (void *)&conn_info->dispatch_buffer);
 
1663
                if (res == -1) {
 
1664
                        goto send_setup_response;
 
1665
                }
1641
1666
                conn_info->dispatch_size = req_setup->dispatch_size;
1642
1667
 
 
1668
 send_setup_response:
 
1669
                if (res == 0) {
 
1670
                        req_setup_send (conn_info, CS_OK);
 
1671
                } else {
 
1672
                        req_setup_send (conn_info, CS_ERR_LIBRARY);
 
1673
                        ipc_disconnect (conn_info);
 
1674
                        return (0);
 
1675
                }
 
1676
 
1643
1677
                conn_info->service = req_setup->service;
1644
1678
                conn_info->refcount = 0;
1645
1679
                conn_info->setup_bytes_read = 0;
1685
1719
                 * the ipc connection
1686
1720
                 */
1687
1721
                if (conn_info->service == SOCKET_SERVICE_INIT) {
1688
 
                        conn_info->service = -1;
 
1722
                        conn_info->service = SOCKET_SERVICE_SECURITY_VIOLATION;
1689
1723
                }
1690
1724
        } else
1691
1725
        if (revent & POLLIN) {