3
* Copyright 2004--2005, Google Inc.
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* 1. Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
#include "talk/p2p/base/port.h"
33
#include "talk/base/helpers.h"
34
#include "talk/base/logging.h"
35
#include "talk/base/messagedigest.h"
36
#include "talk/base/scoped_ptr.h"
37
#include "talk/base/stringutils.h"
38
#include "talk/p2p/base/common.h"
42
// The length of time we wait before timing out readability on a connection.
43
const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000; // 30 seconds
45
// The length of time we wait before timing out writability on a connection.
46
const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000; // 15 seconds
48
// The length of time we wait before we become unwritable.
49
const uint32 CONNECTION_WRITE_CONNECT_TIMEOUT = 5 * 1000; // 5 seconds
51
// The number of pings that must fail to respond before we become unwritable.
52
const uint32 CONNECTION_WRITE_CONNECT_FAILURES = 5;
54
// This is the length of time that we wait for a ping response to come back.
55
const int CONNECTION_RESPONSE_TIMEOUT = 5 * 1000; // 5 seconds
57
// Determines whether we have seen at least the given maximum number of
58
// pings fail to have a response.
59
inline bool TooManyFailures(
60
const std::vector<uint32>& pings_since_last_response,
61
uint32 maximum_failures,
65
// If we haven't sent that many pings, then we can't have failed that many.
66
if (pings_since_last_response.size() < maximum_failures)
69
// Check if the window in which we would expect a response to the ping has
71
return pings_since_last_response[maximum_failures - 1] + rtt_estimate < now;
74
// Determines whether we have gone too long without seeing any response.
75
inline bool TooLongWithoutResponse(
76
const std::vector<uint32>& pings_since_last_response,
80
if (pings_since_last_response.size() == 0)
83
return pings_since_last_response[0] + maximum_time < now;
86
// We will restrict RTT estimates (when used for determining state) to be
87
// within a reasonable range.
88
const uint32 MINIMUM_RTT = 100; // 0.1 seconds
89
const uint32 MAXIMUM_RTT = 3000; // 3 seconds
91
// When we don't have any RTT data, we have to pick something reasonable. We
92
// use a large value just in case the connection is really slow.
93
const uint32 DEFAULT_RTT = MAXIMUM_RTT;
95
// Computes our estimate of the RTT given the current estimate.
96
inline uint32 ConservativeRTTEstimate(uint32 rtt) {
97
return talk_base::_max(MINIMUM_RTT, talk_base::_min(MAXIMUM_RTT, 2 * rtt));
100
// Weighting of the old rtt value to new data.
101
const int RTT_RATIO = 3; // 3 : 1
103
// The delay before we begin checking if this port is useless.
104
const int kPortTimeoutDelay = 30 * 1000; // 30 seconds
106
const uint32 MSG_CHECKTIMEOUT = 1;
107
const uint32 MSG_DELETE = 1;
109
// Mediaproxy expects username to be 16 bytes.
110
const int kUsernameLength = 16;
111
// Minimum password length of 22 characters as per RFC5245.
112
const int kPasswordLength = 22;
117
static const char* const PROTO_NAMES[] = { "udp", "tcp", "ssltcp" };
119
const char* ProtoToString(ProtocolType proto) {
120
return PROTO_NAMES[proto];
123
bool StringToProto(const char* value, ProtocolType* proto) {
124
for (size_t i = 0; i <= PROTO_LAST; ++i) {
125
if (strcmp(PROTO_NAMES[i], value) == 0) {
126
*proto = static_cast<ProtocolType>(i);
133
Port::Port(talk_base::Thread* thread, const std::string& type,
134
talk_base::PacketSocketFactory* factory, talk_base::Network* network,
135
const talk_base::IPAddress& ip, int min_port, int max_port)
145
lifetime_(LT_PRESTART),
146
enable_port_packets_(false),
147
enable_message_integrity_(false) {
148
ASSERT(factory_ != NULL);
150
set_username_fragment(talk_base::CreateRandomString(kUsernameLength));
151
set_password(talk_base::CreateRandomString(kPasswordLength));
152
LOG_J(LS_INFO, this) << "Port created";
156
// Delete all of the remaining connections. We copy the list up front
157
// because each deletion will cause it to be modified.
159
std::vector<Connection*> list;
161
AddressMap::iterator iter = connections_.begin();
162
while (iter != connections_.end()) {
163
list.push_back(iter->second);
167
for (uint32 i = 0; i < list.size(); i++)
171
Connection* Port::GetConnection(const talk_base::SocketAddress& remote_addr) {
172
AddressMap::const_iterator iter = connections_.find(remote_addr);
173
if (iter != connections_.end())
179
void Port::AddAddress(const talk_base::SocketAddress& address,
180
const std::string& protocol,
185
c.set_protocol(protocol);
186
c.set_address(address);
187
c.set_preference(preference_);
188
c.set_username(username_frag_);
189
c.set_password(password_);
190
c.set_network_name(network_->name());
191
c.set_generation(generation_);
192
candidates_.push_back(c);
195
SignalAddressReady(this);
198
void Port::AddConnection(Connection* conn) {
199
connections_[conn->remote_candidate().address()] = conn;
200
conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed);
201
SignalConnectionCreated(this, conn);
204
void Port::OnReadPacket(
205
const char* data, size_t size, const talk_base::SocketAddress& addr) {
206
// If the user has enabled port packets, just hand this over.
207
if (enable_port_packets_) {
208
SignalReadPacket(this, data, size, addr);
212
// If this is an authenticated STUN request, then signal unknown address and
213
// send back a proper binding response.
215
std::string remote_username;
216
if (!GetStunMessage(data, size, addr, &msg, &remote_username)) {
217
LOG_J(LS_ERROR, this) << "Received non-STUN packet from unknown address ("
218
<< addr.ToString() << ")";
220
// STUN message handled already
221
} else if (msg->type() == STUN_BINDING_REQUEST) {
222
SignalUnknownAddress(this, addr, msg, remote_username, false);
224
// NOTE(tschmelcher): STUN_BINDING_RESPONSE is benign. It occurs if we
225
// pruned a connection for this port while it had STUN requests in flight,
226
// because we then get back responses for them, which this code correctly
228
if (msg->type() != STUN_BINDING_RESPONSE) {
229
LOG_J(LS_ERROR, this) << "Received unexpected STUN message type ("
230
<< msg->type() << ") from unknown address ("
231
<< addr.ToString() << ")";
237
bool Port::GetStunMessage(const char* data, size_t size,
238
const talk_base::SocketAddress& addr,
239
StunMessage** out_msg, std::string* out_username) {
240
// NOTE: This could clearly be optimized to avoid allocating any memory.
241
// However, at the data rates we'll be looking at on the client side,
242
// this probably isn't worth worrying about.
243
ASSERT(out_msg != NULL);
244
ASSERT(out_username != NULL);
246
out_username->clear();
248
// Parse the request message. If the packet is not a complete and correct
249
// STUN message, then ignore it.
250
talk_base::scoped_ptr<StunMessage> stun_msg(new StunMessage());
251
talk_base::ByteBuffer buf(data, size);
252
if (!stun_msg->Read(&buf) || (buf.Length() > 0)) {
256
// The packet must include a username that either begins or ends with our
257
// fragment. It should begin with our fragment if it is a request and it
258
// should end with our fragment if it is a response.
259
const StunByteStringAttribute* username_attr =
260
stun_msg->GetByteString(STUN_ATTR_USERNAME);
262
int remote_frag_len = (username_attr ? username_attr->length() : 0);
263
remote_frag_len -= static_cast<int>(username_frag_.size());
265
if (stun_msg->type() == STUN_BINDING_REQUEST) {
266
if (remote_frag_len < 0) {
267
// Username not present or corrupted, don't reply.
268
LOG_J(LS_ERROR, this) << "Received STUN request without username from "
271
} else if (std::memcmp(username_attr->bytes(), username_frag_.c_str(),
272
username_frag_.size()) != 0) {
273
LOG_J(LS_ERROR, this) << "Received STUN request with bad local username "
274
<< std::string(username_attr->bytes(),
275
username_attr->length())
278
SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_UNAUTHORIZED,
279
STUN_ERROR_REASON_UNAUTHORIZED);
283
if (enable_message_integrity_ &&
284
!stun_msg->ValidateMessageIntegrity(data, size, password_)) {
285
LOG_J(LS_ERROR, this) << "Message Integrity check failed for request, "
288
SendBindingErrorResponse(stun_msg.get(), addr, STUN_ERROR_UNAUTHORIZED,
289
STUN_ERROR_REASON_UNAUTHORIZED);
293
out_username->assign(username_attr->bytes() + username_frag_.size(),
294
username_attr->bytes() + username_attr->length());
295
} else if ((stun_msg->type() == STUN_BINDING_RESPONSE)
296
|| (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE)) {
297
if (remote_frag_len < 0) {
298
LOG_J(LS_ERROR, this) << "Received STUN response without username from "
300
// Do not send error response to a response
302
} else if (std::memcmp(username_attr->bytes() + remote_frag_len,
303
username_frag_.c_str(),
304
username_frag_.size()) != 0) {
305
LOG_J(LS_ERROR, this) << "Received STUN response with bad local username "
306
<< std::string(username_attr->bytes(),
307
username_attr->length())
310
// Do not send error response to a response
313
out_username->assign(username_attr->bytes(),
314
username_attr->bytes() + remote_frag_len);
316
if (stun_msg->type() == STUN_BINDING_ERROR_RESPONSE) {
317
if (const StunErrorCodeAttribute* error_code = stun_msg->GetErrorCode()) {
318
LOG_J(LS_ERROR, this) << "Received STUN binding error:"
320
<< static_cast<int>(error_code->error_class())
322
<< static_cast<int>(error_code->number())
323
<< " reason='" << error_code->reason() << "'"
324
<< " from " << addr.ToString();
325
// Return message to allow error-specific processing
327
LOG_J(LS_ERROR, this) << "Received STUN binding error without a error "
328
<< "code from " << addr.ToString();
329
// Drop corrupt message
334
LOG_J(LS_ERROR, this) << "Received STUN packet with invalid type ("
335
<< stun_msg->type() << ") from " << addr.ToString();
339
// Return the STUN message found.
340
*out_msg = stun_msg.release();
344
void Port::SendBindingResponse(StunMessage* request,
345
const talk_base::SocketAddress& addr) {
346
ASSERT(request->type() == STUN_BINDING_REQUEST);
348
// Retrieve the username from the request.
349
const StunByteStringAttribute* username_attr =
350
request->GetByteString(STUN_ATTR_USERNAME);
351
ASSERT(username_attr != NULL);
352
if (username_attr == NULL) {
353
// No valid username, skip the response.
357
// Fill in the response message.
358
StunMessage response;
359
response.SetType(STUN_BINDING_RESPONSE);
360
response.SetTransactionID(request->transaction_id());
362
StunByteStringAttribute* username2_attr =
363
StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
364
username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
365
response.AddAttribute(username2_attr);
367
StunAddressAttribute* addr_attr =
368
StunAttribute::CreateAddress(STUN_ATTR_MAPPED_ADDRESS);
369
addr_attr->SetPort(addr.port());
370
addr_attr->SetIP(addr.ipaddr());
371
response.AddAttribute(addr_attr);
373
// Adding MESSAGE-INTEGRITY attribute to the response message.
374
if (enable_message_integrity_)
375
response.AddMessageIntegrity(password_);
377
// Send the response message.
378
talk_base::ByteBuffer buf;
379
response.Write(&buf);
380
if (SendTo(buf.Data(), buf.Length(), addr, false) < 0) {
381
LOG_J(LS_ERROR, this) << "Failed to send STUN ping response to "
385
// The fact that we received a successful request means that this connection
386
// (if one exists) should now be readable.
387
Connection* conn = GetConnection(addr);
388
ASSERT(conn != NULL);
390
conn->ReceivedPing();
393
void Port::SendBindingErrorResponse(StunMessage* request,
394
const talk_base::SocketAddress& addr,
395
int error_code, const std::string& reason) {
396
ASSERT(request->type() == STUN_BINDING_REQUEST);
398
// Retrieve the username from the request. If it didn't have one, we
399
// shouldn't be responding at all.
400
const StunByteStringAttribute* username_attr =
401
request->GetByteString(STUN_ATTR_USERNAME);
402
ASSERT(username_attr != NULL);
403
if (username_attr == NULL) {
404
// No valid username, skip the response.
408
// Fill in the response message.
409
StunMessage response;
410
response.SetType(STUN_BINDING_ERROR_RESPONSE);
411
response.SetTransactionID(request->transaction_id());
413
StunByteStringAttribute* username2_attr =
414
StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
415
username2_attr->CopyBytes(username_attr->bytes(), username_attr->length());
416
response.AddAttribute(username2_attr);
418
StunErrorCodeAttribute* error_attr = StunAttribute::CreateErrorCode();
419
error_attr->SetErrorCode(error_code);
420
error_attr->SetReason(reason);
421
response.AddAttribute(error_attr);
423
// Send the response message.
424
// NOTE: If we wanted to, this is where we would add the HMAC.
425
talk_base::ByteBuffer buf;
426
response.Write(&buf);
427
SendTo(buf.Data(), buf.Length(), addr, false);
428
LOG_J(LS_INFO, this) << "Sending STUN binding error: reason=" << reason
429
<< " to " << addr.ToString();
432
void Port::OnMessage(talk_base::Message *pmsg) {
433
ASSERT(pmsg->message_id == MSG_CHECKTIMEOUT);
434
ASSERT(lifetime_ == LT_PRETIMEOUT);
435
lifetime_ = LT_POSTTIMEOUT;
439
std::string Port::ToString() const {
440
std::stringstream ss;
441
ss << "Port[" << name_ << ":" << generation_ << ":" << type_
442
<< ":" << network_->ToString() << "]";
446
void Port::EnablePortPackets() {
447
enable_port_packets_ = true;
451
// The port sticks around for a minimum lifetime, after which
452
// we destroy it when it drops to zero connections.
453
if (lifetime_ == LT_PRESTART) {
454
lifetime_ = LT_PRETIMEOUT;
455
thread_->PostDelayed(kPortTimeoutDelay, this, MSG_CHECKTIMEOUT);
457
LOG_J(LS_WARNING, this) << "Port restart attempted";
461
void Port::OnConnectionDestroyed(Connection* conn) {
462
AddressMap::iterator iter =
463
connections_.find(conn->remote_candidate().address());
464
ASSERT(iter != connections_.end());
465
connections_.erase(iter);
470
void Port::Destroy() {
471
ASSERT(connections_.empty());
472
LOG_J(LS_INFO, this) << "Port deleted";
473
SignalDestroyed(this);
477
void Port::CheckTimeout() {
478
// If this port has no connections, then there's no reason to keep it around.
479
// When the connections time out (both read and write), they will delete
480
// themselves, so if we have any connections, they are either readable or
481
// writable (or still connecting).
482
if ((lifetime_ == LT_POSTTIMEOUT) && connections_.empty()) {
487
// A ConnectionRequest is a simple STUN ping used to determine writability.
488
class ConnectionRequest : public StunRequest {
490
explicit ConnectionRequest(Connection* connection) : connection_(connection) {
493
virtual ~ConnectionRequest() {
496
virtual void Prepare(StunMessage* request) {
497
request->SetType(STUN_BINDING_REQUEST);
498
StunByteStringAttribute* username_attr =
499
StunAttribute::CreateByteString(STUN_ATTR_USERNAME);
500
std::string username = connection_->remote_candidate().username();
501
username.append(connection_->port()->username_fragment());
502
username_attr->CopyBytes(username.c_str(), username.size());
503
request->AddAttribute(username_attr);
505
// Adding Message Integrity to the STUN request message.
506
request->AddMessageIntegrity(connection_->remote_candidate().password());
509
virtual void OnResponse(StunMessage* response) {
510
connection_->OnConnectionRequestResponse(this, response);
513
virtual void OnErrorResponse(StunMessage* response) {
514
connection_->OnConnectionRequestErrorResponse(this, response);
517
virtual void OnTimeout() {
518
connection_->OnConnectionRequestTimeout(this);
521
virtual int GetNextDelay() {
522
// Each request is sent only once. After a single delay , the request will
525
return CONNECTION_RESPONSE_TIMEOUT;
529
Connection* connection_;
536
Connection::Connection(Port* port, size_t index,
537
const Candidate& remote_candidate)
538
: port_(port), local_candidate_index_(index),
539
remote_candidate_(remote_candidate), read_state_(STATE_READ_TIMEOUT),
540
write_state_(STATE_WRITE_CONNECT), connected_(true), pruned_(false),
541
requests_(port->thread()), rtt_(DEFAULT_RTT),
542
last_ping_sent_(0), last_ping_received_(0), last_data_received_(0),
544
// Wire up to send stun packets
545
requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket);
546
LOG_J(LS_INFO, this) << "Connection created";
549
Connection::~Connection() {
552
const Candidate& Connection::local_candidate() const {
553
ASSERT(local_candidate_index_ < port_->candidates().size());
554
return port_->candidates()[local_candidate_index_];
557
void Connection::set_read_state(ReadState value) {
558
ReadState old_value = read_state_;
560
if (value != old_value) {
561
LOG_J(LS_VERBOSE, this) << "set_read_state";
562
SignalStateChange(this);
567
void Connection::set_write_state(WriteState value) {
568
WriteState old_value = write_state_;
569
write_state_ = value;
570
if (value != old_value) {
571
LOG_J(LS_VERBOSE, this) << "set_write_state";
572
SignalStateChange(this);
577
void Connection::set_connected(bool value) {
578
bool old_value = connected_;
580
if (value != old_value) {
581
LOG_J(LS_VERBOSE, this) << "set_connected";
585
void Connection::OnSendStunPacket(const void* data, size_t size,
587
if (port_->SendTo(data, size, remote_candidate_.address(), false) < 0) {
588
LOG_J(LS_WARNING, this) << "Failed to send STUN ping " << req->id();
592
void Connection::OnReadPacket(const char* data, size_t size) {
594
std::string remote_username;
595
const talk_base::SocketAddress& addr(remote_candidate_.address());
596
if (!port_->GetStunMessage(data, size, addr, &msg, &remote_username)) {
597
// The packet did not parse as a valid STUN message
599
// If this connection is readable, then pass along the packet.
600
if (read_state_ == STATE_READABLE) {
601
// readable means data from this address is acceptable
604
last_data_received_ = talk_base::Time();
605
recv_rate_tracker_.Update(size);
606
SignalReadPacket(this, data, size);
608
// If timed out sending writability checks, start up again
609
if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
610
set_write_state(STATE_WRITE_CONNECT);
612
// Not readable means the remote address hasn't sent a valid
613
// binding request yet.
615
LOG_J(LS_WARNING, this)
616
<< "Received non-STUN packet from an unreadable connection.";
619
// The packet was STUN, but was already handled internally.
620
} else if (remote_username != remote_candidate_.username()) {
621
// The packet had the right local username, but the remote username was
622
// not the right one for the remote address.
623
if (msg->type() == STUN_BINDING_REQUEST) {
624
LOG_J(LS_ERROR, this) << "Received STUN request with bad remote username "
626
port_->SendBindingErrorResponse(msg, addr, STUN_ERROR_BAD_REQUEST,
627
STUN_ERROR_REASON_BAD_REQUEST);
628
} else if (msg->type() == STUN_BINDING_RESPONSE ||
629
msg->type() == STUN_BINDING_ERROR_RESPONSE) {
630
LOG_J(LS_ERROR, this) << "Received STUN response with bad remote username"
631
" " << remote_username;
635
// The packet is STUN, with the right username.
636
// If this is a STUN request, then update the readable bit and respond.
637
// If this is a STUN response, then update the writable bit.
639
switch (msg->type()) {
640
case STUN_BINDING_REQUEST:
641
// Incoming, validated stun request from remote peer.
642
// This call will also set the connection readable.
644
port_->SendBindingResponse(msg, addr);
646
// If timed out sending writability checks, start up again
647
if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT))
648
set_write_state(STATE_WRITE_CONNECT);
651
// Response from remote peer. Does it match request sent?
652
// This doesn't just check, it makes callbacks if transaction
654
case STUN_BINDING_RESPONSE:
655
if (!port_->enable_message_integrity() ||
656
msg->ValidateMessageIntegrity(
657
data, size, remote_candidate().password())) {
658
requests_.CheckResponse(msg);
660
// Otherwise silently discard the response message.
662
case STUN_BINDING_ERROR_RESPONSE:
663
requests_.CheckResponse(msg);
671
// Done with the message; delete
677
void Connection::Prune() {
679
LOG_J(LS_VERBOSE, this) << "Connection pruned";
682
set_write_state(STATE_WRITE_TIMEOUT);
686
void Connection::Destroy() {
687
LOG_J(LS_VERBOSE, this) << "Connection destroyed";
688
set_read_state(STATE_READ_TIMEOUT);
689
set_write_state(STATE_WRITE_TIMEOUT);
692
void Connection::UpdateState(uint32 now) {
693
uint32 rtt = ConservativeRTTEstimate(rtt_);
696
for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
698
talk_base::sprintfn(buf, sizeof(buf), "%u",
699
pings_since_last_response_[i]);
700
pings.append(buf).append(" ");
702
LOG_J(LS_VERBOSE, this) << "UpdateState(): pings_since_last_response_=" <<
703
pings << ", rtt=" << rtt << ", now=" << now;
705
// Check the readable state.
707
// Since we don't know how many pings the other side has attempted, the best
708
// test we can do is a simple window.
710
if ((read_state_ == STATE_READABLE) &&
711
(last_ping_received_ + CONNECTION_READ_TIMEOUT <= now)) {
712
LOG_J(LS_INFO, this) << "Unreadable after "
713
<< now - last_ping_received_
714
<< " ms without a ping, rtt=" << rtt;
715
set_read_state(STATE_READ_TIMEOUT);
718
// Check the writable state. (The order of these checks is important.)
720
// Before becoming unwritable, we allow for a fixed number of pings to fail
721
// (i.e., receive no response). We also have to give the response time to
722
// get back, so we include a conservative estimate of this.
724
// Before timing out writability, we give a fixed amount of time. This is to
725
// allow for changes in network conditions.
727
if ((write_state_ == STATE_WRITABLE) &&
728
TooManyFailures(pings_since_last_response_,
729
CONNECTION_WRITE_CONNECT_FAILURES,
732
TooLongWithoutResponse(pings_since_last_response_,
733
CONNECTION_WRITE_CONNECT_TIMEOUT,
735
uint32 max_pings = CONNECTION_WRITE_CONNECT_FAILURES;
736
LOG_J(LS_INFO, this) << "Unwritable after " << max_pings
737
<< " ping failures and "
738
<< now - pings_since_last_response_[0]
739
<< " ms without a response,"
740
<< " ms since last received ping="
741
<< now - last_ping_received_
742
<< " ms since last received data="
743
<< now - last_data_received_
745
set_write_state(STATE_WRITE_CONNECT);
748
if ((write_state_ == STATE_WRITE_CONNECT) &&
749
TooLongWithoutResponse(pings_since_last_response_,
750
CONNECTION_WRITE_TIMEOUT,
752
LOG_J(LS_INFO, this) << "Timed out after "
753
<< now - pings_since_last_response_[0]
754
<< " ms without a response, rtt=" << rtt;
755
set_write_state(STATE_WRITE_TIMEOUT);
759
void Connection::Ping(uint32 now) {
761
last_ping_sent_ = now;
762
pings_since_last_response_.push_back(now);
763
ConnectionRequest *req = new ConnectionRequest(this);
764
LOG_J(LS_VERBOSE, this) << "Sending STUN ping " << req->id() << " at " << now;
768
void Connection::ReceivedPing() {
769
last_ping_received_ = talk_base::Time();
770
set_read_state(STATE_READABLE);
773
std::string Connection::ToString() const {
774
const char CONNECT_STATE_ABBREV[2] = {
775
'-', // not connected (false)
776
'C', // connected (true)
778
const char READ_STATE_ABBREV[2] = {
779
'R', // STATE_READABLE
780
'-', // STATE_READ_TIMEOUT
782
const char WRITE_STATE_ABBREV[3] = {
783
'W', // STATE_WRITABLE
784
'w', // STATE_WRITE_CONNECT
785
'-', // STATE_WRITE_TIMEOUT
787
const Candidate& local = local_candidate();
788
const Candidate& remote = remote_candidate();
789
std::stringstream ss;
790
ss << "Conn[" << local.name() << ":" << local.generation()
791
<< ":" << local.type() << ":" << local.protocol()
792
<< ":" << local.address().ToString()
793
<< "->" << remote.name() << ":" << remote.generation()
794
<< ":" << remote.type() << ":"
795
<< remote.protocol() << ":" << remote.address().ToString()
797
<< CONNECT_STATE_ABBREV[connected()]
798
<< READ_STATE_ABBREV[read_state()]
799
<< WRITE_STATE_ABBREV[write_state()]
801
if (rtt_ < DEFAULT_RTT) {
809
void Connection::OnConnectionRequestResponse(ConnectionRequest* request,
810
StunMessage* response) {
811
// We've already validated that this is a STUN binding response with
812
// the correct local and remote username for this connection.
813
// So if we're not already, become writable. We may be bringing a pruned
814
// connection back to life, but if we don't really want it, we can always
816
uint32 rtt = request->Elapsed();
817
set_write_state(STATE_WRITABLE);
820
for (size_t i = 0; i < pings_since_last_response_.size(); ++i) {
822
talk_base::sprintfn(buf, sizeof(buf), "%u",
823
pings_since_last_response_[i]);
824
pings.append(buf).append(" ");
827
LOG_J(LS_VERBOSE, this) << "Received STUN ping response " << request->id()
828
<< ", pings_since_last_response_=" << pings
831
pings_since_last_response_.clear();
832
rtt_ = (RTT_RATIO * rtt_ + rtt) / (RTT_RATIO + 1);
835
void Connection::OnConnectionRequestErrorResponse(ConnectionRequest* request,
836
StunMessage* response) {
837
const StunErrorCodeAttribute* error = response->GetErrorCode();
838
uint32 error_code = error ?
839
error->error_code() : static_cast<uint32>(STUN_ERROR_GLOBAL_FAILURE);
841
if ((error_code == STUN_ERROR_UNKNOWN_ATTRIBUTE)
842
|| (error_code == STUN_ERROR_SERVER_ERROR)
843
|| (error_code == STUN_ERROR_UNAUTHORIZED)) {
844
// Recoverable error, retry
845
} else if (error_code == STUN_ERROR_STALE_CREDENTIALS) {
846
// Race failure, retry
848
// This is not a valid connection.
849
LOG_J(LS_ERROR, this) << "Received STUN error response, code="
850
<< error_code << "; killing connection";
851
set_write_state(STATE_WRITE_TIMEOUT);
855
void Connection::OnConnectionRequestTimeout(ConnectionRequest* request) {
856
// Log at LS_INFO if we miss a ping on a writable connection.
857
talk_base::LoggingSeverity sev = (write_state_ == STATE_WRITABLE) ?
858
talk_base::LS_INFO : talk_base::LS_VERBOSE;
859
LOG_JV(sev, this) << "Timing-out STUN ping " << request->id()
860
<< " after " << request->Elapsed() << " ms";
863
void Connection::CheckTimeout() {
864
// If both read and write have timed out, then this connection can contribute
865
// no more to p2p socket unless at some later date readability were to come
866
// back. However, we gave readability a long time to timeout, so at this
867
// point, it seems fair to get rid of this connection.
868
if ((read_state_ == STATE_READ_TIMEOUT) &&
869
(write_state_ == STATE_WRITE_TIMEOUT)) {
870
port_->thread()->Post(this, MSG_DELETE);
874
void Connection::OnMessage(talk_base::Message *pmsg) {
875
ASSERT(pmsg->message_id == MSG_DELETE);
877
LOG_J(LS_INFO, this) << "Connection deleted";
878
SignalDestroyed(this);
882
size_t Connection::recv_bytes_second() {
883
return recv_rate_tracker_.units_second();
886
size_t Connection::recv_total_bytes() {
887
return recv_rate_tracker_.total_units();
890
size_t Connection::sent_bytes_second() {
891
return send_rate_tracker_.units_second();
894
size_t Connection::sent_total_bytes() {
895
return send_rate_tracker_.total_units();
898
ProxyConnection::ProxyConnection(Port* port, size_t index,
899
const Candidate& candidate)
900
: Connection(port, index, candidate), error_(0) {
903
int ProxyConnection::Send(const void* data, size_t size) {
904
if (write_state() != STATE_WRITABLE) {
905
error_ = EWOULDBLOCK;
908
int sent = port_->SendTo(data, size, remote_candidate_.address(), true);
911
error_ = port_->GetError();
913
send_rate_tracker_.Update(sent);
918
} // namespace cricket