3
* Copyright 2004 Google Inc.
5
* Redistribution and use in source and binary forms, with or without
6
* modification, are permitted provided that the following conditions are met:
8
* 1. Redistributions of source code must retain the above copyright notice,
9
* this list of conditions and the following disclaimer.
10
* 2. Redistributions in binary form must reproduce the above copyright notice,
11
* this list of conditions and the following disclaimer in the documentation
12
* and/or other materials provided with the distribution.
13
* 3. The name of the author may not be used to endorse or promote products
14
* derived from this software without specific prior written permission.
16
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
#include <sys/types.h>
35
#include "talk/base/basictypes.h"
36
#include "talk/base/common.h"
37
#include "talk/base/messagequeue.h"
38
#include "talk/base/stream.h"
39
#include "talk/base/stringencode.h"
40
#include "talk/base/stringutils.h"
41
#include "talk/base/thread.h"
44
#include "talk/base/win32.h"
45
#define fileno _fileno
50
///////////////////////////////////////////////////////////////////////////////
52
///////////////////////////////////////////////////////////////////////////////
53
StreamInterface::~StreamInterface() {
56
struct PostEventData : public MessageData {
58
PostEventData(int ev, int er) : events(ev), error(er) { }
61
StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
62
size_t* written, int* error) {
63
StreamResult result = SR_SUCCESS;
64
size_t total_written = 0, current_written;
65
while (total_written < data_len) {
66
result = Write(static_cast<const char*>(data) + total_written,
67
data_len - total_written, ¤t_written, error);
68
if (result != SR_SUCCESS)
70
total_written += current_written;
73
*written = total_written;
77
StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
78
size_t* read, int* error) {
79
StreamResult result = SR_SUCCESS;
80
size_t total_read = 0, current_read;
81
while (total_read < buffer_len) {
82
result = Read(static_cast<char*>(buffer) + total_read,
83
buffer_len - total_read, ¤t_read, error);
84
if (result != SR_SUCCESS)
86
total_read += current_read;
93
StreamResult StreamInterface::ReadLine(std::string* line) {
95
StreamResult result = SR_SUCCESS;
98
result = Read(&ch, sizeof(ch), NULL, NULL);
99
if (result != SR_SUCCESS) {
107
if (!line->empty()) { // give back the line we've collected so far with
108
result = SR_SUCCESS; // a success code. Otherwise return the last code
113
void StreamInterface::PostEvent(Thread* t, int events, int err) {
114
t->Post(this, MSG_POST_EVENT, new PostEventData(events, err));
117
void StreamInterface::PostEvent(int events, int err) {
118
PostEvent(Thread::Current(), events, err);
121
StreamInterface::StreamInterface() {
124
void StreamInterface::OnMessage(Message* msg) {
125
if (MSG_POST_EVENT == msg->message_id) {
126
PostEventData* pe = static_cast<PostEventData*>(msg->pdata);
127
SignalEvent(this, pe->events, pe->error);
132
///////////////////////////////////////////////////////////////////////////////
133
// StreamAdapterInterface
134
///////////////////////////////////////////////////////////////////////////////
136
StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
138
: stream_(stream), owned_(owned) {
140
stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
143
void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
145
stream_->SignalEvent.disconnect(this);
151
stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
154
StreamInterface* StreamAdapterInterface::Detach() {
156
stream_->SignalEvent.disconnect(this);
157
StreamInterface* stream = stream_;
162
StreamAdapterInterface::~StreamAdapterInterface() {
167
///////////////////////////////////////////////////////////////////////////////
169
///////////////////////////////////////////////////////////////////////////////
171
StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
172
: StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
177
void StreamTap::AttachTap(StreamInterface* tap) {
181
StreamInterface* StreamTap::DetachTap() {
182
return tap_.release();
185
StreamResult StreamTap::GetTapResult(int* error) {
192
StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
193
size_t* read, int* error) {
198
StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
200
if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
201
tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
206
StreamResult StreamTap::Write(const void* data, size_t data_len,
207
size_t* written, int* error) {
208
size_t backup_written;
210
written = &backup_written;
212
StreamResult res = StreamAdapterInterface::Write(data, data_len,
214
if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
215
tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
220
///////////////////////////////////////////////////////////////////////////////
222
///////////////////////////////////////////////////////////////////////////////
224
StreamSegment::StreamSegment(StreamInterface* stream)
225
: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
226
length_(SIZE_UNKNOWN) {
227
// It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
228
stream->GetPosition(&start_);
231
StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
232
: StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
234
// It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
235
stream->GetPosition(&start_);
238
StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
239
size_t* read, int* error) {
240
if (SIZE_UNKNOWN != length_) {
243
buffer_len = _min(buffer_len, length_ - pos_);
249
StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
251
if (SR_SUCCESS == result) {
257
bool StreamSegment::SetPosition(size_t position) {
258
if (SIZE_UNKNOWN == start_)
259
return false; // Not seekable
260
if ((SIZE_UNKNOWN != length_) && (position > length_))
261
return false; // Seek past end of segment
262
if (!StreamAdapterInterface::SetPosition(start_ + position))
268
bool StreamSegment::GetPosition(size_t* position) const {
269
if (SIZE_UNKNOWN == start_)
270
return false; // Not seekable
271
if (!StreamAdapterInterface::GetPosition(position))
274
ASSERT(*position >= start_);
280
bool StreamSegment::GetSize(size_t* size) const {
281
if (!StreamAdapterInterface::GetSize(size))
284
if (SIZE_UNKNOWN != start_) {
285
ASSERT(*size >= start_);
288
if (SIZE_UNKNOWN != length_) {
289
*size = _min(*size, length_);
295
bool StreamSegment::GetAvailable(size_t* size) const {
296
if (!StreamAdapterInterface::GetAvailable(size))
298
if (size && (SIZE_UNKNOWN != length_))
299
*size = _min(*size, length_ - pos_);
303
///////////////////////////////////////////////////////////////////////////////
305
///////////////////////////////////////////////////////////////////////////////
307
NullStream::NullStream() {
310
NullStream::~NullStream() {
313
StreamState NullStream::GetState() const {
317
StreamResult NullStream::Read(void* buffer, size_t buffer_len,
318
size_t* read, int* error) {
319
if (error) *error = -1;
323
StreamResult NullStream::Write(const void* data, size_t data_len,
324
size_t* written, int* error) {
325
if (written) *written = data_len;
329
void NullStream::Close() {
332
///////////////////////////////////////////////////////////////////////////////
334
///////////////////////////////////////////////////////////////////////////////
336
FileStream::FileStream() : file_(NULL) {
339
FileStream::~FileStream() {
343
bool FileStream::Open(const std::string& filename, const char* mode,
347
std::wstring wfilename;
348
if (Utf8ToWindowsFilename(filename, &wfilename)) {
349
file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
357
file_ = fopen(filename.c_str(), mode);
359
if (!file_ && error) {
362
return (file_ != NULL);
365
bool FileStream::OpenShare(const std::string& filename, const char* mode,
366
int shflag, int* error) {
369
std::wstring wfilename;
370
if (Utf8ToWindowsFilename(filename, &wfilename)) {
371
file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
372
if (!file_ && error) {
376
return file_ != NULL;
384
return Open(filename, mode, error);
388
bool FileStream::DisableBuffering() {
391
return (setvbuf(file_, NULL, _IONBF, 0) == 0);
394
StreamState FileStream::GetState() const {
395
return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
398
StreamResult FileStream::Read(void* buffer, size_t buffer_len,
399
size_t* read, int* error) {
402
size_t result = fread(buffer, 1, buffer_len, file_);
403
if ((result == 0) && (buffer_len > 0)) {
415
StreamResult FileStream::Write(const void* data, size_t data_len,
416
size_t* written, int* error) {
419
size_t result = fwrite(data, 1, data_len, file_);
420
if ((result == 0) && (data_len > 0)) {
430
void FileStream::Close() {
437
bool FileStream::SetPosition(size_t position) {
440
return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
443
bool FileStream::GetPosition(size_t* position) const {
444
ASSERT(NULL != position);
447
long result = ftell(file_);
455
bool FileStream::GetSize(size_t* size) const {
456
ASSERT(NULL != size);
459
struct stat file_stats;
460
if (fstat(fileno(file_), &file_stats) != 0)
463
*size = file_stats.st_size;
467
bool FileStream::GetAvailable(size_t* size) const {
468
ASSERT(NULL != size);
471
long result = ftell(file_);
479
bool FileStream::ReserveSize(size_t size) {
480
// TODO: extend the file to the proper length
484
bool FileStream::GetSize(const std::string& filename, size_t* size) {
485
struct stat file_stats;
486
if (stat(filename.c_str(), &file_stats) != 0)
488
*size = file_stats.st_size;
492
bool FileStream::Flush() {
494
return (0 == fflush(file_));
496
// try to flush empty file?
503
bool FileStream::TryLock() {
510
return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
513
bool FileStream::Unlock() {
520
return flock(fileno(file_), LOCK_UN) == 0;
525
void FileStream::DoClose() {
531
// Have to identically rewrite the FileStream destructor or else it would call
532
// the base class's Close() instead of the sub-class's.
533
POpenStream::~POpenStream() {
534
POpenStream::Close();
537
bool POpenStream::Open(const std::string& subcommand,
541
file_ = popen(subcommand.c_str(), mode);
550
bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
551
int shflag, int* error) {
552
return Open(subcommand, mode, error);
555
void POpenStream::DoClose() {
556
wait_status_ = pclose(file_);
561
///////////////////////////////////////////////////////////////////////////////
563
///////////////////////////////////////////////////////////////////////////////
565
MemoryStreamBase::MemoryStreamBase()
566
: buffer_(NULL), buffer_length_(0), data_length_(0),
570
StreamState MemoryStreamBase::GetState() const {
574
StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
575
size_t* bytes_read, int* error) {
576
if (seek_position_ >= data_length_) {
579
size_t available = data_length_ - seek_position_;
580
if (bytes > available) {
581
// Read partial buffer
584
memcpy(buffer, &buffer_[seek_position_], bytes);
585
seek_position_ += bytes;
592
StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
593
size_t* bytes_written, int* error) {
594
size_t available = buffer_length_ - seek_position_;
595
if (0 == available) {
596
// Increase buffer size to the larger of:
597
// a) new position rounded up to next 256 bytes
598
// b) double the previous length
599
size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
601
StreamResult result = DoReserve(new_buffer_length, error);
602
if (SR_SUCCESS != result) {
605
ASSERT(buffer_length_ >= new_buffer_length);
606
available = buffer_length_ - seek_position_;
609
if (bytes > available) {
612
memcpy(&buffer_[seek_position_], buffer, bytes);
613
seek_position_ += bytes;
614
if (data_length_ < seek_position_) {
615
data_length_ = seek_position_;
618
*bytes_written = bytes;
623
void MemoryStreamBase::Close() {
627
bool MemoryStreamBase::SetPosition(size_t position) {
628
if (position > data_length_)
630
seek_position_ = position;
634
bool MemoryStreamBase::GetPosition(size_t *position) const {
636
*position = seek_position_;
640
bool MemoryStreamBase::GetSize(size_t *size) const {
642
*size = data_length_;
646
bool MemoryStreamBase::GetAvailable(size_t *size) const {
648
*size = data_length_ - seek_position_;
652
bool MemoryStreamBase::ReserveSize(size_t size) {
653
return (SR_SUCCESS == DoReserve(size, NULL));
656
StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
657
return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
660
///////////////////////////////////////////////////////////////////////////////
662
MemoryStream::MemoryStream()
663
: buffer_alloc_(NULL) {
666
MemoryStream::MemoryStream(const char* data)
667
: buffer_alloc_(NULL) {
668
SetData(data, strlen(data));
671
MemoryStream::MemoryStream(const void* data, size_t length)
672
: buffer_alloc_(NULL) {
673
SetData(data, length);
676
MemoryStream::~MemoryStream() {
677
delete [] buffer_alloc_;
680
void MemoryStream::SetData(const void* data, size_t length) {
681
data_length_ = buffer_length_ = length;
682
delete [] buffer_alloc_;
683
buffer_alloc_ = new char[buffer_length_ + kAlignment];
684
buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
685
memcpy(buffer_, data, data_length_);
689
StreamResult MemoryStream::DoReserve(size_t size, int* error) {
690
if (buffer_length_ >= size)
693
if (char* new_buffer_alloc = new char[size + kAlignment]) {
694
char* new_buffer = reinterpret_cast<char*>(
695
ALIGNP(new_buffer_alloc, kAlignment));
696
memcpy(new_buffer, buffer_, data_length_);
697
delete [] buffer_alloc_;
698
buffer_alloc_ = new_buffer_alloc;
699
buffer_ = new_buffer;
700
buffer_length_ = size;
710
///////////////////////////////////////////////////////////////////////////////
712
ExternalMemoryStream::ExternalMemoryStream() {
715
ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
716
SetData(data, length);
719
ExternalMemoryStream::~ExternalMemoryStream() {
722
void ExternalMemoryStream::SetData(void* data, size_t length) {
723
data_length_ = buffer_length_ = length;
724
buffer_ = static_cast<char*>(data);
728
///////////////////////////////////////////////////////////////////////////////
730
///////////////////////////////////////////////////////////////////////////////
732
FifoBuffer::FifoBuffer(size_t size)
733
: state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
734
data_length_(0), read_position_(0), owner_(Thread::Current()) {
735
// all events are done on the owner_ thread
738
FifoBuffer::FifoBuffer(size_t size, Thread *owner)
739
: state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
740
data_length_(0), read_position_(0), owner_(owner) {
741
// all events are done on the owner_ thread
744
FifoBuffer::~FifoBuffer() {
747
bool FifoBuffer::GetBuffered(size_t* size) const {
748
CritScope cs(&crit_);
749
*size = data_length_;
753
bool FifoBuffer::SetCapacity(size_t size) {
754
CritScope cs(&crit_);
755
if (data_length_ > size) {
759
if (size != buffer_length_) {
760
char* buffer = new char[size];
761
const size_t copy = data_length_;
762
const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
763
memcpy(buffer, &buffer_[read_position_], tail_copy);
764
memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
765
buffer_.reset(buffer);
767
buffer_length_ = size;
772
StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
773
size_t offset, size_t* bytes_read) {
774
CritScope cs(&crit_);
775
return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
778
StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
779
size_t offset, size_t* bytes_written) {
780
CritScope cs(&crit_);
781
return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
784
StreamState FifoBuffer::GetState() const {
788
StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
789
size_t* bytes_read, int* error) {
790
CritScope cs(&crit_);
791
const bool was_writable = data_length_ < buffer_length_;
793
StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
795
if (result == SR_SUCCESS) {
796
// If read was successful then adjust the read position and number of
798
read_position_ = (read_position_ + copy) % buffer_length_;
799
data_length_ -= copy;
804
// if we were full before, and now we're not, post an event
805
if (!was_writable && copy > 0) {
806
PostEvent(owner_, SE_WRITE, 0);
812
StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
813
size_t* bytes_written, int* error) {
814
CritScope cs(&crit_);
816
const bool was_readable = (data_length_ > 0);
818
StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
820
if (result == SR_SUCCESS) {
821
// If write was successful then adjust the number of readable bytes.
822
data_length_ += copy;
824
*bytes_written = copy;
827
// if we didn't have any data to read before, and now we do, post an event
828
if (!was_readable && copy > 0) {
829
PostEvent(owner_, SE_READ, 0);
835
void FifoBuffer::Close() {
836
CritScope cs(&crit_);
840
const void* FifoBuffer::GetReadData(size_t* size) {
841
CritScope cs(&crit_);
842
*size = (read_position_ + data_length_ <= buffer_length_) ?
843
data_length_ : buffer_length_ - read_position_;
844
return &buffer_[read_position_];
847
void FifoBuffer::ConsumeReadData(size_t size) {
848
CritScope cs(&crit_);
849
ASSERT(size <= data_length_);
850
const bool was_writable = data_length_ < buffer_length_;
851
read_position_ = (read_position_ + size) % buffer_length_;
852
data_length_ -= size;
853
if (!was_writable && size > 0) {
854
PostEvent(owner_, SE_WRITE, 0);
858
void* FifoBuffer::GetWriteBuffer(size_t* size) {
859
CritScope cs(&crit_);
860
if (state_ == SS_CLOSED) {
864
// if empty, reset the write position to the beginning, so we can get
865
// the biggest possible block
866
if (data_length_ == 0) {
870
const size_t write_position = (read_position_ + data_length_)
872
*size = (write_position > read_position_ || data_length_ == 0) ?
873
buffer_length_ - write_position : read_position_ - write_position;
874
return &buffer_[write_position];
877
void FifoBuffer::ConsumeWriteBuffer(size_t size) {
878
CritScope cs(&crit_);
879
ASSERT(size <= buffer_length_ - data_length_);
880
const bool was_readable = (data_length_ > 0);
881
data_length_ += size;
882
if (!was_readable && size > 0) {
883
PostEvent(owner_, SE_READ, 0);
887
bool FifoBuffer::GetWriteRemaining(size_t* size) const {
888
CritScope cs(&crit_);
889
*size = buffer_length_ - data_length_;
893
StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
896
size_t* bytes_read) {
897
if (offset >= data_length_) {
898
return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
901
const size_t available = data_length_ - offset;
902
const size_t read_position = (read_position_ + offset) % buffer_length_;
903
const size_t copy = _min(bytes, available);
904
const size_t tail_copy = _min(copy, buffer_length_ - read_position);
905
char* const p = static_cast<char*>(buffer);
906
memcpy(p, &buffer_[read_position], tail_copy);
907
memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
915
StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
918
size_t* bytes_written) {
919
if (state_ == SS_CLOSED) {
923
if (data_length_ + offset >= buffer_length_) {
927
const size_t available = buffer_length_ - data_length_ - offset;
928
const size_t write_position = (read_position_ + data_length_ + offset)
930
const size_t copy = _min(bytes, available);
931
const size_t tail_copy = _min(copy, buffer_length_ - write_position);
932
const char* const p = static_cast<const char*>(buffer);
933
memcpy(&buffer_[write_position], p, tail_copy);
934
memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
937
*bytes_written = copy;
944
///////////////////////////////////////////////////////////////////////////////
946
///////////////////////////////////////////////////////////////////////////////
948
LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
949
const std::string& label, bool hex_mode)
950
: StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
954
void LoggingAdapter::set_label(const std::string& label) {
956
label_.append(label);
960
StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
961
size_t* read, int* error) {
962
size_t local_read; if (!read) read = &local_read;
963
StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
965
if (result == SR_SUCCESS) {
966
LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
971
StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
972
size_t* written, int* error) {
973
size_t local_written;
974
if (!written) written = &local_written;
975
StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
977
if (result == SR_SUCCESS) {
978
LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
984
void LoggingAdapter::Close() {
985
LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
986
LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
987
LOG_V(level_) << label_ << " Closed locally";
988
StreamAdapterInterface::Close();
991
void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
992
if (events & SE_OPEN) {
993
LOG_V(level_) << label_ << " Open";
994
} else if (events & SE_CLOSE) {
995
LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
996
LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
997
LOG_V(level_) << label_ << " Closed with error: " << err;
999
StreamAdapterInterface::OnEvent(stream, events, err);
1002
///////////////////////////////////////////////////////////////////////////////
1003
// StringStream - Reads/Writes to an external std::string
1004
///////////////////////////////////////////////////////////////////////////////
1006
StringStream::StringStream(std::string& str)
1007
: str_(str), read_pos_(0), read_only_(false) {
1010
StringStream::StringStream(const std::string& str)
1011
: str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1014
StreamState StringStream::GetState() const {
1018
StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1019
size_t* read, int* error) {
1020
size_t available = _min(buffer_len, str_.size() - read_pos_);
1023
memcpy(buffer, str_.data() + read_pos_, available);
1024
read_pos_ += available;
1030
StreamResult StringStream::Write(const void* data, size_t data_len,
1031
size_t* written, int* error) {
1038
str_.append(static_cast<const char*>(data),
1039
static_cast<const char*>(data) + data_len);
1041
*written = data_len;
1045
void StringStream::Close() {
1048
bool StringStream::SetPosition(size_t position) {
1049
if (position > str_.size())
1051
read_pos_ = position;
1055
bool StringStream::GetPosition(size_t* position) const {
1057
*position = read_pos_;
1061
bool StringStream::GetSize(size_t* size) const {
1063
*size = str_.size();
1067
bool StringStream::GetAvailable(size_t* size) const {
1069
*size = str_.size() - read_pos_;
1073
bool StringStream::ReserveSize(size_t size) {
1080
///////////////////////////////////////////////////////////////////////////////
1082
///////////////////////////////////////////////////////////////////////////////
1084
StreamReference::StreamReference(StreamInterface* stream)
1085
: StreamAdapterInterface(stream, false) {
1086
// owner set to false so the destructor does not free the stream.
1087
stream_ref_count_ = new StreamRefCount(stream);
1090
StreamInterface* StreamReference::NewReference() {
1091
stream_ref_count_->AddReference();
1092
return new StreamReference(stream_ref_count_, stream());
1095
StreamReference::~StreamReference() {
1096
stream_ref_count_->Release();
1099
StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1100
StreamInterface* stream)
1101
: StreamAdapterInterface(stream, false),
1102
stream_ref_count_(stream_ref_count) {
1105
///////////////////////////////////////////////////////////////////////////////
1107
StreamResult Flow(StreamInterface* source,
1108
char* buffer, size_t buffer_len,
1109
StreamInterface* sink,
1110
size_t* data_len /* = NULL */) {
1111
ASSERT(buffer_len > 0);
1113
StreamResult result;
1114
size_t count, read_pos, write_pos;
1116
read_pos = *data_len;
1121
bool end_of_stream = false;
1123
// Read until buffer is full, end of stream, or error
1124
while (!end_of_stream && (read_pos < buffer_len)) {
1125
result = source->Read(buffer + read_pos, buffer_len - read_pos,
1127
if (result == SR_EOS) {
1128
end_of_stream = true;
1129
} else if (result != SR_SUCCESS) {
1131
*data_len = read_pos;
1139
// Write until buffer is empty, or error (including end of stream)
1141
while (write_pos < read_pos) {
1142
result = sink->Write(buffer + write_pos, read_pos - write_pos,
1144
if (result != SR_SUCCESS) {
1146
*data_len = read_pos - write_pos;
1147
if (write_pos > 0) {
1148
memmove(buffer, buffer + write_pos, *data_len);
1157
} while (!end_of_stream);
1165
///////////////////////////////////////////////////////////////////////////////
1167
} // namespace talk_base