3
* Copyright 2004--2005, Google Inc.
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* 1. Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
#include "talk/p2p/base/p2ptransportchannel.h"
31
#include "talk/base/common.h"
32
#include "talk/base/logging.h"
33
#include "talk/p2p/base/common.h"
37
// messages for queuing up work for ourselves
38
const uint32 MSG_SORT = 1;
39
const uint32 MSG_PING = 2;
40
const uint32 MSG_ALLOCATE = 3;
42
// When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers)
43
// for pinging. When the socket is writable, we will use only 1 Kbps because
44
// we don't want to degrade the quality on a modem. These numbers should work
45
// well on a 28.8K modem, which is the slowest connection on which the voice
46
// quality is reasonable at all.
47
static const uint32 PING_PACKET_SIZE = 60 * 8;
48
static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480ms
49
static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000; // 50ms
51
// If there is a current writable connection, then we will also try hard to
52
// make sure it is pinged at this rate.
53
static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit
55
// The minimum improvement in RTT that justifies a switch.
56
static const double kMinImprovement = 10;
58
// Amount of time that we wait when *losing* writability before we try doing
59
// another allocation.
60
static const int kAllocateDelay = 1 * 1000; // 1 second
62
// We will try creating a new allocator from scratch after a delay of this
63
// length without becoming writable (or timing out).
64
static const int kAllocatePeriod = 20 * 1000; // 20 seconds
66
cricket::Port::CandidateOrigin GetOrigin(cricket::Port* port,
67
cricket::Port* origin_port) {
69
return cricket::Port::ORIGIN_MESSAGE;
70
else if (port == origin_port)
71
return cricket::Port::ORIGIN_THIS_PORT;
73
return cricket::Port::ORIGIN_OTHER_PORT;
76
// Compares two connections based only on static information about them.
77
int CompareConnectionCandidates(cricket::Connection* a,
78
cricket::Connection* b) {
79
// Combine local and remote preferences
80
ASSERT(a->local_candidate().preference() == a->port()->preference());
81
ASSERT(b->local_candidate().preference() == b->port()->preference());
82
double a_pref = a->local_candidate().preference()
83
* a->remote_candidate().preference();
84
double b_pref = b->local_candidate().preference()
85
* b->remote_candidate().preference();
87
// Now check combined preferences. Lower values get sorted last.
93
// If we're still tied at this point, prefer a younger generation.
94
return (a->remote_candidate().generation() + a->port()->generation()) -
95
(b->remote_candidate().generation() + b->port()->generation());
98
// Compare two connections based on their writability and static preferences.
99
int CompareConnections(cricket::Connection *a, cricket::Connection *b) {
100
// Sort based on write-state. Better states have lower values.
101
if (a->write_state() < b->write_state())
103
if (a->write_state() > b->write_state())
106
// Compare the candidate information.
107
return CompareConnectionCandidates(a, b);
110
// Wraps the comparison connection into a less than operator that puts higher
111
// priority writable connections first.
112
class ConnectionCompare {
114
bool operator()(const cricket::Connection *ca,
115
const cricket::Connection *cb) {
116
cricket::Connection* a = const_cast<cricket::Connection*>(ca);
117
cricket::Connection* b = const_cast<cricket::Connection*>(cb);
119
// Compare first on writability and static preferences.
120
int cmp = CompareConnections(a, b);
126
// Otherwise, sort based on latency estimate.
127
return a->rtt() < b->rtt();
129
// Should we bother checking for the last connection that last received
130
// data? It would help rendezvous on the connection that is also receiving
133
// TODO: Yes we should definitely do this. The TCP protocol gains
134
// efficiency by being used bidirectionally, as opposed to two separate
135
// unidirectional streams. This test should probably occur before
136
// comparison of local prefs (assuming combined prefs are the same). We
137
// need to be careful though, not to bounce back and forth with both sides
138
// trying to rendevous with the other.
142
// Determines whether we should switch between two connections, based first on
143
// static preferences and then (if those are equal) on latency estimates.
144
bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) {
145
if (a_conn == b_conn)
148
if (!a_conn || !b_conn) // don't think the latter should happen
151
int prefs_cmp = CompareConnections(a_conn, b_conn);
157
return b_conn->rtt() <= a_conn->rtt() + kMinImprovement;
160
} // unnamed namespace
164
P2PTransportChannel::P2PTransportChannel(const std::string &name,
165
const std::string &content_type,
166
P2PTransport* transport,
167
PortAllocator *allocator) :
168
TransportChannelImpl(name, content_type),
169
transport_(transport),
170
allocator_(allocator),
171
worker_thread_(talk_base::Thread::Current()),
172
incoming_only_(false),
173
waiting_for_signaling_(false),
175
best_connection_(NULL),
176
pinging_started_(false),
178
was_writable_(false),
179
was_timed_out_(true) {
182
P2PTransportChannel::~P2PTransportChannel() {
183
ASSERT(worker_thread_ == talk_base::Thread::Current());
185
for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
186
delete allocator_sessions_[i];
189
// Add the allocator session to our list so that we know which sessions
191
void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) {
192
session->set_generation(static_cast<uint32>(allocator_sessions_.size()));
193
allocator_sessions_.push_back(session);
195
// We now only want to apply new candidates that we receive to the ports
196
// created by this new session because these are replacing those of the
197
// previous sessions.
200
session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady);
201
session->SignalCandidatesReady.connect(
202
this, &P2PTransportChannel::OnCandidatesReady);
203
session->SignalCandidatesAllocationDone.connect(
204
this, &P2PTransportChannel::OnCandidatesAllocationDone);
205
session->GetInitialPorts();
206
if (pinging_started_)
207
session->StartGetAllPorts();
210
// Go into the state of processing candidates, and running in general
211
void P2PTransportChannel::Connect() {
212
ASSERT(worker_thread_ == talk_base::Thread::Current());
214
// Kick off an allocator session
217
// Start pinging as the ports come in.
218
thread()->Post(this, MSG_PING);
221
// Reset the socket, clear up any previous allocations and start over
222
void P2PTransportChannel::Reset() {
223
ASSERT(worker_thread_ == talk_base::Thread::Current());
225
// Get rid of all the old allocators. This should clean up everything.
226
for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
227
delete allocator_sessions_[i];
229
allocator_sessions_.clear();
231
connections_.clear();
232
best_connection_ = NULL;
234
// Forget about all of the candidates we got before.
235
remote_candidates_.clear();
237
// Revert to the initial state.
241
// Reinitialize the rest of our state.
242
waiting_for_signaling_ = false;
243
pinging_started_ = false;
245
was_writable_ = false;
246
was_timed_out_ = true;
248
// If we allocated before, start a new one now.
249
if (transport_->connect_requested())
252
// Start pinging as the ports come in.
253
thread()->Clear(this);
254
thread()->Post(this, MSG_PING);
257
// A new port is available, attempt to make connections for it
258
void P2PTransportChannel::OnPortReady(PortAllocatorSession *session,
260
ASSERT(worker_thread_ == talk_base::Thread::Current());
262
// Set in-effect options on the new port
263
for (OptionMap::const_iterator it = options_.begin();
264
it != options_.end();
266
int val = port->SetOption(it->first, it->second);
268
LOG_J(LS_WARNING, port) << "SetOption(" << it->first
269
<< ", " << it->second
270
<< ") failed: " << port->GetError();
274
// Remember the ports and candidates, and signal that candidates are ready.
275
// The session will handle this, and send an initiate/accept/modify message
276
// if one is pending.
278
ports_.push_back(port);
279
port->SignalUnknownAddress.connect(
280
this, &P2PTransportChannel::OnUnknownAddress);
281
port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed);
283
// Attempt to create a connection from this new port to all of the remote
284
// candidates that we were given so far.
286
std::vector<RemoteCandidate>::iterator iter;
287
for (iter = remote_candidates_.begin(); iter != remote_candidates_.end();
289
CreateConnection(port, *iter, iter->origin_port(), false);
295
// A new candidate is available, let listeners know
296
void P2PTransportChannel::OnCandidatesReady(
297
PortAllocatorSession *session, const std::vector<Candidate>& candidates) {
298
for (size_t i = 0; i < candidates.size(); ++i) {
299
SignalCandidateReady(this, candidates[i]);
303
void P2PTransportChannel::OnCandidatesAllocationDone(
304
PortAllocatorSession* session) {
305
SignalCandidatesAllocationDone(this);
308
// Handle stun packets
309
void P2PTransportChannel::OnUnknownAddress(
310
Port *port, const talk_base::SocketAddress &address, StunMessage *stun_msg,
311
const std::string &remote_username, bool port_muxed) {
312
ASSERT(worker_thread_ == talk_base::Thread::Current());
314
// Port has received a valid stun packet from an address that no Connection
315
// is currently available for. See if the remote user name is in the remote
316
// candidate list. If it isn't return error to the stun request.
318
const Candidate *candidate = NULL;
319
std::vector<RemoteCandidate>::iterator it;
320
for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) {
321
if ((*it).username() == remote_username) {
327
if (candidate == NULL) {
329
// When Ports are muxed, SignalUnknownAddress is delivered to all
330
// P2PTransportChannel belong to a session. Return from here will
331
// save us from sending stun binding error message from incorrect channel.
334
// Don't know about this username, the request is bogus
335
// This sometimes happens if a binding response comes in before the ACCEPT
336
// message. It is totally valid; the retry state machine will try again.
337
port->SendBindingErrorResponse(stun_msg, address,
338
STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS);
343
// Check for connectivity to this address. Create connections
344
// to this address across all local ports. First, add this as a new remote
347
Candidate new_remote_candidate = *candidate;
348
new_remote_candidate.set_address(address);
349
// new_remote_candidate.set_protocol(port->protocol());
351
// This remote username exists. Now create connections using this candidate,
354
if (CreateConnections(new_remote_candidate, port, true)) {
355
// Send the pinger a successful stun response.
356
port->SendBindingResponse(stun_msg, address);
358
// Update the list of connections since we just added another. We do this
359
// after sending the response since it could (in principle) delete the
360
// connection in question.
363
// Hopefully this won't occur, because changing a destination address
364
// shouldn't cause a new connection to fail
366
port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR,
367
STUN_ERROR_REASON_SERVER_ERROR);
373
void P2PTransportChannel::OnCandidate(const Candidate& candidate) {
374
ASSERT(worker_thread_ == talk_base::Thread::Current());
376
// Create connections to this remote candidate.
377
CreateConnections(candidate, NULL, false);
379
// Resort the connections list, which may have new elements.
383
// Creates connections from all of the ports that we care about to the given
384
// remote candidate. The return value is true if we created a connection from
386
bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate,
389
ASSERT(worker_thread_ == talk_base::Thread::Current());
391
// Add a new connection for this candidate to every port that allows such a
392
// connection (i.e., if they have compatible protocols) and that does not
393
// already have a connection to an equivalent candidate. We must be careful
394
// to make sure that the origin port is included, even if it was pruned,
395
// since that may be the only port that can create this connection.
397
bool created = false;
399
std::vector<Port *>::reverse_iterator it;
400
for (it = ports_.rbegin(); it != ports_.rend(); ++it) {
401
if (CreateConnection(*it, remote_candidate, origin_port, readable)) {
402
if (*it == origin_port)
407
if ((origin_port != NULL) &&
408
std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) {
409
if (CreateConnection(origin_port, remote_candidate, origin_port, readable))
413
// Remember this remote candidate so that we can add it to future ports.
414
RememberRemoteCandidate(remote_candidate, origin_port);
419
// Setup a connection object for the local and remote candidate combination.
420
// And then listen to connection object for changes.
421
bool P2PTransportChannel::CreateConnection(Port* port,
422
const Candidate& remote_candidate,
425
// Look for an existing connection with this remote address. If one is not
426
// found, then we can create a new connection for this address.
427
Connection* connection = port->GetConnection(remote_candidate.address());
428
if (connection != NULL) {
429
// It is not legal to try to change any of the parameters of an existing
430
// connection; however, the other side can send a duplicate candidate.
431
if (!remote_candidate.IsEquivalent(connection->remote_candidate())) {
432
LOG(INFO) << "Attempt to change a remote candidate";
436
Port::CandidateOrigin origin = GetOrigin(port, origin_port);
438
// Don't create connection if this is a candidate we received in a
439
// message and we are not allowed to make outgoing connections.
440
if (origin == cricket::Port::ORIGIN_MESSAGE && incoming_only_)
443
connection = port->CreateConnection(remote_candidate, origin);
447
connections_.push_back(connection);
448
connection->SignalReadPacket.connect(
449
this, &P2PTransportChannel::OnReadPacket);
450
connection->SignalStateChange.connect(
451
this, &P2PTransportChannel::OnConnectionStateChange);
452
connection->SignalDestroyed.connect(
453
this, &P2PTransportChannel::OnConnectionDestroyed);
455
LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", ("
456
<< connections_.size() << " total)";
459
// If we are readable, it is because we are creating this in response to a
460
// ping from the other side. This will cause the state to become readable.
462
connection->ReceivedPing();
467
bool P2PTransportChannel::FindConnection(
468
cricket::Connection* connection) const {
469
std::vector<Connection*>::const_iterator citer =
470
std::find(connections_.begin(), connections_.end(), connection);
471
return citer != connections_.end();
474
// Maintain our remote candidate list, adding this new remote one.
475
void P2PTransportChannel::RememberRemoteCandidate(
476
const Candidate& remote_candidate, Port* origin_port) {
477
// Remove any candidates whose generation is older than this one. The
478
// presence of a new generation indicates that the old ones are not useful.
480
while (i < remote_candidates_.size()) {
481
if (remote_candidates_[i].generation() < remote_candidate.generation()) {
482
LOG(INFO) << "Pruning candidate from old generation: "
483
<< remote_candidates_[i].address().ToString();
484
remote_candidates_.erase(remote_candidates_.begin() + i);
490
// Make sure this candidate is not a duplicate.
491
for (uint32 i = 0; i < remote_candidates_.size(); ++i) {
492
if (remote_candidates_[i].IsEquivalent(remote_candidate)) {
493
LOG(INFO) << "Duplicate candidate: "
494
<< remote_candidate.address().ToString();
499
// Try this candidate for all future ports.
500
remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port));
502
// We have some candidates from the other side, we are now serious about
503
// this connection. Let's do the StartGetAllPorts thing.
504
if (!pinging_started_) {
505
pinging_started_ = true;
506
for (size_t i = 0; i < allocator_sessions_.size(); ++i) {
507
if (!allocator_sessions_[i]->IsGettingAllPorts())
508
allocator_sessions_[i]->StartGetAllPorts();
513
// Send data to the other side, using our best connection
514
int P2PTransportChannel::SendPacket(const char *data, size_t len) {
515
// This can get called on any thread that is convenient to write from!
516
if (best_connection_ == NULL) {
517
error_ = EWOULDBLOCK;
520
int sent = best_connection_->Send(data, len);
523
error_ = best_connection_->GetError();
528
// Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending)
529
void P2PTransportChannel::Allocate() {
530
CancelPendingAllocate();
531
// Time for a new allocator, lets make sure we have a signalling channel
532
// to communicate candidates through first.
533
waiting_for_signaling_ = true;
534
SignalRequestSignaling(this);
537
// Cancels the pending allocate, if any.
538
void P2PTransportChannel::CancelPendingAllocate() {
539
thread()->Clear(this, MSG_ALLOCATE);
542
// Monitor connection states
543
void P2PTransportChannel::UpdateConnectionStates() {
544
uint32 now = talk_base::Time();
546
// We need to copy the list of connections since some may delete themselves
547
// when we call UpdateState.
548
for (uint32 i = 0; i < connections_.size(); ++i)
549
connections_[i]->UpdateState(now);
552
// Prepare for best candidate sorting
553
void P2PTransportChannel::RequestSort() {
555
worker_thread_->Post(this, MSG_SORT);
560
// Sort the available connections to find the best one. We also monitor
561
// the number of available connections and the current state so that we
562
// can possibly kick off more allocators (for more connections).
563
void P2PTransportChannel::SortConnections() {
564
ASSERT(worker_thread_ == talk_base::Thread::Current());
566
// Make sure the connection states are up-to-date since this affects how they
568
UpdateConnectionStates();
570
// Any changes after this point will require a re-sort.
573
// Get a list of the networks that we are using.
574
std::set<talk_base::Network*> networks;
575
for (uint32 i = 0; i < connections_.size(); ++i)
576
networks.insert(connections_[i]->port()->network());
578
// Find the best alternative connection by sorting. It is important to note
579
// that amongst equal preference, writable connections, this will choose the
580
// one whose estimated latency is lowest. So it is the only one that we
581
// need to consider switching to.
583
ConnectionCompare cmp;
584
std::stable_sort(connections_.begin(), connections_.end(), cmp);
585
LOG(LS_VERBOSE) << "Sorting available connections:";
586
for (uint32 i = 0; i < connections_.size(); ++i) {
587
LOG(LS_VERBOSE) << connections_[i]->ToString();
590
Connection* top_connection = NULL;
591
if (connections_.size() > 0)
592
top_connection = connections_[0];
594
// If necessary, switch to the new choice.
595
if (ShouldSwitch(best_connection_, top_connection))
596
SwitchBestConnectionTo(top_connection);
598
// We can prune any connection for which there is a writable connection on
599
// the same network with better or equal prefences. We leave those with
600
// better preference just in case they become writable later (at which point,
601
// we would prune out the current best connection). We leave connections on
602
// other networks because they may not be using the same resources and they
603
// may represent very distinct paths over which we can switch.
604
std::set<talk_base::Network*>::iterator network;
605
for (network = networks.begin(); network != networks.end(); ++network) {
606
Connection* primier = GetBestConnectionOnNetwork(*network);
607
if (!primier || (primier->write_state() != Connection::STATE_WRITABLE))
610
for (uint32 i = 0; i < connections_.size(); ++i) {
611
if ((connections_[i] != primier) &&
612
(connections_[i]->port()->network() == *network) &&
613
(CompareConnectionCandidates(primier, connections_[i]) >= 0)) {
614
connections_[i]->Prune();
619
// Count the number of connections in the various states.
622
int write_connect = 0;
623
int write_timeout = 0;
625
for (uint32 i = 0; i < connections_.size(); ++i) {
626
switch (connections_[i]->write_state()) {
627
case Connection::STATE_WRITABLE:
630
case Connection::STATE_WRITE_CONNECT:
633
case Connection::STATE_WRITE_TIMEOUT:
643
} else if (write_connect > 0) {
649
// Update the state of this channel. This method is called whenever the
650
// state of any connection changes, so this is a good place to do this.
651
UpdateChannelState();
653
// Notify of connection state change
654
SignalConnectionMonitor(this);
657
// Track the best connection, and let listeners know
658
void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) {
659
// Note: if conn is NULL, the previous best_connection_ has been destroyed,
662
Connection* old_best_connection = best_connection_;
663
best_connection_ = conn;
664
if (best_connection_) {
665
if (old_best_connection) {
666
LOG_J(LS_INFO, this) << "Previous best connection: "
667
<< old_best_connection->ToString();
669
LOG_J(LS_INFO, this) << "New best connection: "
670
<< best_connection_->ToString();
671
SignalRouteChange(this, best_connection_->remote_candidate());
673
LOG_J(LS_INFO, this) << "No best connection";
677
void P2PTransportChannel::UpdateChannelState() {
678
// The Handle* functions already set the writable state. We'll just double-
680
bool writable = ((best_connection_ != NULL) &&
681
(best_connection_->write_state() ==
682
Connection::STATE_WRITABLE));
683
ASSERT(writable == this->writable());
684
if (writable != this->writable())
685
LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch";
687
bool readable = false;
688
for (uint32 i = 0; i < connections_.size(); ++i) {
689
if (connections_[i]->read_state() == Connection::STATE_READABLE)
692
set_readable(readable);
695
// We checked the status of our connections and we had at least one that
696
// was writable, go into the writable state.
697
void P2PTransportChannel::HandleWritable() {
699
// One or more connections writable!
702
for (uint32 i = 0; i < allocator_sessions_.size(); ++i) {
703
if (allocator_sessions_[i]->IsGettingAllPorts()) {
704
allocator_sessions_[i]->StopGetAllPorts();
708
// Stop further allocations.
709
CancelPendingAllocate();
712
// We're writable, obviously we aren't timed out
713
was_writable_ = true;
714
was_timed_out_ = false;
718
// We checked the status of our connections and we didn't have any that
719
// were writable, go into the connecting state (kick off a new allocator
721
void P2PTransportChannel::HandleNotWritable() {
723
// No connections are writable but not timed out!
726
// If we were writable, let's kick off an allocator session immediately
727
was_writable_ = false;
731
// We were connecting, obviously not ALL timed out.
732
was_timed_out_ = false;
736
// We checked the status of our connections and not only weren't they writable
737
// but they were also timed out, we really need a new allocator.
738
void P2PTransportChannel::HandleAllTimedOut() {
740
// No connections... all are timed out!
742
if (!was_timed_out_) {
743
// We weren't timed out before, so kick off an allocator now (we'll still
744
// be in the fully timed out state until the allocator actually gives back
749
// NOTE: we start was_timed_out_ in the true state so that we don't get
750
// another allocator created WHILE we are in the process of building up
751
// our first allocator.
752
was_timed_out_ = true;
753
was_writable_ = false;
757
// If we have a best connection, return it, otherwise return top one in the
758
// list (later we will mark it best).
759
Connection* P2PTransportChannel::GetBestConnectionOnNetwork(
760
talk_base::Network* network) {
761
// If the best connection is on this network, then it wins.
762
if (best_connection_ && (best_connection_->port()->network() == network))
763
return best_connection_;
765
// Otherwise, we return the top-most in sorted order.
766
for (uint32 i = 0; i < connections_.size(); ++i) {
767
if (connections_[i]->port()->network() == network)
768
return connections_[i];
774
// Handle any queued up requests
775
void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) {
776
if (pmsg->message_id == MSG_SORT)
778
else if (pmsg->message_id == MSG_PING)
780
else if (pmsg->message_id == MSG_ALLOCATE)
786
// Handle queued up sort request
787
void P2PTransportChannel::OnSort() {
788
// Resort the connections based on the new statistics.
792
// Handle queued up ping request
793
void P2PTransportChannel::OnPing() {
794
// Make sure the states of the connections are up-to-date (since this affects
795
// which ones are pingable).
796
UpdateConnectionStates();
798
// Find the oldest pingable connection and have it do a ping.
799
Connection* conn = FindNextPingableConnection();
801
conn->Ping(talk_base::Time());
803
// Post ourselves a message to perform the next ping.
804
uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY;
805
thread()->PostDelayed(delay, this, MSG_PING);
808
// Is the connection in a state for us to even consider pinging the other side?
809
bool P2PTransportChannel::IsPingable(Connection* conn) {
810
// An unconnected connection cannot be written to at all, so pinging is out
812
if (!conn->connected())
816
// If we are writable, then we only want to ping connections that could be
817
// better than this one, i.e., the ones that were not pruned.
818
return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT);
820
// If we are not writable, then we need to try everything that might work.
821
// This includes both connections that do not have write timeout as well as
822
// ones that do not have read timeout. A connection could be readable but
823
// be in write-timeout if we pruned it before. Since the other side is
824
// still pinging it, it very well might still work.
825
return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) ||
826
(conn->read_state() != Connection::STATE_READ_TIMEOUT);
830
// Returns the next pingable connection to ping. This will be the oldest
831
// pingable connection unless we have a writable connection that is past the
832
// maximum acceptable ping delay.
833
Connection* P2PTransportChannel::FindNextPingableConnection() {
834
uint32 now = talk_base::Time();
835
if (best_connection_ &&
836
(best_connection_->write_state() == Connection::STATE_WRITABLE) &&
837
(best_connection_->last_ping_sent()
838
+ MAX_CURRENT_WRITABLE_DELAY <= now)) {
839
return best_connection_;
842
Connection* oldest_conn = NULL;
843
uint32 oldest_time = 0xFFFFFFFF;
844
for (uint32 i = 0; i < connections_.size(); ++i) {
845
if (IsPingable(connections_[i])) {
846
if (connections_[i]->last_ping_sent() < oldest_time) {
847
oldest_time = connections_[i]->last_ping_sent();
848
oldest_conn = connections_[i];
855
// return the number of "pingable" connections
856
uint32 P2PTransportChannel::NumPingableConnections() {
858
for (uint32 i = 0; i < connections_.size(); ++i) {
859
if (IsPingable(connections_[i]))
865
// When a connection's state changes, we need to figure out who to use as
866
// the best connection again. It could have become usable, or become unusable.
867
void P2PTransportChannel::OnConnectionStateChange(Connection *connection) {
868
ASSERT(worker_thread_ == talk_base::Thread::Current());
870
// We have to unroll the stack before doing this because we may be changing
871
// the state of connections while sorting.
875
// When a connection is removed, edit it out, and then update our best
877
void P2PTransportChannel::OnConnectionDestroyed(Connection *connection) {
878
ASSERT(worker_thread_ == talk_base::Thread::Current());
880
// Note: the previous best_connection_ may be destroyed by now, so don't
883
// Remove this connection from the list.
884
std::vector<Connection*>::iterator iter =
885
std::find(connections_.begin(), connections_.end(), connection);
886
ASSERT(iter != connections_.end());
887
connections_.erase(iter);
889
LOG_J(LS_INFO, this) << "Removed connection ("
890
<< static_cast<int>(connections_.size()) << " remaining)";
892
// If this is currently the best connection, then we need to pick a new one.
893
// The call to SortConnections will pick a new one. It looks at the current
894
// best connection in order to avoid switching between fairly similar ones.
895
// Since this connection is no longer an option, we can just set best to NULL
896
// and re-choose a best assuming that there was no best connection.
897
if (best_connection_ == connection) {
898
SwitchBestConnectionTo(NULL);
903
// When a port is destroyed remove it from our list of ports to use for
904
// connection attempts.
905
void P2PTransportChannel::OnPortDestroyed(Port* port) {
906
ASSERT(worker_thread_ == talk_base::Thread::Current());
908
// Remove this port from the list (if we didn't drop it already).
909
std::vector<Port*>::iterator iter =
910
std::find(ports_.begin(), ports_.end(), port);
911
if (iter != ports_.end())
914
LOG(INFO) << "Removed port from p2p socket: "
915
<< static_cast<int>(ports_.size()) << " remaining";
918
// We data is available, let listeners know
919
void P2PTransportChannel::OnReadPacket(Connection *connection,
920
const char *data, size_t len) {
921
ASSERT(worker_thread_ == talk_base::Thread::Current());
923
// Do not deliver, if packet doesn't belong to the correct transport channel.
924
if (!FindConnection(connection))
927
// Let the client know of an incoming packet
928
SignalReadPacket(this, data, len);
931
// Set options on ourselves is simply setting options on all of our available
933
int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) {
934
OptionMap::iterator it = options_.find(opt);
935
if (it == options_.end()) {
936
options_.insert(std::make_pair(opt, value));
937
} else if (it->second == value) {
943
for (uint32 i = 0; i < ports_.size(); ++i) {
944
int val = ports_[i]->SetOption(opt, value);
946
// Because this also occurs deferred, probably no point in reporting an
948
LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: "
949
<< ports_[i]->GetError();
955
// When the signalling channel is ready, we can really kick off the allocator
956
void P2PTransportChannel::OnSignalingReady() {
957
if (waiting_for_signaling_) {
958
waiting_for_signaling_ = false;
959
AddAllocatorSession(allocator_->CreateSession(
960
session_id(), name(), content_type()));
961
thread()->PostDelayed(kAllocatePeriod, this, MSG_ALLOCATE);
965
} // namespace cricket