3
* Copyright 2004--2006, Google Inc.
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* 1. Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
#include "talk/base/basictypes.h"
30
#include "talk/base/common.h"
31
#include "talk/base/logging.h"
32
#include "talk/base/scoped_ptr.h"
33
#include "talk/base/stringutils.h"
34
#include "talk/p2p/base/candidate.h"
35
#include "talk/p2p/base/transportchannel.h"
36
#include "pseudotcpchannel.h"
38
using namespace talk_base;
42
extern const talk_base::ConstantLabel SESSION_STATES[];
44
// MSG_WK_* - worker thread messages
45
// MSG_ST_* - stream thread messages
46
// MSG_SI_* - signal thread messages
52
MSG_SI_DESTROYCHANNEL,
56
struct EventData : public MessageData {
58
EventData(int ev, int err = 0) : event(ev), error(err) { }
61
///////////////////////////////////////////////////////////////////////////////
62
// PseudoTcpChannel::InternalStream
63
///////////////////////////////////////////////////////////////////////////////
65
class PseudoTcpChannel::InternalStream : public StreamInterface {
67
InternalStream(PseudoTcpChannel* parent);
68
virtual ~InternalStream();
70
virtual StreamState GetState() const;
71
virtual StreamResult Read(void* buffer, size_t buffer_len,
72
size_t* read, int* error);
73
virtual StreamResult Write(const void* data, size_t data_len,
74
size_t* written, int* error);
78
// parent_ is accessed and modified exclusively on the event thread, to
79
// avoid thread contention. This means that the PseudoTcpChannel cannot go
80
// away until after it receives a Close() from TunnelStream.
81
PseudoTcpChannel* parent_;
84
///////////////////////////////////////////////////////////////////////////////
86
// Member object lifetime summaries:
87
// session_ - passed in constructor, cleared when channel_ goes away.
88
// channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
89
// tcp_ - created in Connect, destroyed when channel_ goes away, or connection
91
// worker_thread_ - created when channel_ is created, purged when channel_ is
93
// stream_ - created in GetStream, destroyed by owner at arbitrary time.
94
// this - created in constructor, destroyed when worker_thread_ and stream_
96
///////////////////////////////////////////////////////////////////////////////
99
// Signal thread methods
102
PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
103
: signal_thread_(session->session_manager()->signaling_thread()),
104
worker_thread_(NULL),
105
stream_thread_(stream_thread),
106
session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
107
stream_readable_(false), pending_read_event_(false),
108
ready_to_connect_(false) {
109
ASSERT(signal_thread_->IsCurrent());
110
ASSERT(NULL != session_);
113
PseudoTcpChannel::~PseudoTcpChannel() {
114
ASSERT(signal_thread_->IsCurrent());
115
ASSERT(worker_thread_ == NULL);
116
ASSERT(session_ == NULL);
117
ASSERT(channel_ == NULL);
118
ASSERT(stream_ == NULL);
119
ASSERT(tcp_ == NULL);
122
bool PseudoTcpChannel::Connect(const std::string& content_name,
123
const std::string& channel_name) {
124
ASSERT(signal_thread_->IsCurrent());
125
CritScope lock(&cs_);
130
ASSERT(session_ != NULL);
131
worker_thread_ = session_->session_manager()->worker_thread();
132
content_name_ = content_name;
133
channel_ = session_->CreateChannel(content_name, channel_name);
134
channel_name_ = channel_name;
135
channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
137
channel_->SignalDestroyed.connect(this,
138
&PseudoTcpChannel::OnChannelDestroyed);
139
channel_->SignalWritableState.connect(this,
140
&PseudoTcpChannel::OnChannelWritableState);
141
channel_->SignalReadPacket.connect(this,
142
&PseudoTcpChannel::OnChannelRead);
143
channel_->SignalRouteChange.connect(this,
144
&PseudoTcpChannel::OnChannelConnectionChanged);
146
ASSERT(tcp_ == NULL);
147
tcp_ = new PseudoTcp(this, 0);
148
if (session_->initiator()) {
149
// Since we may try several protocols and network adapters that won't work,
150
// waiting until we get our first writable notification before initiating
152
ready_to_connect_ = true;
158
StreamInterface* PseudoTcpChannel::GetStream() {
159
ASSERT(signal_thread_->IsCurrent());
160
CritScope lock(&cs_);
161
ASSERT(NULL != session_);
163
stream_ = new PseudoTcpChannel::InternalStream(this);
164
//TODO("should we disallow creation of new stream at some point?");
168
void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
169
LOG_F(LS_INFO) << "(" << channel->name() << ")";
170
ASSERT(signal_thread_->IsCurrent());
171
CritScope lock(&cs_);
172
ASSERT(channel == channel_);
173
signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
174
// When MSG_WK_PURGE is received, we know there will be no more messages from
175
// the worker thread.
176
worker_thread_->Clear(this, MSG_WK_CLOCK);
177
worker_thread_->Post(this, MSG_WK_PURGE);
180
if ((stream_ != NULL)
181
&& ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
182
stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
187
SignalChannelClosed(this);
190
void PseudoTcpChannel::OnSessionTerminate(Session* session) {
191
// When the session terminates before we even connected
192
CritScope lock(&cs_);
193
if (session_ != NULL && channel_ == NULL) {
194
ASSERT(session == session_);
195
ASSERT(worker_thread_ == NULL);
196
ASSERT(tcp_ == NULL);
197
LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
200
stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
203
// Even though session_ is being destroyed, we mustn't clear the pointer,
204
// since we'll need it to tear down channel_.
206
// TODO: Is it always the case that if channel_ != NULL then we'll get
207
// a channel-destroyed notification?
210
void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
211
ASSERT(signal_thread_->IsCurrent());
212
CritScope lock(&cs_);
213
ASSERT(tcp_ != NULL);
214
tcp_->GetOption(opt, value);
217
void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
218
ASSERT(signal_thread_->IsCurrent());
219
CritScope lock(&cs_);
220
ASSERT(tcp_ != NULL);
221
tcp_->SetOption(opt, value);
225
// Stream thread methods
228
StreamState PseudoTcpChannel::GetState() const {
229
ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
230
CritScope lock(&cs_);
235
switch (tcp_->State()) {
236
case PseudoTcp::TCP_LISTEN:
237
case PseudoTcp::TCP_SYN_SENT:
238
case PseudoTcp::TCP_SYN_RECEIVED:
240
case PseudoTcp::TCP_ESTABLISHED:
242
case PseudoTcp::TCP_CLOSED:
248
StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
249
size_t* read, int* error) {
250
ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
251
CritScope lock(&cs_);
255
stream_readable_ = false;
256
int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
257
//LOG_F(LS_VERBOSE) << "Recv returned: " << result;
261
// PseudoTcp doesn't currently support repeated Readable signals. Simulate
263
stream_readable_ = true;
264
if (!pending_read_event_) {
265
pending_read_event_ = true;
266
stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
269
} else if (IsBlockingError(tcp_->GetError())) {
273
*error = tcp_->GetError();
276
// This spot is never reached.
279
StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
280
size_t* written, int* error) {
281
ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
282
CritScope lock(&cs_);
285
int result = tcp_->Send(static_cast<const char*>(data), data_len);
286
//LOG_F(LS_VERBOSE) << "Send returned: " << result;
291
} else if (IsBlockingError(tcp_->GetError())) {
295
*error = tcp_->GetError();
298
// This spot is never reached.
301
void PseudoTcpChannel::Close() {
302
ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
303
CritScope lock(&cs_);
305
// Clear out any pending event notifications
306
stream_thread_->Clear(this, MSG_ST_EVENT);
316
// Worker thread methods
319
void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
320
LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
321
ASSERT(worker_thread_->IsCurrent());
322
CritScope lock(&cs_);
324
LOG_F(LS_WARNING) << "NULL channel";
327
ASSERT(channel == channel_);
329
LOG_F(LS_WARNING) << "NULL tcp";
332
if (!ready_to_connect_ || !channel->writable())
335
ready_to_connect_ = false;
340
void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
341
const char* data, size_t size) {
342
//LOG_F(LS_VERBOSE) << "(" << size << ")";
343
ASSERT(worker_thread_->IsCurrent());
344
CritScope lock(&cs_);
346
LOG_F(LS_WARNING) << "NULL channel";
349
ASSERT(channel == channel_);
351
LOG_F(LS_WARNING) << "NULL tcp";
354
tcp_->NotifyPacket(data, size);
358
void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
359
const Candidate& candidate) {
360
LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
361
ASSERT(worker_thread_->IsCurrent());
362
CritScope lock(&cs_);
364
LOG_F(LS_WARNING) << "NULL channel";
367
ASSERT(channel == channel_);
369
LOG_F(LS_WARNING) << "NULL tcp";
373
uint16 mtu = 1280; // safe default
374
talk_base::scoped_ptr<Socket> mtu_socket(
375
worker_thread_->socketserver()->CreateSocket(SOCK_DGRAM));
376
if (mtu_socket->Connect(candidate.address()) < 0 ||
377
mtu_socket->EstimateMTU(&mtu) < 0) {
378
LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
379
<< mtu_socket->GetError();
382
LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
383
tcp_->NotifyMTU(mtu);
387
void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
388
LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
389
ASSERT(cs_.CurrentThreadIsOwner());
390
ASSERT(worker_thread_->IsCurrent());
393
stream_readable_ = true;
394
pending_read_event_ = true;
395
stream_thread_->Post(this, MSG_ST_EVENT,
396
new EventData(SE_OPEN | SE_READ | SE_WRITE));
400
void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
402
ASSERT(cs_.CurrentThreadIsOwner());
403
ASSERT(worker_thread_->IsCurrent());
406
stream_readable_ = true;
407
if (!pending_read_event_) {
408
pending_read_event_ = true;
409
stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
414
void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
416
ASSERT(cs_.CurrentThreadIsOwner());
417
ASSERT(worker_thread_->IsCurrent());
420
stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
423
void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
424
LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
425
ASSERT(cs_.CurrentThreadIsOwner());
426
ASSERT(worker_thread_->IsCurrent());
429
stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
433
// Multi-thread methods
436
void PseudoTcpChannel::OnMessage(Message* pmsg) {
437
if (pmsg->message_id == MSG_WK_CLOCK) {
439
ASSERT(worker_thread_->IsCurrent());
440
//LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
441
CritScope lock(&cs_);
443
tcp_->NotifyClock(PseudoTcp::Now());
447
} else if (pmsg->message_id == MSG_WK_PURGE) {
449
ASSERT(worker_thread_->IsCurrent());
450
LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
451
// At this point, we know there are no additional worker thread messages.
452
CritScope lock(&cs_);
453
ASSERT(NULL == session_);
454
ASSERT(NULL == channel_);
455
worker_thread_ = NULL;
458
} else if (pmsg->message_id == MSG_ST_EVENT) {
460
ASSERT(stream_thread_->IsCurrent());
461
//LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
462
// << data->event << ", " << data->error << ")";
463
ASSERT(stream_ != NULL);
464
EventData* data = static_cast<EventData*>(pmsg->pdata);
465
if (data->event & SE_READ) {
466
CritScope lock(&cs_);
467
pending_read_event_ = false;
469
stream_->SignalEvent(stream_, data->event, data->error);
472
} else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
474
ASSERT(signal_thread_->IsCurrent());
475
LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
476
ASSERT(session_ != NULL);
477
ASSERT(channel_ != NULL);
478
session_->DestroyChannel(content_name_, channel_->name());
480
} else if (pmsg->message_id == MSG_SI_DESTROY) {
482
ASSERT(signal_thread_->IsCurrent());
483
LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
484
// The message queue is empty, so it is safe to destroy ourselves.
492
IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
493
PseudoTcp* tcp, const char* buffer, size_t len) {
494
ASSERT(cs_.CurrentThreadIsOwner());
496
ASSERT(NULL != channel_);
497
int sent = channel_->SendPacket(buffer, len);
499
//LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
500
return IPseudoTcpNotify::WR_SUCCESS;
501
} else if (IsBlockingError(channel_->GetError())) {
502
LOG_F(LS_VERBOSE) << "Blocking";
503
return IPseudoTcpNotify::WR_SUCCESS;
504
} else if (channel_->GetError() == EMSGSIZE) {
505
LOG_F(LS_ERROR) << "EMSGSIZE";
506
return IPseudoTcpNotify::WR_TOO_LARGE;
508
PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
510
return IPseudoTcpNotify::WR_FAIL;
514
void PseudoTcpChannel::AdjustClock(bool clear) {
515
ASSERT(cs_.CurrentThreadIsOwner());
516
ASSERT(NULL != tcp_);
519
if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
520
ASSERT(NULL != channel_);
521
// Reset the next clock, by clearing the old and setting a new one.
523
worker_thread_->Clear(this, MSG_WK_CLOCK);
524
worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
530
ready_to_connect_ = false;
533
// If TCP has failed, no need for channel_ anymore
534
signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
538
void PseudoTcpChannel::CheckDestroy() {
539
ASSERT(cs_.CurrentThreadIsOwner());
540
if ((worker_thread_ != NULL) || (stream_ != NULL))
542
signal_thread_->Post(this, MSG_SI_DESTROY);
545
///////////////////////////////////////////////////////////////////////////////
546
// PseudoTcpChannel::InternalStream
547
///////////////////////////////////////////////////////////////////////////////
549
PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
553
PseudoTcpChannel::InternalStream::~InternalStream() {
557
StreamState PseudoTcpChannel::InternalStream::GetState() const {
560
return parent_->GetState();
563
StreamResult PseudoTcpChannel::InternalStream::Read(
564
void* buffer, size_t buffer_len, size_t* read, int* error) {
570
return parent_->Read(buffer, buffer_len, read, error);
573
StreamResult PseudoTcpChannel::InternalStream::Write(
574
const void* data, size_t data_len, size_t* written, int* error) {
580
return parent_->Write(data, data_len, written, error);
583
void PseudoTcpChannel::InternalStream::Close() {
590
///////////////////////////////////////////////////////////////////////////////
592
} // namespace cricket