124
115
GAsyncResult *result,
118
static void g_buffered_input_stream_seekable_iface_init (GSeekableIface *iface);
119
static goffset g_buffered_input_stream_tell (GSeekable *seekable);
120
static gboolean g_buffered_input_stream_can_seek (GSeekable *seekable);
121
static gboolean g_buffered_input_stream_seek (GSeekable *seekable,
124
GCancellable *cancellable,
126
static gboolean g_buffered_input_stream_can_truncate (GSeekable *seekable);
127
static gboolean g_buffered_input_stream_truncate (GSeekable *seekable,
129
GCancellable *cancellable,
132
static void g_buffered_input_stream_finalize (GObject *object);
127
134
static void compact_buffer (GBufferedInputStream *stream);
129
G_DEFINE_TYPE (GBufferedInputStream,
130
g_buffered_input_stream,
131
G_TYPE_FILTER_INPUT_STREAM)
136
G_DEFINE_TYPE_WITH_CODE (GBufferedInputStream,
137
g_buffered_input_stream,
138
G_TYPE_FILTER_INPUT_STREAM,
139
G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
140
g_buffered_input_stream_seekable_iface_init))
135
143
g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
467
483
gpointer user_data)
469
485
GBufferedInputStreamClass *class;
470
GSimpleAsyncResult *simple;
471
486
GError *error = NULL;
473
488
g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
477
simple = g_simple_async_result_new (G_OBJECT (stream),
480
g_buffered_input_stream_fill_async);
481
g_simple_async_result_complete_in_idle (simple);
482
g_object_unref (simple);
494
task = g_task_new (stream, cancellable, callback, user_data);
495
g_task_set_source_tag (task, g_buffered_input_stream_fill_async);
496
g_task_return_int (task, 0);
497
g_object_unref (task);
488
g_simple_async_report_error_in_idle (G_OBJECT (stream),
491
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
492
_("Too large count value passed to %s"),
503
g_task_report_new_error (stream, callback, user_data,
504
g_buffered_input_stream_fill_async,
505
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
506
_("Too large count value passed to %s"),
497
511
if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
499
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
513
g_task_report_error (stream, callback, user_data,
514
g_buffered_input_stream_fill_async,
526
539
GAsyncResult *result,
529
GSimpleAsyncResult *simple;
530
542
GBufferedInputStreamClass *class;
532
544
g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
533
545
g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
535
if (G_IS_SIMPLE_ASYNC_RESULT (result))
537
simple = G_SIMPLE_ASYNC_RESULT (result);
538
if (g_simple_async_result_propagate_error (simple, error))
541
/* Special case read of 0 bytes */
542
if (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_fill_async)
547
if (g_async_result_legacy_propagate_error (result, error))
549
else if (g_async_result_is_tagged (result, g_buffered_input_stream_fill_async))
550
return g_task_propagate_int (G_TASK (result), error);
546
552
class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
547
553
return class->fill_finish (stream, result, error);
838
844
return bytes_read;
848
g_buffered_input_stream_tell (GSeekable *seekable)
850
GBufferedInputStream *bstream;
851
GBufferedInputStreamPrivate *priv;
852
GInputStream *base_stream;
853
GSeekable *base_stream_seekable;
857
bstream = G_BUFFERED_INPUT_STREAM (seekable);
858
priv = bstream->priv;
860
base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
861
if (!G_IS_SEEKABLE (base_stream))
863
base_stream_seekable = G_SEEKABLE (base_stream);
865
available = priv->end - priv->pos;
866
base_offset = g_seekable_tell (base_stream_seekable);
868
return base_offset - available;
872
g_buffered_input_stream_can_seek (GSeekable *seekable)
874
GInputStream *base_stream;
876
base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
877
return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
881
g_buffered_input_stream_seek (GSeekable *seekable,
884
GCancellable *cancellable,
887
GBufferedInputStream *bstream;
888
GBufferedInputStreamPrivate *priv;
889
GInputStream *base_stream;
890
GSeekable *base_stream_seekable;
892
bstream = G_BUFFERED_INPUT_STREAM (seekable);
893
priv = bstream->priv;
895
base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
896
if (!G_IS_SEEKABLE (base_stream))
898
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
899
_("Seek not supported on base stream"));
903
base_stream_seekable = G_SEEKABLE (base_stream);
905
if (type == G_SEEK_CUR)
907
if (offset <= priv->end - priv->pos && offset >= -priv->pos)
914
offset -= priv->end - priv->pos;
918
if (g_seekable_seek (base_stream_seekable, offset, type, cancellable, error))
931
g_buffered_input_stream_can_truncate (GSeekable *seekable)
937
g_buffered_input_stream_truncate (GSeekable *seekable,
939
GCancellable *cancellable,
942
g_set_error_literal (error,
944
G_IO_ERROR_NOT_SUPPORTED,
945
_("Cannot truncate GBufferedInputStream"));
842
950
* g_buffered_input_stream_read_byte:
843
951
* @stream: a #GBufferedInputStream
930
GSimpleAsyncResult *simple;
1038
GTask *task = user_data;
935
1041
res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
936
1042
result, &error);
938
g_simple_async_result_set_op_res_gssize (simple, res);
941
g_simple_async_result_take_error (simple, error);
1044
g_task_return_error (task, error);
1047
GBufferedInputStream *stream;
945
1048
GBufferedInputStreamPrivate *priv;
948
object = g_async_result_get_source_object (G_ASYNC_RESULT (simple));
949
priv = G_BUFFERED_INPUT_STREAM (object)->priv;
1050
stream = g_task_get_source_object (task);
1051
priv = G_BUFFERED_INPUT_STREAM (stream)->priv;
951
1053
g_assert_cmpint (priv->end + res, <=, priv->len);
952
1054
priv->end += res;
954
g_object_unref (object);
1056
g_task_return_int (task, res);
957
/* Complete immediately, not in idle, since we're already
958
* in a mainloop callout
960
g_simple_async_result_complete (simple);
961
g_object_unref (simple);
1059
g_object_unref (task);
1007
1103
GAsyncResult *result,
1008
1104
GError **error)
1010
GSimpleAsyncResult *simple;
1013
simple = G_SIMPLE_ASYNC_RESULT (result);
1014
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_real_fill_async);
1016
nread = g_simple_async_result_get_op_res_gssize (simple);
1028
free_read_async_data (gpointer _data)
1030
ReadAsyncData *data = _data;
1031
g_slice_free (ReadAsyncData, data);
1035
large_read_callback (GObject *source_object,
1036
GAsyncResult *result,
1039
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1040
ReadAsyncData *data;
1044
data = g_simple_async_result_get_op_res_gpointer (simple);
1047
nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1050
/* Only report the error if we've not already read some data */
1051
if (nread < 0 && data->bytes_read == 0)
1052
g_simple_async_result_take_error (simple, error);
1054
g_error_free (error);
1057
data->bytes_read += nread;
1059
/* Complete immediately, not in idle, since we're already
1060
* in a mainloop callout
1062
g_simple_async_result_complete (simple);
1063
g_object_unref (simple);
1067
read_fill_buffer_callback (GObject *source_object,
1068
GAsyncResult *result,
1071
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
1072
GBufferedInputStream *bstream;
1073
GBufferedInputStreamPrivate *priv;
1074
ReadAsyncData *data;
1079
bstream = G_BUFFERED_INPUT_STREAM (source_object);
1080
priv = bstream->priv;
1082
data = g_simple_async_result_get_op_res_gpointer (simple);
1085
nread = g_buffered_input_stream_fill_finish (bstream,
1088
if (nread < 0 && data->bytes_read == 0)
1089
g_simple_async_result_take_error (simple, error);
1091
g_error_free (error);
1095
available = priv->end - priv->pos;
1096
data->count = MIN (data->count, available);
1098
memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
1099
data->bytes_read += data->count;
1100
priv->pos += data->count;
1103
/* Complete immediately, not in idle, since we're already
1104
* in a mainloop callout
1106
g_simple_async_result_complete (simple);
1107
g_object_unref (simple);
1111
g_buffered_input_stream_read_async (GInputStream *stream,
1115
GCancellable *cancellable,
1116
GAsyncReadyCallback callback,
1119
GBufferedInputStream *bstream;
1120
GBufferedInputStreamPrivate *priv;
1121
GBufferedInputStreamClass *class;
1122
GInputStream *base_stream;
1124
GSimpleAsyncResult *simple;
1125
ReadAsyncData *data;
1127
bstream = G_BUFFERED_INPUT_STREAM (stream);
1128
priv = bstream->priv;
1130
data = g_slice_new (ReadAsyncData);
1131
data->buffer = buffer;
1132
data->bytes_read = 0;
1133
simple = g_simple_async_result_new (G_OBJECT (stream),
1134
callback, user_data,
1135
g_buffered_input_stream_read_async);
1136
g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
1138
available = priv->end - priv->pos;
1140
if (count <= available)
1142
memcpy (buffer, priv->buffer + priv->pos, count);
1144
data->bytes_read = count;
1146
g_simple_async_result_complete_in_idle (simple);
1147
g_object_unref (simple);
1152
/* Full request not available, read all currently available
1153
* and request refill for more
1156
memcpy (buffer, priv->buffer + priv->pos, available);
1162
data->bytes_read = available;
1163
data->count = count;
1165
if (count > priv->len)
1167
/* Large request, shortcut buffer */
1169
base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1171
g_input_stream_read_async (base_stream,
1172
(char *)buffer + data->bytes_read,
1174
io_priority, cancellable,
1175
large_read_callback,
1180
class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1181
class->fill_async (bstream, priv->len, io_priority, cancellable,
1182
read_fill_buffer_callback, simple);
1187
g_buffered_input_stream_read_finish (GInputStream *stream,
1188
GAsyncResult *result,
1191
GSimpleAsyncResult *simple;
1192
ReadAsyncData *data;
1194
simple = G_SIMPLE_ASYNC_RESULT (result);
1196
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
1198
data = g_simple_async_result_get_op_res_gpointer (simple);
1200
return data->bytes_read;
1106
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1108
return g_task_propagate_int (G_TASK (result), error);
1261
1170
bstream = G_BUFFERED_INPUT_STREAM (source_object);
1262
1171
priv = bstream->priv;
1264
data = g_simple_async_result_get_op_res_gpointer (simple);
1173
data = g_task_get_task_data (task);
1267
1176
nread = g_buffered_input_stream_fill_finish (bstream,
1268
1177
result, &error);
1270
1179
if (nread < 0 && data->bytes_skipped == 0)
1271
g_simple_async_result_take_error (simple, error);
1273
g_error_free (error);
1180
g_task_return_error (task, error);
1277
available = priv->end - priv->pos;
1278
data->count = MIN (data->count, available);
1280
data->bytes_skipped += data->count;
1281
priv->pos += data->count;
1184
g_error_free (error);
1188
available = priv->end - priv->pos;
1189
data->count = MIN (data->count, available);
1191
data->bytes_skipped += data->count;
1192
priv->pos += data->count;
1195
g_task_return_int (task, data->bytes_skipped);
1284
/* Complete immediately, not in idle, since we're already
1285
* in a mainloop callout
1287
g_simple_async_result_complete (simple);
1288
g_object_unref (simple);
1198
g_object_unref (task);