1
/* Copyright (c) 2003-2013 Dovecot authors, see the included COPYING file */
5
#include "istream-private.h"
6
#include "istream-chain.h"
10
struct istream_chain_link {
11
struct istream_chain_link *prev, *next;
13
struct istream *stream;
17
struct istream_chain {
18
struct istream_chain_link *head, *tail;
20
struct chain_istream *stream;
23
struct chain_istream {
24
struct istream_private istream;
26
size_t prev_stream_left, prev_skip;
28
struct istream_chain chain;
31
static void ATTR_NULL(2)
32
i_stream_chain_append_internal(struct istream_chain *chain,
33
struct istream *stream)
35
struct istream_chain_link *link;
37
if (stream == NULL && chain->tail != NULL && chain->tail->stream == NULL)
40
link = i_new(struct istream_chain_link, 1);
41
link->stream = stream;
42
link->eof = stream == NULL;
47
if (chain->head == NULL && stream != NULL) {
48
if (chain->stream->istream.max_buffer_size == 0) {
49
chain->stream->istream.max_buffer_size =
50
stream->real_stream->max_buffer_size;
52
i_stream_set_max_buffer_size(stream,
53
chain->stream->istream.max_buffer_size);
56
DLLIST2_APPEND(&chain->head, &chain->tail, link);
59
void i_stream_chain_append(struct istream_chain *chain, struct istream *stream)
61
i_stream_chain_append_internal(chain, stream);
64
void i_stream_chain_append_eof(struct istream_chain *chain)
66
i_stream_chain_append_internal(chain, NULL);
70
i_stream_chain_set_max_buffer_size(struct iostream_private *stream,
73
struct chain_istream *cstream = (struct chain_istream *)stream;
74
struct istream_chain_link *link = cstream->chain.head;
76
cstream->istream.max_buffer_size = max_size;
77
while (link != NULL) {
78
if (link->stream != NULL)
79
i_stream_set_max_buffer_size(link->stream, max_size);
84
static void i_stream_chain_destroy(struct iostream_private *stream)
86
struct chain_istream *cstream = (struct chain_istream *)stream;
87
struct istream_chain_link *link = cstream->chain.head;
89
while (link != NULL) {
90
struct istream_chain_link *next = link->next;
92
if (link->stream != NULL)
93
i_stream_unref(&link->stream);
97
i_free(cstream->istream.w_buffer);
100
static void i_stream_chain_read_next(struct chain_istream *cstream)
102
struct istream_chain_link *link = cstream->chain.head;
103
struct istream *prev_input;
104
const unsigned char *data;
105
size_t data_size, size, cur_data_pos;
107
i_assert(link != NULL && link->stream != NULL);
108
i_assert(link->stream->eof);
110
prev_input = link->stream;
111
data = i_stream_get_data(prev_input, &data_size);
113
DLLIST2_REMOVE(&cstream->chain.head, &cstream->chain.tail, link);
116
/* a) we have more streams, b) we have EOF, c) we need to wait
118
link = cstream->chain.head;
119
if (link != NULL && link->stream != NULL)
120
i_stream_seek(link->stream, 0);
122
if (cstream->prev_stream_left > 0) {
123
/* we've already buffered some of the prev_input. continue
124
appending the rest to it. */
125
cur_data_pos = cstream->istream.pos -
126
(cstream->istream.skip + cstream->prev_stream_left);
127
i_assert(cur_data_pos <= data_size);
128
data += cur_data_pos;
129
data_size -= cur_data_pos;
131
cstream->istream.pos = 0;
132
cstream->istream.skip = 0;
133
cstream->prev_stream_left = 0;
136
/* we already verified that the data size is less than the
137
maximum buffer size */
139
if (!i_stream_try_alloc(&cstream->istream, data_size, &size))
141
i_assert(size >= data_size);
143
memcpy(cstream->istream.w_buffer + cstream->istream.pos,
145
cstream->istream.pos += data_size;
146
cstream->prev_stream_left += data_size;
148
i_stream_skip(prev_input, i_stream_get_data_size(prev_input));
149
i_stream_unref(&prev_input);
152
static ssize_t i_stream_chain_read(struct istream_private *stream)
154
struct chain_istream *cstream = (struct chain_istream *)stream;
155
struct istream_chain_link *link = cstream->chain.head;
156
const unsigned char *data;
157
size_t size, data_size, cur_data_pos, new_pos, bytes_skipped;
158
size_t new_bytes_count;
161
if (link != NULL && link->eof) {
162
stream->istream.eof = TRUE;
166
i_assert(stream->skip >= cstream->prev_skip);
167
bytes_skipped = stream->skip - cstream->prev_skip;
169
if (cstream->prev_stream_left == 0) {
170
/* no need to worry about buffers, skip everything */
171
} else if (bytes_skipped < cstream->prev_stream_left) {
172
/* we're still skipping inside buffer */
173
cstream->prev_stream_left -= bytes_skipped;
176
/* done with the buffer */
177
bytes_skipped -= cstream->prev_stream_left;
178
cstream->prev_stream_left = 0;
180
stream->pos -= bytes_skipped;
181
stream->skip -= bytes_skipped;
182
stream->buffer += bytes_skipped;
183
cstream->prev_skip = stream->skip;
186
i_assert(bytes_skipped == 0);
189
i_stream_skip(link->stream, bytes_skipped);
191
i_assert(stream->pos >= stream->skip + cstream->prev_stream_left);
192
cur_data_pos = stream->pos - (stream->skip + cstream->prev_stream_left);
194
data = i_stream_get_data(link->stream, &data_size);
195
if (data_size > cur_data_pos)
198
/* need to read more */
199
i_assert(cur_data_pos == data_size);
200
ret = i_stream_read(link->stream);
201
if (ret == -2 || ret == 0)
205
if (link->stream->stream_errno != 0) {
206
io_stream_set_error(&stream->iostream,
207
"read(%s) failed: %s",
208
i_stream_get_name(link->stream),
209
i_stream_get_error(link->stream));
210
stream->istream.stream_errno =
211
link->stream->stream_errno;
214
/* EOF of this stream, go to next stream */
215
i_stream_chain_read_next(cstream);
216
cstream->prev_skip = stream->skip;
217
return i_stream_chain_read(stream);
219
/* we read something */
220
data = i_stream_get_data(link->stream, &data_size);
223
if (cstream->prev_stream_left == 0) {
224
/* we can point directly to the current stream's buffers */
225
stream->buffer = data;
226
stream->pos -= stream->skip;
229
} else if (data_size == cur_data_pos) {
230
/* nothing new read */
231
i_assert(ret == 0 || ret == -1);
232
stream->buffer = stream->w_buffer;
233
new_pos = stream->pos;
235
/* we still have some of the previous stream left. merge the
237
i_assert(data_size > cur_data_pos);
238
new_bytes_count = data_size - cur_data_pos;
239
if (!i_stream_try_alloc(stream, new_bytes_count, &size)) {
240
stream->buffer = stream->w_buffer;
243
stream->buffer = stream->w_buffer;
245
if (new_bytes_count > size)
246
new_bytes_count = size;
247
memcpy(stream->w_buffer + stream->pos,
248
data + cur_data_pos, new_bytes_count);
249
new_pos = stream->pos + new_bytes_count;
252
ret = new_pos > stream->pos ? (ssize_t)(new_pos - stream->pos) :
254
stream->pos = new_pos;
255
cstream->prev_skip = stream->skip;
259
struct istream *i_stream_create_chain(struct istream_chain **chain_r)
261
struct chain_istream *cstream;
263
cstream = i_new(struct chain_istream, 1);
264
cstream->chain.stream = cstream;
265
cstream->istream.max_buffer_size = 256;
267
cstream->istream.iostream.destroy = i_stream_chain_destroy;
268
cstream->istream.iostream.set_max_buffer_size =
269
i_stream_chain_set_max_buffer_size;
271
cstream->istream.read = i_stream_chain_read;
273
cstream->istream.istream.readable_fd = FALSE;
274
cstream->istream.istream.blocking = FALSE;
275
cstream->istream.istream.seekable = FALSE;
277
*chain_r = &cstream->chain;
278
return i_stream_create(&cstream->istream, NULL, -1);