~ubuntu-branches/debian/experimental/kopete/experimental

« back to all changes in this revision

Viewing changes to protocols/jabber/libjingle/talk/p2p/base/p2ptransportchannel.cc

  • Committer: Package Import Robot
  • Author(s): Maximiliano Curia
  • Date: 2015-02-24 11:32:57 UTC
  • mfrom: (1.1.41 vivid)
  • Revision ID: package-import@ubuntu.com-20150224113257-gnupg4v7lzz18ij0
Tags: 4:14.12.2-1
* New upstream release (14.12.2).
* Bump Standards-Version to 3.9.6, no changes needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * libjingle
 
3
 * Copyright 2004--2005, Google Inc.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions are met:
 
7
 *
 
8
 *  1. Redistributions of source code must retain the above copyright notice,
 
9
 *     this list of conditions and the following disclaimer.
 
10
 *  2. Redistributions in binary form must reproduce the above copyright notice,
 
11
 *     this list of conditions and the following disclaimer in the documentation
 
12
 *     and/or other materials provided with the distribution.
 
13
 *  3. The name of the author may not be used to endorse or promote products
 
14
 *     derived from this software without specific prior written permission.
 
15
 *
 
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
 
17
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 
18
 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
 
19
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
20
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
21
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 
22
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 
23
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 
24
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 
25
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 */
 
27
 
 
28
#include "talk/p2p/base/p2ptransportchannel.h"
 
29
 
 
30
#include <set>
 
31
#include "talk/base/common.h"
 
32
#include "talk/base/logging.h"
 
33
#include "talk/p2p/base/common.h"
 
34
 
 
35
namespace {
 
36
 
 
37
// messages for queuing up work for ourselves
 
38
const uint32 MSG_SORT = 1;
 
39
const uint32 MSG_PING = 2;
 
40
const uint32 MSG_ALLOCATE = 3;
 
41
 
 
42
// When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers)
 
43
// for pinging.  When the socket is writable, we will use only 1 Kbps because
 
44
// we don't want to degrade the quality on a modem.  These numbers should work
 
45
// well on a 28.8K modem, which is the slowest connection on which the voice
 
46
// quality is reasonable at all.
 
47
static const uint32 PING_PACKET_SIZE = 60 * 8;
 
48
static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000;  // 480ms
 
49
static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000;  // 50ms
 
50
 
 
51
// If there is a current writable connection, then we will also try hard to
 
52
// make sure it is pinged at this rate.
 
53
static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900;  // 2*WRITABLE_DELAY - bit
 
54
 
 
55
// The minimum improvement in RTT that justifies a switch.
 
56
static const double kMinImprovement = 10;
 
57
 
 
58
// Amount of time that we wait when *losing* writability before we try doing
 
59
// another allocation.
 
60
static const int kAllocateDelay = 1 * 1000;  // 1 second
 
61
 
 
62
// We will try creating a new allocator from scratch after a delay of this
 
63
// length without becoming writable (or timing out).
 
64
static const int kAllocatePeriod = 20 * 1000;  // 20 seconds
 
65
 
 
66
cricket::Port::CandidateOrigin GetOrigin(cricket::Port* port,
 
67
                                         cricket::Port* origin_port) {
 
68
  if (!origin_port)
 
69
    return cricket::Port::ORIGIN_MESSAGE;
 
70
  else if (port == origin_port)
 
71
    return cricket::Port::ORIGIN_THIS_PORT;
 
72
  else
 
73
    return cricket::Port::ORIGIN_OTHER_PORT;
 
74
}
 
75
 
 
76
// Compares two connections based only on static information about them.
 
77
int CompareConnectionCandidates(cricket::Connection* a,
 
78
                                cricket::Connection* b) {
 
79
  // Combine local and remote preferences
 
80
  ASSERT(a->local_candidate().preference() == a->port()->preference());
 
81
  ASSERT(b->local_candidate().preference() == b->port()->preference());
 
82
  double a_pref = a->local_candidate().preference()
 
83
                * a->remote_candidate().preference();
 
84
  double b_pref = b->local_candidate().preference()
 
85
                * b->remote_candidate().preference();
 
86
 
 
87
  // Now check combined preferences. Lower values get sorted last.
 
88
  if (a_pref > b_pref)
 
89
    return 1;
 
90
  if (a_pref < b_pref)
 
91
    return -1;
 
92
 
 
93
  // If we're still tied at this point, prefer a younger generation.
 
94
  return (a->remote_candidate().generation() + a->port()->generation()) -
 
95
         (b->remote_candidate().generation() + b->port()->generation());
 
96
}
 
97
 
 
98
// Compare two connections based on their writability and static preferences.
 
99
int CompareConnections(cricket::Connection *a, cricket::Connection *b) {
 
100
  // Sort based on write-state.  Better states have lower values.
 
101
  if (a->write_state() < b->write_state())
 
102
    return 1;
 
103
  if (a->write_state() > b->write_state())
 
104
    return -1;
 
105
 
 
106
  // Compare the candidate information.
 
107
  return CompareConnectionCandidates(a, b);
 
108
}
 
109
 
 
110
// Wraps the comparison connection into a less than operator that puts higher
 
111
// priority writable connections first.
 
112
class ConnectionCompare {
 
113
 public:
 
114
  bool operator()(const cricket::Connection *ca,
 
115
                  const cricket::Connection *cb) {
 
116
    cricket::Connection* a = const_cast<cricket::Connection*>(ca);
 
117
    cricket::Connection* b = const_cast<cricket::Connection*>(cb);
 
118
 
 
119
    // Compare first on writability and static preferences.
 
120
    int cmp = CompareConnections(a, b);
 
121
    if (cmp > 0)
 
122
      return true;
 
123
    if (cmp < 0)
 
124
      return false;
 
125
 
 
126
    // Otherwise, sort based on latency estimate.
 
127
    return a->rtt() < b->rtt();
 
128
 
 
129
    // Should we bother checking for the last connection that last received
 
130
    // data? It would help rendezvous on the connection that is also receiving
 
131
    // packets.
 
132
    //
 
133
    // TODO: Yes we should definitely do this.  The TCP protocol gains
 
134
    // efficiency by being used bidirectionally, as opposed to two separate
 
135
    // unidirectional streams.  This test should probably occur before
 
136
    // comparison of local prefs (assuming combined prefs are the same).  We
 
137
    // need to be careful though, not to bounce back and forth with both sides
 
138
    // trying to rendevous with the other.
 
139
  }
 
140
};
 
141
 
 
142
// Determines whether we should switch between two connections, based first on
 
143
// static preferences and then (if those are equal) on latency estimates.
 
144
bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) {
 
145
  if (a_conn == b_conn)
 
146
    return false;
 
147
 
 
148
  if (!a_conn || !b_conn)  // don't think the latter should happen
 
149
    return true;
 
150
 
 
151
  int prefs_cmp = CompareConnections(a_conn, b_conn);
 
152
  if (prefs_cmp < 0)
 
153
    return true;
 
154
  if (prefs_cmp > 0)
 
155
    return false;
 
156
 
 
157
  return b_conn->rtt() <= a_conn->rtt() + kMinImprovement;
 
158
}
 
159
 
 
160
}  // unnamed namespace
 
161
 
 
162
namespace cricket {
 
163
 
 
164
P2PTransportChannel::P2PTransportChannel(const std::string &name,
 
165
                                         const std::string &content_type,
 
166
                                         P2PTransport* transport,
 
167
                                         PortAllocator *allocator) :
 
168
    TransportChannelImpl(name, content_type),
 
169
    transport_(transport),
 
170
    allocator_(allocator),
 
171
    worker_thread_(talk_base::Thread::Current()),
 
172
    incoming_only_(false),
 
173
    waiting_for_signaling_(false),
 
174
    error_(0),
 
175
    best_connection_(NULL),
 
176
    pinging_started_(false),
 
177
    sort_dirty_(false),
 
178
    was_writable_(false),
 
179
    was_timed_out_(true) {
 
180
}
 
181
 
 
182
P2PTransportChannel::~P2PTransportChannel() {
 
183
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
184
 
 
185
  for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
 
186
    delete allocator_sessions_[i];
 
187
}
 
188
 
 
189
// Add the allocator session to our list so that we know which sessions
 
190
// are still active.
 
191
void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) {
 
192
  session->set_generation(static_cast<uint32>(allocator_sessions_.size()));
 
193
  allocator_sessions_.push_back(session);
 
194
 
 
195
  // We now only want to apply new candidates that we receive to the ports
 
196
  // created by this new session because these are replacing those of the
 
197
  // previous sessions.
 
198
  ports_.clear();
 
199
 
 
200
  session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady);
 
201
  session->SignalCandidatesReady.connect(
 
202
      this, &P2PTransportChannel::OnCandidatesReady);
 
203
  session->SignalCandidatesAllocationDone.connect(
 
204
      this, &P2PTransportChannel::OnCandidatesAllocationDone);
 
205
  session->GetInitialPorts();
 
206
  if (pinging_started_)
 
207
    session->StartGetAllPorts();
 
208
}
 
209
 
 
210
// Go into the state of processing candidates, and running in general
 
211
void P2PTransportChannel::Connect() {
 
212
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
213
 
 
214
  // Kick off an allocator session
 
215
  Allocate();
 
216
 
 
217
  // Start pinging as the ports come in.
 
218
  thread()->Post(this, MSG_PING);
 
219
}
 
220
 
 
221
// Reset the socket, clear up any previous allocations and start over
 
222
void P2PTransportChannel::Reset() {
 
223
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
224
 
 
225
  // Get rid of all the old allocators.  This should clean up everything.
 
226
  for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
 
227
    delete allocator_sessions_[i];
 
228
 
 
229
  allocator_sessions_.clear();
 
230
  ports_.clear();
 
231
  connections_.clear();
 
232
  best_connection_ = NULL;
 
233
 
 
234
  // Forget about all of the candidates we got before.
 
235
  remote_candidates_.clear();
 
236
 
 
237
  // Revert to the initial state.
 
238
  set_readable(false);
 
239
  set_writable(false);
 
240
 
 
241
  // Reinitialize the rest of our state.
 
242
  waiting_for_signaling_ = false;
 
243
  pinging_started_ = false;
 
244
  sort_dirty_ = false;
 
245
  was_writable_ = false;
 
246
  was_timed_out_ = true;
 
247
 
 
248
  // If we allocated before, start a new one now.
 
249
  if (transport_->connect_requested())
 
250
    Allocate();
 
251
 
 
252
  // Start pinging as the ports come in.
 
253
  thread()->Clear(this);
 
254
  thread()->Post(this, MSG_PING);
 
255
}
 
256
 
 
257
// A new port is available, attempt to make connections for it
 
258
void P2PTransportChannel::OnPortReady(PortAllocatorSession *session,
 
259
                                      Port* port) {
 
260
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
261
 
 
262
  // Set in-effect options on the new port
 
263
  for (OptionMap::const_iterator it = options_.begin();
 
264
       it != options_.end();
 
265
       ++it) {
 
266
    int val = port->SetOption(it->first, it->second);
 
267
    if (val < 0) {
 
268
      LOG_J(LS_WARNING, port) << "SetOption(" << it->first
 
269
                              << ", " << it->second
 
270
                              << ") failed: " << port->GetError();
 
271
    }
 
272
  }
 
273
 
 
274
  // Remember the ports and candidates, and signal that candidates are ready.
 
275
  // The session will handle this, and send an initiate/accept/modify message
 
276
  // if one is pending.
 
277
 
 
278
  ports_.push_back(port);
 
279
  port->SignalUnknownAddress.connect(
 
280
      this, &P2PTransportChannel::OnUnknownAddress);
 
281
  port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed);
 
282
 
 
283
  // Attempt to create a connection from this new port to all of the remote
 
284
  // candidates that we were given so far.
 
285
 
 
286
  std::vector<RemoteCandidate>::iterator iter;
 
287
  for (iter = remote_candidates_.begin(); iter != remote_candidates_.end();
 
288
       ++iter) {
 
289
    CreateConnection(port, *iter, iter->origin_port(), false);
 
290
  }
 
291
 
 
292
  SortConnections();
 
293
}
 
294
 
 
295
// A new candidate is available, let listeners know
 
296
void P2PTransportChannel::OnCandidatesReady(
 
297
    PortAllocatorSession *session, const std::vector<Candidate>& candidates) {
 
298
  for (size_t i = 0; i < candidates.size(); ++i) {
 
299
    SignalCandidateReady(this, candidates[i]);
 
300
  }
 
301
}
 
302
 
 
303
void P2PTransportChannel::OnCandidatesAllocationDone(
 
304
    PortAllocatorSession* session) {
 
305
  SignalCandidatesAllocationDone(this);
 
306
}
 
307
 
 
308
// Handle stun packets
 
309
void P2PTransportChannel::OnUnknownAddress(
 
310
    Port *port, const talk_base::SocketAddress &address, StunMessage *stun_msg,
 
311
    const std::string &remote_username, bool port_muxed) {
 
312
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
313
 
 
314
  // Port has received a valid stun packet from an address that no Connection
 
315
  // is currently available for. See if the remote user name is in the remote
 
316
  // candidate list. If it isn't return error to the stun request.
 
317
 
 
318
  const Candidate *candidate = NULL;
 
319
  std::vector<RemoteCandidate>::iterator it;
 
320
  for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) {
 
321
    if ((*it).username() == remote_username) {
 
322
      candidate = &(*it);
 
323
      break;
 
324
    }
 
325
  }
 
326
 
 
327
  if (candidate == NULL) {
 
328
    if (port_muxed) {
 
329
      // When Ports are muxed, SignalUnknownAddress is delivered to all
 
330
      // P2PTransportChannel belong to a session. Return from here will
 
331
      // save us from sending stun binding error message from incorrect channel.
 
332
      return;
 
333
    }
 
334
    // Don't know about this username, the request is bogus
 
335
    // This sometimes happens if a binding response comes in before the ACCEPT
 
336
    // message.  It is totally valid; the retry state machine will try again.
 
337
    port->SendBindingErrorResponse(stun_msg, address,
 
338
        STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS);
 
339
    delete stun_msg;
 
340
    return;
 
341
  }
 
342
 
 
343
  // Check for connectivity to this address. Create connections
 
344
  // to this address across all local ports. First, add this as a new remote
 
345
  // address
 
346
 
 
347
  Candidate new_remote_candidate = *candidate;
 
348
  new_remote_candidate.set_address(address);
 
349
  // new_remote_candidate.set_protocol(port->protocol());
 
350
 
 
351
  // This remote username exists. Now create connections using this candidate,
 
352
  // and resort
 
353
 
 
354
  if (CreateConnections(new_remote_candidate, port, true)) {
 
355
    // Send the pinger a successful stun response.
 
356
    port->SendBindingResponse(stun_msg, address);
 
357
 
 
358
    // Update the list of connections since we just added another.  We do this
 
359
    // after sending the response since it could (in principle) delete the
 
360
    // connection in question.
 
361
    SortConnections();
 
362
  } else {
 
363
    // Hopefully this won't occur, because changing a destination address
 
364
    // shouldn't cause a new connection to fail
 
365
    ASSERT(false);
 
366
    port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR,
 
367
        STUN_ERROR_REASON_SERVER_ERROR);
 
368
  }
 
369
 
 
370
  delete stun_msg;
 
371
}
 
372
 
 
373
void P2PTransportChannel::OnCandidate(const Candidate& candidate) {
 
374
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
375
 
 
376
  // Create connections to this remote candidate.
 
377
  CreateConnections(candidate, NULL, false);
 
378
 
 
379
  // Resort the connections list, which may have new elements.
 
380
  SortConnections();
 
381
}
 
382
 
 
383
// Creates connections from all of the ports that we care about to the given
 
384
// remote candidate.  The return value is true if we created a connection from
 
385
// the origin port.
 
386
bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate,
 
387
                                            Port* origin_port,
 
388
                                            bool readable) {
 
389
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
390
 
 
391
  // Add a new connection for this candidate to every port that allows such a
 
392
  // connection (i.e., if they have compatible protocols) and that does not
 
393
  // already have a connection to an equivalent candidate.  We must be careful
 
394
  // to make sure that the origin port is included, even if it was pruned,
 
395
  // since that may be the only port that can create this connection.
 
396
 
 
397
  bool created = false;
 
398
 
 
399
  std::vector<Port *>::reverse_iterator it;
 
400
  for (it = ports_.rbegin(); it != ports_.rend(); ++it) {
 
401
    if (CreateConnection(*it, remote_candidate, origin_port, readable)) {
 
402
      if (*it == origin_port)
 
403
        created = true;
 
404
    }
 
405
  }
 
406
 
 
407
  if ((origin_port != NULL) &&
 
408
      std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) {
 
409
    if (CreateConnection(origin_port, remote_candidate, origin_port, readable))
 
410
      created = true;
 
411
  }
 
412
 
 
413
  // Remember this remote candidate so that we can add it to future ports.
 
414
  RememberRemoteCandidate(remote_candidate, origin_port);
 
415
 
 
416
  return created;
 
417
}
 
418
 
 
419
// Setup a connection object for the local and remote candidate combination.
 
420
// And then listen to connection object for changes.
 
421
bool P2PTransportChannel::CreateConnection(Port* port,
 
422
                                           const Candidate& remote_candidate,
 
423
                                           Port* origin_port,
 
424
                                           bool readable) {
 
425
  // Look for an existing connection with this remote address.  If one is not
 
426
  // found, then we can create a new connection for this address.
 
427
  Connection* connection = port->GetConnection(remote_candidate.address());
 
428
  if (connection != NULL) {
 
429
    // It is not legal to try to change any of the parameters of an existing
 
430
    // connection; however, the other side can send a duplicate candidate.
 
431
    if (!remote_candidate.IsEquivalent(connection->remote_candidate())) {
 
432
      LOG(INFO) << "Attempt to change a remote candidate";
 
433
      return false;
 
434
    }
 
435
  } else {
 
436
    Port::CandidateOrigin origin = GetOrigin(port, origin_port);
 
437
 
 
438
    // Don't create connection if this is a candidate we received in a
 
439
    // message and we are not allowed to make outgoing connections.
 
440
    if (origin == cricket::Port::ORIGIN_MESSAGE && incoming_only_)
 
441
      return false;
 
442
 
 
443
    connection = port->CreateConnection(remote_candidate, origin);
 
444
    if (!connection)
 
445
      return false;
 
446
 
 
447
    connections_.push_back(connection);
 
448
    connection->SignalReadPacket.connect(
 
449
        this, &P2PTransportChannel::OnReadPacket);
 
450
    connection->SignalStateChange.connect(
 
451
        this, &P2PTransportChannel::OnConnectionStateChange);
 
452
    connection->SignalDestroyed.connect(
 
453
        this, &P2PTransportChannel::OnConnectionDestroyed);
 
454
 
 
455
    LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", ("
 
456
                         << connections_.size() << " total)";
 
457
  }
 
458
 
 
459
  // If we are readable, it is because we are creating this in response to a
 
460
  // ping from the other side.  This will cause the state to become readable.
 
461
  if (readable)
 
462
    connection->ReceivedPing();
 
463
 
 
464
  return true;
 
465
}
 
466
 
 
467
bool P2PTransportChannel::FindConnection(
 
468
    cricket::Connection* connection) const {
 
469
  std::vector<Connection*>::const_iterator citer =
 
470
      std::find(connections_.begin(), connections_.end(), connection);
 
471
  return citer != connections_.end();
 
472
}
 
473
 
 
474
// Maintain our remote candidate list, adding this new remote one.
 
475
void P2PTransportChannel::RememberRemoteCandidate(
 
476
    const Candidate& remote_candidate, Port* origin_port) {
 
477
  // Remove any candidates whose generation is older than this one.  The
 
478
  // presence of a new generation indicates that the old ones are not useful.
 
479
  uint32 i = 0;
 
480
  while (i < remote_candidates_.size()) {
 
481
    if (remote_candidates_[i].generation() < remote_candidate.generation()) {
 
482
      LOG(INFO) << "Pruning candidate from old generation: "
 
483
                << remote_candidates_[i].address().ToString();
 
484
      remote_candidates_.erase(remote_candidates_.begin() + i);
 
485
    } else {
 
486
      i += 1;
 
487
    }
 
488
  }
 
489
 
 
490
  // Make sure this candidate is not a duplicate.
 
491
  for (uint32 i = 0; i < remote_candidates_.size(); ++i) {
 
492
    if (remote_candidates_[i].IsEquivalent(remote_candidate)) {
 
493
      LOG(INFO) << "Duplicate candidate: "
 
494
                << remote_candidate.address().ToString();
 
495
      return;
 
496
    }
 
497
  }
 
498
 
 
499
  // Try this candidate for all future ports.
 
500
  remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port));
 
501
 
 
502
  // We have some candidates from the other side, we are now serious about
 
503
  // this connection.  Let's do the StartGetAllPorts thing.
 
504
  if (!pinging_started_) {
 
505
    pinging_started_ = true;
 
506
    for (size_t i = 0; i < allocator_sessions_.size(); ++i) {
 
507
      if (!allocator_sessions_[i]->IsGettingAllPorts())
 
508
        allocator_sessions_[i]->StartGetAllPorts();
 
509
    }
 
510
  }
 
511
}
 
512
 
 
513
// Send data to the other side, using our best connection
 
514
int P2PTransportChannel::SendPacket(const char *data, size_t len) {
 
515
  // This can get called on any thread that is convenient to write from!
 
516
  if (best_connection_ == NULL) {
 
517
    error_ = EWOULDBLOCK;
 
518
    return SOCKET_ERROR;
 
519
  }
 
520
  int sent = best_connection_->Send(data, len);
 
521
  if (sent <= 0) {
 
522
    ASSERT(sent < 0);
 
523
    error_ = best_connection_->GetError();
 
524
  }
 
525
  return sent;
 
526
}
 
527
 
 
528
// Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending)
 
529
void P2PTransportChannel::Allocate() {
 
530
  CancelPendingAllocate();
 
531
  // Time for a new allocator, lets make sure we have a signalling channel
 
532
  // to communicate candidates through first.
 
533
  waiting_for_signaling_ = true;
 
534
  SignalRequestSignaling(this);
 
535
}
 
536
 
 
537
// Cancels the pending allocate, if any.
 
538
void P2PTransportChannel::CancelPendingAllocate() {
 
539
  thread()->Clear(this, MSG_ALLOCATE);
 
540
}
 
541
 
 
542
// Monitor connection states
 
543
void P2PTransportChannel::UpdateConnectionStates() {
 
544
  uint32 now = talk_base::Time();
 
545
 
 
546
  // We need to copy the list of connections since some may delete themselves
 
547
  // when we call UpdateState.
 
548
  for (uint32 i = 0; i < connections_.size(); ++i)
 
549
    connections_[i]->UpdateState(now);
 
550
}
 
551
 
 
552
// Prepare for best candidate sorting
 
553
void P2PTransportChannel::RequestSort() {
 
554
  if (!sort_dirty_) {
 
555
    worker_thread_->Post(this, MSG_SORT);
 
556
    sort_dirty_ = true;
 
557
  }
 
558
}
 
559
 
 
560
// Sort the available connections to find the best one.  We also monitor
 
561
// the number of available connections and the current state so that we
 
562
// can possibly kick off more allocators (for more connections).
 
563
void P2PTransportChannel::SortConnections() {
 
564
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
565
 
 
566
  // Make sure the connection states are up-to-date since this affects how they
 
567
  // will be sorted.
 
568
  UpdateConnectionStates();
 
569
 
 
570
  // Any changes after this point will require a re-sort.
 
571
  sort_dirty_ = false;
 
572
 
 
573
  // Get a list of the networks that we are using.
 
574
  std::set<talk_base::Network*> networks;
 
575
  for (uint32 i = 0; i < connections_.size(); ++i)
 
576
    networks.insert(connections_[i]->port()->network());
 
577
 
 
578
  // Find the best alternative connection by sorting.  It is important to note
 
579
  // that amongst equal preference, writable connections, this will choose the
 
580
  // one whose estimated latency is lowest.  So it is the only one that we
 
581
  // need to consider switching to.
 
582
 
 
583
  ConnectionCompare cmp;
 
584
  std::stable_sort(connections_.begin(), connections_.end(), cmp);
 
585
  LOG(LS_VERBOSE) << "Sorting available connections:";
 
586
  for (uint32 i = 0; i < connections_.size(); ++i) {
 
587
    LOG(LS_VERBOSE) << connections_[i]->ToString();
 
588
  }
 
589
 
 
590
  Connection* top_connection = NULL;
 
591
  if (connections_.size() > 0)
 
592
    top_connection = connections_[0];
 
593
 
 
594
  // If necessary, switch to the new choice.
 
595
  if (ShouldSwitch(best_connection_, top_connection))
 
596
    SwitchBestConnectionTo(top_connection);
 
597
 
 
598
  // We can prune any connection for which there is a writable connection on
 
599
  // the same network with better or equal prefences.  We leave those with
 
600
  // better preference just in case they become writable later (at which point,
 
601
  // we would prune out the current best connection).  We leave connections on
 
602
  // other networks because they may not be using the same resources and they
 
603
  // may represent very distinct paths over which we can switch.
 
604
  std::set<talk_base::Network*>::iterator network;
 
605
  for (network = networks.begin(); network != networks.end(); ++network) {
 
606
    Connection* primier = GetBestConnectionOnNetwork(*network);
 
607
    if (!primier || (primier->write_state() != Connection::STATE_WRITABLE))
 
608
      continue;
 
609
 
 
610
    for (uint32 i = 0; i < connections_.size(); ++i) {
 
611
      if ((connections_[i] != primier) &&
 
612
          (connections_[i]->port()->network() == *network) &&
 
613
          (CompareConnectionCandidates(primier, connections_[i]) >= 0)) {
 
614
        connections_[i]->Prune();
 
615
      }
 
616
    }
 
617
  }
 
618
 
 
619
  // Count the number of connections in the various states.
 
620
 
 
621
  int writable = 0;
 
622
  int write_connect = 0;
 
623
  int write_timeout = 0;
 
624
 
 
625
  for (uint32 i = 0; i < connections_.size(); ++i) {
 
626
    switch (connections_[i]->write_state()) {
 
627
    case Connection::STATE_WRITABLE:
 
628
      ++writable;
 
629
      break;
 
630
    case Connection::STATE_WRITE_CONNECT:
 
631
      ++write_connect;
 
632
      break;
 
633
    case Connection::STATE_WRITE_TIMEOUT:
 
634
      ++write_timeout;
 
635
      break;
 
636
    default:
 
637
      ASSERT(false);
 
638
    }
 
639
  }
 
640
 
 
641
  if (writable > 0) {
 
642
    HandleWritable();
 
643
  } else if (write_connect > 0) {
 
644
    HandleNotWritable();
 
645
  } else {
 
646
    HandleAllTimedOut();
 
647
  }
 
648
 
 
649
  // Update the state of this channel.  This method is called whenever the
 
650
  // state of any connection changes, so this is a good place to do this.
 
651
  UpdateChannelState();
 
652
 
 
653
  // Notify of connection state change
 
654
  SignalConnectionMonitor(this);
 
655
}
 
656
 
 
657
// Track the best connection, and let listeners know
 
658
void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) {
 
659
  // Note: if conn is NULL, the previous best_connection_ has been destroyed,
 
660
  // so don't use it.
 
661
  // use it.
 
662
  Connection* old_best_connection = best_connection_;
 
663
  best_connection_ = conn;
 
664
  if (best_connection_) {
 
665
    if (old_best_connection) {
 
666
      LOG_J(LS_INFO, this) << "Previous best connection: "
 
667
                           << old_best_connection->ToString();
 
668
    }
 
669
    LOG_J(LS_INFO, this) << "New best connection: "
 
670
                         << best_connection_->ToString();
 
671
    SignalRouteChange(this, best_connection_->remote_candidate());
 
672
  } else {
 
673
    LOG_J(LS_INFO, this) << "No best connection";
 
674
  }
 
675
}
 
676
 
 
677
void P2PTransportChannel::UpdateChannelState() {
 
678
  // The Handle* functions already set the writable state.  We'll just double-
 
679
  // check it here.
 
680
  bool writable = ((best_connection_ != NULL)  &&
 
681
      (best_connection_->write_state() ==
 
682
      Connection::STATE_WRITABLE));
 
683
  ASSERT(writable == this->writable());
 
684
  if (writable != this->writable())
 
685
    LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch";
 
686
 
 
687
  bool readable = false;
 
688
  for (uint32 i = 0; i < connections_.size(); ++i) {
 
689
    if (connections_[i]->read_state() == Connection::STATE_READABLE)
 
690
      readable = true;
 
691
  }
 
692
  set_readable(readable);
 
693
}
 
694
 
 
695
// We checked the status of our connections and we had at least one that
 
696
// was writable, go into the writable state.
 
697
void P2PTransportChannel::HandleWritable() {
 
698
  //
 
699
  // One or more connections writable!
 
700
  //
 
701
  if (!writable()) {
 
702
    for (uint32 i = 0; i < allocator_sessions_.size(); ++i) {
 
703
      if (allocator_sessions_[i]->IsGettingAllPorts()) {
 
704
        allocator_sessions_[i]->StopGetAllPorts();
 
705
      }
 
706
    }
 
707
 
 
708
    // Stop further allocations.
 
709
    CancelPendingAllocate();
 
710
  }
 
711
 
 
712
  // We're writable, obviously we aren't timed out
 
713
  was_writable_ = true;
 
714
  was_timed_out_ = false;
 
715
  set_writable(true);
 
716
}
 
717
 
 
718
// We checked the status of our connections and we didn't have any that
 
719
// were writable, go into the connecting state (kick off a new allocator
 
720
// session).
 
721
void P2PTransportChannel::HandleNotWritable() {
 
722
  //
 
723
  // No connections are writable but not timed out!
 
724
  //
 
725
  if (was_writable_) {
 
726
    // If we were writable, let's kick off an allocator session immediately
 
727
    was_writable_ = false;
 
728
    Allocate();
 
729
  }
 
730
 
 
731
  // We were connecting, obviously not ALL timed out.
 
732
  was_timed_out_ = false;
 
733
  set_writable(false);
 
734
}
 
735
 
 
736
// We checked the status of our connections and not only weren't they writable
 
737
// but they were also timed out, we really need a new allocator.
 
738
void P2PTransportChannel::HandleAllTimedOut() {
 
739
  //
 
740
  // No connections... all are timed out!
 
741
  //
 
742
  if (!was_timed_out_) {
 
743
    // We weren't timed out before, so kick off an allocator now (we'll still
 
744
    // be in the fully timed out state until the allocator actually gives back
 
745
    // new ports)
 
746
    Allocate();
 
747
  }
 
748
 
 
749
  // NOTE: we start was_timed_out_ in the true state so that we don't get
 
750
  // another allocator created WHILE we are in the process of building up
 
751
  // our first allocator.
 
752
  was_timed_out_ = true;
 
753
  was_writable_ = false;
 
754
  set_writable(false);
 
755
}
 
756
 
 
757
// If we have a best connection, return it, otherwise return top one in the
 
758
// list (later we will mark it best).
 
759
Connection* P2PTransportChannel::GetBestConnectionOnNetwork(
 
760
    talk_base::Network* network) {
 
761
  // If the best connection is on this network, then it wins.
 
762
  if (best_connection_ && (best_connection_->port()->network() == network))
 
763
    return best_connection_;
 
764
 
 
765
  // Otherwise, we return the top-most in sorted order.
 
766
  for (uint32 i = 0; i < connections_.size(); ++i) {
 
767
    if (connections_[i]->port()->network() == network)
 
768
      return connections_[i];
 
769
  }
 
770
 
 
771
  return NULL;
 
772
}
 
773
 
 
774
// Handle any queued up requests
 
775
void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) {
 
776
  if (pmsg->message_id == MSG_SORT)
 
777
    OnSort();
 
778
  else if (pmsg->message_id == MSG_PING)
 
779
    OnPing();
 
780
  else if (pmsg->message_id == MSG_ALLOCATE)
 
781
    Allocate();
 
782
  else
 
783
    ASSERT(false);
 
784
}
 
785
 
 
786
// Handle queued up sort request
 
787
void P2PTransportChannel::OnSort() {
 
788
  // Resort the connections based on the new statistics.
 
789
  SortConnections();
 
790
}
 
791
 
 
792
// Handle queued up ping request
 
793
void P2PTransportChannel::OnPing() {
 
794
  // Make sure the states of the connections are up-to-date (since this affects
 
795
  // which ones are pingable).
 
796
  UpdateConnectionStates();
 
797
 
 
798
  // Find the oldest pingable connection and have it do a ping.
 
799
  Connection* conn = FindNextPingableConnection();
 
800
  if (conn)
 
801
    conn->Ping(talk_base::Time());
 
802
 
 
803
  // Post ourselves a message to perform the next ping.
 
804
  uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY;
 
805
  thread()->PostDelayed(delay, this, MSG_PING);
 
806
}
 
807
 
 
808
// Is the connection in a state for us to even consider pinging the other side?
 
809
bool P2PTransportChannel::IsPingable(Connection* conn) {
 
810
  // An unconnected connection cannot be written to at all, so pinging is out
 
811
  // of the question.
 
812
  if (!conn->connected())
 
813
    return false;
 
814
 
 
815
  if (writable()) {
 
816
    // If we are writable, then we only want to ping connections that could be
 
817
    // better than this one, i.e., the ones that were not pruned.
 
818
    return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT);
 
819
  } else {
 
820
    // If we are not writable, then we need to try everything that might work.
 
821
    // This includes both connections that do not have write timeout as well as
 
822
    // ones that do not have read timeout.  A connection could be readable but
 
823
    // be in write-timeout if we pruned it before.  Since the other side is
 
824
    // still pinging it, it very well might still work.
 
825
    return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) ||
 
826
           (conn->read_state() != Connection::STATE_READ_TIMEOUT);
 
827
  }
 
828
}
 
829
 
 
830
// Returns the next pingable connection to ping.  This will be the oldest
 
831
// pingable connection unless we have a writable connection that is past the
 
832
// maximum acceptable ping delay.
 
833
Connection* P2PTransportChannel::FindNextPingableConnection() {
 
834
  uint32 now = talk_base::Time();
 
835
  if (best_connection_ &&
 
836
      (best_connection_->write_state() == Connection::STATE_WRITABLE) &&
 
837
      (best_connection_->last_ping_sent()
 
838
       + MAX_CURRENT_WRITABLE_DELAY <= now)) {
 
839
    return best_connection_;
 
840
  }
 
841
 
 
842
  Connection* oldest_conn = NULL;
 
843
  uint32 oldest_time = 0xFFFFFFFF;
 
844
  for (uint32 i = 0; i < connections_.size(); ++i) {
 
845
    if (IsPingable(connections_[i])) {
 
846
      if (connections_[i]->last_ping_sent() < oldest_time) {
 
847
        oldest_time = connections_[i]->last_ping_sent();
 
848
        oldest_conn = connections_[i];
 
849
      }
 
850
    }
 
851
  }
 
852
  return oldest_conn;
 
853
}
 
854
 
 
855
// return the number of "pingable" connections
 
856
uint32 P2PTransportChannel::NumPingableConnections() {
 
857
  uint32 count = 0;
 
858
  for (uint32 i = 0; i < connections_.size(); ++i) {
 
859
    if (IsPingable(connections_[i]))
 
860
      count += 1;
 
861
  }
 
862
  return count;
 
863
}
 
864
 
 
865
// When a connection's state changes, we need to figure out who to use as
 
866
// the best connection again.  It could have become usable, or become unusable.
 
867
void P2PTransportChannel::OnConnectionStateChange(Connection *connection) {
 
868
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
869
 
 
870
  // We have to unroll the stack before doing this because we may be changing
 
871
  // the state of connections while sorting.
 
872
  RequestSort();
 
873
}
 
874
 
 
875
// When a connection is removed, edit it out, and then update our best
 
876
// connection.
 
877
void P2PTransportChannel::OnConnectionDestroyed(Connection *connection) {
 
878
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
879
 
 
880
  // Note: the previous best_connection_ may be destroyed by now, so don't
 
881
  // use it.
 
882
 
 
883
  // Remove this connection from the list.
 
884
  std::vector<Connection*>::iterator iter =
 
885
      std::find(connections_.begin(), connections_.end(), connection);
 
886
  ASSERT(iter != connections_.end());
 
887
  connections_.erase(iter);
 
888
 
 
889
  LOG_J(LS_INFO, this) << "Removed connection ("
 
890
    << static_cast<int>(connections_.size()) << " remaining)";
 
891
 
 
892
  // If this is currently the best connection, then we need to pick a new one.
 
893
  // The call to SortConnections will pick a new one.  It looks at the current
 
894
  // best connection in order to avoid switching between fairly similar ones.
 
895
  // Since this connection is no longer an option, we can just set best to NULL
 
896
  // and re-choose a best assuming that there was no best connection.
 
897
  if (best_connection_ == connection) {
 
898
    SwitchBestConnectionTo(NULL);
 
899
    RequestSort();
 
900
  }
 
901
}
 
902
 
 
903
// When a port is destroyed remove it from our list of ports to use for
 
904
// connection attempts.
 
905
void P2PTransportChannel::OnPortDestroyed(Port* port) {
 
906
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
907
 
 
908
  // Remove this port from the list (if we didn't drop it already).
 
909
  std::vector<Port*>::iterator iter =
 
910
      std::find(ports_.begin(), ports_.end(), port);
 
911
  if (iter != ports_.end())
 
912
    ports_.erase(iter);
 
913
 
 
914
  LOG(INFO) << "Removed port from p2p socket: "
 
915
            << static_cast<int>(ports_.size()) << " remaining";
 
916
}
 
917
 
 
918
// We data is available, let listeners know
 
919
void P2PTransportChannel::OnReadPacket(Connection *connection,
 
920
                                       const char *data, size_t len) {
 
921
  ASSERT(worker_thread_ == talk_base::Thread::Current());
 
922
 
 
923
  // Do not deliver, if packet doesn't belong to the correct transport channel.
 
924
  if (!FindConnection(connection))
 
925
    return;
 
926
 
 
927
  // Let the client know of an incoming packet
 
928
  SignalReadPacket(this, data, len);
 
929
}
 
930
 
 
931
// Set options on ourselves is simply setting options on all of our available
 
932
// port objects.
 
933
int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) {
 
934
  OptionMap::iterator it = options_.find(opt);
 
935
  if (it == options_.end()) {
 
936
    options_.insert(std::make_pair(opt, value));
 
937
  } else if (it->second == value) {
 
938
    return 0;
 
939
  } else {
 
940
    it->second = value;
 
941
  }
 
942
 
 
943
  for (uint32 i = 0; i < ports_.size(); ++i) {
 
944
    int val = ports_[i]->SetOption(opt, value);
 
945
    if (val < 0) {
 
946
      // Because this also occurs deferred, probably no point in reporting an
 
947
      // error
 
948
      LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: "
 
949
                   << ports_[i]->GetError();
 
950
    }
 
951
  }
 
952
  return 0;
 
953
}
 
954
 
 
955
// When the signalling channel is ready, we can really kick off the allocator
 
956
void P2PTransportChannel::OnSignalingReady() {
 
957
  if (waiting_for_signaling_) {
 
958
    waiting_for_signaling_ = false;
 
959
    AddAllocatorSession(allocator_->CreateSession(
 
960
        session_id(), name(), content_type()));
 
961
    thread()->PostDelayed(kAllocatePeriod, this, MSG_ALLOCATE);
 
962
  }
 
963
}
 
964
 
 
965
}  // namespace cricket