~ubuntu-branches/ubuntu/oneiric/libktorrent/oneiric

« back to all changes in this revision

Viewing changes to src/utp/connection.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Modestas Vainius
  • Date: 2011-05-26 00:33:38 UTC
  • mfrom: (5.1.7 experimental)
  • Revision ID: james.westby@ubuntu.com-20110526003338-2r4zwyzn8bovsxay
Tags: 1.1.1-2
Release to unstable.

Show diffs side-by-side

added added

removed removed

Lines of Context:
44
44
        }
45
45
        
46
46
        Connection::Connection(bt::Uint16 recv_connection_id, Type type, const net::Address& remote, Transmitter* transmitter) 
47
 
                : transmitter(transmitter)
 
47
                : transmitter(transmitter),timer_id(-1)
48
48
        {
49
49
                stats.type = type;
50
50
                stats.remote = remote;
77
77
                stats.packets_sent = 0;
78
78
                stats.bytes_lost = 0;
79
79
                stats.packets_lost = 0;
 
80
                stats.readable = stats.writeable = false;
80
81
                
81
82
                connect(this,SIGNAL(doDelayedStartTimer()),this,SLOT(delayedStartTimer()),Qt::QueuedConnection);
82
83
                if (type == INCOMING)
262
263
                                break;
263
264
                }
264
265
                
 
266
                checkState();
265
267
                startTimer();
266
268
                return stats.state;
267
269
        }
268
270
        
 
271
        void Connection::checkState()
 
272
        {
 
273
                // Check if we have become readable or writeable, and notify if necessary
 
274
                bool r = local_wnd->size() > 0 || stats.state == CS_CLOSED;
 
275
                bool w = remote_wnd->availableSpace() > 0 && stats.state == CS_CONNECTED;
 
276
                bool r_changed = !stats.readable && r;
 
277
                bool w_changed = !stats.writeable && w;
 
278
                stats.readable = r;
 
279
                stats.writeable = w;
 
280
                
 
281
                mutex.unlock();
 
282
                // Temporary unlock the mutex to avoid a deadlock
 
283
                if (r_changed || w_changed)
 
284
                        transmitter->stateChanged(self.toStrongRef(),r_changed,w_changed);
 
285
                mutex.lock();
 
286
        }
 
287
 
 
288
        
269
289
        void Connection::checkIfClosed()
270
290
        {
271
291
                // Check if we need to go to the closed state
272
292
                // We can do this if all our packets have been acked and the local window
273
293
                // has been fully read
274
 
                if (remote_wnd->allPacketsAcked() && local_wnd->isEmpty())
 
294
                if (stats.state == CS_FINISHED && remote_wnd->allPacketsAcked() && local_wnd->isEmpty())
275
295
                {
276
296
                        stats.state = CS_CLOSED;
277
297
                        Out(SYS_UTP|LOG_NOTICE) << "UTP: Connection " << stats.recv_connection_id << "|" << stats.send_connection_id << " closed " << endl;
323
343
                }
324
344
                
325
345
        
326
 
                if (!transmitter->sendTo(ba,stats.remote,receiveConnectionID()))
 
346
                if (!transmitter->sendTo(self.toStrongRef(),ba))
327
347
                        throw TransmissionError(__FILE__,__LINE__);
328
348
                
329
349
                last_packet_sent = tv;
407
427
                // first put data in the output buffer then send packets
408
428
                bt::Uint32 ret = output_buffer.write(data,len);
409
429
                sendPackets();
 
430
                stats.writeable = !output_buffer.full();
410
431
                return ret;
411
432
        }
412
433
        
442
463
                else
443
464
                        sendState();
444
465
        }
445
 
 
446
 
        int Connection::sendDataPacket(const QByteArray& packet)
 
466
        
 
467
        void Connection::sendDataPacket(const QByteArray& packet, Uint16 seq_nr, const utp::TimeValue& now)
447
468
        {
448
 
                bt::Uint32 to_send = packet.size();
449
 
                TimeValue now;
450
 
                
451
469
                bt::Uint32 extension_length = 0;
452
470
                bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
453
471
                if (sack_bits > 0)
462
480
                hdr.timestamp_microseconds = now.timestampMicroSeconds();
463
481
                hdr.timestamp_difference_microseconds = stats.reply_micro;
464
482
                hdr.wnd_size = stats.last_window_size_transmitted = local_wnd->availableSpace();
465
 
                hdr.seq_nr = stats.seq_nr;
 
483
                hdr.seq_nr = seq_nr;
466
484
                hdr.ack_nr = local_wnd->lastSeqNr();
467
485
                hdr.write((bt::Uint8*)ba.data());
468
486
                
476
494
                        local_wnd->fillSelectiveAck(&sack);
477
495
                }
478
496
                
479
 
                memcpy(ba.data() + Header::size() + extension_length,packet.data(),to_send);
480
 
                if (!transmitter->sendTo(ba,stats.remote,receiveConnectionID()))
 
497
                memcpy(ba.data() + Header::size() + extension_length,packet.data(),packet.size());
 
498
                if (!transmitter->sendTo(self.toStrongRef(),ba))
481
499
                        throw TransmissionError(__FILE__,__LINE__);
482
500
                
483
501
                last_packet_sent = now;
484
502
                stats.packets_sent++;
 
503
        }
 
504
 
 
505
 
 
506
        void Connection::sendDataPacket(const QByteArray& packet)
 
507
        {
 
508
                TimeValue now;
 
509
                sendDataPacket(packet,stats.seq_nr,now);
 
510
                
485
511
                remote_wnd->addPacket(packet,stats.seq_nr,bt::Now());
486
512
                stats.seq_nr++;
487
513
                startTimer();
488
 
                return to_send;
489
514
        }
490
515
 
491
516
        void Connection::retransmit(const QByteArray& packet, Uint16 p_seq_nr)
492
517
        {
493
518
                TimeValue now;
494
 
                
495
 
                bt::Uint32 extension_length = 0;
496
 
                bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
497
 
                if (sack_bits > 0)
498
 
                        extension_length += 2 + qMin(sack_bits / 8,(bt::Uint32)4);
499
 
                
500
 
                QByteArray ba(Header::size() + extension_length + packet.size(),0);
501
 
                Header hdr;
502
 
                hdr.version = 1;
503
 
                hdr.type = ST_DATA;
504
 
                hdr.extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
505
 
                hdr.connection_id = stats.send_connection_id;
506
 
                hdr.timestamp_microseconds = now.timestampMicroSeconds();
507
 
                hdr.timestamp_difference_microseconds = stats.reply_micro;
508
 
                hdr.wnd_size = stats.last_window_size_transmitted = local_wnd->availableSpace();
509
 
                hdr.seq_nr = p_seq_nr;
510
 
                hdr.ack_nr = local_wnd->lastSeqNr();
511
 
                hdr.write((bt::Uint8*)ba.data());
512
 
                
513
 
                if (extension_length > 0)
514
 
                {
515
 
                        bt::Uint8* ptr = (bt::Uint8*)(ba.data() + Header::size());
516
 
                        SelectiveAck sack;
517
 
                        sack.extension = ptr[0] = 0;
518
 
                        sack.length = ptr[1] = extension_length - 2;
519
 
                        sack.bitmask = ptr + 2;
520
 
                        local_wnd->fillSelectiveAck(&sack);
521
 
                }
522
 
                
523
 
                memcpy(ba.data() + Header::size() + extension_length,packet.data(),packet.size());
 
519
                sendDataPacket(packet, p_seq_nr, now);
524
520
                startTimer();
525
 
                if (!transmitter->sendTo(ba,stats.remote,receiveConnectionID()))
526
 
                        throw TransmissionError(__FILE__,__LINE__);
527
 
                
528
 
                last_packet_sent = now;
529
 
                stats.packets_sent++;
530
521
        }
531
522
 
532
523
        bt::Uint32 Connection::bytesAvailable() const
556
547
                        sendState();
557
548
                
558
549
                stats.bytes_received += ret;
 
550
                stats.readable = local_wnd->size() > 0;
559
551
                return ret;
560
552
        }
561
553
 
562
554
        
563
555
        bool Connection::waitUntilConnected()
564
556
        {
565
 
                mutex.lock();
 
557
                QMutexLocker lock(&mutex);
 
558
                if (stats.state == CS_CONNECTED)
 
559
                        return true;
 
560
                
566
561
                connected.wait(&mutex);
567
 
                bool ret = stats.state == CS_CONNECTED;
568
 
                mutex.unlock();
569
 
                return ret;
 
562
                return stats.state == CS_CONNECTED;
570
563
        }
571
564
 
572
565
 
573
 
        bool Connection::waitForData()
 
566
        bool Connection::waitForData(Uint32 timeout)
574
567
        {
575
 
                mutex.lock();
576
 
                data_ready.wait(&mutex);
577
 
                bool ret = local_wnd->size() > 0;
578
 
                mutex.unlock();
579
 
                return ret;
 
568
                QMutexLocker lock(&mutex);
 
569
                if (local_wnd->size() > 0)
 
570
                        return true;
 
571
                
 
572
                data_ready.wait(&mutex,timeout == 0 ? ULONG_MAX : timeout);
 
573
                return local_wnd->size() > 0;
580
574
        }
581
575
 
582
576
 
637
631
                                        sendState();
638
632
                                }
639
633
                                break;
640
 
                        case CS_CLOSED:
641
634
                        case CS_IDLE:
642
635
                                startTimer();
643
636
                                break;
 
637
                        case CS_CLOSED:
 
638
                                break;
644
639
                }
 
640
                
 
641
                checkState();
 
642
                if (stats.state == CS_CLOSED)
 
643
                        transmitter->closed(self.toStrongRef());
645
644
        }
646
645
 
647
646
        void Connection::dumpStats()
669
668
                if (QThread::currentThread() != thread())
670
669
                        emit doDelayedStartTimer();
671
670
                else
672
 
                        timer.start(stats.timeout,this);
 
671
                        delayedStartTimer();
673
672
        }
674
673
        
675
674
        void Connection::delayedStartTimer()
676
675
        {
677
 
                timer.start(stats.timeout,this);
678
 
        }
679
 
 
680
 
        void Connection::timerEvent(QTimerEvent* event)
681
 
        {
682
 
                Q_UNUSED(event);
683
 
                handleTimeout();
684
 
        }
685
 
 
 
676
                if (timer_id != -1)
 
677
                        transmitter->cancelTimer(timer_id);
 
678
                timer_id = transmitter->scheduleTimer(self.toStrongRef(),stats.timeout);
 
679
        }
686
680
}
687
681