~ubuntu-branches/ubuntu/saucy/kopete/saucy-proposed

« back to all changes in this revision

Viewing changes to protocols/jabber/googletalk/libjingle/talk/session/tunnel/pseudotcpchannel.cc

  • Committer: Package Import Robot
  • Author(s): Jonathan Riddell
  • Date: 2013-06-21 02:22:39 UTC
  • Revision ID: package-import@ubuntu.com-20130621022239-63l3zc8p0nf26pt6
Tags: upstream-4.10.80
ImportĀ upstreamĀ versionĀ 4.10.80

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * libjingle
 
3
 * Copyright 2004--2006, Google Inc.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions are met:
 
7
 *
 
8
 *  1. Redistributions of source code must retain the above copyright notice,
 
9
 *     this list of conditions and the following disclaimer.
 
10
 *  2. Redistributions in binary form must reproduce the above copyright notice,
 
11
 *     this list of conditions and the following disclaimer in the documentation
 
12
 *     and/or other materials provided with the distribution.
 
13
 *  3. The name of the author may not be used to endorse or promote products
 
14
 *     derived from this software without specific prior written permission.
 
15
 *
 
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
 
17
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 
18
 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
 
19
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
20
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
21
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 
22
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 
23
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 
24
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 
25
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 */
 
27
 
 
28
#include <string>
 
29
#include "talk/base/basictypes.h"
 
30
#include "talk/base/common.h"
 
31
#include "talk/base/logging.h"
 
32
#include "talk/base/scoped_ptr.h"
 
33
#include "talk/base/stringutils.h"
 
34
#include "talk/p2p/base/candidate.h"
 
35
#include "talk/p2p/base/transportchannel.h"
 
36
#include "pseudotcpchannel.h"
 
37
 
 
38
using namespace talk_base;
 
39
 
 
40
namespace cricket {
 
41
 
 
42
extern const talk_base::ConstantLabel SESSION_STATES[];
 
43
 
 
44
// MSG_WK_* - worker thread messages
 
45
// MSG_ST_* - stream thread messages
 
46
// MSG_SI_* - signal thread messages
 
47
 
 
48
enum {
 
49
  MSG_WK_CLOCK = 1,
 
50
  MSG_WK_PURGE,
 
51
  MSG_ST_EVENT,
 
52
  MSG_SI_DESTROYCHANNEL,
 
53
  MSG_SI_DESTROY,
 
54
};
 
55
 
 
56
struct EventData : public MessageData {
 
57
  int event, error;
 
58
  EventData(int ev, int err = 0) : event(ev), error(err) { }
 
59
};
 
60
 
 
61
///////////////////////////////////////////////////////////////////////////////
 
62
// PseudoTcpChannel::InternalStream
 
63
///////////////////////////////////////////////////////////////////////////////
 
64
 
 
65
class PseudoTcpChannel::InternalStream : public StreamInterface {
 
66
public:
 
67
  InternalStream(PseudoTcpChannel* parent);
 
68
  virtual ~InternalStream();
 
69
 
 
70
  virtual StreamState GetState() const;
 
71
  virtual StreamResult Read(void* buffer, size_t buffer_len,
 
72
                                       size_t* read, int* error);
 
73
  virtual StreamResult Write(const void* data, size_t data_len,
 
74
                                        size_t* written, int* error);
 
75
  virtual void Close();
 
76
 
 
77
private:
 
78
  // parent_ is accessed and modified exclusively on the event thread, to
 
79
  // avoid thread contention.  This means that the PseudoTcpChannel cannot go
 
80
  // away until after it receives a Close() from TunnelStream.
 
81
  PseudoTcpChannel* parent_;
 
82
};
 
83
 
 
84
///////////////////////////////////////////////////////////////////////////////
 
85
// PseudoTcpChannel
 
86
// Member object lifetime summaries:
 
87
//   session_ - passed in constructor, cleared when channel_ goes away.
 
88
//   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
 
89
//   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
 
90
//     closes.
 
91
//   worker_thread_ - created when channel_ is created, purged when channel_ is
 
92
//     destroyed.
 
93
//   stream_ - created in GetStream, destroyed by owner at arbitrary time.
 
94
//   this - created in constructor, destroyed when worker_thread_ and stream_
 
95
//     are both gone.
 
96
///////////////////////////////////////////////////////////////////////////////
 
97
 
 
98
//
 
99
// Signal thread methods
 
100
//
 
101
 
 
102
PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
 
103
  : signal_thread_(session->session_manager()->signaling_thread()),
 
104
    worker_thread_(NULL),
 
105
    stream_thread_(stream_thread),
 
106
    session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
 
107
    stream_readable_(false), pending_read_event_(false),
 
108
    ready_to_connect_(false) {
 
109
  ASSERT(signal_thread_->IsCurrent());
 
110
  ASSERT(NULL != session_);
 
111
}
 
112
 
 
113
PseudoTcpChannel::~PseudoTcpChannel() {
 
114
  ASSERT(signal_thread_->IsCurrent());
 
115
  ASSERT(worker_thread_ == NULL);
 
116
  ASSERT(session_ == NULL);
 
117
  ASSERT(channel_ == NULL);
 
118
  ASSERT(stream_ == NULL);
 
119
  ASSERT(tcp_ == NULL);
 
120
}
 
121
 
 
122
bool PseudoTcpChannel::Connect(const std::string& content_name,
 
123
                               const std::string& channel_name) {
 
124
  ASSERT(signal_thread_->IsCurrent());
 
125
  CritScope lock(&cs_);
 
126
 
 
127
  if (channel_)
 
128
    return false;
 
129
 
 
130
  ASSERT(session_ != NULL);
 
131
  worker_thread_ = session_->session_manager()->worker_thread();
 
132
  content_name_ = content_name;
 
133
  channel_ = session_->CreateChannel(content_name, channel_name);
 
134
  channel_name_ = channel_name;
 
135
  channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
 
136
 
 
137
  channel_->SignalDestroyed.connect(this,
 
138
    &PseudoTcpChannel::OnChannelDestroyed);
 
139
  channel_->SignalWritableState.connect(this,
 
140
    &PseudoTcpChannel::OnChannelWritableState);
 
141
  channel_->SignalReadPacket.connect(this,
 
142
    &PseudoTcpChannel::OnChannelRead);
 
143
  channel_->SignalRouteChange.connect(this,
 
144
    &PseudoTcpChannel::OnChannelConnectionChanged);
 
145
 
 
146
  ASSERT(tcp_ == NULL);
 
147
  tcp_ = new PseudoTcp(this, 0);
 
148
  if (session_->initiator()) {
 
149
    // Since we may try several protocols and network adapters that won't work,
 
150
    // waiting until we get our first writable notification before initiating
 
151
    // TCP negotiation.
 
152
    ready_to_connect_ = true;
 
153
  }
 
154
 
 
155
  return true;
 
156
}
 
157
 
 
158
StreamInterface* PseudoTcpChannel::GetStream() {
 
159
  ASSERT(signal_thread_->IsCurrent());
 
160
  CritScope lock(&cs_);
 
161
  ASSERT(NULL != session_);
 
162
  if (!stream_)
 
163
    stream_ = new PseudoTcpChannel::InternalStream(this);
 
164
  //TODO("should we disallow creation of new stream at some point?");
 
165
  return stream_;
 
166
}
 
167
 
 
168
void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
 
169
  LOG_F(LS_INFO) << "(" << channel->name() << ")";
 
170
  ASSERT(signal_thread_->IsCurrent());
 
171
  CritScope lock(&cs_);
 
172
  ASSERT(channel == channel_);
 
173
  signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
 
174
  // When MSG_WK_PURGE is received, we know there will be no more messages from
 
175
  // the worker thread.
 
176
  worker_thread_->Clear(this, MSG_WK_CLOCK);
 
177
  worker_thread_->Post(this, MSG_WK_PURGE);
 
178
  session_ = NULL;
 
179
  channel_ = NULL;
 
180
  if ((stream_ != NULL)
 
181
      && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
 
182
    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
 
183
  if (tcp_) {
 
184
    tcp_->Close(true);
 
185
    AdjustClock();
 
186
  }
 
187
  SignalChannelClosed(this);
 
188
}
 
189
 
 
190
void PseudoTcpChannel::OnSessionTerminate(Session* session) {
 
191
  // When the session terminates before we even connected
 
192
  CritScope lock(&cs_);
 
193
  if (session_ != NULL && channel_ == NULL) {
 
194
    ASSERT(session == session_);
 
195
    ASSERT(worker_thread_ == NULL);
 
196
    ASSERT(tcp_ == NULL);
 
197
    LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
 
198
    session_ = NULL;
 
199
    if (stream_ != NULL)
 
200
      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
 
201
  }
 
202
 
 
203
  // Even though session_ is being destroyed, we mustn't clear the pointer,
 
204
  // since we'll need it to tear down channel_.
 
205
  //
 
206
  // TODO: Is it always the case that if channel_ != NULL then we'll get
 
207
  // a channel-destroyed notification?
 
208
}
 
209
 
 
210
void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
 
211
  ASSERT(signal_thread_->IsCurrent());
 
212
  CritScope lock(&cs_);
 
213
  ASSERT(tcp_ != NULL);
 
214
  tcp_->GetOption(opt, value);
 
215
}
 
216
 
 
217
void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
 
218
  ASSERT(signal_thread_->IsCurrent());
 
219
  CritScope lock(&cs_);
 
220
  ASSERT(tcp_ != NULL);
 
221
  tcp_->SetOption(opt, value);
 
222
}
 
223
 
 
224
//
 
225
// Stream thread methods
 
226
//
 
227
 
 
228
StreamState PseudoTcpChannel::GetState() const {
 
229
  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
 
230
  CritScope lock(&cs_);
 
231
  if (!session_)
 
232
    return SS_CLOSED;
 
233
  if (!tcp_)
 
234
    return SS_OPENING;
 
235
  switch (tcp_->State()) {
 
236
    case PseudoTcp::TCP_LISTEN:
 
237
    case PseudoTcp::TCP_SYN_SENT:
 
238
    case PseudoTcp::TCP_SYN_RECEIVED:
 
239
      return SS_OPENING;
 
240
    case PseudoTcp::TCP_ESTABLISHED:
 
241
      return SS_OPEN;
 
242
    case PseudoTcp::TCP_CLOSED:
 
243
    default:
 
244
      return SS_CLOSED;
 
245
  }
 
246
}
 
247
 
 
248
StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
 
249
                                    size_t* read, int* error) {
 
250
  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
 
251
  CritScope lock(&cs_);
 
252
  if (!tcp_)
 
253
    return SR_BLOCK;
 
254
 
 
255
  stream_readable_ = false;
 
256
  int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
 
257
  //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
 
258
  if (result > 0) {
 
259
    if (read)
 
260
      *read = result;
 
261
    // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
 
262
    // them here.
 
263
    stream_readable_ = true;
 
264
    if (!pending_read_event_) {
 
265
      pending_read_event_ = true;
 
266
      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
 
267
    }
 
268
    return SR_SUCCESS;
 
269
  } else if (IsBlockingError(tcp_->GetError())) {
 
270
    return SR_BLOCK;
 
271
  } else {
 
272
    if (error)
 
273
      *error = tcp_->GetError();
 
274
    return SR_ERROR;
 
275
  }
 
276
  // This spot is never reached.
 
277
}
 
278
 
 
279
StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
 
280
                                     size_t* written, int* error) {
 
281
  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
 
282
  CritScope lock(&cs_);
 
283
  if (!tcp_)
 
284
    return SR_BLOCK;
 
285
  int result = tcp_->Send(static_cast<const char*>(data), data_len);
 
286
  //LOG_F(LS_VERBOSE) << "Send returned: " << result;
 
287
  if (result > 0) {
 
288
    if (written)
 
289
      *written = result;
 
290
    return SR_SUCCESS;
 
291
  } else if (IsBlockingError(tcp_->GetError())) {
 
292
    return SR_BLOCK;
 
293
  } else {
 
294
    if (error)
 
295
      *error = tcp_->GetError();
 
296
    return SR_ERROR;
 
297
  }
 
298
  // This spot is never reached.
 
299
}
 
300
 
 
301
void PseudoTcpChannel::Close() {
 
302
  ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
 
303
  CritScope lock(&cs_);
 
304
  stream_ = NULL;
 
305
  // Clear out any pending event notifications
 
306
  stream_thread_->Clear(this, MSG_ST_EVENT);
 
307
  if (tcp_) {
 
308
    tcp_->Close(false);
 
309
    AdjustClock();
 
310
  } else {
 
311
    CheckDestroy();
 
312
  }
 
313
}
 
314
 
 
315
//
 
316
// Worker thread methods
 
317
//
 
318
 
 
319
void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
 
320
  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
 
321
  ASSERT(worker_thread_->IsCurrent());
 
322
  CritScope lock(&cs_);
 
323
  if (!channel_) {
 
324
    LOG_F(LS_WARNING) << "NULL channel";
 
325
    return;
 
326
  }
 
327
  ASSERT(channel == channel_);
 
328
  if (!tcp_) {
 
329
    LOG_F(LS_WARNING) << "NULL tcp";
 
330
    return;
 
331
  }
 
332
  if (!ready_to_connect_ || !channel->writable())
 
333
    return;
 
334
 
 
335
  ready_to_connect_ = false;
 
336
  tcp_->Connect();
 
337
  AdjustClock();
 
338
}
 
339
 
 
340
void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
 
341
                                     const char* data, size_t size) {
 
342
  //LOG_F(LS_VERBOSE) << "(" << size << ")";
 
343
  ASSERT(worker_thread_->IsCurrent());
 
344
  CritScope lock(&cs_);
 
345
  if (!channel_) {
 
346
    LOG_F(LS_WARNING) << "NULL channel";
 
347
    return;
 
348
  }
 
349
  ASSERT(channel == channel_);
 
350
  if (!tcp_) {
 
351
    LOG_F(LS_WARNING) << "NULL tcp";
 
352
    return;
 
353
  }
 
354
  tcp_->NotifyPacket(data, size);
 
355
  AdjustClock();
 
356
}
 
357
 
 
358
void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
 
359
                                                  const Candidate& candidate) {
 
360
  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
 
361
  ASSERT(worker_thread_->IsCurrent());
 
362
  CritScope lock(&cs_);
 
363
  if (!channel_) {
 
364
    LOG_F(LS_WARNING) << "NULL channel";
 
365
    return;
 
366
  }
 
367
  ASSERT(channel == channel_);
 
368
  if (!tcp_) {
 
369
    LOG_F(LS_WARNING) << "NULL tcp";
 
370
    return;
 
371
  }
 
372
 
 
373
  uint16 mtu = 1280;  // safe default
 
374
  talk_base::scoped_ptr<Socket> mtu_socket(
 
375
      worker_thread_->socketserver()->CreateSocket(SOCK_DGRAM));
 
376
  if (mtu_socket->Connect(candidate.address()) < 0 ||
 
377
      mtu_socket->EstimateMTU(&mtu) < 0) {
 
378
    LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
 
379
                      << mtu_socket->GetError();
 
380
  }
 
381
 
 
382
  LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
 
383
  tcp_->NotifyMTU(mtu);
 
384
  AdjustClock();
 
385
}
 
386
 
 
387
void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
 
388
  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
 
389
  ASSERT(cs_.CurrentThreadIsOwner());
 
390
  ASSERT(worker_thread_->IsCurrent());
 
391
  ASSERT(tcp == tcp_);
 
392
  if (stream_) {
 
393
    stream_readable_ = true;
 
394
    pending_read_event_ = true;
 
395
    stream_thread_->Post(this, MSG_ST_EVENT,
 
396
                         new EventData(SE_OPEN | SE_READ | SE_WRITE));
 
397
  }
 
398
}
 
399
 
 
400
void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
 
401
  //LOG_F(LS_VERBOSE);
 
402
  ASSERT(cs_.CurrentThreadIsOwner());
 
403
  ASSERT(worker_thread_->IsCurrent());
 
404
  ASSERT(tcp == tcp_);
 
405
  if (stream_) {
 
406
    stream_readable_ = true;
 
407
    if (!pending_read_event_) {
 
408
      pending_read_event_ = true;
 
409
      stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
 
410
    }
 
411
  }
 
412
}
 
413
 
 
414
void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
 
415
  //LOG_F(LS_VERBOSE);
 
416
  ASSERT(cs_.CurrentThreadIsOwner());
 
417
  ASSERT(worker_thread_->IsCurrent());
 
418
  ASSERT(tcp == tcp_);
 
419
  if (stream_)
 
420
    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
 
421
}
 
422
 
 
423
void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
 
424
  LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
 
425
  ASSERT(cs_.CurrentThreadIsOwner());
 
426
  ASSERT(worker_thread_->IsCurrent());
 
427
  ASSERT(tcp == tcp_);
 
428
  if (stream_)
 
429
    stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
 
430
}
 
431
 
 
432
//
 
433
// Multi-thread methods
 
434
//
 
435
 
 
436
void PseudoTcpChannel::OnMessage(Message* pmsg) {
 
437
  if (pmsg->message_id == MSG_WK_CLOCK) {
 
438
 
 
439
    ASSERT(worker_thread_->IsCurrent());
 
440
    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
 
441
    CritScope lock(&cs_);
 
442
    if (tcp_) {
 
443
      tcp_->NotifyClock(PseudoTcp::Now());
 
444
      AdjustClock(false);
 
445
    }
 
446
 
 
447
  } else if (pmsg->message_id == MSG_WK_PURGE) {
 
448
 
 
449
    ASSERT(worker_thread_->IsCurrent());
 
450
    LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
 
451
    // At this point, we know there are no additional worker thread messages.
 
452
    CritScope lock(&cs_);
 
453
    ASSERT(NULL == session_);
 
454
    ASSERT(NULL == channel_);
 
455
    worker_thread_ = NULL;
 
456
    CheckDestroy();
 
457
 
 
458
  } else if (pmsg->message_id == MSG_ST_EVENT) {
 
459
 
 
460
    ASSERT(stream_thread_->IsCurrent());
 
461
    //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
 
462
    //             << data->event << ", " << data->error << ")";
 
463
    ASSERT(stream_ != NULL);
 
464
    EventData* data = static_cast<EventData*>(pmsg->pdata);
 
465
    if (data->event & SE_READ) {
 
466
      CritScope lock(&cs_);
 
467
      pending_read_event_ = false;
 
468
    }
 
469
    stream_->SignalEvent(stream_, data->event, data->error);
 
470
    delete data;
 
471
 
 
472
  } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
 
473
 
 
474
    ASSERT(signal_thread_->IsCurrent());
 
475
    LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
 
476
    ASSERT(session_ != NULL);
 
477
    ASSERT(channel_ != NULL);
 
478
    session_->DestroyChannel(content_name_, channel_->name());
 
479
 
 
480
  } else if (pmsg->message_id == MSG_SI_DESTROY) {
 
481
 
 
482
    ASSERT(signal_thread_->IsCurrent());
 
483
    LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
 
484
    // The message queue is empty, so it is safe to destroy ourselves.
 
485
    delete this;
 
486
 
 
487
  } else {
 
488
    ASSERT(false);
 
489
  }
 
490
}
 
491
 
 
492
IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
 
493
    PseudoTcp* tcp, const char* buffer, size_t len) {
 
494
  ASSERT(cs_.CurrentThreadIsOwner());
 
495
  ASSERT(tcp == tcp_);
 
496
  ASSERT(NULL != channel_);
 
497
  int sent = channel_->SendPacket(buffer, len);
 
498
  if (sent > 0) {
 
499
    //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
 
500
    return IPseudoTcpNotify::WR_SUCCESS;
 
501
  } else if (IsBlockingError(channel_->GetError())) {
 
502
    LOG_F(LS_VERBOSE) << "Blocking";
 
503
    return IPseudoTcpNotify::WR_SUCCESS;
 
504
  } else if (channel_->GetError() == EMSGSIZE) {
 
505
    LOG_F(LS_ERROR) << "EMSGSIZE";
 
506
    return IPseudoTcpNotify::WR_TOO_LARGE;
 
507
  } else {
 
508
    PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
 
509
    ASSERT(false);
 
510
    return IPseudoTcpNotify::WR_FAIL;
 
511
  }
 
512
}
 
513
 
 
514
void PseudoTcpChannel::AdjustClock(bool clear) {
 
515
  ASSERT(cs_.CurrentThreadIsOwner());
 
516
  ASSERT(NULL != tcp_);
 
517
 
 
518
  long timeout = 0;
 
519
  if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
 
520
    ASSERT(NULL != channel_);
 
521
    // Reset the next clock, by clearing the old and setting a new one.
 
522
    if (clear)
 
523
      worker_thread_->Clear(this, MSG_WK_CLOCK);
 
524
    worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
 
525
    return;
 
526
  }
 
527
 
 
528
  delete tcp_;
 
529
  tcp_ = NULL;
 
530
  ready_to_connect_ = false;
 
531
 
 
532
  if (channel_) {
 
533
    // If TCP has failed, no need for channel_ anymore
 
534
    signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
 
535
  }
 
536
}
 
537
 
 
538
void PseudoTcpChannel::CheckDestroy() {
 
539
  ASSERT(cs_.CurrentThreadIsOwner());
 
540
  if ((worker_thread_ != NULL) || (stream_ != NULL))
 
541
    return;
 
542
  signal_thread_->Post(this, MSG_SI_DESTROY);
 
543
}
 
544
 
 
545
///////////////////////////////////////////////////////////////////////////////
 
546
// PseudoTcpChannel::InternalStream
 
547
///////////////////////////////////////////////////////////////////////////////
 
548
 
 
549
PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
 
550
  : parent_(parent) {
 
551
}
 
552
 
 
553
PseudoTcpChannel::InternalStream::~InternalStream() {
 
554
  Close();
 
555
}
 
556
 
 
557
StreamState PseudoTcpChannel::InternalStream::GetState() const {
 
558
  if (!parent_)
 
559
    return SS_CLOSED;
 
560
  return parent_->GetState();
 
561
}
 
562
 
 
563
StreamResult PseudoTcpChannel::InternalStream::Read(
 
564
    void* buffer, size_t buffer_len, size_t* read, int* error) {
 
565
  if (!parent_) {
 
566
    if (error)
 
567
      *error = ENOTCONN;
 
568
    return SR_ERROR;
 
569
  }
 
570
  return parent_->Read(buffer, buffer_len, read, error);
 
571
}
 
572
 
 
573
StreamResult PseudoTcpChannel::InternalStream::Write(
 
574
    const void* data, size_t data_len,  size_t* written, int* error) {
 
575
  if (!parent_) {
 
576
    if (error)
 
577
      *error = ENOTCONN;
 
578
    return SR_ERROR;
 
579
  }
 
580
  return parent_->Write(data, data_len, written, error);
 
581
}
 
582
 
 
583
void PseudoTcpChannel::InternalStream::Close() {
 
584
  if (!parent_)
 
585
    return;
 
586
  parent_->Close();
 
587
  parent_ = NULL;
 
588
}
 
589
 
 
590
///////////////////////////////////////////////////////////////////////////////
 
591
 
 
592
} // namespace cricket