~ubuntu-branches/debian/stretch/spice/stretch

« back to all changes in this revision

Viewing changes to server/red_tunnel_worker.c

  • Committer: Package Import Robot
  • Author(s): Liang Guo, Liang Guo, Michael Tokarev
  • Date: 2011-11-29 14:37:08 UTC
  • mfrom: (0.6.1) (0.4.2) (2.1.3 sid)
  • Revision ID: package-import@ubuntu.com-20111129143708-jptkxyjl3a4rds2r
Tags: 0.10.0-1
[ Liang Guo ]
* New upstream release (Closes: #651262)
* Refresh debian/copyright
* Remove fix-typo-in-cmd_line_parser-cpp.patch, applied upstream
* Remove fix-typo-in-record-cpp.patch, applied upstream
* Remove use-requires-private-for-libspice-pkgconfig.patch, applied upstream
* Change Build-Depends on libspice-protocol-dev to (>= 0.9.1~)
* Refresh libspice-server1.symbols
* Update debian/rules clean target
* Ignore common/win/my_getopt-1.5/Makefile change when building package
* debian/control: set DMUA

[ Michael Tokarev ]
* use `rm -f' instead of `-rm' in debian/rules clean targets
* remove python_modules/*.pyc in clean target

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
    Author:
19
19
        yhalperi@redhat.com
20
20
*/
 
21
#ifdef HAVE_CONFIG_H
 
22
#include <config.h>
 
23
#endif
21
24
 
22
25
#include <stdio.h>
23
26
#include <stdint.h>
67
70
typedef struct TunnelWorker TunnelWorker;
68
71
 
69
72
enum {
70
 
    PIPE_ITEM_TYPE_SET_ACK,
71
 
    PIPE_ITEM_TYPE_MIGRATE,
 
73
    PIPE_ITEM_TYPE_MIGRATE = PIPE_ITEM_TYPE_CHANNEL_BASE,
72
74
    PIPE_ITEM_TYPE_MIGRATE_DATA,
73
75
    PIPE_ITEM_TYPE_TUNNEL_INIT,
74
76
    PIPE_ITEM_TYPE_SERVICE_IP_MAP,
341
343
/****************************************************
342
344
*   Migration data
343
345
****************************************************/
344
 
typedef struct TunnelChannel TunnelChannel;
 
346
typedef struct TunnelChannelClient TunnelChannelClient;
345
347
 
346
348
#define TUNNEL_MIGRATE_DATA_MAGIC (*(uint32_t *)"TMDA")
347
349
#define TUNNEL_MIGRATE_DATA_VERSION 1
469
471
    TunnelMigrateSocketItem sockets_data[MAX_SOCKETS_NUM];
470
472
} TunnelMigrateItem;
471
473
 
472
 
static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel);
 
474
static inline void tunnel_channel_activate_migrated_sockets(TunnelChannelClient *channel);
473
475
 
474
476
/*******************************************************************************************/
475
477
 
483
485
/* should be checked after each subroutine that may cause error or after calls to slirp routines */
484
486
#define CHECK_TUNNEL_ERROR(channel) (channel->tunnel_error)
485
487
 
486
 
struct TunnelChannel {
487
 
    RedChannel base;
 
488
struct TunnelChannelClient {
 
489
    RedChannelClient base;
488
490
    TunnelWorker *worker;
489
491
    int mig_inprogress;
490
492
    int expect_migrate_mark;
492
494
 
493
495
    int tunnel_error;
494
496
 
 
497
    /* TODO: this needs to be RCC specific (or bad things will happen) */
495
498
    struct {
496
499
        union {
497
500
            SpiceMsgTunnelInit init;
503
506
            SpiceMsgTunnelSocketData socket_data;
504
507
            SpiceMsgTunnelSocketTokens socket_token;
505
508
            TunnelMigrateData migrate_data;
 
509
            SpiceMsgMigrate migrate;
506
510
        } u;
507
511
    } send_data;
508
512
 
533
537
} TunnelPrintService;
534
538
 
535
539
struct TunnelWorker {
536
 
    Channel channel_interface; // for reds
537
 
    TunnelChannel *channel;
 
540
    RedChannel *channel;
 
541
    TunnelChannelClient *channel_client;
538
542
 
539
543
    SpiceCoreInterface *core_interface;
540
544
    SpiceNetWireInstance *sin;
560
564
/*********************************************************************
561
565
 * Tunnel interface
562
566
 *********************************************************************/
563
 
static void tunnel_channel_disconnect(RedChannel *channel);
 
567
static void tunnel_channel_on_disconnect(RedChannel *channel);
564
568
 
565
569
/* networking interface for slirp */
566
570
static int  qemu_can_output(SlirpUsrNetworkInterface *usr_interface);
597
601
static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms);
598
602
 
599
603
 
600
 
/* reds interface */
601
 
static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration,
602
 
                                       int num_common_caps, uint32_t *common_caps, int num_caps,
 
604
/* RedChannel interface */
 
605
 
 
606
static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client,
 
607
                                       RedsStream *stream, int migration,
 
608
                                       int num_common_caps,
 
609
                                       uint32_t *common_caps, int num_caps,
603
610
                                       uint32_t *caps);
604
 
static void handle_tunnel_channel_shutdown(struct Channel *channel);
605
 
static void handle_tunnel_channel_migrate(struct Channel *channel);
606
 
 
 
611
static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc);
 
612
static void red_tunnel_channel_create(TunnelWorker *worker);
607
613
 
608
614
static void tunnel_shutdown(TunnelWorker *worker)
609
615
{
610
616
    int i;
611
617
    red_printf("");
612
618
    /* shutdown input from channel */
613
 
    if (worker->channel) {
614
 
        red_channel_shutdown(&worker->channel->base);
 
619
    if (worker->channel_client) {
 
620
        red_channel_client_shutdown(&worker->channel_client->base);
615
621
    }
616
622
 
617
623
    /* shutdown socket pipe items */
720
726
    return ret;
721
727
}
722
728
 
723
 
static inline void __process_rcv_buf_tokens(TunnelChannel *channel, RedSocket *sckt)
 
729
static inline void __process_rcv_buf_tokens(TunnelChannelClient *channel, RedSocket *sckt)
724
730
{
725
 
    if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) || red_channel_pipe_item_is_linked(
 
731
    if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) || red_channel_client_pipe_item_is_linked(
726
732
            &channel->base, &sckt->out_data.token_pipe_item) || channel->mig_inprogress) {
727
733
        return;
728
734
    }
730
736
    if ((sckt->in_data.num_tokens >= SOCKET_TOKENS_TO_SEND) ||
731
737
        (!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head)) {
732
738
        sckt->out_data.token_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_TOKEN;
733
 
        red_channel_pipe_add(&channel->base, &sckt->out_data.token_pipe_item);
 
739
        red_channel_client_pipe_add(&channel->base, &sckt->out_data.token_pipe_item);
734
740
    }
735
741
}
736
742
 
739
745
    --sckt->in_data.num_buffers;
740
746
    __tunnel_worker_free_socket_rcv_buf(sckt->worker, rcv_buf);
741
747
    ++sckt->in_data.num_tokens;
742
 
    __process_rcv_buf_tokens(sckt->worker->channel, sckt);
 
748
    __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
743
749
}
744
750
 
745
751
static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker,
967
973
    TunnelWorker *worker = sin->st->worker;
968
974
    ASSERT(worker);
969
975
 
970
 
    if (worker->channel && worker->channel->base.migrate) {
 
976
    if (worker->channel_client && worker->channel_client->base.channel->migrate) {
971
977
        return; // during migration and the tunnel state hasn't been restored yet.
972
978
    }
973
979
 
1010
1016
 
1011
1017
    worker->null_interface.worker = worker;
1012
1018
 
1013
 
    worker->channel_interface.type = SPICE_CHANNEL_TUNNEL;
1014
 
    worker->channel_interface.id = 0;
1015
 
    worker->channel_interface.link = handle_tunnel_channel_link;
1016
 
    worker->channel_interface.shutdown = handle_tunnel_channel_shutdown;
1017
 
    worker->channel_interface.migrate = handle_tunnel_channel_migrate;
1018
 
    worker->channel_interface.data = worker;
 
1019
    red_tunnel_channel_create(worker);
1019
1020
 
1020
 
    ring_init(&worker->services);
1021
 
    reds_register_channel(&worker->channel_interface);
 
1021
   ring_init(&worker->services);
1022
1022
 
1023
1023
    net_slirp_init(worker->sif->get_ip(worker->sin),
1024
1024
                   TRUE,
1090
1090
#endif
1091
1091
    if (!virt_ip) {
1092
1092
        new_service->pipe_item.type = PIPE_ITEM_TYPE_SERVICE_IP_MAP;
1093
 
        red_channel_pipe_add(&worker->channel->base, &new_service->pipe_item);
 
1093
        red_channel_client_pipe_add(&worker->channel_client->base, &new_service->pipe_item);
1094
1094
    }
1095
1095
 
1096
1096
    return new_service;
1148
1148
    return service;
1149
1149
}
1150
1150
 
1151
 
static int tunnel_channel_handle_service_add(TunnelChannel *channel,
 
1151
static int tunnel_channel_handle_service_add(TunnelChannelClient *channel,
1152
1152
                                             SpiceMsgcTunnelAddGenericService *service_msg)
1153
1153
{
1154
1154
    TunnelService *out_service = NULL;
1286
1286
 
1287
1287
static void tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt)
1288
1288
{
1289
 
    if (worker->channel) {
1290
 
        if (red_channel_pipe_item_is_linked(&worker->channel->base,
 
1289
    if (worker->channel_client) {
 
1290
        if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
1291
1291
                                            &sckt->out_data.data_pipe_item)) {
1292
 
            red_channel_pipe_item_remove(&worker->channel->base,
 
1292
            red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
1293
1293
                                         &sckt->out_data.data_pipe_item);
1294
1294
            return;
1295
1295
        }
1296
1296
 
1297
 
        if (red_channel_pipe_item_is_linked(&worker->channel->base,
 
1297
        if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
1298
1298
                                            &sckt->out_data.status_pipe_item)) {
1299
 
            red_channel_pipe_item_remove(&worker->channel->base,
 
1299
            red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
1300
1300
                                         &sckt->out_data.status_pipe_item);
1301
1301
            return;
1302
1302
        }
1303
1303
 
1304
 
        if (red_channel_pipe_item_is_linked(&worker->channel->base,
 
1304
        if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
1305
1305
                                            &sckt->out_data.token_pipe_item)) {
1306
 
            red_channel_pipe_item_remove(&worker->channel->base,
 
1306
            red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
1307
1307
                                         &sckt->out_data.token_pipe_item);
1308
1308
            return;
1309
1309
        }
1339
1339
    return NULL;
1340
1340
}
1341
1341
 
1342
 
static inline void __tunnel_socket_add_fin_to_pipe(TunnelChannel *channel, RedSocket *sckt)
 
1342
static inline void __tunnel_socket_add_fin_to_pipe(TunnelChannelClient *channel, RedSocket *sckt)
1343
1343
{
1344
 
    ASSERT(!red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item));
 
1344
    ASSERT(!red_channel_client_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item));
1345
1345
    sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_FIN;
1346
 
    red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
 
1346
    red_channel_client_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
1347
1347
}
1348
1348
 
1349
 
static inline void __tunnel_socket_add_close_to_pipe(TunnelChannel *channel, RedSocket *sckt)
 
1349
static inline void __tunnel_socket_add_close_to_pipe(TunnelChannelClient *channel, RedSocket *sckt)
1350
1350
{
1351
1351
    ASSERT(!channel->mig_inprogress);
1352
1352
 
1353
 
    if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) {
 
1353
    if (red_channel_client_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) {
1354
1354
        ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN);
1355
1355
        // close is stronger than FIN
1356
 
        red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item);
 
1356
        red_channel_client_pipe_remove_and_release(&channel->base,
 
1357
                                        &sckt->out_data.status_pipe_item);
1357
1358
    }
1358
1359
    sckt->pushed_close = TRUE;
1359
1360
    sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSE;
1360
 
    red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
 
1361
    red_channel_client_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
1361
1362
}
1362
1363
 
1363
 
static inline void __tunnel_socket_add_close_ack_to_pipe(TunnelChannel *channel, RedSocket *sckt)
 
1364
static inline void __tunnel_socket_add_close_ack_to_pipe(TunnelChannelClient *channel, RedSocket *sckt)
1364
1365
{
1365
1366
    ASSERT(!channel->mig_inprogress);
1366
1367
 
1367
 
    if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) {
 
1368
    if (red_channel_client_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) {
1368
1369
        ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN);
1369
1370
        // close is stronger than FIN
1370
 
        red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item);
 
1371
        red_channel_client_pipe_remove_and_release(&channel->base,
 
1372
                                    &sckt->out_data.status_pipe_item);
1371
1373
    }
1372
1374
 
1373
1375
    sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK;
1374
 
    red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
 
1376
    red_channel_client_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
1375
1377
}
1376
1378
 
1377
1379
/*
1379
1381
    If possible, notify slirp to recv data (which will return 0)
1380
1382
    When close ack is received from client, we notify slirp (maybe again) if needed.
1381
1383
*/
1382
 
static void tunnel_socket_force_close(TunnelChannel *channel, RedSocket *sckt)
 
1384
static void tunnel_socket_force_close(TunnelChannelClient *channel, RedSocket *sckt)
1383
1385
{
1384
 
    if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.token_pipe_item)) {
1385
 
        red_channel_pipe_item_remove(&channel->base, &sckt->out_data.token_pipe_item);
 
1386
    if (red_channel_client_pipe_item_is_linked(&channel->base, &sckt->out_data.token_pipe_item)) {
 
1387
        red_channel_client_pipe_remove_and_release(&channel->base, &sckt->out_data.token_pipe_item);
1386
1388
    }
1387
1389
 
1388
 
    if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) {
1389
 
        red_channel_pipe_item_remove(&channel->base, &sckt->out_data.data_pipe_item);
 
1390
    if (red_channel_client_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) {
 
1391
        red_channel_client_pipe_remove_and_release(&channel->base, &sckt->out_data.data_pipe_item);
1390
1392
    }
1391
1393
 
1392
1394
 
1408
1410
    }
1409
1411
}
1410
1412
 
1411
 
static int tunnel_channel_handle_socket_connect_ack(TunnelChannel *channel, RedSocket *sckt,
 
1413
static int tunnel_channel_handle_socket_connect_ack(TunnelChannelClient *channel, RedSocket *sckt,
1412
1414
                                                    uint32_t tokens)
1413
1415
{
1414
1416
#ifdef DEBUG_NETWORK
1415
1417
    red_printf("TUNNEL_DBG");
1416
1418
#endif
1417
 
    if (channel->mig_inprogress || channel->base.migrate) {
 
1419
    if (channel->mig_inprogress || channel->base.channel->migrate) {
1418
1420
        sckt->mig_client_status_msg = SPICE_MSGC_TUNNEL_SOCKET_OPEN_ACK;
1419
1421
        sckt->mig_open_ack_tokens = tokens;
1420
1422
        return TRUE;
1442
1444
    return (!CHECK_TUNNEL_ERROR(channel));
1443
1445
}
1444
1446
 
1445
 
static int tunnel_channel_handle_socket_connect_nack(TunnelChannel *channel, RedSocket *sckt)
 
1447
static int tunnel_channel_handle_socket_connect_nack(TunnelChannelClient *channel, RedSocket *sckt)
1446
1448
{
1447
1449
#ifdef DEBUG_NETWORK
1448
1450
    PRINT_SCKT(sckt);
1449
1451
#endif
1450
 
    if (channel->mig_inprogress || channel->base.migrate) {
 
1452
    if (channel->mig_inprogress || channel->base.channel->migrate) {
1451
1453
        sckt->mig_client_status_msg = SPICE_MSGC_TUNNEL_SOCKET_OPEN_NACK;
1452
1454
        return TRUE;
1453
1455
    }
1467
1469
    return (!CHECK_TUNNEL_ERROR(channel));
1468
1470
}
1469
1471
 
1470
 
static int tunnel_channel_handle_socket_fin(TunnelChannel *channel, RedSocket *sckt)
 
1472
static int tunnel_channel_handle_socket_fin(TunnelChannelClient *channel, RedSocket *sckt)
1471
1473
{
1472
1474
#ifdef DEBUG_NETWORK
1473
1475
    PRINT_SCKT(sckt);
1474
1476
#endif
1475
 
    if (channel->mig_inprogress || channel->base.migrate) {
 
1477
    if (channel->mig_inprogress || channel->base.channel->migrate) {
1476
1478
        sckt->mig_client_status_msg = SPICE_MSGC_TUNNEL_SOCKET_FIN;
1477
1479
        return TRUE;
1478
1480
    }
1502
1504
    return (!CHECK_TUNNEL_ERROR(channel));
1503
1505
}
1504
1506
 
1505
 
static int tunnel_channel_handle_socket_closed(TunnelChannel *channel, RedSocket *sckt)
 
1507
static int tunnel_channel_handle_socket_closed(TunnelChannelClient *channel, RedSocket *sckt)
1506
1508
{
1507
1509
    int prev_client_status = sckt->client_status;
1508
1510
 
1510
1512
    PRINT_SCKT(sckt);
1511
1513
#endif
1512
1514
 
1513
 
    if (channel->mig_inprogress || channel->base.migrate) {
 
1515
    if (channel->mig_inprogress || channel->base.channel->migrate) {
1514
1516
        sckt->mig_client_status_msg = SPICE_MSGC_TUNNEL_SOCKET_CLOSED;
1515
1517
        return TRUE;
1516
1518
    }
1551
1553
    return (!CHECK_TUNNEL_ERROR(channel));
1552
1554
}
1553
1555
 
1554
 
static int tunnel_channel_handle_socket_closed_ack(TunnelChannel *channel, RedSocket *sckt)
 
1556
static int tunnel_channel_handle_socket_closed_ack(TunnelChannelClient *channel, RedSocket *sckt)
1555
1557
{
1556
1558
#ifdef DEBUG_NETWORK
1557
1559
    PRINT_SCKT(sckt);
1558
1560
#endif
1559
 
    if (channel->mig_inprogress || channel->base.migrate) {
 
1561
    if (channel->mig_inprogress || channel->base.channel->migrate) {
1560
1562
        sckt->mig_client_status_msg = SPICE_MSGC_TUNNEL_SOCKET_CLOSED_ACK;
1561
1563
        return TRUE;
1562
1564
    }
1578
1580
    return (!CHECK_TUNNEL_ERROR(channel));
1579
1581
}
1580
1582
 
1581
 
static int tunnel_channel_handle_socket_receive_data(TunnelChannel *channel, RedSocket *sckt,
 
1583
static int tunnel_channel_handle_socket_receive_data(TunnelChannelClient *channel, RedSocket *sckt,
1582
1584
                                                     RedSocketRawRcvBuf *recv_data, int buf_size)
1583
1585
{
1584
1586
    if ((sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) ||
1594
1596
        __tunnel_worker_free_socket_rcv_buf(sckt->worker, recv_data);
1595
1597
        return (!CHECK_TUNNEL_ERROR(channel));
1596
1598
    } else if ((sckt->in_data.num_buffers == MAX_SOCKET_IN_BUFFERS) &&
1597
 
               !channel->mig_inprogress && !channel->base.migrate) {
 
1599
               !channel->mig_inprogress && !channel->base.channel->migrate) {
1598
1600
        red_printf("socket in buffers overflow, socket will be closed"
1599
1601
                   " (local_port=%d, service_id=%d)",
1600
1602
                   ntohs(sckt->local_port), sckt->far_service->id);
1623
1625
{
1624
1626
    return (((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) ||
1625
1627
             (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) &&
1626
 
            !sckt->worker->channel->mig_inprogress);
 
1628
            !sckt->worker->channel_client->mig_inprogress);
1627
1629
}
1628
1630
 
1629
 
static int tunnel_channel_handle_socket_token(TunnelChannel *channel, RedSocket *sckt,
 
1631
static int tunnel_channel_handle_socket_token(TunnelChannelClient *channel, RedSocket *sckt,
1630
1632
                                              SpiceMsgcTunnelSocketTokens *message)
1631
1633
{
1632
1634
    sckt->out_data.num_tokens += message->num_tokens;
1633
1635
 
1634
1636
    if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head &&
1635
 
        !red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) {
 
1637
        !red_channel_client_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) {
1636
1638
        // data is pending to be sent
1637
1639
        sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
1638
 
        red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item);
 
1640
        red_channel_client_pipe_add(&channel->base, &sckt->out_data.data_pipe_item);
1639
1641
    }
1640
1642
 
1641
1643
    return TRUE;
1642
1644
}
1643
1645
 
1644
 
static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header)
 
1646
static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
 
1647
                                                 SpiceDataHeader *msg_header)
1645
1648
{
1646
 
    TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
 
1649
    TunnelChannelClient *tunnel_channel = (TunnelChannelClient *)rcc->channel;
 
1650
 
1647
1651
    if (msg_header->type == SPICE_MSGC_TUNNEL_SOCKET_DATA) {
1648
1652
        return (__tunnel_worker_alloc_socket_rcv_buf(tunnel_channel->worker)->buf);
1649
1653
    } else if ((msg_header->type == SPICE_MSGC_MIGRATE_DATA) ||
1655
1659
}
1656
1660
 
1657
1661
// called by the receive routine of the channel, before the buffer was assigned to a socket
1658
 
static void tunnel_channel_release_msg_rcv_buf(RedChannel *channel, SpiceDataHeader *msg_header,
 
1662
static void tunnel_channel_release_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header,
1659
1663
                                               uint8_t *msg)
1660
1664
{
1661
 
    TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
 
1665
    TunnelChannelClient *tunnel_channel = (TunnelChannelClient *)rcc->channel;
 
1666
 
1662
1667
    if (msg_header->type == SPICE_MSGC_TUNNEL_SOCKET_DATA) {
1663
1668
        ASSERT(!(SPICE_CONTAINEROF(msg, RedSocketRawRcvBuf, buf)->base.usr_opaque));
1664
1669
        __tunnel_worker_free_socket_rcv_buf(tunnel_channel->worker,
1666
1671
    }
1667
1672
}
1668
1673
 
1669
 
static void __tunnel_channel_fill_service_migrate_item(TunnelChannel *channel,
 
1674
static void __tunnel_channel_fill_service_migrate_item(TunnelChannelClient *channel,
1670
1675
                                                       TunnelService *service,
1671
1676
                                                       TunnelMigrateServiceItem *migrate_item)
1672
1677
{
1688
1693
    memcpy(general_data->virt_ip, &service->virt_ip.s_addr, 4);
1689
1694
}
1690
1695
 
1691
 
static void __tunnel_channel_fill_socket_migrate_item(TunnelChannel *channel, RedSocket *sckt,
 
1696
static void __tunnel_channel_fill_socket_migrate_item(TunnelChannelClient *channel, RedSocket *sckt,
1692
1697
                                                      TunnelMigrateSocketItem *migrate_item)
1693
1698
{
1694
1699
    TunnelMigrateSocket *mig_sckt = &migrate_item->mig_socket;
1740
1745
}
1741
1746
 
1742
1747
static void release_migrate_item(TunnelMigrateItem *item);
1743
 
static int tunnel_channel_handle_migrate_mark(TunnelChannel *channel)
 
1748
static int tunnel_channel_handle_migrate_mark(RedChannelClient *base)
1744
1749
{
 
1750
    TunnelChannelClient *channel = SPICE_CONTAINEROF(base->channel, TunnelChannelClient, base);
1745
1751
    TunnelMigrateItem *migrate_item = NULL;
1746
1752
    TunnelService *service;
1747
1753
    TunnelMigrateServiceItem *mig_service;
1801
1807
        }
1802
1808
    }
1803
1809
 
1804
 
    red_channel_pipe_add((RedChannel *)channel, &migrate_item->base);
 
1810
    red_channel_client_pipe_add(&channel->base, &migrate_item->base);
1805
1811
 
1806
1812
    return TRUE;
1807
1813
error:
1858
1864
    --sckt->in_data.num_buffers;
1859
1865
    __tunnel_worker_free_socket_rcv_buf(sckt->worker, (RedSocketRawRcvBuf *)buf);
1860
1866
    // for case that ready queue is empty and the client has no tokens
1861
 
    __process_rcv_buf_tokens(sckt->worker->channel, sckt);
 
1867
    __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
1862
1868
}
1863
1869
 
1864
1870
RawTunneledBuffer *tunnel_socket_alloc_restored_rcv_buf(RedSocket *sckt)
1877
1883
    RedSocket *sckt = (RedSocket *)buf->usr_opaque;
1878
1884
 
1879
1885
    sckt->in_data.num_tokens += tokens_buf->num_tokens;
1880
 
    __process_rcv_buf_tokens(sckt->worker->channel, sckt);
 
1886
    __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
1881
1887
 
1882
1888
    free(tokens_buf);
1883
1889
}
1931
1937
    }
1932
1938
}
1933
1939
 
1934
 
static void tunnel_channel_restore_migrated_service(TunnelChannel *channel,
 
1940
static void tunnel_channel_restore_migrated_service(TunnelChannelClient *channel,
1935
1941
                                                    TunnelMigrateService *mig_service,
1936
1942
                                                    uint8_t *data_buf)
1937
1943
{
1966
1972
    }
1967
1973
}
1968
1974
 
1969
 
static void tunnel_channel_restore_migrated_socket(TunnelChannel *channel,
 
1975
static void tunnel_channel_restore_migrated_socket(TunnelChannelClient *channel,
1970
1976
                                                   TunnelMigrateSocket *mig_socket,
1971
1977
                                                   uint8_t *data_buf)
1972
1978
{
2052
2058
    }
2053
2059
}
2054
2060
 
2055
 
static void tunnel_channel_restore_socket_state(TunnelChannel *channel, RedSocket *sckt)
 
2061
static void tunnel_channel_restore_socket_state(TunnelChannelClient *channel, RedSocket *sckt)
2056
2062
{
2057
2063
    int ret = TRUE;
2058
2064
    red_printf("");
2102
2108
 
2103
2109
    // handling data transfer
2104
2110
    if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head) {
2105
 
        if (!red_channel_pipe_item_is_linked(
 
2111
        if (!red_channel_client_pipe_item_is_linked(
2106
2112
                &channel->base, &sckt->out_data.data_pipe_item)) {
2107
2113
            sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
2108
 
            red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item);
 
2114
            red_channel_client_pipe_add(&channel->base, &sckt->out_data.data_pipe_item);
2109
2115
        }
2110
2116
    }
2111
2117
 
2131
2137
    __process_rcv_buf_tokens(channel, sckt);
2132
2138
}
2133
2139
 
2134
 
static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel)
 
2140
static inline void tunnel_channel_activate_migrated_sockets(TunnelChannelClient *channel)
2135
2141
{
2136
2142
    // if we are overgoing migration again, no need to restore the state, we will wait
2137
2143
    // for the next host.
2154
2160
    }
2155
2161
}
2156
2162
 
2157
 
static int tunnel_channel_handle_migrate_data(TunnelChannel *channel,
2158
 
                                              TunnelMigrateData *migrate_data)
2159
 
{
 
2163
static uint64_t tunnel_channel_handle_migrate_data_get_serial(RedChannelClient *base,
 
2164
                                              uint32_t size, void *msg)
 
2165
{
 
2166
    TunnelMigrateData *migrate_data = msg;
 
2167
 
 
2168
    if (size < sizeof(TunnelMigrateData)
 
2169
        || migrate_data->magic != TUNNEL_MIGRATE_DATA_MAGIC
 
2170
        || migrate_data->version != TUNNEL_MIGRATE_DATA_VERSION) {
 
2171
        return 0;
 
2172
    }
 
2173
    return migrate_data->message_serial;
 
2174
}
 
2175
 
 
2176
static uint64_t tunnel_channel_handle_migrate_data(RedChannelClient *base,
 
2177
                                              uint32_t size, void *msg)
 
2178
{
 
2179
    TunnelChannelClient *channel = SPICE_CONTAINEROF(base, TunnelChannelClient, base);
2160
2180
    TunnelMigrateSocketList *sockets_list;
2161
2181
    TunnelMigrateServicesList *services_list;
 
2182
    TunnelMigrateData *migrate_data = msg;
2162
2183
    int i;
2163
2184
 
 
2185
    if (size < sizeof(TunnelMigrateData)) {
 
2186
        red_printf("bad message size");
 
2187
        goto error;
 
2188
    }
2164
2189
    if (!channel->expect_migrate_data) {
2165
2190
        red_printf("unexpected");
2166
2191
        goto error;
2173
2198
        goto error;
2174
2199
    }
2175
2200
 
2176
 
    ASSERT(channel->base.send_data.header.serial == 0);
2177
 
    channel->base.send_data.header.serial = migrate_data->message_serial;
2178
 
 
2179
2201
    net_slirp_state_restore(migrate_data->data + migrate_data->slirp_state);
2180
2202
 
2181
2203
    services_list = (TunnelMigrateServicesList *)(migrate_data->data +
2205
2227
    }
2206
2228
 
2207
2229
    // activate channel
2208
 
    channel->base.migrate = FALSE;
2209
 
    red_channel_init_outgoing_messages_window(&channel->base);
 
2230
    channel->base.channel->migrate = FALSE;
 
2231
    red_channel_init_outgoing_messages_window(channel->base.channel);
2210
2232
 
2211
2233
    tunnel_channel_activate_migrated_sockets(channel);
2212
2234
 
2221
2243
}
2222
2244
 
2223
2245
//  msg was allocated by tunnel_channel_alloc_msg_rcv_buf
2224
 
static int tunnel_channel_handle_message(RedChannel *channel, SpiceDataHeader *header, uint8_t *msg)
 
2246
static int tunnel_channel_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
2225
2247
{
2226
 
    TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
 
2248
    TunnelChannelClient *tunnel_channel = (TunnelChannelClient *)rcc->channel;
2227
2249
    RedSocket *sckt = NULL;
2228
2250
    // retrieve the sckt
2229
2251
    switch (header->type) {
2247
2269
        }
2248
2270
        break;
2249
2271
    default:
2250
 
        return red_channel_handle_message(channel, header, msg);
 
2272
        return red_channel_client_handle_message(rcc, header->size, header->type, msg);
2251
2273
    }
2252
2274
 
2253
2275
    switch (header->type) {
2315
2337
 
2316
2338
        return tunnel_channel_handle_socket_token(tunnel_channel, sckt,
2317
2339
                                                  (SpiceMsgcTunnelSocketTokens *)msg);
2318
 
    case SPICE_MSGC_MIGRATE_FLUSH_MARK:
2319
 
        return tunnel_channel_handle_migrate_mark(tunnel_channel);
2320
 
    case SPICE_MSGC_MIGRATE_DATA:
2321
 
        if (header->size < sizeof(TunnelMigrateData)) {
2322
 
            red_printf("bad message size");
2323
 
            free(msg);
2324
 
            return FALSE;
2325
 
        }
2326
 
        return tunnel_channel_handle_migrate_data(tunnel_channel, (TunnelMigrateData *)msg);
2327
2340
    default:
2328
 
        return red_channel_handle_message(channel, header, msg);
 
2341
        return red_channel_client_handle_message(rcc, header->size, header->type, msg);
2329
2342
    }
2330
2343
    return TRUE;
2331
2344
}
2334
2347
/* outgoing msgs
2335
2348
********************************/
2336
2349
 
2337
 
static void tunnel_channel_send_set_ack(TunnelChannel *channel, PipeItem *item)
2338
 
{
2339
 
    ASSERT(channel);
2340
 
 
2341
 
    channel->base.send_data.u.ack.generation = ++channel->base.ack_data.generation;
2342
 
    channel->base.send_data.u.ack.window = CLIENT_ACK_WINDOW;
2343
 
 
2344
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_SET_ACK, item);
2345
 
    red_channel_add_buf(&channel->base, &channel->base.send_data.u.ack, sizeof(SpiceMsgSetAck));
2346
 
 
2347
 
    red_channel_begin_send_message(&channel->base);
2348
 
}
2349
 
 
2350
 
static void tunnel_channel_send_migrate(TunnelChannel *channel, PipeItem *item)
2351
 
{
2352
 
    ASSERT(channel);
2353
 
    channel->base.send_data.u.migrate.flags = SPICE_MIGRATE_NEED_FLUSH |
2354
 
        SPICE_MIGRATE_NEED_DATA_TRANSFER;
2355
 
    channel->expect_migrate_mark = TRUE;
2356
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_MIGRATE, item);
2357
 
    red_channel_add_buf(&channel->base, &channel->base.send_data.u.migrate, sizeof(SpiceMsgMigrate));
2358
 
    red_channel_begin_send_message(&channel->base);
2359
 
}
2360
 
 
2361
 
static int __tunnel_channel_send_process_bufs_migrate_data(TunnelChannel *channel,
2362
 
                                                           TunneledBufferProcessQueue *queue)
 
2350
static void tunnel_channel_marshall_migrate(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
 
2351
{
 
2352
    TunnelChannelClient *tunnel_channel;
 
2353
 
 
2354
    ASSERT(rcc);
 
2355
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
 
2356
    tunnel_channel->send_data.u.migrate.flags =
 
2357
        SPICE_MIGRATE_NEED_FLUSH | SPICE_MIGRATE_NEED_DATA_TRANSFER;
 
2358
    tunnel_channel->expect_migrate_mark = TRUE;
 
2359
    red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE, item);
 
2360
    spice_marshaller_add_ref(m,
 
2361
        (uint8_t*)&tunnel_channel->send_data.u.migrate,
 
2362
        sizeof(SpiceMsgMigrate));
 
2363
}
 
2364
 
 
2365
static int __tunnel_channel_marshall_process_bufs_migrate_data(TunnelChannelClient *channel,
 
2366
                                    SpiceMarshaller *m, TunneledBufferProcessQueue *queue)
2363
2367
{
2364
2368
    int buf_offset = queue->head_offset;
2365
2369
    RawTunneledBuffer *buf = queue->head;
2366
2370
    int size = 0;
2367
2371
 
2368
2372
    while (buf) {
2369
 
        red_channel_add_buf(&channel->base, buf->data + buf_offset, buf->size - buf_offset);
 
2373
        spice_marshaller_add_ref(m, (uint8_t*)buf->data + buf_offset, buf->size - buf_offset);
2370
2374
        size += buf->size - buf_offset;
2371
2375
        buf_offset = 0;
2372
2376
        buf = buf->next;
2375
2379
    return size;
2376
2380
}
2377
2381
 
2378
 
static int __tunnel_channel_send_ready_bufs_migrate_data(TunnelChannel *channel,
2379
 
                                                         ReadyTunneledChunkQueue *queue)
 
2382
static int __tunnel_channel_marshall_ready_bufs_migrate_data(TunnelChannelClient *channel,
 
2383
                                    SpiceMarshaller *m, ReadyTunneledChunkQueue *queue)
2380
2384
{
2381
2385
    int offset = queue->offset;
2382
2386
    ReadyTunneledChunk *chunk = queue->head;
2383
2387
    int size = 0;
2384
2388
 
2385
2389
    while (chunk) {
2386
 
        red_channel_add_buf(&channel->base, chunk->data + offset, chunk->size - offset);
 
2390
        spice_marshaller_add_ref(m, (uint8_t*)chunk->data + offset, chunk->size - offset);
2387
2391
        size += chunk->size - offset;
2388
2392
        offset = 0;
2389
2393
        chunk = chunk->next;
2392
2396
}
2393
2397
 
2394
2398
// returns the size to send
2395
 
static int __tunnel_channel_send_service_migrate_data(TunnelChannel *channel,
 
2399
static int __tunnel_channel_marshall_service_migrate_data(TunnelChannelClient *channel,
 
2400
                                                      SpiceMarshaller *m,
2396
2401
                                                      TunnelMigrateServiceItem *item,
2397
2402
                                                      int offset)
2398
2403
{
2402
2407
 
2403
2408
    if (service->type == SPICE_TUNNEL_SERVICE_TYPE_GENERIC) {
2404
2409
        generic_data = &item->u.generic_service;
2405
 
        red_channel_add_buf(&channel->base, &item->u.generic_service,
 
2410
        spice_marshaller_add_ref(m, (uint8_t*)&item->u.generic_service,
2406
2411
                            sizeof(item->u.generic_service));
2407
2412
        cur_offset += sizeof(item->u.generic_service);
2408
2413
    } else if (service->type == SPICE_TUNNEL_SERVICE_TYPE_IPP) {
2409
2414
        generic_data = &item->u.print_service.base;
2410
 
        red_channel_add_buf(&channel->base, &item->u.print_service,
 
2415
        spice_marshaller_add_ref(m, (uint8_t*)&item->u.print_service,
2411
2416
                            sizeof(item->u.print_service));
2412
2417
        cur_offset += sizeof(item->u.print_service);
2413
2418
    } else {
2415
2420
    }
2416
2421
 
2417
2422
    generic_data->name = cur_offset;
2418
 
    red_channel_add_buf(&channel->base, service->name, strlen(service->name) + 1);
 
2423
    spice_marshaller_add_ref(m, (uint8_t*)service->name, strlen(service->name) + 1);
2419
2424
    cur_offset += strlen(service->name) + 1;
2420
2425
 
2421
2426
    generic_data->description = cur_offset;
2422
 
    red_channel_add_buf(&channel->base, service->description, strlen(service->description) + 1);
 
2427
    spice_marshaller_add_ref(m, (uint8_t*)service->description, strlen(service->description) + 1);
2423
2428
    cur_offset += strlen(service->description) + 1;
2424
2429
 
2425
2430
    return (cur_offset - offset);
2426
2431
}
2427
2432
 
2428
2433
// returns the size to send
2429
 
static int __tunnel_channel_send_socket_migrate_data(TunnelChannel *channel,
2430
 
                                                     TunnelMigrateSocketItem *item, int offset)
 
2434
static int __tunnel_channel_marshall_socket_migrate_data(TunnelChannelClient *channel,
 
2435
                                SpiceMarshaller *m, TunnelMigrateSocketItem *item, int offset)
2431
2436
{
2432
2437
    RedSocket *sckt = item->socket;
2433
2438
    TunnelMigrateSocket *mig_sckt = &item->mig_socket;
2434
2439
    int cur_offset = offset;
2435
 
    red_channel_add_buf(&channel->base, mig_sckt, sizeof(*mig_sckt));
 
2440
    spice_marshaller_add_ref(m, (uint8_t*)mig_sckt, sizeof(*mig_sckt));
2436
2441
    cur_offset += sizeof(*mig_sckt);
2437
2442
 
2438
2443
    if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) &&
2440
2445
        (sckt->mig_client_status_msg != SPICE_MSGC_TUNNEL_SOCKET_CLOSED_ACK)) {
2441
2446
        mig_sckt->out_data.process_buf = cur_offset;
2442
2447
        mig_sckt->out_data.process_buf_size =
2443
 
            __tunnel_channel_send_process_bufs_migrate_data(channel,
 
2448
            __tunnel_channel_marshall_process_bufs_migrate_data(channel, m,
2444
2449
                                                            sckt->out_data.process_queue);
2445
2450
        cur_offset += mig_sckt->out_data.process_buf_size;
2446
2451
        if (mig_sckt->out_data.process_queue_size) {
2447
2452
            mig_sckt->out_data.process_queue = cur_offset;
2448
 
            red_channel_add_buf(&channel->base, item->out_process_queue,
 
2453
            spice_marshaller_add_ref(m, (uint8_t*)item->out_process_queue,
2449
2454
                                mig_sckt->out_data.process_queue_size);
2450
2455
            cur_offset += mig_sckt->out_data.process_queue_size;
2451
2456
        }
2452
2457
        mig_sckt->out_data.ready_buf = cur_offset;
2453
2458
        mig_sckt->out_data.ready_buf_size =
2454
 
            __tunnel_channel_send_ready_bufs_migrate_data(channel,
 
2459
            __tunnel_channel_marshall_ready_bufs_migrate_data(channel, m,
2455
2460
                                                          &sckt->out_data.ready_chunks_queue);
2456
2461
        cur_offset += mig_sckt->out_data.ready_buf_size;
2457
2462
    } else {
2464
2469
        (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) {
2465
2470
        mig_sckt->in_data.process_buf = cur_offset;
2466
2471
        mig_sckt->in_data.process_buf_size =
2467
 
            __tunnel_channel_send_process_bufs_migrate_data(channel,
 
2472
            __tunnel_channel_marshall_process_bufs_migrate_data(channel, m,
2468
2473
                                                            sckt->in_data.process_queue);
2469
2474
        cur_offset += mig_sckt->in_data.process_buf_size;
2470
2475
        if (mig_sckt->in_data.process_queue_size) {
2471
2476
            mig_sckt->in_data.process_queue = cur_offset;
2472
 
            red_channel_add_buf(&channel->base, item->in_process_queue,
 
2477
            spice_marshaller_add_ref(m, (uint8_t*)item->in_process_queue,
2473
2478
                                mig_sckt->in_data.process_queue_size);
2474
2479
            cur_offset += mig_sckt->in_data.process_queue_size;
2475
2480
        }
2476
2481
        mig_sckt->in_data.ready_buf = cur_offset;
2477
2482
        mig_sckt->in_data.ready_buf_size =
2478
 
            __tunnel_channel_send_ready_bufs_migrate_data(channel,
 
2483
            __tunnel_channel_marshall_ready_bufs_migrate_data(channel, m,
2479
2484
                                                          &sckt->in_data.ready_chunks_queue);
2480
2485
        cur_offset += mig_sckt->in_data.ready_buf_size;
2481
2486
    } else {
2484
2489
    }
2485
2490
 
2486
2491
    if (item->slirp_socket_size) { // zero if socket is closed
2487
 
        red_channel_add_buf(&channel->base, item->slirp_socket, item->slirp_socket_size);
 
2492
        spice_marshaller_add_ref(m, (uint8_t*)item->slirp_socket, item->slirp_socket_size);
2488
2493
        mig_sckt->slirp_sckt = cur_offset;
2489
2494
        cur_offset += item->slirp_socket_size;
2490
2495
    }
2491
2496
    return (cur_offset - offset);
2492
2497
}
2493
2498
 
2494
 
static void tunnel_channel_send_migrate_data(TunnelChannel *channel, PipeItem *item)
 
2499
static void tunnel_channel_marshall_migrate_data(RedChannelClient *rcc,
 
2500
                                        SpiceMarshaller *m, PipeItem *item)
2495
2501
{
2496
 
    TunnelMigrateData *migrate_data = &channel->send_data.u.migrate_data;
 
2502
    TunnelChannelClient *tunnel_channel;
 
2503
    TunnelMigrateData *migrate_data;
2497
2504
    TunnelMigrateItem *migrate_item = (TunnelMigrateItem *)item;
2498
2505
    int i;
2499
2506
 
2500
2507
    uint32_t data_buf_offset = 0; // current location in data[0] field
2501
 
    ASSERT(channel);
 
2508
    ASSERT(rcc);
 
2509
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
 
2510
    migrate_data = &tunnel_channel->send_data.u.migrate_data;
2502
2511
 
2503
2512
    migrate_data->magic = TUNNEL_MIGRATE_DATA_MAGIC;
2504
2513
    migrate_data->version = TUNNEL_MIGRATE_DATA_VERSION;
2505
 
    migrate_data->message_serial = red_channel_get_message_serial(&channel->base);
2506
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_MIGRATE_DATA, item);
2507
 
    red_channel_add_buf(&channel->base, migrate_data, sizeof(*migrate_data));
 
2514
    migrate_data->message_serial = red_channel_client_get_message_serial(rcc);
 
2515
    red_channel_client_init_send_data(rcc, SPICE_MSG_MIGRATE_DATA, item);
 
2516
    spice_marshaller_add_ref(m, (uint8_t*)migrate_data, sizeof(*migrate_data));
2508
2517
 
2509
2518
    migrate_data->slirp_state = data_buf_offset;
2510
 
    red_channel_add_buf(&channel->base, migrate_item->slirp_state, migrate_item->slirp_state_size);
 
2519
    spice_marshaller_add_ref(m, (uint8_t*)migrate_item->slirp_state, migrate_item->slirp_state_size);
2511
2520
    data_buf_offset += migrate_item->slirp_state_size;
2512
2521
 
2513
2522
    migrate_data->services_list = data_buf_offset;
2514
 
    red_channel_add_buf(&channel->base, migrate_item->services_list,
 
2523
    spice_marshaller_add_ref(m, (uint8_t*)migrate_item->services_list,
2515
2524
                        migrate_item->services_list_size);
2516
2525
    data_buf_offset += migrate_item->services_list_size;
2517
2526
 
2518
2527
    for (i = 0; i < migrate_item->services_list->num_services; i++) {
2519
2528
        migrate_item->services_list->services[i] = data_buf_offset;
2520
 
        data_buf_offset += __tunnel_channel_send_service_migrate_data(channel,
 
2529
        data_buf_offset += __tunnel_channel_marshall_service_migrate_data(tunnel_channel, m,
2521
2530
                                                                      migrate_item->services + i,
2522
2531
                                                                      data_buf_offset);
2523
2532
    }
2524
2533
 
2525
2534
 
2526
2535
    migrate_data->sockets_list = data_buf_offset;
2527
 
    red_channel_add_buf(&channel->base, migrate_item->sockets_list,
 
2536
    spice_marshaller_add_ref(m, (uint8_t*)migrate_item->sockets_list,
2528
2537
                        migrate_item->sockets_list_size);
2529
2538
    data_buf_offset += migrate_item->sockets_list_size;
2530
2539
 
2531
2540
    for (i = 0; i < migrate_item->sockets_list->num_sockets; i++) {
2532
2541
        migrate_item->sockets_list->sockets[i] = data_buf_offset;
2533
 
        data_buf_offset += __tunnel_channel_send_socket_migrate_data(channel,
 
2542
        data_buf_offset += __tunnel_channel_marshall_socket_migrate_data(tunnel_channel, m,
2534
2543
                                                                     migrate_item->sockets_data + i,
2535
2544
                                                                     data_buf_offset);
2536
2545
    }
2537
 
 
2538
 
    red_channel_begin_send_message(&channel->base);
2539
2546
}
2540
2547
 
2541
 
static void tunnel_channel_send_init(TunnelChannel *channel, PipeItem *item)
 
2548
static void tunnel_channel_marshall_init(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2542
2549
{
2543
 
    ASSERT(channel);
 
2550
    TunnelChannelClient *channel;
2544
2551
 
 
2552
    ASSERT(rcc);
 
2553
    channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
2545
2554
    channel->send_data.u.init.max_socket_data_size = MAX_SOCKET_DATA_SIZE;
2546
2555
    channel->send_data.u.init.max_num_of_sockets = MAX_SOCKETS_NUM;
2547
2556
 
2548
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_INIT, item);
2549
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.init, sizeof(SpiceMsgTunnelInit));
2550
 
 
2551
 
    red_channel_begin_send_message(&channel->base);
 
2557
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_INIT, item);
 
2558
    spice_marshaller_add_ref(m, (uint8_t*)&channel->send_data.u.init, sizeof(SpiceMsgTunnelInit));
2552
2559
}
2553
2560
 
2554
 
static void tunnel_channel_send_service_ip_map(TunnelChannel *channel, PipeItem *item)
 
2561
static void tunnel_channel_marshall_service_ip_map(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2555
2562
{
 
2563
    TunnelChannelClient *tunnel_channel;
2556
2564
    TunnelService *service = SPICE_CONTAINEROF(item, TunnelService, pipe_item);
2557
2565
 
2558
 
    channel->send_data.u.service_ip.service_id = service->id;
2559
 
    channel->send_data.u.service_ip.virtual_ip.type = SPICE_TUNNEL_IP_TYPE_IPv4;
 
2566
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
 
2567
    tunnel_channel->send_data.u.service_ip.service_id = service->id;
 
2568
    tunnel_channel->send_data.u.service_ip.virtual_ip.type = SPICE_TUNNEL_IP_TYPE_IPv4;
2560
2569
 
2561
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SERVICE_IP_MAP, item);
2562
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.service_ip,
 
2570
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SERVICE_IP_MAP, item);
 
2571
    spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.service_ip,
2563
2572
                        sizeof(SpiceMsgTunnelServiceIpMap));
2564
 
    red_channel_add_buf(&channel->base, &service->virt_ip.s_addr, sizeof(SpiceTunnelIPv4));
2565
 
    red_channel_begin_send_message(&channel->base);
 
2573
    spice_marshaller_add_ref(m, (uint8_t*)&service->virt_ip.s_addr, sizeof(SpiceTunnelIPv4));
2566
2574
}
2567
2575
 
2568
 
static void tunnel_channel_send_socket_open(TunnelChannel *channel, PipeItem *item)
 
2576
static void tunnel_channel_marshall_socket_open(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2569
2577
{
 
2578
    TunnelChannelClient *tunnel_channel;
2570
2579
    RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
2571
2580
    RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
2572
2581
 
2573
 
    channel->send_data.u.socket_open.connection_id = sckt->connection_id;
2574
 
    channel->send_data.u.socket_open.service_id = sckt->far_service->id;
2575
 
    channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE;
 
2582
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
 
2583
    tunnel_channel->send_data.u.socket_open.connection_id = sckt->connection_id;
 
2584
    tunnel_channel->send_data.u.socket_open.service_id = sckt->far_service->id;
 
2585
    tunnel_channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE;
2576
2586
 
2577
2587
    sckt->in_data.client_total_num_tokens = SOCKET_WINDOW_SIZE;
2578
2588
    sckt->in_data.num_tokens = 0;
2579
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_OPEN, item);
2580
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.socket_open,
2581
 
                        sizeof(channel->send_data.u.socket_open));
2582
 
 
2583
 
    red_channel_begin_send_message(&channel->base);
 
2589
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_OPEN, item);
 
2590
    spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_open,
 
2591
                        sizeof(tunnel_channel->send_data.u.socket_open));
2584
2592
#ifdef DEBUG_NETWORK
2585
2593
    PRINT_SCKT(sckt);
2586
2594
#endif
2587
2595
}
2588
2596
 
2589
 
static void tunnel_channel_send_socket_fin(TunnelChannel *channel, PipeItem *item)
 
2597
static void tunnel_channel_marshall_socket_fin(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2590
2598
{
 
2599
    TunnelChannelClient *tunnel_channel;
2591
2600
    RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
2592
2601
    RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
2593
2602
 
2594
2603
    ASSERT(!sckt->out_data.ready_chunks_queue.head);
2595
2604
 
 
2605
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
2596
2606
    if (sckt->out_data.process_queue->head) {
2597
2607
        red_printf("socket sent FIN but there are still buffers in outgoing process queue"
2598
2608
                   "(local_port=%d, service_id=%d)",
2599
2609
                   ntohs(sckt->local_port), sckt->far_service->id);
2600
2610
    }
2601
2611
 
2602
 
    channel->send_data.u.socket_fin.connection_id = sckt->connection_id;
2603
 
 
2604
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_FIN, item);
2605
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.socket_fin,
2606
 
                        sizeof(channel->send_data.u.socket_fin));
2607
 
 
2608
 
    red_channel_begin_send_message(&channel->base);
2609
 
 
 
2612
    tunnel_channel->send_data.u.socket_fin.connection_id = sckt->connection_id;
 
2613
 
 
2614
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_FIN, item);
 
2615
    spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_fin,
 
2616
                        sizeof(tunnel_channel->send_data.u.socket_fin));
2610
2617
#ifdef DEBUG_NETWORK
2611
2618
    PRINT_SCKT(sckt);
2612
2619
#endif
2613
2620
}
2614
2621
 
2615
 
static void tunnel_channel_send_socket_close(TunnelChannel *channel, PipeItem *item)
 
2622
static void tunnel_channel_marshall_socket_close(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2616
2623
{
 
2624
    TunnelChannelClient *tunnel_channel;
2617
2625
    RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
2618
2626
    RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
2619
2627
 
 
2628
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
2620
2629
    // can happen when it is a forced close
2621
2630
    if (sckt->out_data.ready_chunks_queue.head) {
2622
2631
        red_printf("socket closed but there are still buffers in outgoing ready queue"
2631
2640
                   ntohs(sckt->local_port), sckt->far_service->id);
2632
2641
    }
2633
2642
 
2634
 
    channel->send_data.u.socket_close.connection_id = sckt->connection_id;
2635
 
 
2636
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_CLOSE, item);
2637
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close,
2638
 
                        sizeof(channel->send_data.u.socket_close));
2639
 
 
2640
 
    red_channel_begin_send_message(&channel->base);
2641
 
 
 
2643
    tunnel_channel->send_data.u.socket_close.connection_id = sckt->connection_id;
 
2644
 
 
2645
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_CLOSE, item);
 
2646
    spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_close,
 
2647
                        sizeof(tunnel_channel->send_data.u.socket_close));
2642
2648
#ifdef DEBUG_NETWORK
2643
2649
    PRINT_SCKT(sckt);
2644
2650
#endif
2645
2651
}
2646
2652
 
2647
 
static void tunnel_channel_send_socket_closed_ack(TunnelChannel *channel, PipeItem *item)
 
2653
static void tunnel_channel_marshall_socket_closed_ack(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2648
2654
{
 
2655
    TunnelChannelClient *tunnel_channel;
2649
2656
    RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, status_pipe_item);
2650
2657
    RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
2651
2658
 
2652
 
    channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id;
 
2659
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
 
2660
    tunnel_channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id;
2653
2661
 
2654
2662
    // pipe item is null because we free the sckt.
2655
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_CLOSED_ACK, NULL);
2656
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close_ack,
2657
 
                        sizeof(channel->send_data.u.socket_close_ack));
2658
 
 
2659
 
    red_channel_begin_send_message(&channel->base);
2660
 
 
 
2663
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_CLOSED_ACK, NULL);
 
2664
    spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_close_ack,
 
2665
                        sizeof(tunnel_channel->send_data.u.socket_close_ack));
2661
2666
#ifdef DEBUG_NETWORK
2662
2667
    PRINT_SCKT(sckt);
2663
2668
#endif
2664
2669
 
2665
2670
    ASSERT(sckt->client_waits_close_ack && (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED));
2666
 
    tunnel_worker_free_socket(channel->worker, sckt);
2667
 
    if (CHECK_TUNNEL_ERROR(channel)) {
2668
 
        tunnel_shutdown(channel->worker);
 
2671
    tunnel_worker_free_socket(tunnel_channel->worker, sckt);
 
2672
    if (CHECK_TUNNEL_ERROR(tunnel_channel)) {
 
2673
        tunnel_shutdown(tunnel_channel->worker);
2669
2674
    }
2670
2675
}
2671
2676
 
2672
 
static void tunnel_channel_send_socket_token(TunnelChannel *channel, PipeItem *item)
 
2677
static void tunnel_channel_marshall_socket_token(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2673
2678
{
 
2679
    TunnelChannelClient *tunnel_channel;
2674
2680
    RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, token_pipe_item);
2675
2681
    RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
2676
2682
 
2677
2683
    /* notice that the num of tokens sent can be > SOCKET_TOKENS_TO_SEND, since
2678
2684
       the sending is performed after the pipe item was pushed */
2679
2685
 
2680
 
    channel->send_data.u.socket_token.connection_id = sckt->connection_id;
 
2686
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
 
2687
    tunnel_channel->send_data.u.socket_token.connection_id = sckt->connection_id;
2681
2688
 
2682
2689
    if (sckt->in_data.num_tokens > 0) {
2683
 
        channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens;
 
2690
        tunnel_channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens;
2684
2691
    } else {
2685
2692
        ASSERT(!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head);
2686
 
        channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS;
 
2693
        tunnel_channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS;
2687
2694
    }
2688
 
    sckt->in_data.num_tokens -= channel->send_data.u.socket_token.num_tokens;
2689
 
    sckt->in_data.client_total_num_tokens += channel->send_data.u.socket_token.num_tokens;
 
2695
    sckt->in_data.num_tokens -= tunnel_channel->send_data.u.socket_token.num_tokens;
 
2696
    sckt->in_data.client_total_num_tokens += tunnel_channel->send_data.u.socket_token.num_tokens;
2690
2697
    ASSERT(sckt->in_data.client_total_num_tokens <= SOCKET_WINDOW_SIZE);
2691
2698
 
2692
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_TOKEN, item);
2693
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.socket_token,
2694
 
                        sizeof(channel->send_data.u.socket_token));
2695
 
 
2696
 
    red_channel_begin_send_message(&channel->base);
 
2699
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_TOKEN, item);
 
2700
    spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_token,
 
2701
                        sizeof(tunnel_channel->send_data.u.socket_token));
2697
2702
}
2698
2703
 
2699
 
static void tunnel_channel_send_socket_out_data(TunnelChannel *channel, PipeItem *item)
 
2704
static void tunnel_channel_marshall_socket_out_data(RedChannelClient *rcc, SpiceMarshaller *m, PipeItem *item)
2700
2705
{
 
2706
    TunnelChannelClient *tunnel_channel;
 
2707
    tunnel_channel = SPICE_CONTAINEROF(rcc->channel, TunnelChannelClient, base);
2701
2708
    RedSocketOutData *sckt_out_data = SPICE_CONTAINEROF(item, RedSocketOutData, data_pipe_item);
2702
2709
    RedSocket *sckt = SPICE_CONTAINEROF(sckt_out_data, RedSocket, out_data);
2703
2710
    ReadyTunneledChunk *chunk;
2717
2724
    ASSERT(!sckt->out_data.push_tail);
2718
2725
    ASSERT(sckt->out_data.ready_chunks_queue.head->size <= MAX_SOCKET_DATA_SIZE);
2719
2726
 
2720
 
    channel->send_data.u.socket_data.connection_id = sckt->connection_id;
 
2727
    tunnel_channel->send_data.u.socket_data.connection_id = sckt->connection_id;
2721
2728
 
2722
 
    red_channel_init_send_data(&channel->base, SPICE_MSG_TUNNEL_SOCKET_DATA, item);
2723
 
    red_channel_add_buf(&channel->base, &channel->send_data.u.socket_data,
2724
 
                        sizeof(channel->send_data.u.socket_data));
 
2729
    red_channel_client_init_send_data(rcc, SPICE_MSG_TUNNEL_SOCKET_DATA, item);
 
2730
    spice_marshaller_add_ref(m, (uint8_t*)&tunnel_channel->send_data.u.socket_data,
 
2731
                        sizeof(tunnel_channel->send_data.u.socket_data));
2725
2732
    pushed_bufs_num++;
2726
2733
 
2727
2734
    // the first chunk is in a valid size
2728
2735
    chunk = sckt->out_data.ready_chunks_queue.head;
2729
2736
    total_push_size = chunk->size - sckt->out_data.ready_chunks_queue.offset;
2730
 
    red_channel_add_buf(&channel->base, chunk->data + sckt->out_data.ready_chunks_queue.offset,
 
2737
    spice_marshaller_add_ref(m, (uint8_t*)chunk->data + sckt->out_data.ready_chunks_queue.offset,
2731
2738
                        total_push_size);
2732
2739
    pushed_bufs_num++;
2733
2740
    sckt->out_data.push_tail = chunk;
2737
2744
 
2738
2745
    while (chunk && (total_push_size < MAX_SOCKET_DATA_SIZE) && (pushed_bufs_num < MAX_SEND_BUFS)) {
2739
2746
        uint32_t cur_push_size = MIN(chunk->size, MAX_SOCKET_DATA_SIZE - total_push_size);
2740
 
        red_channel_add_buf(&channel->base, chunk->data, cur_push_size);
 
2747
        spice_marshaller_add_ref(m, (uint8_t*)chunk->data, cur_push_size);
2741
2748
        pushed_bufs_num++;
2742
2749
 
2743
2750
        sckt->out_data.push_tail = chunk;
2748
2755
    }
2749
2756
 
2750
2757
    sckt->out_data.num_tokens--;
2751
 
 
2752
 
    red_channel_begin_send_message(&channel->base);
2753
2758
}
2754
2759
 
2755
2760
static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem *item)
2779
2784
    sckt_out_data->push_tail = NULL;
2780
2785
    sckt_out_data->push_tail_size = 0;
2781
2786
 
2782
 
    if (worker->channel) {
 
2787
    if (worker->channel_client) {
2783
2788
        // can still send data to socket
2784
2789
        if (__client_socket_can_receive(sckt)) {
2785
2790
            if (sckt_out_data->ready_chunks_queue.head) {
2786
2791
                // the pipe item may already be linked, if for example the send was
2787
2792
                // blocked and before it finished and called release, tunnel_socket_send was called
2788
 
                if (!red_channel_pipe_item_is_linked(
2789
 
                        &worker->channel->base, &sckt_out_data->data_pipe_item)) {
 
2793
                if (!red_channel_client_pipe_item_is_linked(
 
2794
                        &worker->channel_client->base, &sckt_out_data->data_pipe_item)) {
2790
2795
                    sckt_out_data->data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
2791
 
                    red_channel_pipe_add(&worker->channel->base, &sckt_out_data->data_pipe_item);
 
2796
                    red_channel_client_pipe_add(&worker->channel_client->base, &sckt_out_data->data_pipe_item);
2792
2797
                }
2793
2798
            } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) ||
2794
2799
                       (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
2795
 
                __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
 
2800
                __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt);
2796
2801
            } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
2797
 
                __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
 
2802
                __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt);
2798
2803
            }
2799
2804
        }
2800
2805
    }
2802
2807
 
2803
2808
    if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
2804
2809
         (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) &&
2805
 
        !sckt->in_slirp_send && !worker->channel->mig_inprogress) {
 
2810
        !sckt->in_slirp_send && !worker->channel_client->mig_inprogress) {
2806
2811
        // for cases that slirp couldn't write whole it data to our socket buffer
2807
2812
        net_slirp_socket_can_send_notify(sckt->slirp_sckt);
2808
2813
    }
2809
2814
}
2810
2815
 
2811
 
static void tunnel_channel_send_item(RedChannel *channel, PipeItem *item)
 
2816
static void tunnel_channel_send_item(RedChannelClient *rcc, PipeItem *item)
2812
2817
{
2813
 
    TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
 
2818
    SpiceMarshaller *m = red_channel_client_get_marshaller(rcc);
2814
2819
 
2815
 
    red_channel_reset_send_data(channel);
2816
2820
    switch (item->type) {
2817
 
    case PIPE_ITEM_TYPE_SET_ACK:
2818
 
        tunnel_channel_send_set_ack(tunnel_channel, item);
2819
 
        break;
2820
2821
    case PIPE_ITEM_TYPE_TUNNEL_INIT:
2821
 
        tunnel_channel_send_init(tunnel_channel, item);
 
2822
        tunnel_channel_marshall_init(rcc, m, item);
2822
2823
        break;
2823
2824
    case PIPE_ITEM_TYPE_SERVICE_IP_MAP:
2824
 
        tunnel_channel_send_service_ip_map(tunnel_channel, item);
 
2825
        tunnel_channel_marshall_service_ip_map(rcc, m, item);
2825
2826
        break;
2826
2827
    case PIPE_ITEM_TYPE_SOCKET_OPEN:
2827
 
        tunnel_channel_send_socket_open(tunnel_channel, item);
 
2828
        tunnel_channel_marshall_socket_open(rcc, m, item);
2828
2829
        break;
2829
2830
    case PIPE_ITEM_TYPE_SOCKET_DATA:
2830
 
        tunnel_channel_send_socket_out_data(tunnel_channel, item);
 
2831
        tunnel_channel_marshall_socket_out_data(rcc, m, item);
2831
2832
        break;
2832
2833
    case PIPE_ITEM_TYPE_SOCKET_FIN:
2833
 
        tunnel_channel_send_socket_fin(tunnel_channel, item);
 
2834
        tunnel_channel_marshall_socket_fin(rcc, m, item);
2834
2835
        break;
2835
2836
    case PIPE_ITEM_TYPE_SOCKET_CLOSE:
2836
 
        tunnel_channel_send_socket_close(tunnel_channel, item);
 
2837
        tunnel_channel_marshall_socket_close(rcc, m, item);
2837
2838
        break;
2838
2839
    case PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK:
2839
 
        tunnel_channel_send_socket_closed_ack(tunnel_channel, item);
 
2840
        tunnel_channel_marshall_socket_closed_ack(rcc, m, item);
2840
2841
        break;
2841
2842
    case PIPE_ITEM_TYPE_SOCKET_TOKEN:
2842
 
        tunnel_channel_send_socket_token(tunnel_channel, item);
 
2843
        tunnel_channel_marshall_socket_token(rcc, m, item);
2843
2844
        break;
2844
2845
    case PIPE_ITEM_TYPE_MIGRATE:
2845
 
        tunnel_channel_send_migrate(tunnel_channel, item);
 
2846
        tunnel_channel_marshall_migrate(rcc, m, item);
2846
2847
        break;
2847
2848
    case PIPE_ITEM_TYPE_MIGRATE_DATA:
2848
 
        tunnel_channel_send_migrate_data(tunnel_channel, item);
 
2849
        tunnel_channel_marshall_migrate_data(rcc, m, item);
2849
2850
        break;
2850
2851
    default:
2851
2852
        red_error("invalid pipe item type");
2852
2853
    }
 
2854
    red_channel_client_begin_send_message(rcc);
2853
2855
}
2854
2856
 
2855
2857
/* param item_pushed: distinguishes between a pipe item that was pushed for sending, and
2856
2858
   a pipe item that is still in the pipe and is released due to disconnection.
2857
2859
   see red_pipe_item_clear */
2858
 
static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item, int item_pushed)
 
2860
static void tunnel_channel_release_pipe_item(RedChannelClient *rcc, PipeItem *item, int item_pushed)
2859
2861
{
2860
2862
    if (!item) { // e.g. when acking closed socket
2861
2863
        return;
2862
2864
    }
2863
2865
    switch (item->type) {
2864
 
    case PIPE_ITEM_TYPE_SET_ACK:
2865
2866
    case PIPE_ITEM_TYPE_TUNNEL_INIT:
2866
2867
        free(item);
2867
2868
        break;
2873
2874
        break;
2874
2875
    case PIPE_ITEM_TYPE_SOCKET_DATA:
2875
2876
        if (item_pushed) {
2876
 
            tunnel_worker_release_socket_out_data(((TunnelChannel *)channel)->worker, item);
 
2877
            tunnel_worker_release_socket_out_data(
 
2878
                SPICE_CONTAINEROF(rcc, TunnelChannelClient, base)->worker, item);
2877
2879
        }
2878
2880
        break;
2879
2881
    case PIPE_ITEM_TYPE_MIGRATE:
2927
2929
    red_printf("TUNNEL_DBG");
2928
2930
#endif
2929
2931
    worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
2930
 
    ASSERT(worker->channel);
2931
 
    ASSERT(!worker->channel->mig_inprogress);
 
2932
    ASSERT(worker->channel_client);
 
2933
    ASSERT(!worker->channel_client->mig_inprogress);
2932
2934
 
2933
2935
    far_service = tunnel_worker_find_service_by_addr(worker, &dst_addr, (uint32_t)ntohs(dst_port));
2934
2936
 
2956
2958
#endif
2957
2959
    *o_usr_s = sckt;
2958
2960
    sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_OPEN;
2959
 
    red_channel_pipe_add(&worker->channel->base, &sckt->out_data.status_pipe_item);
 
2961
    red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.status_pipe_item);
2960
2962
 
2961
2963
    errno = EINPROGRESS;
2962
2964
    return -1;
2981
2983
 
2982
2984
    worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
2983
2985
 
2984
 
    ASSERT(!worker->channel->mig_inprogress);
 
2986
    ASSERT(!worker->channel_client->mig_inprogress);
2985
2987
 
2986
2988
    sckt = (RedSocket *)opaque;
2987
2989
 
3006
3008
    }
3007
3009
 
3008
3010
    if (urgent) {
3009
 
        SET_TUNNEL_ERROR(worker->channel, "urgent msgs not supported");
 
3011
        SET_TUNNEL_ERROR(worker->channel_client, "urgent msgs not supported");
3010
3012
        tunnel_shutdown(worker);
3011
3013
        errno = ECONNRESET;
3012
3014
        return -1;
3029
3031
                red_printf("socket out buffers overflow, socket will be closed"
3030
3032
                           " (local_port=%d, service_id=%d)",
3031
3033
                           ntohs(sckt->local_port), sckt->far_service->id);
3032
 
                tunnel_socket_force_close(worker->channel, sckt);
 
3034
                tunnel_socket_force_close(worker->channel_client, sckt);
3033
3035
                size_to_send = 0;
3034
3036
            } else {
3035
3037
                size_to_send = len;
3042
3044
        sckt->out_data.data_size += size_to_send;
3043
3045
 
3044
3046
        if (sckt->out_data.ready_chunks_queue.head &&
3045
 
            !red_channel_pipe_item_is_linked(&worker->channel->base,
 
3047
            !red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
3046
3048
                                             &sckt->out_data.data_pipe_item)) {
3047
3049
            sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
3048
 
            red_channel_pipe_add(&worker->channel->base, &sckt->out_data.data_pipe_item);
 
3050
            red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.data_pipe_item);
3049
3051
        }
3050
3052
    }
3051
3053
 
3085
3087
    ASSERT(opaque);
3086
3088
    worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
3087
3089
 
3088
 
    ASSERT(!worker->channel->mig_inprogress);
 
3090
    ASSERT(!worker->channel_client->mig_inprogress);
3089
3091
 
3090
3092
    sckt = (RedSocket *)opaque;
3091
3093
 
3096
3098
 
3097
3099
    if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) ||
3098
3100
        (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
3099
 
        SET_TUNNEL_ERROR(worker->channel, "receive was shutdown");
 
3101
        SET_TUNNEL_ERROR(worker->channel_client, "receive was shutdown");
3100
3102
        tunnel_shutdown(worker);
3101
3103
        errno = ECONNRESET;
3102
3104
        return -1;
3103
3105
    }
3104
3106
 
3105
3107
    if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
3106
 
        SET_TUNNEL_ERROR(worker->channel, "slirp socket not connected");
 
3108
        SET_TUNNEL_ERROR(worker->channel_client, "slirp socket not connected");
3107
3109
        tunnel_shutdown(worker);
3108
3110
        errno = ECONNRESET;
3109
3111
        return -1;
3169
3171
#ifdef DEBUG_NETWORK
3170
3172
    PRINT_SCKT(sckt);
3171
3173
#endif
3172
 
    ASSERT(!worker->channel->mig_inprogress);
 
3174
    ASSERT(!worker->channel_client->mig_inprogress);
3173
3175
 
3174
3176
    if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) {
3175
3177
        return;
3181
3183
        ASSERT(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND);
3182
3184
        sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
3183
3185
    } else {
3184
 
        SET_TUNNEL_ERROR(worker->channel, "unexpected tunnel_socket_shutdown_send slirp_status=%d",
 
3186
        SET_TUNNEL_ERROR(worker->channel_client, "unexpected tunnel_socket_shutdown_send slirp_status=%d",
3185
3187
                         sckt->slirp_status);
3186
3188
        tunnel_shutdown(worker);
3187
3189
        return;
3192
3194
        // check if there is still data to send. the fin will be sent after data is released
3193
3195
        // channel is alive, otherwise the sockets would have been aborted
3194
3196
        if (!sckt->out_data.ready_chunks_queue.head) {
3195
 
            __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
 
3197
            __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt);
3196
3198
        }
3197
3199
    } else { // if client is closed, it means the connection was aborted since we didn't
3198
3200
             // received fin from guest
3199
 
        SET_TUNNEL_ERROR(worker->channel,
 
3201
        SET_TUNNEL_ERROR(worker->channel_client,
3200
3202
                         "unexpected tunnel_socket_shutdown_send client_status=%d",
3201
3203
                         sckt->client_status);
3202
3204
        tunnel_shutdown(worker);
3221
3223
#ifdef DEBUG_NETWORK
3222
3224
    PRINT_SCKT(sckt);
3223
3225
#endif
3224
 
    ASSERT(!worker->channel->mig_inprogress);
 
3226
    ASSERT(!worker->channel_client->mig_inprogress);
3225
3227
 
3226
3228
    /* failure in recv can happen after the client sckt was shutdown
3227
3229
      (after client sent FIN, or after slirp sent FIN and client socket was closed */
3228
3230
    if (!__should_send_fin_to_guest(sckt)) {
3229
 
        SET_TUNNEL_ERROR(worker->channel,
 
3231
        SET_TUNNEL_ERROR(worker->channel_client,
3230
3232
                         "unexpected tunnel_socket_shutdown_recv client_status=%d slirp_status=%d",
3231
3233
                         sckt->client_status, sckt->slirp_status);
3232
3234
        tunnel_shutdown(worker);
3238
3240
    } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) {
3239
3241
        sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
3240
3242
    } else {
3241
 
        SET_TUNNEL_ERROR(worker->channel,
 
3243
        SET_TUNNEL_ERROR(worker->channel_client,
3242
3244
                         "unexpected tunnel_socket_shutdown_recv slirp_status=%d",
3243
3245
                         sckt->slirp_status);
3244
3246
        tunnel_shutdown(worker);
3294
3296
        // check if there is still data to send. the close will be sent after data is released.
3295
3297
        // close may already been pushed if it is a forced close
3296
3298
        if (!sckt->out_data.ready_chunks_queue.head && !sckt->pushed_close) {
3297
 
            __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
 
3299
            __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt);
3298
3300
        }
3299
3301
    } else if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) {
3300
3302
        if (sckt->client_waits_close_ack) {
3301
 
            __tunnel_socket_add_close_ack_to_pipe(worker->channel, sckt);
 
3303
            __tunnel_socket_add_close_ack_to_pipe(worker->channel_client, sckt);
3302
3304
        } else {
3303
3305
            tunnel_worker_free_socket(worker, sckt);
3304
3306
        }
3325
3327
 
3326
3328
    worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
3327
3329
#ifdef DEBUG_NETWORK
3328
 
    if (!worker->channel) {
 
3330
    if (!worker->channel_client) {
3329
3331
        red_printf("channel not connected");
3330
3332
    }
3331
3333
#endif
3332
 
    if (worker->channel && worker->channel->mig_inprogress) {
3333
 
        SET_TUNNEL_ERROR(worker->channel, "during migration");
 
3334
    if (worker->channel_client && worker->channel_client->mig_inprogress) {
 
3335
        SET_TUNNEL_ERROR(worker->channel_client, "during migration");
3334
3336
        tunnel_shutdown(worker);
3335
3337
        return;
3336
3338
    }
3342
3344
* channel interface and other related procedures
3343
3345
************************************************/
3344
3346
 
3345
 
static int tunnel_channel_config_socket(RedChannel *channel)
 
3347
static int tunnel_channel_config_socket(RedChannelClient *rcc)
3346
3348
{
3347
3349
    int flags;
3348
3350
    int delay_val;
 
3351
    RedsStream *stream = red_channel_client_get_stream(rcc);
3349
3352
 
3350
 
    if ((flags = fcntl(channel->stream->socket, F_GETFL)) == -1) {
 
3353
    if ((flags = fcntl(stream->socket, F_GETFL)) == -1) {
3351
3354
        red_printf("accept failed, %s", strerror(errno)); // can't we just use red_error?
3352
3355
        return FALSE;
3353
3356
    }
3354
3357
 
3355
 
    if (fcntl(channel->stream->socket, F_SETFL, flags | O_NONBLOCK) == -1) {
 
3358
    if (fcntl(stream->socket, F_SETFL, flags | O_NONBLOCK) == -1) {
3356
3359
        red_printf("accept failed, %s", strerror(errno));
3357
3360
        return FALSE;
3358
3361
    }
3359
3362
 
3360
3363
    delay_val = 1;
3361
3364
 
3362
 
    if (setsockopt(channel->stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
 
3365
    if (setsockopt(stream->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
3363
3366
                   sizeof(delay_val)) == -1) {
3364
3367
        red_printf("setsockopt failed, %s", strerror(errno));
3365
3368
    }
3389
3392
 
3390
3393
/* don't call disconnect from functions that might be called by slirp
3391
3394
   since it closes all its sockets and slirp is not aware of it */
3392
 
static void tunnel_channel_disconnect(RedChannel *channel)
 
3395
static void tunnel_channel_on_disconnect(RedChannel *channel)
3393
3396
{
3394
 
    TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
3395
3397
    TunnelWorker *worker;
3396
3398
    if (!channel) {
3397
3399
        return;
3398
3400
    }
3399
3401
    red_printf("");
3400
 
    worker = tunnel_channel->worker;
 
3402
    worker = (TunnelWorker *)channel->data;
3401
3403
 
3402
3404
    tunnel_worker_disconnect_slirp(worker);
3403
3405
 
3404
3406
    tunnel_worker_clear_routed_network(worker);
3405
 
    red_channel_destroy(channel);
3406
 
    worker->channel = NULL;
 
3407
    worker->channel_client = NULL;
 
3408
}
 
3409
 
 
3410
// TODO - not MC friendly, remove
 
3411
static void tunnel_channel_client_on_disconnect(RedChannelClient *rcc)
 
3412
{
 
3413
    tunnel_channel_on_disconnect(rcc->channel);
3407
3414
}
3408
3415
 
3409
3416
/* interface for reds */
3410
3417
 
3411
 
static void on_new_tunnel_channel(TunnelChannel *channel)
 
3418
static void on_new_tunnel_channel(TunnelChannelClient *tcc)
3412
3419
{
3413
 
    red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_SET_ACK);
 
3420
    red_channel_client_push_set_ack(&tcc->base);
3414
3421
 
3415
 
    if (channel->base.migrate) {
3416
 
        channel->expect_migrate_data = TRUE;
 
3422
    if (tcc->base.channel->migrate) {
 
3423
        tcc->expect_migrate_data = TRUE;
3417
3424
    } else {
3418
 
        red_channel_init_outgoing_messages_window(&channel->base);
3419
 
        red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_TUNNEL_INIT);
 
3425
        red_channel_init_outgoing_messages_window(tcc->base.channel);
 
3426
        red_channel_client_pipe_add_type(&tcc->base, PIPE_ITEM_TYPE_TUNNEL_INIT);
3420
3427
    }
3421
3428
}
3422
3429
 
3423
 
static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration,
3424
 
                                       int num_common_caps, uint32_t *common_caps, int num_caps,
 
3430
static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
 
3431
{
 
3432
}
 
3433
 
 
3434
static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client,
 
3435
                                       RedsStream *stream, int migration,
 
3436
                                       int num_common_caps,
 
3437
                                       uint32_t *common_caps, int num_caps,
3425
3438
                                       uint32_t *caps)
3426
3439
{
3427
 
    TunnelChannel *tunnel_channel;
 
3440
    TunnelChannelClient *tcc;
3428
3441
    TunnelWorker *worker = (TunnelWorker *)channel->data;
3429
 
    if (worker->channel) {
3430
 
        tunnel_channel_disconnect(&worker->channel->base);
3431
 
    }
3432
 
 
3433
 
    tunnel_channel =
3434
 
        (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), stream, worker->core_interface,
3435
 
                                            migration, TRUE,
3436
 
                                            tunnel_channel_config_socket,
3437
 
                                            tunnel_channel_disconnect,
3438
 
                                            tunnel_channel_handle_message,
3439
 
                                            tunnel_channel_alloc_msg_rcv_buf,
3440
 
                                            tunnel_channel_release_msg_rcv_buf,
3441
 
                                            tunnel_channel_send_item,
3442
 
                                            tunnel_channel_release_pipe_item);
3443
 
 
3444
 
    if (!tunnel_channel) {
3445
 
        return;
3446
 
    }
3447
 
 
3448
 
 
3449
 
    tunnel_channel->worker = worker;
3450
 
    tunnel_channel->worker->channel = tunnel_channel;
 
3442
 
 
3443
    if (worker->channel_client) {
 
3444
        red_error("tunnel does not support multiple client");
 
3445
    }
 
3446
 
 
3447
    tcc = (TunnelChannelClient*)red_channel_client_create(sizeof(TunnelChannelClient),
 
3448
                                                          channel, client, stream,
 
3449
                                                          0, NULL, 0, NULL);
 
3450
 
 
3451
    tcc->worker = worker;
 
3452
    tcc->worker->channel_client = tcc;
3451
3453
    net_slirp_set_net_interface(&worker->tunnel_interface.base);
3452
3454
 
3453
 
    on_new_tunnel_channel(tunnel_channel);
3454
 
}
3455
 
 
3456
 
static void handle_tunnel_channel_shutdown(struct Channel *channel)
3457
 
{
3458
 
    tunnel_channel_disconnect(&((TunnelWorker *)channel->data)->channel->base);
3459
 
}
3460
 
 
3461
 
static void handle_tunnel_channel_migrate(struct Channel *channel)
3462
 
{
 
3455
    on_new_tunnel_channel(tcc);
 
3456
}
 
3457
 
 
3458
static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc)
 
3459
{
 
3460
    TunnelChannelClient *tunnel_channel;
3463
3461
#ifdef DEBUG_NETWORK
3464
3462
    red_printf("TUNNEL_DBG: MIGRATE STARTED");
3465
3463
#endif
3466
 
    TunnelChannel *tunnel_channel = ((TunnelWorker *)channel->data)->channel;
 
3464
    tunnel_channel = (TunnelChannelClient *)rcc;
 
3465
    ASSERT(tunnel_channel == tunnel_channel->worker->channel_client);
3467
3466
    tunnel_channel->mig_inprogress = TRUE;
3468
3467
    net_slirp_freeze();
3469
 
    red_channel_pipe_add_type(&tunnel_channel->base, PIPE_ITEM_TYPE_MIGRATE);
 
3468
    red_channel_client_pipe_add_type(rcc, PIPE_ITEM_TYPE_MIGRATE);
 
3469
}
 
3470
 
 
3471
static void red_tunnel_channel_create(TunnelWorker *worker)
 
3472
{
 
3473
    RedChannel *channel;
 
3474
    ChannelCbs channel_cbs;
 
3475
    ClientCbs client_cbs = {0,};
 
3476
 
 
3477
    channel_cbs.config_socket = tunnel_channel_config_socket;
 
3478
    channel_cbs.on_disconnect = tunnel_channel_client_on_disconnect;
 
3479
    channel_cbs.alloc_recv_buf = tunnel_channel_alloc_msg_rcv_buf;
 
3480
    channel_cbs.release_recv_buf = tunnel_channel_release_msg_rcv_buf;
 
3481
    channel_cbs.hold_item = tunnel_channel_hold_pipe_item;
 
3482
    channel_cbs.send_item = tunnel_channel_send_item;
 
3483
    channel_cbs.release_item = tunnel_channel_release_pipe_item;
 
3484
    channel_cbs.handle_migrate_flush_mark = tunnel_channel_handle_migrate_mark;
 
3485
    channel_cbs.handle_migrate_data = tunnel_channel_handle_migrate_data;
 
3486
    channel_cbs.handle_migrate_data_get_serial = tunnel_channel_handle_migrate_data_get_serial;
 
3487
 
 
3488
    channel = red_channel_create(sizeof(RedChannel),
 
3489
                                 worker->core_interface,
 
3490
                                 SPICE_CHANNEL_TUNNEL, 0,
 
3491
                                 FALSE, // TODO: handle migration=TRUE
 
3492
                                 TRUE,
 
3493
                                 tunnel_channel_handle_message,
 
3494
                                 &channel_cbs);
 
3495
    if (!channel) {
 
3496
        return;
 
3497
    }
 
3498
 
 
3499
    client_cbs.connect = handle_tunnel_channel_link;
 
3500
    client_cbs.migrate = handle_tunnel_channel_client_migrate;
 
3501
    red_channel_register_client_cbs(channel, &client_cbs);
 
3502
 
 
3503
    worker->channel = channel;
 
3504
    red_channel_set_data(channel, worker);
 
3505
    reds_register_channel(worker->channel);
3470
3506
}
3471
3507