3
* Copyright 2004--2005, Google Inc.
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* 1. Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
#include "talk/p2p/base/transport.h"
30
#include "talk/base/common.h"
31
#include "talk/base/logging.h"
32
#include "talk/p2p/base/candidate.h"
33
#include "talk/p2p/base/constants.h"
34
#include "talk/p2p/base/sessionmanager.h"
35
#include "talk/p2p/base/parsing.h"
36
#include "talk/p2p/base/transportchannelimpl.h"
37
#include "talk/xmllite/xmlelement.h"
38
#include "talk/xmpp/constants.h"
42
struct ChannelParams {
43
ChannelParams() : channel(NULL), candidate(NULL) {}
44
explicit ChannelParams(const std::string& name)
45
: name(name), channel(NULL), candidate(NULL) {}
46
ChannelParams(const std::string& name,
47
const std::string& content_type)
48
: name(name), content_type(content_type),
49
channel(NULL), candidate(NULL) {}
50
explicit ChannelParams(cricket::Candidate* candidate) :
51
channel(NULL), candidate(candidate) {
52
name = candidate->name();
60
std::string content_type;
61
cricket::TransportChannelImpl* channel;
62
cricket::Candidate* candidate;
64
// TODO: Merge ChannelParams and ChannelMessage.
65
typedef talk_base::ScopedMessageData<ChannelParams> ChannelMessage;
68
MSG_CREATECHANNEL = 1,
69
MSG_DESTROYCHANNEL = 2,
70
MSG_DESTROYALLCHANNELS = 3,
71
MSG_CONNECTCHANNELS = 4,
72
MSG_RESETCHANNELS = 5,
73
MSG_ONSIGNALINGREADY = 6,
74
MSG_ONREMOTECANDIDATE = 7,
77
MSG_REQUESTSIGNALING = 10,
78
MSG_CANDIDATEREADY = 11,
81
MSG_CANDIDATEALLOCATIONCOMPLETE = 14,
84
Transport::Transport(talk_base::Thread* signaling_thread,
85
talk_base::Thread* worker_thread,
86
const std::string& type,
87
PortAllocator* allocator)
88
: signaling_thread_(signaling_thread),
89
worker_thread_(worker_thread), type_(type), allocator_(allocator),
90
destroyed_(false), readable_(false), writable_(false),
91
connect_requested_(false), allow_local_ips_(false) {
94
Transport::~Transport() {
95
ASSERT(signaling_thread_->IsCurrent());
99
TransportChannelImpl* Transport::CreateChannel(
100
const std::string& name, const std::string& content_type) {
101
ChannelMessage msg(new ChannelParams(name, content_type));
102
worker_thread()->Send(this, MSG_CREATECHANNEL, &msg);
103
return msg.data()->channel;
106
TransportChannelImpl* Transport::CreateChannel_w(
107
const std::string& name, const std::string& content_type) {
108
ASSERT(worker_thread()->IsCurrent());
109
TransportChannelImpl *impl;
110
talk_base::CritScope cs(&crit_);
112
// Create the entry if it does not exist
113
if (channels_.find(name) == channels_.end()) {
114
impl = CreateTransportChannel(name, content_type);
115
channels_[name] = ChannelMapEntry(impl);
117
impl = channels_[name].get();
120
// Increase the ref count
121
channels_[name].AddRef();
124
impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
125
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
126
impl->SignalRequestSignaling.connect(
127
this, &Transport::OnChannelRequestSignaling);
128
impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
129
impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
131
if (connect_requested_) {
133
if (channels_.size() == 1) {
134
// If this is the first channel, then indicate that we have started
136
signaling_thread()->Post(this, MSG_CONNECTING, NULL);
142
TransportChannelImpl* Transport::GetChannel(const std::string& name) {
143
talk_base::CritScope cs(&crit_);
144
ChannelMap::iterator iter = channels_.find(name);
145
return (iter != channels_.end()) ? iter->second.get() : NULL;
148
bool Transport::HasChannels() {
149
talk_base::CritScope cs(&crit_);
150
return !channels_.empty();
153
void Transport::DestroyChannel(const std::string& name) {
154
ChannelMessage msg(new ChannelParams(name));
155
worker_thread()->Send(this, MSG_DESTROYCHANNEL, &msg);
158
void Transport::DestroyChannel_w(const std::string& name) {
159
ASSERT(worker_thread()->IsCurrent());
161
TransportChannelImpl* impl = NULL;
163
talk_base::CritScope cs(&crit_);
164
ChannelMap::iterator iter = channels_.find(name);
165
if (iter == channels_.end())
168
iter->second.DecRef();
169
if (!iter->second.ref()) {
170
impl = iter->second.get();
171
channels_.erase(iter);
175
if (connect_requested_ && channels_.empty()) {
176
// We're no longer attempting to connect.
177
signaling_thread()->Post(this, MSG_CONNECTING, NULL);
181
// Check in case the deleted channel was the only non-writable channel.
182
OnChannelWritableState(impl);
183
DestroyTransportChannel(impl);
187
void Transport::ConnectChannels() {
188
ASSERT(signaling_thread()->IsCurrent());
189
worker_thread()->Send(this, MSG_CONNECTCHANNELS, NULL);
192
void Transport::ConnectChannels_w() {
193
ASSERT(worker_thread()->IsCurrent());
194
if (connect_requested_ || channels_.empty())
196
connect_requested_ = true;
197
signaling_thread()->Post(
198
this, MSG_CANDIDATEREADY, NULL);
199
CallChannels_w(&TransportChannelImpl::Connect);
200
if (!channels_.empty()) {
201
signaling_thread()->Post(this, MSG_CONNECTING, NULL);
205
void Transport::OnConnecting_s() {
206
ASSERT(signaling_thread()->IsCurrent());
207
SignalConnecting(this);
210
void Transport::DestroyAllChannels() {
211
ASSERT(signaling_thread()->IsCurrent());
212
worker_thread()->Send(this, MSG_DESTROYALLCHANNELS, NULL);
213
worker_thread()->Clear(this);
214
signaling_thread()->Clear(this);
218
void Transport::DestroyAllChannels_w() {
219
ASSERT(worker_thread()->IsCurrent());
220
std::vector<TransportChannelImpl*> impls;
222
talk_base::CritScope cs(&crit_);
223
for (ChannelMap::iterator iter = channels_.begin();
224
iter != channels_.end();
226
iter->second.DecRef();
227
if (!iter->second.ref())
228
impls.push_back(iter->second.get());
234
for (size_t i = 0; i < impls.size(); ++i)
235
DestroyTransportChannel(impls[i]);
238
void Transport::ResetChannels() {
239
ASSERT(signaling_thread()->IsCurrent());
240
worker_thread()->Send(this, MSG_RESETCHANNELS, NULL);
243
void Transport::ResetChannels_w() {
244
ASSERT(worker_thread()->IsCurrent());
246
// We are no longer attempting to connect
247
connect_requested_ = false;
249
// Clear out the old messages, they aren't relevant
250
talk_base::CritScope cs(&crit_);
251
ready_candidates_.clear();
253
// Reset all of the channels
254
CallChannels_w(&TransportChannelImpl::Reset);
257
void Transport::OnSignalingReady() {
258
ASSERT(signaling_thread()->IsCurrent());
259
if (destroyed_) return;
261
worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
263
// Notify the subclass.
264
OnTransportSignalingReady();
267
void Transport::CallChannels_w(TransportChannelFunc func) {
268
ASSERT(worker_thread()->IsCurrent());
269
talk_base::CritScope cs(&crit_);
270
for (ChannelMap::iterator iter = channels_.begin();
271
iter != channels_.end();
273
((iter->second.get())->*func)();
277
bool Transport::VerifyCandidate(const Candidate& cand, ParseError* error) {
278
if (cand.address().IsLocalIP() && !allow_local_ips_)
279
return BadParse("candidate has local IP address", error);
282
if (cand.address().IsAny()) {
283
return BadParse("candidate has address of zero", error);
286
// Disallow all ports below 1024, except for 80 and 443 on public addresses.
287
int port = cand.address().port();
289
if ((port != 80) && (port != 443))
291
"candidate has port below 1024, but not 80 or 443", error);
292
if (cand.address().IsPrivateIP()) {
294
"candidate has port of 80 or 443 with private IP address", error);
301
void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
302
for (std::vector<Candidate>::const_iterator iter = candidates.begin();
303
iter != candidates.end();
305
OnRemoteCandidate(*iter);
309
void Transport::OnRemoteCandidate(const Candidate& candidate) {
310
ASSERT(signaling_thread()->IsCurrent());
311
if (destroyed_) return;
312
if (!HasChannel(candidate.name())) {
313
LOG(LS_WARNING) << "Ignoring candidate for unknown channel "
318
ChannelMessage* msg = new ChannelMessage(
319
new ChannelParams(new Candidate(candidate)));
320
worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, msg);
323
void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
324
ASSERT(worker_thread()->IsCurrent());
325
ChannelMap::iterator iter = channels_.find(candidate.name());
326
// It's ok for a channel to go away while this message is in transit.
327
if (iter != channels_.end()) {
328
iter->second.get()->OnCandidate(candidate);
332
void Transport::OnChannelReadableState(TransportChannel* channel) {
333
ASSERT(worker_thread()->IsCurrent());
334
signaling_thread()->Post(this, MSG_READSTATE, NULL);
337
void Transport::OnChannelReadableState_s() {
338
ASSERT(signaling_thread()->IsCurrent());
339
bool readable = GetTransportState_s(true);
340
if (readable_ != readable) {
341
readable_ = readable;
342
SignalReadableState(this);
346
void Transport::OnChannelWritableState(TransportChannel* channel) {
347
ASSERT(worker_thread()->IsCurrent());
348
signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
351
void Transport::OnChannelWritableState_s() {
352
ASSERT(signaling_thread()->IsCurrent());
353
bool writable = GetTransportState_s(false);
354
if (writable_ != writable) {
355
writable_ = writable;
356
SignalWritableState(this);
360
bool Transport::GetTransportState_s(bool read) {
361
ASSERT(signaling_thread()->IsCurrent());
363
talk_base::CritScope cs(&crit_);
364
for (ChannelMap::iterator iter = channels_.begin();
365
iter != channels_.end();
367
bool b = (read ? iter->second.get()->readable() :
368
iter->second.get()->writable());
369
result = result || b;
374
void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) {
375
ASSERT(worker_thread()->IsCurrent());
376
ChannelMessage* msg = new ChannelMessage(
377
new ChannelParams(channel->name()));
378
signaling_thread()->Post(this, MSG_REQUESTSIGNALING, msg);
381
void Transport::OnChannelRequestSignaling_s(const std::string& name) {
382
ASSERT(signaling_thread()->IsCurrent());
383
// Resetting ICE state for the channel.
385
talk_base::CritScope cs(&crit_);
386
ChannelMap::iterator iter = channels_.find(name);
387
if (iter != channels_.end())
388
iter->second.set_candidates_allocated(false);
390
SignalRequestSignaling(this);
393
void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
394
const Candidate& candidate) {
395
ASSERT(worker_thread()->IsCurrent());
396
talk_base::CritScope cs(&crit_);
397
ready_candidates_.push_back(candidate);
399
// We hold any messages until the client lets us connect.
400
if (connect_requested_) {
401
signaling_thread()->Post(
402
this, MSG_CANDIDATEREADY, NULL);
406
void Transport::OnChannelCandidateReady_s() {
407
ASSERT(signaling_thread()->IsCurrent());
408
ASSERT(connect_requested_);
410
std::vector<Candidate> candidates;
412
talk_base::CritScope cs(&crit_);
413
candidates.swap(ready_candidates_);
416
// we do the deleting of Candidate* here to keep the new above and
417
// delete below close to each other
418
if (!candidates.empty()) {
419
SignalCandidatesReady(this, candidates);
423
void Transport::OnChannelRouteChange(TransportChannel* channel,
424
const Candidate& remote_candidate) {
425
ASSERT(worker_thread()->IsCurrent());
426
ChannelParams* params = new ChannelParams(new Candidate(remote_candidate));
427
signaling_thread()->Post(this, MSG_ROUTECHANGE, new ChannelMessage(params));
430
void Transport::OnChannelRouteChange_s(const std::string& name,
431
const Candidate& remote_candidate) {
432
ASSERT(signaling_thread()->IsCurrent());
433
SignalRouteChange(this, name, remote_candidate);
436
void Transport::OnChannelCandidatesAllocationDone(
437
TransportChannelImpl* channel) {
438
ASSERT(worker_thread()->IsCurrent());
439
ChannelMap::iterator iter = channels_.find(channel->name());
440
ASSERT(iter != channels_.end());
441
iter->second.set_candidates_allocated(true);
443
// If all channels belonging to this Transport got signal, then
444
// forward this signal to upper layer.
445
// Can this signal arrive before all transport channels are created?
446
for (iter = channels_.begin(); iter != channels_.end(); ++iter) {
447
if (!iter->second.candidates_allocated())
450
signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE);
453
void Transport::OnMessage(talk_base::Message* msg) {
454
switch (msg->message_id) {
455
case MSG_CREATECHANNEL:
457
ChannelParams* params =
458
static_cast<ChannelMessage*>(msg->pdata)->data().get();
459
params->channel = CreateChannel_w(params->name, params->content_type);
462
case MSG_DESTROYCHANNEL:
464
ChannelParams* params =
465
static_cast<ChannelMessage*>(msg->pdata)->data().get();
466
DestroyChannel_w(params->name);
469
case MSG_CONNECTCHANNELS:
472
case MSG_RESETCHANNELS:
475
case MSG_DESTROYALLCHANNELS:
476
DestroyAllChannels_w();
478
case MSG_ONSIGNALINGREADY:
479
CallChannels_w(&TransportChannelImpl::OnSignalingReady);
481
case MSG_ONREMOTECANDIDATE:
483
ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
484
OnRemoteCandidate_w(*(channel_msg->data()->candidate));
492
OnChannelReadableState_s();
495
OnChannelWritableState_s();
497
case MSG_REQUESTSIGNALING:
499
ChannelParams* params =
500
static_cast<ChannelMessage*>(msg->pdata)->data().get();
501
OnChannelRequestSignaling_s(params->name);
505
case MSG_CANDIDATEREADY:
506
OnChannelCandidateReady_s();
508
case MSG_ROUTECHANGE:
510
ChannelMessage* channel_msg = static_cast<ChannelMessage*>(msg->pdata);
511
ChannelParams* params = channel_msg->data().get();
512
OnChannelRouteChange_s(params->name, *params->candidate);
516
case MSG_CANDIDATEALLOCATIONCOMPLETE:
517
SignalCandidatesAllocationDone(this);
522
bool TransportParser::ParseAddress(const buzz::XmlElement* elem,
523
const buzz::QName& address_name,
524
const buzz::QName& port_name,
525
talk_base::SocketAddress* address,
527
if (!elem->HasAttr(address_name))
528
return BadParse("address does not have " + address_name.LocalPart(), error);
529
if (!elem->HasAttr(port_name))
530
return BadParse("address does not have " + port_name.LocalPart(), error);
532
address->SetIP(elem->Attr(address_name));
533
std::istringstream ist(elem->Attr(port_name));
536
address->SetPort(port);
541
} // namespace cricket