~ubuntu-branches/debian/experimental/kopete/experimental

« back to all changes in this revision

Viewing changes to protocols/jabber/libjingle/talk/base/stream.cc

  • Committer: Package Import Robot
  • Author(s): Maximiliano Curia
  • Date: 2015-02-24 11:32:57 UTC
  • mfrom: (1.1.41 vivid)
  • Revision ID: package-import@ubuntu.com-20150224113257-gnupg4v7lzz18ij0
Tags: 4:14.12.2-1
* New upstream release (14.12.2).
* Bump Standards-Version to 3.9.6, no changes needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * libjingle
 
3
 * Copyright 2004 Google Inc.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions are met:
 
7
 *
 
8
 *  1. Redistributions of source code must retain the above copyright notice,
 
9
 *     this list of conditions and the following disclaimer.
 
10
 *  2. Redistributions in binary form must reproduce the above copyright notice,
 
11
 *     this list of conditions and the following disclaimer in the documentation
 
12
 *     and/or other materials provided with the distribution.
 
13
 *  3. The name of the author may not be used to endorse or promote products
 
14
 *     derived from this software without specific prior written permission.
 
15
 *
 
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
 
17
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
 
18
 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
 
19
 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
20
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
21
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 
22
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 
23
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 
24
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 
25
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
26
 */
 
27
 
 
28
#if defined(POSIX)
 
29
#include <sys/file.h>
 
30
#endif  // POSIX
 
31
#include <sys/types.h>
 
32
#include <sys/stat.h>
 
33
#include <errno.h>
 
34
#include <string>
 
35
#include "talk/base/basictypes.h"
 
36
#include "talk/base/common.h"
 
37
#include "talk/base/messagequeue.h"
 
38
#include "talk/base/stream.h"
 
39
#include "talk/base/stringencode.h"
 
40
#include "talk/base/stringutils.h"
 
41
#include "talk/base/thread.h"
 
42
 
 
43
#ifdef WIN32
 
44
#include "talk/base/win32.h"
 
45
#define fileno _fileno
 
46
#endif
 
47
 
 
48
namespace talk_base {
 
49
 
 
50
///////////////////////////////////////////////////////////////////////////////
 
51
// StreamInterface
 
52
///////////////////////////////////////////////////////////////////////////////
 
53
StreamInterface::~StreamInterface() {
 
54
}
 
55
 
 
56
struct PostEventData : public MessageData {
 
57
  int events, error;
 
58
  PostEventData(int ev, int er) : events(ev), error(er) { }
 
59
};
 
60
 
 
61
StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
 
62
                                       size_t* written, int* error) {
 
63
  StreamResult result = SR_SUCCESS;
 
64
  size_t total_written = 0, current_written;
 
65
  while (total_written < data_len) {
 
66
    result = Write(static_cast<const char*>(data) + total_written,
 
67
                   data_len - total_written, &current_written, error);
 
68
    if (result != SR_SUCCESS)
 
69
      break;
 
70
    total_written += current_written;
 
71
  }
 
72
  if (written)
 
73
    *written = total_written;
 
74
  return result;
 
75
}
 
76
 
 
77
StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
 
78
                                      size_t* read, int* error) {
 
79
  StreamResult result = SR_SUCCESS;
 
80
  size_t total_read = 0, current_read;
 
81
  while (total_read < buffer_len) {
 
82
    result = Read(static_cast<char*>(buffer) + total_read,
 
83
                  buffer_len - total_read, &current_read, error);
 
84
    if (result != SR_SUCCESS)
 
85
      break;
 
86
    total_read += current_read;
 
87
  }
 
88
  if (read)
 
89
    *read = total_read;
 
90
  return result;
 
91
}
 
92
 
 
93
StreamResult StreamInterface::ReadLine(std::string* line) {
 
94
  line->clear();
 
95
  StreamResult result = SR_SUCCESS;
 
96
  while (true) {
 
97
    char ch;
 
98
    result = Read(&ch, sizeof(ch), NULL, NULL);
 
99
    if (result != SR_SUCCESS) {
 
100
      break;
 
101
    }
 
102
    if (ch == '\n') {
 
103
      break;
 
104
    }
 
105
    line->push_back(ch);
 
106
  }
 
107
  if (!line->empty()) {   // give back the line we've collected so far with
 
108
    result = SR_SUCCESS;  // a success code.  Otherwise return the last code
 
109
  }
 
110
  return result;
 
111
}
 
112
 
 
113
void StreamInterface::PostEvent(Thread* t, int events, int err) {
 
114
  t->Post(this, MSG_POST_EVENT, new PostEventData(events, err));
 
115
}
 
116
 
 
117
void StreamInterface::PostEvent(int events, int err) {
 
118
  PostEvent(Thread::Current(), events, err);
 
119
}
 
120
 
 
121
StreamInterface::StreamInterface() {
 
122
}
 
123
 
 
124
void StreamInterface::OnMessage(Message* msg) {
 
125
  if (MSG_POST_EVENT == msg->message_id) {
 
126
    PostEventData* pe = static_cast<PostEventData*>(msg->pdata);
 
127
    SignalEvent(this, pe->events, pe->error);
 
128
    delete msg->pdata;
 
129
  }
 
130
}
 
131
 
 
132
///////////////////////////////////////////////////////////////////////////////
 
133
// StreamAdapterInterface
 
134
///////////////////////////////////////////////////////////////////////////////
 
135
 
 
136
StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
 
137
                                               bool owned)
 
138
    : stream_(stream), owned_(owned) {
 
139
  if (NULL != stream_)
 
140
    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
 
141
}
 
142
 
 
143
void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
 
144
  if (NULL != stream_)
 
145
    stream_->SignalEvent.disconnect(this);
 
146
  if (owned_)
 
147
    delete stream_;
 
148
  stream_ = stream;
 
149
  owned_ = owned;
 
150
  if (NULL != stream_)
 
151
    stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
 
152
}
 
153
 
 
154
StreamInterface* StreamAdapterInterface::Detach() {
 
155
  if (NULL != stream_)
 
156
    stream_->SignalEvent.disconnect(this);
 
157
  StreamInterface* stream = stream_;
 
158
  stream_ = NULL;
 
159
  return stream;
 
160
}
 
161
 
 
162
StreamAdapterInterface::~StreamAdapterInterface() {
 
163
  if (owned_)
 
164
    delete stream_;
 
165
}
 
166
 
 
167
///////////////////////////////////////////////////////////////////////////////
 
168
// StreamTap
 
169
///////////////////////////////////////////////////////////////////////////////
 
170
 
 
171
StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
 
172
    : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
 
173
        tap_error_(0) {
 
174
  AttachTap(tap);
 
175
}
 
176
 
 
177
void StreamTap::AttachTap(StreamInterface* tap) {
 
178
  tap_.reset(tap);
 
179
}
 
180
 
 
181
StreamInterface* StreamTap::DetachTap() {
 
182
  return tap_.release();
 
183
}
 
184
 
 
185
StreamResult StreamTap::GetTapResult(int* error) {
 
186
  if (error) {
 
187
    *error = tap_error_;
 
188
  }
 
189
  return tap_result_;
 
190
}
 
191
 
 
192
StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
 
193
                             size_t* read, int* error) {
 
194
  size_t backup_read;
 
195
  if (!read) {
 
196
    read = &backup_read;
 
197
  }
 
198
  StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
 
199
                                                  read, error);
 
200
  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
 
201
    tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
 
202
  }
 
203
  return res;
 
204
}
 
205
 
 
206
StreamResult StreamTap::Write(const void* data, size_t data_len,
 
207
                              size_t* written, int* error) {
 
208
  size_t backup_written;
 
209
  if (!written) {
 
210
    written = &backup_written;
 
211
  }
 
212
  StreamResult res = StreamAdapterInterface::Write(data, data_len,
 
213
                                                   written, error);
 
214
  if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
 
215
    tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
 
216
  }
 
217
  return res;
 
218
}
 
219
 
 
220
///////////////////////////////////////////////////////////////////////////////
 
221
// StreamSegment
 
222
///////////////////////////////////////////////////////////////////////////////
 
223
 
 
224
StreamSegment::StreamSegment(StreamInterface* stream)
 
225
    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
 
226
    length_(SIZE_UNKNOWN) {
 
227
  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
 
228
  stream->GetPosition(&start_);
 
229
}
 
230
 
 
231
StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
 
232
    : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
 
233
    length_(length) {
 
234
  // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
 
235
  stream->GetPosition(&start_);
 
236
}
 
237
 
 
238
StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
 
239
                                 size_t* read, int* error) {
 
240
  if (SIZE_UNKNOWN != length_) {
 
241
    if (pos_ >= length_)
 
242
      return SR_EOS;
 
243
    buffer_len = _min(buffer_len, length_ - pos_);
 
244
  }
 
245
  size_t backup_read;
 
246
  if (!read) {
 
247
    read = &backup_read;
 
248
  }
 
249
  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
 
250
                                                     read, error);
 
251
  if (SR_SUCCESS == result) {
 
252
    pos_ += *read;
 
253
  }
 
254
  return result;
 
255
}
 
256
 
 
257
bool StreamSegment::SetPosition(size_t position) {
 
258
  if (SIZE_UNKNOWN == start_)
 
259
    return false;  // Not seekable
 
260
  if ((SIZE_UNKNOWN != length_) && (position > length_))
 
261
    return false;  // Seek past end of segment
 
262
  if (!StreamAdapterInterface::SetPosition(start_ + position))
 
263
    return false;
 
264
  pos_ = position;
 
265
  return true;
 
266
}
 
267
 
 
268
bool StreamSegment::GetPosition(size_t* position) const {
 
269
  if (SIZE_UNKNOWN == start_)
 
270
    return false;  // Not seekable
 
271
  if (!StreamAdapterInterface::GetPosition(position))
 
272
    return false;
 
273
  if (position) {
 
274
    ASSERT(*position >= start_);
 
275
    *position -= start_;
 
276
  }
 
277
  return true;
 
278
}
 
279
 
 
280
bool StreamSegment::GetSize(size_t* size) const {
 
281
  if (!StreamAdapterInterface::GetSize(size))
 
282
    return false;
 
283
  if (size) {
 
284
    if (SIZE_UNKNOWN != start_) {
 
285
      ASSERT(*size >= start_);
 
286
      *size -= start_;
 
287
    }
 
288
    if (SIZE_UNKNOWN != length_) {
 
289
      *size = _min(*size, length_);
 
290
    }
 
291
  }
 
292
  return true;
 
293
}
 
294
 
 
295
bool StreamSegment::GetAvailable(size_t* size) const {
 
296
  if (!StreamAdapterInterface::GetAvailable(size))
 
297
    return false;
 
298
  if (size && (SIZE_UNKNOWN != length_))
 
299
    *size = _min(*size, length_ - pos_);
 
300
  return true;
 
301
}
 
302
 
 
303
///////////////////////////////////////////////////////////////////////////////
 
304
// NullStream
 
305
///////////////////////////////////////////////////////////////////////////////
 
306
 
 
307
NullStream::NullStream() {
 
308
}
 
309
 
 
310
NullStream::~NullStream() {
 
311
}
 
312
 
 
313
StreamState NullStream::GetState() const {
 
314
  return SS_OPEN;
 
315
}
 
316
 
 
317
StreamResult NullStream::Read(void* buffer, size_t buffer_len,
 
318
                              size_t* read, int* error) {
 
319
  if (error) *error = -1;
 
320
  return SR_ERROR;
 
321
}
 
322
 
 
323
StreamResult NullStream::Write(const void* data, size_t data_len,
 
324
                               size_t* written, int* error) {
 
325
  if (written) *written = data_len;
 
326
  return SR_SUCCESS;
 
327
}
 
328
 
 
329
void NullStream::Close() {
 
330
}
 
331
 
 
332
///////////////////////////////////////////////////////////////////////////////
 
333
// FileStream
 
334
///////////////////////////////////////////////////////////////////////////////
 
335
 
 
336
FileStream::FileStream() : file_(NULL) {
 
337
}
 
338
 
 
339
FileStream::~FileStream() {
 
340
  FileStream::Close();
 
341
}
 
342
 
 
343
bool FileStream::Open(const std::string& filename, const char* mode,
 
344
                      int* error) {
 
345
  Close();
 
346
#ifdef WIN32
 
347
  std::wstring wfilename;
 
348
  if (Utf8ToWindowsFilename(filename, &wfilename)) {
 
349
    file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
 
350
  } else {
 
351
    if (error) {
 
352
      *error = -1;
 
353
      return false;
 
354
    }
 
355
  }
 
356
#else
 
357
  file_ = fopen(filename.c_str(), mode);
 
358
#endif
 
359
  if (!file_ && error) {
 
360
    *error = errno;
 
361
  }
 
362
  return (file_ != NULL);
 
363
}
 
364
 
 
365
bool FileStream::OpenShare(const std::string& filename, const char* mode,
 
366
                           int shflag, int* error) {
 
367
  Close();
 
368
#ifdef WIN32
 
369
  std::wstring wfilename;
 
370
  if (Utf8ToWindowsFilename(filename, &wfilename)) {
 
371
    file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
 
372
    if (!file_ && error) {
 
373
      *error = errno;
 
374
      return false;
 
375
    }
 
376
    return file_ != NULL;
 
377
  } else {
 
378
    if (error) {
 
379
      *error = -1;
 
380
    }
 
381
    return false;
 
382
  }
 
383
#else
 
384
  return Open(filename, mode, error);
 
385
#endif
 
386
}
 
387
 
 
388
bool FileStream::DisableBuffering() {
 
389
  if (!file_)
 
390
    return false;
 
391
  return (setvbuf(file_, NULL, _IONBF, 0) == 0);
 
392
}
 
393
 
 
394
StreamState FileStream::GetState() const {
 
395
  return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
 
396
}
 
397
 
 
398
StreamResult FileStream::Read(void* buffer, size_t buffer_len,
 
399
                              size_t* read, int* error) {
 
400
  if (!file_)
 
401
    return SR_EOS;
 
402
  size_t result = fread(buffer, 1, buffer_len, file_);
 
403
  if ((result == 0) && (buffer_len > 0)) {
 
404
    if (feof(file_))
 
405
      return SR_EOS;
 
406
    if (error)
 
407
      *error = errno;
 
408
    return SR_ERROR;
 
409
  }
 
410
  if (read)
 
411
    *read = result;
 
412
  return SR_SUCCESS;
 
413
}
 
414
 
 
415
StreamResult FileStream::Write(const void* data, size_t data_len,
 
416
                               size_t* written, int* error) {
 
417
  if (!file_)
 
418
    return SR_EOS;
 
419
  size_t result = fwrite(data, 1, data_len, file_);
 
420
  if ((result == 0) && (data_len > 0)) {
 
421
    if (error)
 
422
      *error = errno;
 
423
    return SR_ERROR;
 
424
  }
 
425
  if (written)
 
426
    *written = result;
 
427
  return SR_SUCCESS;
 
428
}
 
429
 
 
430
void FileStream::Close() {
 
431
  if (file_) {
 
432
    DoClose();
 
433
    file_ = NULL;
 
434
  }
 
435
}
 
436
 
 
437
bool FileStream::SetPosition(size_t position) {
 
438
  if (!file_)
 
439
    return false;
 
440
  return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
 
441
}
 
442
 
 
443
bool FileStream::GetPosition(size_t* position) const {
 
444
  ASSERT(NULL != position);
 
445
  if (!file_)
 
446
    return false;
 
447
  long result = ftell(file_);
 
448
  if (result < 0)
 
449
    return false;
 
450
  if (position)
 
451
    *position = result;
 
452
  return true;
 
453
}
 
454
 
 
455
bool FileStream::GetSize(size_t* size) const {
 
456
  ASSERT(NULL != size);
 
457
  if (!file_)
 
458
    return false;
 
459
  struct stat file_stats;
 
460
  if (fstat(fileno(file_), &file_stats) != 0)
 
461
    return false;
 
462
  if (size)
 
463
    *size = file_stats.st_size;
 
464
  return true;
 
465
}
 
466
 
 
467
bool FileStream::GetAvailable(size_t* size) const {
 
468
  ASSERT(NULL != size);
 
469
  if (!GetSize(size))
 
470
    return false;
 
471
  long result = ftell(file_);
 
472
  if (result < 0)
 
473
    return false;
 
474
  if (size)
 
475
    *size -= result;
 
476
  return true;
 
477
}
 
478
 
 
479
bool FileStream::ReserveSize(size_t size) {
 
480
  // TODO: extend the file to the proper length
 
481
  return true;
 
482
}
 
483
 
 
484
bool FileStream::GetSize(const std::string& filename, size_t* size) {
 
485
  struct stat file_stats;
 
486
  if (stat(filename.c_str(), &file_stats) != 0)
 
487
    return false;
 
488
  *size = file_stats.st_size;
 
489
  return true;
 
490
}
 
491
 
 
492
bool FileStream::Flush() {
 
493
  if (file_) {
 
494
    return (0 == fflush(file_));
 
495
  }
 
496
  // try to flush empty file?
 
497
  ASSERT(false);
 
498
  return false;
 
499
}
 
500
 
 
501
#if defined(POSIX)
 
502
 
 
503
bool FileStream::TryLock() {
 
504
  if (file_ == NULL) {
 
505
    // Stream not open.
 
506
    ASSERT(false);
 
507
    return false;
 
508
  }
 
509
 
 
510
  return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
 
511
}
 
512
 
 
513
bool FileStream::Unlock() {
 
514
  if (file_ == NULL) {
 
515
    // Stream not open.
 
516
    ASSERT(false);
 
517
    return false;
 
518
  }
 
519
 
 
520
  return flock(fileno(file_), LOCK_UN) == 0;
 
521
}
 
522
 
 
523
#endif
 
524
 
 
525
void FileStream::DoClose() {
 
526
  fclose(file_);
 
527
}
 
528
 
 
529
#ifdef POSIX
 
530
 
 
531
// Have to identically rewrite the FileStream destructor or else it would call
 
532
// the base class's Close() instead of the sub-class's.
 
533
POpenStream::~POpenStream() {
 
534
  POpenStream::Close();
 
535
}
 
536
 
 
537
bool POpenStream::Open(const std::string& subcommand,
 
538
                       const char* mode,
 
539
                       int *error) {
 
540
  Close();
 
541
  file_ = popen(subcommand.c_str(), mode);
 
542
  if (file_ == NULL) {
 
543
    if (error)
 
544
      *error = errno;
 
545
    return false;
 
546
  }
 
547
  return true;
 
548
}
 
549
 
 
550
bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
 
551
                            int shflag, int* error) {
 
552
  return Open(subcommand, mode, error);
 
553
}
 
554
 
 
555
void POpenStream::DoClose() {
 
556
  wait_status_ = pclose(file_);
 
557
}
 
558
 
 
559
#endif
 
560
 
 
561
///////////////////////////////////////////////////////////////////////////////
 
562
// MemoryStream
 
563
///////////////////////////////////////////////////////////////////////////////
 
564
 
 
565
MemoryStreamBase::MemoryStreamBase()
 
566
  : buffer_(NULL), buffer_length_(0), data_length_(0),
 
567
    seek_position_(0) {
 
568
}
 
569
 
 
570
StreamState MemoryStreamBase::GetState() const {
 
571
  return SS_OPEN;
 
572
}
 
573
 
 
574
StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
 
575
                                    size_t* bytes_read, int* error) {
 
576
  if (seek_position_ >= data_length_) {
 
577
    return SR_EOS;
 
578
  }
 
579
  size_t available = data_length_ - seek_position_;
 
580
  if (bytes > available) {
 
581
    // Read partial buffer
 
582
    bytes = available;
 
583
  }
 
584
  memcpy(buffer, &buffer_[seek_position_], bytes);
 
585
  seek_position_ += bytes;
 
586
  if (bytes_read) {
 
587
    *bytes_read = bytes;
 
588
  }
 
589
  return SR_SUCCESS;
 
590
}
 
591
 
 
592
StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
 
593
                                     size_t* bytes_written, int* error) {
 
594
  size_t available = buffer_length_ - seek_position_;
 
595
  if (0 == available) {
 
596
    // Increase buffer size to the larger of:
 
597
    // a) new position rounded up to next 256 bytes
 
598
    // b) double the previous length
 
599
    size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
 
600
                                    buffer_length_ * 2);
 
601
    StreamResult result = DoReserve(new_buffer_length, error);
 
602
    if (SR_SUCCESS != result) {
 
603
      return result;
 
604
    }
 
605
    ASSERT(buffer_length_ >= new_buffer_length);
 
606
    available = buffer_length_ - seek_position_;
 
607
  }
 
608
 
 
609
  if (bytes > available) {
 
610
    bytes = available;
 
611
  }
 
612
  memcpy(&buffer_[seek_position_], buffer, bytes);
 
613
  seek_position_ += bytes;
 
614
  if (data_length_ < seek_position_) {
 
615
    data_length_ = seek_position_;
 
616
  }
 
617
  if (bytes_written) {
 
618
    *bytes_written = bytes;
 
619
  }
 
620
  return SR_SUCCESS;
 
621
}
 
622
 
 
623
void MemoryStreamBase::Close() {
 
624
  // nothing to do
 
625
}
 
626
 
 
627
bool MemoryStreamBase::SetPosition(size_t position) {
 
628
  if (position > data_length_)
 
629
    return false;
 
630
  seek_position_ = position;
 
631
  return true;
 
632
}
 
633
 
 
634
bool MemoryStreamBase::GetPosition(size_t *position) const {
 
635
  if (position)
 
636
    *position = seek_position_;
 
637
  return true;
 
638
}
 
639
 
 
640
bool MemoryStreamBase::GetSize(size_t *size) const {
 
641
  if (size)
 
642
    *size = data_length_;
 
643
  return true;
 
644
}
 
645
 
 
646
bool MemoryStreamBase::GetAvailable(size_t *size) const {
 
647
  if (size)
 
648
    *size = data_length_ - seek_position_;
 
649
  return true;
 
650
}
 
651
 
 
652
bool MemoryStreamBase::ReserveSize(size_t size) {
 
653
  return (SR_SUCCESS == DoReserve(size, NULL));
 
654
}
 
655
 
 
656
StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
 
657
  return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
 
658
}
 
659
 
 
660
///////////////////////////////////////////////////////////////////////////////
 
661
 
 
662
MemoryStream::MemoryStream()
 
663
  : buffer_alloc_(NULL) {
 
664
}
 
665
 
 
666
MemoryStream::MemoryStream(const char* data)
 
667
  : buffer_alloc_(NULL) {
 
668
  SetData(data, strlen(data));
 
669
}
 
670
 
 
671
MemoryStream::MemoryStream(const void* data, size_t length)
 
672
  : buffer_alloc_(NULL) {
 
673
  SetData(data, length);
 
674
}
 
675
 
 
676
MemoryStream::~MemoryStream() {
 
677
  delete [] buffer_alloc_;
 
678
}
 
679
 
 
680
void MemoryStream::SetData(const void* data, size_t length) {
 
681
  data_length_ = buffer_length_ = length;
 
682
  delete [] buffer_alloc_;
 
683
  buffer_alloc_ = new char[buffer_length_ + kAlignment];
 
684
  buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
 
685
  memcpy(buffer_, data, data_length_);
 
686
  seek_position_ = 0;
 
687
}
 
688
 
 
689
StreamResult MemoryStream::DoReserve(size_t size, int* error) {
 
690
  if (buffer_length_ >= size)
 
691
    return SR_SUCCESS;
 
692
 
 
693
  if (char* new_buffer_alloc = new char[size + kAlignment]) {
 
694
    char* new_buffer = reinterpret_cast<char*>(
 
695
        ALIGNP(new_buffer_alloc, kAlignment));
 
696
    memcpy(new_buffer, buffer_, data_length_);
 
697
    delete [] buffer_alloc_;
 
698
    buffer_alloc_ = new_buffer_alloc;
 
699
    buffer_ = new_buffer;
 
700
    buffer_length_ = size;
 
701
    return SR_SUCCESS;
 
702
  }
 
703
 
 
704
  if (error) {
 
705
    *error = ENOMEM;
 
706
  }
 
707
  return SR_ERROR;
 
708
}
 
709
 
 
710
///////////////////////////////////////////////////////////////////////////////
 
711
 
 
712
ExternalMemoryStream::ExternalMemoryStream() {
 
713
}
 
714
 
 
715
ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
 
716
  SetData(data, length);
 
717
}
 
718
 
 
719
ExternalMemoryStream::~ExternalMemoryStream() {
 
720
}
 
721
 
 
722
void ExternalMemoryStream::SetData(void* data, size_t length) {
 
723
  data_length_ = buffer_length_ = length;
 
724
  buffer_ = static_cast<char*>(data);
 
725
  seek_position_ = 0;
 
726
}
 
727
 
 
728
///////////////////////////////////////////////////////////////////////////////
 
729
// FifoBuffer
 
730
///////////////////////////////////////////////////////////////////////////////
 
731
 
 
732
FifoBuffer::FifoBuffer(size_t size)
 
733
    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
 
734
      data_length_(0), read_position_(0), owner_(Thread::Current()) {
 
735
  // all events are done on the owner_ thread
 
736
}
 
737
 
 
738
FifoBuffer::FifoBuffer(size_t size, Thread *owner)
 
739
    : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
 
740
      data_length_(0), read_position_(0), owner_(owner) {
 
741
  // all events are done on the owner_ thread
 
742
}
 
743
 
 
744
FifoBuffer::~FifoBuffer() {
 
745
}
 
746
 
 
747
bool FifoBuffer::GetBuffered(size_t* size) const {
 
748
  CritScope cs(&crit_);
 
749
  *size = data_length_;
 
750
  return true;
 
751
}
 
752
 
 
753
bool FifoBuffer::SetCapacity(size_t size) {
 
754
  CritScope cs(&crit_);
 
755
  if (data_length_ > size) {
 
756
    return false;
 
757
  }
 
758
 
 
759
  if (size != buffer_length_) {
 
760
    char* buffer = new char[size];
 
761
    const size_t copy = data_length_;
 
762
    const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
 
763
    memcpy(buffer, &buffer_[read_position_], tail_copy);
 
764
    memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
 
765
    buffer_.reset(buffer);
 
766
    read_position_ = 0;
 
767
    buffer_length_ = size;
 
768
  }
 
769
  return true;
 
770
}
 
771
 
 
772
StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
 
773
                                    size_t offset, size_t* bytes_read) {
 
774
  CritScope cs(&crit_);
 
775
  return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
 
776
}
 
777
 
 
778
StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
 
779
                                     size_t offset, size_t* bytes_written) {
 
780
  CritScope cs(&crit_);
 
781
  return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
 
782
}
 
783
 
 
784
StreamState FifoBuffer::GetState() const {
 
785
  return state_;
 
786
}
 
787
 
 
788
StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
 
789
                              size_t* bytes_read, int* error) {
 
790
  CritScope cs(&crit_);
 
791
  const bool was_writable = data_length_ < buffer_length_;
 
792
  size_t copy = 0;
 
793
  StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
 
794
 
 
795
  if (result == SR_SUCCESS) {
 
796
    // If read was successful then adjust the read position and number of
 
797
    // bytes buffered.
 
798
    read_position_ = (read_position_ + copy) % buffer_length_;
 
799
    data_length_ -= copy;
 
800
    if (bytes_read) {
 
801
      *bytes_read = copy;
 
802
    }
 
803
 
 
804
    // if we were full before, and now we're not, post an event
 
805
    if (!was_writable && copy > 0) {
 
806
      PostEvent(owner_, SE_WRITE, 0);
 
807
    }
 
808
  }
 
809
  return result;
 
810
}
 
811
 
 
812
StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
 
813
                               size_t* bytes_written, int* error) {
 
814
  CritScope cs(&crit_);
 
815
 
 
816
  const bool was_readable = (data_length_ > 0);
 
817
  size_t copy = 0;
 
818
  StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
 
819
 
 
820
  if (result == SR_SUCCESS) {
 
821
    // If write was successful then adjust the number of readable bytes.
 
822
    data_length_ += copy;
 
823
    if (bytes_written) {
 
824
      *bytes_written = copy;
 
825
    }
 
826
 
 
827
    // if we didn't have any data to read before, and now we do, post an event
 
828
    if (!was_readable && copy > 0) {
 
829
      PostEvent(owner_, SE_READ, 0);
 
830
    }
 
831
  }
 
832
  return result;
 
833
}
 
834
 
 
835
void FifoBuffer::Close() {
 
836
  CritScope cs(&crit_);
 
837
  state_ = SS_CLOSED;
 
838
}
 
839
 
 
840
const void* FifoBuffer::GetReadData(size_t* size) {
 
841
  CritScope cs(&crit_);
 
842
  *size = (read_position_ + data_length_ <= buffer_length_) ?
 
843
      data_length_ : buffer_length_ - read_position_;
 
844
  return &buffer_[read_position_];
 
845
}
 
846
 
 
847
void FifoBuffer::ConsumeReadData(size_t size) {
 
848
  CritScope cs(&crit_);
 
849
  ASSERT(size <= data_length_);
 
850
  const bool was_writable = data_length_ < buffer_length_;
 
851
  read_position_ = (read_position_ + size) % buffer_length_;
 
852
  data_length_ -= size;
 
853
  if (!was_writable && size > 0) {
 
854
    PostEvent(owner_, SE_WRITE, 0);
 
855
  }
 
856
}
 
857
 
 
858
void* FifoBuffer::GetWriteBuffer(size_t* size) {
 
859
  CritScope cs(&crit_);
 
860
  if (state_ == SS_CLOSED) {
 
861
    return NULL;
 
862
  }
 
863
 
 
864
  // if empty, reset the write position to the beginning, so we can get
 
865
  // the biggest possible block
 
866
  if (data_length_ == 0) {
 
867
    read_position_ = 0;
 
868
  }
 
869
 
 
870
  const size_t write_position = (read_position_ + data_length_)
 
871
      % buffer_length_;
 
872
  *size = (write_position > read_position_ || data_length_ == 0) ?
 
873
      buffer_length_ - write_position : read_position_ - write_position;
 
874
  return &buffer_[write_position];
 
875
}
 
876
 
 
877
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
 
878
  CritScope cs(&crit_);
 
879
  ASSERT(size <= buffer_length_ - data_length_);
 
880
  const bool was_readable = (data_length_ > 0);
 
881
  data_length_ += size;
 
882
  if (!was_readable && size > 0) {
 
883
    PostEvent(owner_, SE_READ, 0);
 
884
  }
 
885
}
 
886
 
 
887
bool FifoBuffer::GetWriteRemaining(size_t* size) const {
 
888
  CritScope cs(&crit_);
 
889
  *size = buffer_length_ - data_length_;
 
890
  return true;
 
891
}
 
892
 
 
893
StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
 
894
                                          size_t bytes,
 
895
                                          size_t offset,
 
896
                                          size_t* bytes_read) {
 
897
  if (offset >= data_length_) {
 
898
    return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
 
899
  }
 
900
 
 
901
  const size_t available = data_length_ - offset;
 
902
  const size_t read_position = (read_position_ + offset) % buffer_length_;
 
903
  const size_t copy = _min(bytes, available);
 
904
  const size_t tail_copy = _min(copy, buffer_length_ - read_position);
 
905
  char* const p = static_cast<char*>(buffer);
 
906
  memcpy(p, &buffer_[read_position], tail_copy);
 
907
  memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
 
908
 
 
909
  if (bytes_read) {
 
910
    *bytes_read = copy;
 
911
  }
 
912
  return SR_SUCCESS;
 
913
}
 
914
 
 
915
StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
 
916
                                           size_t bytes,
 
917
                                           size_t offset,
 
918
                                           size_t* bytes_written) {
 
919
  if (state_ == SS_CLOSED) {
 
920
    return SR_EOS;
 
921
  }
 
922
 
 
923
  if (data_length_ + offset >= buffer_length_) {
 
924
    return SR_BLOCK;
 
925
  }
 
926
 
 
927
  const size_t available = buffer_length_ - data_length_ - offset;
 
928
  const size_t write_position = (read_position_ + data_length_ + offset)
 
929
      % buffer_length_;
 
930
  const size_t copy = _min(bytes, available);
 
931
  const size_t tail_copy = _min(copy, buffer_length_ - write_position);
 
932
  const char* const p = static_cast<const char*>(buffer);
 
933
  memcpy(&buffer_[write_position], p, tail_copy);
 
934
  memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
 
935
 
 
936
  if (bytes_written) {
 
937
    *bytes_written = copy;
 
938
  }
 
939
  return SR_SUCCESS;
 
940
}
 
941
 
 
942
 
 
943
 
 
944
///////////////////////////////////////////////////////////////////////////////
 
945
// LoggingAdapter
 
946
///////////////////////////////////////////////////////////////////////////////
 
947
 
 
948
LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
 
949
                               const std::string& label, bool hex_mode)
 
950
    : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
 
951
  set_label(label);
 
952
}
 
953
 
 
954
void LoggingAdapter::set_label(const std::string& label) {
 
955
  label_.assign("[");
 
956
  label_.append(label);
 
957
  label_.append("]");
 
958
}
 
959
 
 
960
StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
 
961
                                  size_t* read, int* error) {
 
962
  size_t local_read; if (!read) read = &local_read;
 
963
  StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
 
964
                                                     error);
 
965
  if (result == SR_SUCCESS) {
 
966
    LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
 
967
  }
 
968
  return result;
 
969
}
 
970
 
 
971
StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
 
972
                                   size_t* written, int* error) {
 
973
  size_t local_written;
 
974
  if (!written) written = &local_written;
 
975
  StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
 
976
                                                      error);
 
977
  if (result == SR_SUCCESS) {
 
978
    LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
 
979
                 &lms_);
 
980
  }
 
981
  return result;
 
982
}
 
983
 
 
984
void LoggingAdapter::Close() {
 
985
  LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
 
986
  LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
 
987
  LOG_V(level_) << label_ << " Closed locally";
 
988
  StreamAdapterInterface::Close();
 
989
}
 
990
 
 
991
void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
 
992
  if (events & SE_OPEN) {
 
993
    LOG_V(level_) << label_ << " Open";
 
994
  } else if (events & SE_CLOSE) {
 
995
    LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
 
996
    LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
 
997
    LOG_V(level_) << label_ << " Closed with error: " << err;
 
998
  }
 
999
  StreamAdapterInterface::OnEvent(stream, events, err);
 
1000
}
 
1001
 
 
1002
///////////////////////////////////////////////////////////////////////////////
 
1003
// StringStream - Reads/Writes to an external std::string
 
1004
///////////////////////////////////////////////////////////////////////////////
 
1005
 
 
1006
StringStream::StringStream(std::string& str)
 
1007
    : str_(str), read_pos_(0), read_only_(false) {
 
1008
}
 
1009
 
 
1010
StringStream::StringStream(const std::string& str)
 
1011
    : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
 
1012
}
 
1013
 
 
1014
StreamState StringStream::GetState() const {
 
1015
  return SS_OPEN;
 
1016
}
 
1017
 
 
1018
StreamResult StringStream::Read(void* buffer, size_t buffer_len,
 
1019
                                      size_t* read, int* error) {
 
1020
  size_t available = _min(buffer_len, str_.size() - read_pos_);
 
1021
  if (!available)
 
1022
    return SR_EOS;
 
1023
  memcpy(buffer, str_.data() + read_pos_, available);
 
1024
  read_pos_ += available;
 
1025
  if (read)
 
1026
    *read = available;
 
1027
  return SR_SUCCESS;
 
1028
}
 
1029
 
 
1030
StreamResult StringStream::Write(const void* data, size_t data_len,
 
1031
                                      size_t* written, int* error) {
 
1032
  if (read_only_) {
 
1033
    if (error) {
 
1034
      *error = -1;
 
1035
    }
 
1036
    return SR_ERROR;
 
1037
  }
 
1038
  str_.append(static_cast<const char*>(data),
 
1039
              static_cast<const char*>(data) + data_len);
 
1040
  if (written)
 
1041
    *written = data_len;
 
1042
  return SR_SUCCESS;
 
1043
}
 
1044
 
 
1045
void StringStream::Close() {
 
1046
}
 
1047
 
 
1048
bool StringStream::SetPosition(size_t position) {
 
1049
  if (position > str_.size())
 
1050
    return false;
 
1051
  read_pos_ = position;
 
1052
  return true;
 
1053
}
 
1054
 
 
1055
bool StringStream::GetPosition(size_t* position) const {
 
1056
  if (position)
 
1057
    *position = read_pos_;
 
1058
  return true;
 
1059
}
 
1060
 
 
1061
bool StringStream::GetSize(size_t* size) const {
 
1062
  if (size)
 
1063
    *size = str_.size();
 
1064
  return true;
 
1065
}
 
1066
 
 
1067
bool StringStream::GetAvailable(size_t* size) const {
 
1068
  if (size)
 
1069
    *size = str_.size() - read_pos_;
 
1070
  return true;
 
1071
}
 
1072
 
 
1073
bool StringStream::ReserveSize(size_t size) {
 
1074
  if (read_only_)
 
1075
    return false;
 
1076
  str_.reserve(size);
 
1077
  return true;
 
1078
}
 
1079
 
 
1080
///////////////////////////////////////////////////////////////////////////////
 
1081
// StreamReference
 
1082
///////////////////////////////////////////////////////////////////////////////
 
1083
 
 
1084
StreamReference::StreamReference(StreamInterface* stream)
 
1085
    : StreamAdapterInterface(stream, false) {
 
1086
  // owner set to false so the destructor does not free the stream.
 
1087
  stream_ref_count_ = new StreamRefCount(stream);
 
1088
}
 
1089
 
 
1090
StreamInterface* StreamReference::NewReference() {
 
1091
  stream_ref_count_->AddReference();
 
1092
  return new StreamReference(stream_ref_count_, stream());
 
1093
}
 
1094
 
 
1095
StreamReference::~StreamReference() {
 
1096
  stream_ref_count_->Release();
 
1097
}
 
1098
 
 
1099
StreamReference::StreamReference(StreamRefCount* stream_ref_count,
 
1100
                                 StreamInterface* stream)
 
1101
    : StreamAdapterInterface(stream, false),
 
1102
      stream_ref_count_(stream_ref_count) {
 
1103
}
 
1104
 
 
1105
///////////////////////////////////////////////////////////////////////////////
 
1106
 
 
1107
StreamResult Flow(StreamInterface* source,
 
1108
                  char* buffer, size_t buffer_len,
 
1109
                  StreamInterface* sink,
 
1110
                  size_t* data_len /* = NULL */) {
 
1111
  ASSERT(buffer_len > 0);
 
1112
 
 
1113
  StreamResult result;
 
1114
  size_t count, read_pos, write_pos;
 
1115
  if (data_len) {
 
1116
    read_pos = *data_len;
 
1117
  } else {
 
1118
    read_pos = 0;
 
1119
  }
 
1120
 
 
1121
  bool end_of_stream = false;
 
1122
  do {
 
1123
    // Read until buffer is full, end of stream, or error
 
1124
    while (!end_of_stream && (read_pos < buffer_len)) {
 
1125
      result = source->Read(buffer + read_pos, buffer_len - read_pos,
 
1126
                            &count, NULL);
 
1127
      if (result == SR_EOS) {
 
1128
        end_of_stream = true;
 
1129
      } else if (result != SR_SUCCESS) {
 
1130
        if (data_len) {
 
1131
          *data_len = read_pos;
 
1132
        }
 
1133
        return result;
 
1134
      } else {
 
1135
        read_pos += count;
 
1136
      }
 
1137
    }
 
1138
 
 
1139
    // Write until buffer is empty, or error (including end of stream)
 
1140
    write_pos = 0;
 
1141
    while (write_pos < read_pos) {
 
1142
      result = sink->Write(buffer + write_pos, read_pos - write_pos,
 
1143
                           &count, NULL);
 
1144
      if (result != SR_SUCCESS) {
 
1145
        if (data_len) {
 
1146
          *data_len = read_pos - write_pos;
 
1147
          if (write_pos > 0) {
 
1148
            memmove(buffer, buffer + write_pos, *data_len);
 
1149
          }
 
1150
        }
 
1151
        return result;
 
1152
      }
 
1153
      write_pos += count;
 
1154
    }
 
1155
 
 
1156
    read_pos = 0;
 
1157
  } while (!end_of_stream);
 
1158
 
 
1159
  if (data_len) {
 
1160
    *data_len = 0;
 
1161
  }
 
1162
  return SR_SUCCESS;
 
1163
}
 
1164
 
 
1165
///////////////////////////////////////////////////////////////////////////////
 
1166
 
 
1167
}  // namespace talk_base