~ubuntu-branches/ubuntu/quantal/zeroc-ice/quantal

« back to all changes in this revision

Viewing changes to cpp/src/Ice/TcpTransceiver.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Cleto Martin Angelina
  • Date: 2011-04-25 18:44:24 UTC
  • mfrom: (6.1.14 sid)
  • Revision ID: james.westby@ubuntu.com-20110425184424-sep9i9euu434vq4c
Tags: 3.4.1-7
* Bug fix: "libdb5.1-java.jar was renamed to db.jar", thanks to Ondřej
  Surý (Closes: #623555).
* Bug fix: "causes noise in php5", thanks to Jayen Ashar (Closes:
  #623533).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
// **********************************************************************
2
2
//
3
 
// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
 
3
// Copyright (c) 2003-2010 ZeroC, Inc. All rights reserved.
4
4
//
5
5
// This copy of Ice is licensed to you under the terms described in the
6
6
// ICE_LICENSE file included in this distribution.
8
8
// **********************************************************************
9
9
 
10
10
#include <Ice/TcpTransceiver.h>
 
11
#include <Ice/Connection.h>
11
12
#include <Ice/Instance.h>
12
13
#include <Ice/TraceLevels.h>
13
14
#include <Ice/LoggerUtil.h>
20
21
using namespace Ice;
21
22
using namespace IceInternal;
22
23
 
23
 
SOCKET
24
 
IceInternal::TcpTransceiver::fd()
25
 
{
26
 
    assert(_fd != INVALID_SOCKET);
27
 
    return _fd;
 
24
NativeInfoPtr
 
25
IceInternal::TcpTransceiver::getNativeInfo()
 
26
{
 
27
    return this;
 
28
}
 
29
 
 
30
#ifdef ICE_USE_IOCP
 
31
AsyncInfo*
 
32
IceInternal::TcpTransceiver::getAsyncInfo(SocketOperation status)
 
33
{
 
34
    switch(status)
 
35
    {
 
36
    case SocketOperationRead:
 
37
        return &_read;
 
38
    case SocketOperationWrite:
 
39
        return &_write;
 
40
    default:
 
41
        assert(false);
 
42
        return 0;
 
43
    }
 
44
}
 
45
#endif
 
46
 
 
47
SocketOperation
 
48
IceInternal::TcpTransceiver::initialize()
 
49
{
 
50
    if(_state == StateNeedConnect)
 
51
    {
 
52
        _state = StateConnectPending;
 
53
        return SocketOperationConnect;
 
54
    }
 
55
    else if(_state <= StateConnectPending)
 
56
    {
 
57
        try
 
58
        {
 
59
#ifndef ICE_USE_IOCP
 
60
            doFinishConnect(_fd);
 
61
#else
 
62
            doFinishConnectAsync(_fd, _write);
 
63
#endif
 
64
            _state = StateConnected;
 
65
            _desc = fdToString(_fd);
 
66
        }
 
67
        catch(const Ice::LocalException& ex)
 
68
        {
 
69
            if(_traceLevels->network >= 2)
 
70
            {
 
71
                Trace out(_logger, _traceLevels->networkCat);
 
72
                out << "failed to establish tcp connection\n" << _desc << "\n" << ex;
 
73
            }
 
74
            throw;
 
75
        }
 
76
 
 
77
        if(_traceLevels->network >= 1)
 
78
        {
 
79
            Trace out(_logger, _traceLevels->networkCat);
 
80
            out << "tcp connection established\n" << _desc;
 
81
        }
 
82
    }
 
83
    assert(_state == StateConnected);
 
84
    return SocketOperationNone;
28
85
}
29
86
 
30
87
void
31
88
IceInternal::TcpTransceiver::close()
32
89
{
33
 
    if(_traceLevels->network >= 1)
 
90
    if(_state == StateConnected && _traceLevels->network >= 1)
34
91
    {
35
92
        Trace out(_logger, _traceLevels->networkCat);
36
93
        out << "closing tcp connection\n" << toString();
55
112
    // Its impossible for the packetSize to be more than an Int.
56
113
    int packetSize = static_cast<int>(buf.b.end() - buf.i);
57
114
    
58
 
#ifdef _WIN32
 
115
#ifdef ICE_USE_IOCP
59
116
    //
60
117
    // Limit packet size to avoid performance problems on WIN32
61
118
    //
62
 
    if(packetSize > _maxPacketSize)
 
119
    if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
63
120
    { 
64
 
        packetSize = _maxPacketSize;
 
121
        packetSize = _maxSendPacketSize;
65
122
    }
66
123
#endif
67
124
 
68
125
    while(buf.i != buf.b.end())
69
126
    {
70
127
        assert(_fd != INVALID_SOCKET);
 
128
 
71
129
        ssize_t ret = ::send(_fd, reinterpret_cast<const char*>(&*buf.i), packetSize, 0);
72
 
 
73
130
        if(ret == 0)
74
131
        {
75
132
            ConnectionLostException ex(__FILE__, __LINE__);
213
270
 
214
271
        buf.i += ret;
215
272
 
216
 
        if(packetSize > buf.b.end() - buf.i)
217
 
        {
218
 
            packetSize = static_cast<int>(buf.b.end() - buf.i);
219
 
        }
 
273
        packetSize = static_cast<int>(buf.b.end() - buf.i);
220
274
    }
221
275
 
222
276
    return true;
223
277
}
224
278
 
 
279
#ifdef ICE_USE_IOCP
 
280
bool
 
281
IceInternal::TcpTransceiver::startWrite(Buffer& buf)
 
282
{
 
283
    if(_state < StateConnected)
 
284
    {
 
285
        doConnectAsync(_fd, _connectAddr, _write);
 
286
        _desc = fdToString(_fd);
 
287
        return false;
 
288
    }
 
289
 
 
290
    assert(!buf.b.empty() && buf.i != buf.b.end());
 
291
 
 
292
    int packetSize = static_cast<int>(buf.b.end() - buf.i);
 
293
    if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
 
294
    { 
 
295
        packetSize = _maxSendPacketSize;
 
296
    }
 
297
 
 
298
    _write.buf.len = packetSize;
 
299
    _write.buf.buf = reinterpret_cast<char*>(&*buf.i);
 
300
    int err = WSASend(_fd, &_write.buf, 1, &_write.count, 0, &_write, NULL);
 
301
    if(err == SOCKET_ERROR)
 
302
    {
 
303
        if(!wouldBlock())
 
304
        {
 
305
            if(connectionLost())
 
306
            {
 
307
                ConnectionLostException ex(__FILE__, __LINE__);
 
308
                ex.error = getSocketErrno();
 
309
                throw ex;
 
310
            }
 
311
            else
 
312
            {
 
313
                SocketException ex(__FILE__, __LINE__);
 
314
                ex.error = getSocketErrno();
 
315
                throw ex;
 
316
            }
 
317
        }
 
318
    }
 
319
    return packetSize == static_cast<int>(buf.b.end() - buf.i);
 
320
}
 
321
 
 
322
void
 
323
IceInternal::TcpTransceiver::finishWrite(Buffer& buf)
 
324
{
 
325
    if(_state < StateConnected)
 
326
    {
 
327
        return;
 
328
    }
 
329
 
 
330
    if(_write.count == SOCKET_ERROR)
 
331
    {
 
332
        WSASetLastError(_write.error);
 
333
        if(connectionLost())
 
334
        {
 
335
            ConnectionLostException ex(__FILE__, __LINE__);
 
336
            ex.error = getSocketErrno();
 
337
            throw ex;
 
338
        }
 
339
        else
 
340
        {
 
341
            SocketException ex(__FILE__, __LINE__);
 
342
            ex.error = getSocketErrno();
 
343
            throw ex;
 
344
        }
 
345
    }
 
346
 
 
347
    if(_traceLevels->network >= 3)
 
348
    {
 
349
        int packetSize = static_cast<int>(buf.b.end() - buf.i);
 
350
        if(_maxSendPacketSize > 0 && packetSize > _maxSendPacketSize)
 
351
        { 
 
352
            packetSize = _maxSendPacketSize;
 
353
        }
 
354
        Trace out(_logger, _traceLevels->networkCat);
 
355
        out << "sent " << _write.count << " of " << packetSize << " bytes via tcp\n" << toString();
 
356
    }
 
357
    
 
358
    if(_stats)
 
359
    {
 
360
        _stats->bytesSent(type(), _write.count);
 
361
    }
 
362
 
 
363
    buf.i += _write.count;
 
364
}
 
365
 
 
366
void
 
367
IceInternal::TcpTransceiver::startRead(Buffer& buf)
 
368
{
 
369
    int packetSize = static_cast<int>(buf.b.end() - buf.i);
 
370
    if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
 
371
    {
 
372
        packetSize = _maxReceivePacketSize;
 
373
    }
 
374
 
 
375
    assert(!buf.b.empty() && buf.i != buf.b.end());
 
376
 
 
377
    _read.buf.len = packetSize;
 
378
    _read.buf.buf = reinterpret_cast<char*>(&*buf.i);
 
379
    int err = WSARecv(_fd, &_read.buf, 1, &_read.count, &_read.flags, &_read, NULL);
 
380
    if(err == SOCKET_ERROR)
 
381
    {
 
382
        if(!wouldBlock())
 
383
        {
 
384
            if(connectionLost())
 
385
            {
 
386
                ConnectionLostException ex(__FILE__, __LINE__);
 
387
                ex.error = getSocketErrno();
 
388
                throw ex;
 
389
            }
 
390
            else
 
391
            {
 
392
                SocketException ex(__FILE__, __LINE__);
 
393
                ex.error = getSocketErrno();
 
394
                throw ex;
 
395
            }
 
396
        }
 
397
    }
 
398
}
 
399
 
 
400
void
 
401
IceInternal::TcpTransceiver::finishRead(Buffer& buf)
 
402
{
 
403
    if(_read.count == SOCKET_ERROR)
 
404
    {
 
405
        WSASetLastError(_read.error);
 
406
        if(connectionLost())
 
407
        {
 
408
            ConnectionLostException ex(__FILE__, __LINE__);
 
409
            ex.error = getSocketErrno();
 
410
            throw ex;
 
411
        }
 
412
        else
 
413
        {
 
414
            SocketException ex(__FILE__, __LINE__);
 
415
            ex.error = getSocketErrno();
 
416
            throw ex;
 
417
        }
 
418
    }
 
419
    else if(_read.count == 0)
 
420
    {
 
421
        ConnectionLostException ex(__FILE__, __LINE__);
 
422
        ex.error = 0;
 
423
        throw ex;
 
424
    }
 
425
 
 
426
    if(_traceLevels->network >= 3)
 
427
    {
 
428
        int packetSize = static_cast<int>(buf.b.end() - buf.i);
 
429
        if(_maxReceivePacketSize > 0 && packetSize > _maxReceivePacketSize)
 
430
        {
 
431
            packetSize = _maxReceivePacketSize;
 
432
        }
 
433
        Trace out(_logger, _traceLevels->networkCat);
 
434
        out << "received " << _read.count << " of " << packetSize << " bytes via tcp\n" << toString();
 
435
    }
 
436
 
 
437
    if(_stats)
 
438
    {
 
439
        _stats->bytesReceived(type(), static_cast<Int>(_read.count));
 
440
    }
 
441
 
 
442
    buf.i += _read.count;
 
443
}
 
444
#endif
 
445
 
225
446
string
226
447
IceInternal::TcpTransceiver::type() const
227
448
{
234
455
    return _desc;
235
456
}
236
457
 
237
 
SocketStatus
238
 
IceInternal::TcpTransceiver::initialize()
 
458
Ice::ConnectionInfoPtr 
 
459
IceInternal::TcpTransceiver::getInfo() const
239
460
{
240
 
    if(_state == StateNeedConnect)
241
 
    {
242
 
        _state = StateConnectPending;
243
 
        return NeedConnect;
244
 
    }
245
 
    else if(_state <= StateConnectPending)
246
 
    {
247
 
        try
248
 
        {
249
 
            doFinishConnect(_fd);
250
 
            _state = StateConnected;
251
 
            _desc = fdToString(_fd);
252
 
        }
253
 
        catch(const Ice::LocalException& ex)
254
 
        {
255
 
            if(_traceLevels->network >= 2)
256
 
            {
257
 
                Trace out(_logger, _traceLevels->networkCat);
258
 
                out << "failed to establish tcp connection\n" << _desc << "\n" << ex;
259
 
            }
260
 
            throw;
261
 
        }
262
 
 
263
 
        if(_traceLevels->network >= 1)
264
 
        {
265
 
            Trace out(_logger, _traceLevels->networkCat);
266
 
            out << "tcp connection established\n" << _desc;
267
 
        }
268
 
    }
269
 
    assert(_state == StateConnected);
270
 
    return Finished;
 
461
    assert(_fd != INVALID_SOCKET);
 
462
    Ice::TCPConnectionInfoPtr info = new Ice::TCPConnectionInfo();
 
463
    fdToAddressAndPort(_fd, info->localAddress, info->localPort, info->remoteAddress, info->remotePort);
 
464
    return info;
271
465
}
272
466
 
273
467
void
275
469
{
276
470
    if(buf.b.size() > messageSizeMax)
277
471
    {
278
 
        throw MemoryLimitException(__FILE__, __LINE__);
 
472
        Ex::throwMemoryLimitException(__FILE__, __LINE__, buf.b.size(), messageSizeMax);
279
473
    }
280
474
}
281
475
 
282
476
IceInternal::TcpTransceiver::TcpTransceiver(const InstancePtr& instance, SOCKET fd, bool connected) :
 
477
    NativeInfo(fd),
283
478
    _traceLevels(instance->traceLevels()),
284
479
    _logger(instance->initializationData().logger),
285
480
    _stats(instance->initializationData().stats),
286
 
    _fd(fd),
287
481
    _state(connected ? StateConnected : StateNeedConnect),
288
 
    _desc(fdToString(_fd))
 
482
    _desc(connected ? fdToString(_fd) : string())
 
483
#ifdef ICE_USE_IOCP
 
484
    , _read(SocketOperationRead), 
 
485
    _write(SocketOperationWrite)
 
486
#endif
289
487
{
290
 
#ifdef _WIN32
 
488
    setBlock(_fd, false);
 
489
    setTcpBufSize(_fd, instance->initializationData().properties, _logger);
 
490
 
 
491
#ifdef ICE_USE_IOCP
291
492
    //
292
493
    // On Windows, limiting the buffer size is important to prevent
293
494
    // poor throughput performances when transfering large amount of
294
495
    // data. See Microsoft KB article KB823764.
295
496
    //
296
 
    _maxPacketSize = IceInternal::getSendBufferSize(_fd) / 2;
297
 
    if(_maxPacketSize < 512)
298
 
    {
299
 
        _maxPacketSize = 0;
 
497
    _maxSendPacketSize = IceInternal::getSendBufferSize(fd) / 2;
 
498
    if(_maxSendPacketSize < 512)
 
499
    {
 
500
        _maxSendPacketSize = 0;
 
501
    }
 
502
 
 
503
    _maxReceivePacketSize = IceInternal::getRecvBufferSize(fd);
 
504
    if(_maxReceivePacketSize < 512)
 
505
    {
 
506
        _maxReceivePacketSize = 0;
300
507
    }
301
508
#endif
302
509
}
305
512
{
306
513
    assert(_fd == INVALID_SOCKET);
307
514
}
 
515
 
 
516
void
 
517
IceInternal::TcpTransceiver::connect(const struct sockaddr_storage& addr)
 
518
{
 
519
#ifndef ICE_USE_IOCP
 
520
    try
 
521
    {
 
522
        if(doConnect(_fd, addr))
 
523
        {
 
524
            _state = StateConnected;
 
525
            _desc = fdToString(_fd);
 
526
            if(_traceLevels->network >= 1)
 
527
            {
 
528
                Trace out(_logger, _traceLevels->networkCat);
 
529
                out << "tcp connection established\n" << _desc;
 
530
            }
 
531
        }
 
532
        else
 
533
        {
 
534
            _desc = fdToString(_fd);
 
535
        }
 
536
    }
 
537
    catch(...)
 
538
    {
 
539
        _fd = INVALID_SOCKET;
 
540
        throw;
 
541
    }
 
542
#else
 
543
    _connectAddr = addr;
 
544
#endif
 
545
}