266
268
return stats.state;
271
void Connection::checkState()
273
// Check if we have become readable or writeable, and notify if necessary
274
bool r = local_wnd->size() > 0 || stats.state == CS_CLOSED;
275
bool w = remote_wnd->availableSpace() > 0 && stats.state == CS_CONNECTED;
276
bool r_changed = !stats.readable && r;
277
bool w_changed = !stats.writeable && w;
282
// Temporary unlock the mutex to avoid a deadlock
283
if (r_changed || w_changed)
284
transmitter->stateChanged(self.toStrongRef(),r_changed,w_changed);
269
289
void Connection::checkIfClosed()
271
291
// Check if we need to go to the closed state
272
292
// We can do this if all our packets have been acked and the local window
273
293
// has been fully read
274
if (remote_wnd->allPacketsAcked() && local_wnd->isEmpty())
294
if (stats.state == CS_FINISHED && remote_wnd->allPacketsAcked() && local_wnd->isEmpty())
276
296
stats.state = CS_CLOSED;
277
297
Out(SYS_UTP|LOG_NOTICE) << "UTP: Connection " << stats.recv_connection_id << "|" << stats.send_connection_id << " closed " << endl;
446
int Connection::sendDataPacket(const QByteArray& packet)
467
void Connection::sendDataPacket(const QByteArray& packet, Uint16 seq_nr, const utp::TimeValue& now)
448
bt::Uint32 to_send = packet.size();
451
469
bt::Uint32 extension_length = 0;
452
470
bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
453
471
if (sack_bits > 0)
462
480
hdr.timestamp_microseconds = now.timestampMicroSeconds();
463
481
hdr.timestamp_difference_microseconds = stats.reply_micro;
464
482
hdr.wnd_size = stats.last_window_size_transmitted = local_wnd->availableSpace();
465
hdr.seq_nr = stats.seq_nr;
466
484
hdr.ack_nr = local_wnd->lastSeqNr();
467
485
hdr.write((bt::Uint8*)ba.data());
476
494
local_wnd->fillSelectiveAck(&sack);
479
memcpy(ba.data() + Header::size() + extension_length,packet.data(),to_send);
480
if (!transmitter->sendTo(ba,stats.remote,receiveConnectionID()))
497
memcpy(ba.data() + Header::size() + extension_length,packet.data(),packet.size());
498
if (!transmitter->sendTo(self.toStrongRef(),ba))
481
499
throw TransmissionError(__FILE__,__LINE__);
483
501
last_packet_sent = now;
484
502
stats.packets_sent++;
506
void Connection::sendDataPacket(const QByteArray& packet)
509
sendDataPacket(packet,stats.seq_nr,now);
485
511
remote_wnd->addPacket(packet,stats.seq_nr,bt::Now());
491
516
void Connection::retransmit(const QByteArray& packet, Uint16 p_seq_nr)
495
bt::Uint32 extension_length = 0;
496
bt::Uint32 sack_bits = local_wnd->selectiveAckBits();
498
extension_length += 2 + qMin(sack_bits / 8,(bt::Uint32)4);
500
QByteArray ba(Header::size() + extension_length + packet.size(),0);
504
hdr.extension = extension_length == 0 ? 0 : SELECTIVE_ACK_ID;
505
hdr.connection_id = stats.send_connection_id;
506
hdr.timestamp_microseconds = now.timestampMicroSeconds();
507
hdr.timestamp_difference_microseconds = stats.reply_micro;
508
hdr.wnd_size = stats.last_window_size_transmitted = local_wnd->availableSpace();
509
hdr.seq_nr = p_seq_nr;
510
hdr.ack_nr = local_wnd->lastSeqNr();
511
hdr.write((bt::Uint8*)ba.data());
513
if (extension_length > 0)
515
bt::Uint8* ptr = (bt::Uint8*)(ba.data() + Header::size());
517
sack.extension = ptr[0] = 0;
518
sack.length = ptr[1] = extension_length - 2;
519
sack.bitmask = ptr + 2;
520
local_wnd->fillSelectiveAck(&sack);
523
memcpy(ba.data() + Header::size() + extension_length,packet.data(),packet.size());
519
sendDataPacket(packet, p_seq_nr, now);
525
if (!transmitter->sendTo(ba,stats.remote,receiveConnectionID()))
526
throw TransmissionError(__FILE__,__LINE__);
528
last_packet_sent = now;
529
stats.packets_sent++;
532
523
bt::Uint32 Connection::bytesAvailable() const
558
549
stats.bytes_received += ret;
550
stats.readable = local_wnd->size() > 0;
563
555
bool Connection::waitUntilConnected()
557
QMutexLocker lock(&mutex);
558
if (stats.state == CS_CONNECTED)
566
561
connected.wait(&mutex);
567
bool ret = stats.state == CS_CONNECTED;
562
return stats.state == CS_CONNECTED;
573
bool Connection::waitForData()
566
bool Connection::waitForData(Uint32 timeout)
576
data_ready.wait(&mutex);
577
bool ret = local_wnd->size() > 0;
568
QMutexLocker lock(&mutex);
569
if (local_wnd->size() > 0)
572
data_ready.wait(&mutex,timeout == 0 ? ULONG_MAX : timeout);
573
return local_wnd->size() > 0;
669
668
if (QThread::currentThread() != thread())
670
669
emit doDelayedStartTimer();
672
timer.start(stats.timeout,this);
675
674
void Connection::delayedStartTimer()
677
timer.start(stats.timeout,this);
680
void Connection::timerEvent(QTimerEvent* event)
677
transmitter->cancelTimer(timer_id);
678
timer_id = transmitter->scheduleTimer(self.toStrongRef(),stats.timeout);