278
* g_input_stream_read_bytes:
279
* @stream: a #GInputStream.
280
* @count: maximum number of bytes that will be read from the stream. Common
281
* values include 4096 and 8192.
282
* @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
283
* @error: location to store the error occurring, or %NULL to ignore
285
* Like g_input_stream_read(), this tries to read @count bytes from
286
* the stream in a blocking fashion. However, rather than reading into
287
* a user-supplied buffer, this will create a new #GBytes containing
288
* the data that was read. This may be easier to use from language
291
* If count is zero, returns a zero-length #GBytes and does nothing. A
292
* value of @count larger than %G_MAXSSIZE will cause a
293
* %G_IO_ERROR_INVALID_ARGUMENT error.
295
* On success, a new #GBytes is returned. It is not an error if the
296
* size of this object is not the same as the requested size, as it
297
* can happen e.g. near the end of a file. A zero-length #GBytes is
298
* returned on end of file (or if @count is zero), but never
301
* If @cancellable is not %NULL, then the operation can be cancelled by
302
* triggering the cancellable object from another thread. If the operation
303
* was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
304
* operation was partially finished when the operation was cancelled the
305
* partial result will be returned, without an error.
307
* On error %NULL is returned and @error is set accordingly.
309
* Return value: a new #GBytes, or %NULL on error
312
g_input_stream_read_bytes (GInputStream *stream,
314
GCancellable *cancellable,
320
buf = g_malloc (count);
321
nread = g_input_stream_read (stream, buf, count, cancellable, error);
330
return g_bytes_new_static ("", 0);
333
return g_bytes_new_take (buf, nread);
277
337
* g_input_stream_skip:
278
338
* @stream: a #GInputStream.
279
339
* @count: the number of bytes that will be skipped from the stream
541
simple = g_simple_async_result_new (G_OBJECT (stream),
544
g_input_stream_read_async);
545
g_simple_async_result_complete_in_idle (simple);
546
g_object_unref (simple);
603
task = g_task_new (stream, cancellable, callback, user_data);
604
g_task_set_source_tag (task, g_input_stream_read_async);
605
g_task_return_int (task, 0);
606
g_object_unref (task);
550
610
if (((gssize) count) < 0)
552
g_simple_async_report_error_in_idle (G_OBJECT (stream),
555
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
556
_("Too large count value passed to %s"),
612
g_task_report_new_error (stream, callback, user_data,
613
g_input_stream_read_async,
614
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
615
_("Too large count value passed to %s"),
561
620
if (!g_input_stream_set_pending (stream, &error))
563
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
622
g_task_report_error (stream, callback, user_data,
623
g_input_stream_read_async,
590
648
GAsyncResult *result,
593
GSimpleAsyncResult *simple;
594
651
GInputStreamClass *class;
596
653
g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
597
654
g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
599
if (G_IS_SIMPLE_ASYNC_RESULT (result))
601
simple = G_SIMPLE_ASYNC_RESULT (result);
602
if (g_simple_async_result_propagate_error (simple, error))
605
/* Special case read of 0 bytes */
606
if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
656
if (g_async_result_legacy_propagate_error (result, error))
658
else if (g_async_result_is_tagged (result, g_input_stream_read_async))
659
return g_task_propagate_int (G_TASK (result), error);
610
661
class = G_INPUT_STREAM_GET_CLASS (stream);
611
662
return class->read_finish (stream, result, error);
666
read_bytes_callback (GObject *stream,
667
GAsyncResult *result,
670
GTask *task = user_data;
671
guchar *buf = g_task_get_task_data (task);
672
GError *error = NULL;
674
GBytes *bytes = NULL;
676
nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
681
g_task_return_error (task, error);
686
bytes = g_bytes_new_static ("", 0);
689
bytes = g_bytes_new_take (buf, nread);
692
g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
694
g_object_unref (task);
698
* g_input_stream_read_bytes_async:
699
* @stream: A #GInputStream.
700
* @count: the number of bytes that will be read from the stream
701
* @io_priority: the <link linkend="io-priority">I/O priority</link>
703
* @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
704
* @callback: (scope async): callback to call when the request is satisfied
705
* @user_data: (closure): the data to pass to callback function
707
* Request an asynchronous read of @count bytes from the stream into a
708
* new #GBytes. When the operation is finished @callback will be
709
* called. You can then call g_input_stream_read_bytes_finish() to get the
710
* result of the operation.
712
* During an async request no other sync and async calls are allowed
713
* on @stream, and will result in %G_IO_ERROR_PENDING errors.
715
* A value of @count larger than %G_MAXSSIZE will cause a
716
* %G_IO_ERROR_INVALID_ARGUMENT error.
718
* On success, the new #GBytes will be passed to the callback. It is
719
* not an error if this is smaller than the requested size, as it can
720
* happen e.g. near the end of a file, but generally we try to read as
721
* many bytes as requested. Zero is returned on end of file (or if
722
* @count is zero), but never otherwise.
724
* Any outstanding I/O request with higher priority (lower numerical
725
* value) will be executed before an outstanding request with lower
726
* priority. Default priority is %G_PRIORITY_DEFAULT.
729
g_input_stream_read_bytes_async (GInputStream *stream,
732
GCancellable *cancellable,
733
GAsyncReadyCallback callback,
739
task = g_task_new (stream, cancellable, callback, user_data);
740
buf = g_malloc (count);
741
g_task_set_task_data (task, buf, NULL);
743
g_input_stream_read_async (stream, buf, count,
744
io_priority, cancellable,
745
read_bytes_callback, task);
749
* g_input_stream_read_bytes_finish:
750
* @stream: a #GInputStream.
751
* @result: a #GAsyncResult.
752
* @error: a #GError location to store the error occurring, or %NULL to
755
* Finishes an asynchronous stream read-into-#GBytes operation.
757
* Returns: the newly-allocated #GBytes, or %NULL on error
760
g_input_stream_read_bytes_finish (GInputStream *stream,
761
GAsyncResult *result,
764
g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
765
g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
767
return g_task_propagate_pointer (G_TASK (result), error);
615
771
* g_input_stream_skip_async:
616
772
* @stream: A #GInputStream.
654
810
gpointer user_data)
656
812
GInputStreamClass *class;
657
GSimpleAsyncResult *simple;
658
813
GError *error = NULL;
660
815
g_return_if_fail (G_IS_INPUT_STREAM (stream));
664
simple = g_simple_async_result_new (G_OBJECT (stream),
667
g_input_stream_skip_async);
669
g_simple_async_result_complete_in_idle (simple);
670
g_object_unref (simple);
821
task = g_task_new (stream, cancellable, callback, user_data);
822
g_task_set_source_tag (task, g_input_stream_skip_async);
823
g_task_return_int (task, 0);
824
g_object_unref (task);
674
828
if (((gssize) count) < 0)
676
g_simple_async_report_error_in_idle (G_OBJECT (stream),
679
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
680
_("Too large count value passed to %s"),
830
g_task_report_new_error (stream, callback, user_data,
831
g_input_stream_skip_async,
832
G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
833
_("Too large count value passed to %s"),
685
838
if (!g_input_stream_set_pending (stream, &error))
687
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
840
g_task_report_error (stream, callback, user_data,
841
g_input_stream_skip_async,
714
866
GAsyncResult *result,
717
GSimpleAsyncResult *simple;
718
869
GInputStreamClass *class;
720
871
g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
721
872
g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
723
if (G_IS_SIMPLE_ASYNC_RESULT (result))
725
simple = G_SIMPLE_ASYNC_RESULT (result);
726
if (g_simple_async_result_propagate_error (simple, error))
729
/* Special case skip of 0 bytes */
730
if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
874
if (g_async_result_legacy_propagate_error (result, error))
876
else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
877
return g_task_propagate_int (G_TASK (result), error);
734
879
class = G_INPUT_STREAM_GET_CLASS (stream);
735
880
return class->skip_finish (stream, result, error);
763
908
gpointer user_data)
765
910
GInputStreamClass *class;
766
GSimpleAsyncResult *simple;
767
911
GError *error = NULL;
769
913
g_return_if_fail (G_IS_INPUT_STREAM (stream));
771
915
if (stream->priv->closed)
773
simple = g_simple_async_result_new (G_OBJECT (stream),
776
g_input_stream_close_async);
778
g_simple_async_result_complete_in_idle (simple);
779
g_object_unref (simple);
919
task = g_task_new (stream, cancellable, callback, user_data);
920
g_task_set_source_tag (task, g_input_stream_close_async);
921
g_task_return_boolean (task, TRUE);
922
g_object_unref (task);
783
926
if (!g_input_stream_set_pending (stream, &error))
785
g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
928
g_task_report_error (stream, callback, user_data,
929
g_input_stream_close_async,
812
954
GAsyncResult *result,
815
GSimpleAsyncResult *simple;
816
957
GInputStreamClass *class;
818
959
g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
819
960
g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
821
if (G_IS_SIMPLE_ASYNC_RESULT (result))
823
simple = G_SIMPLE_ASYNC_RESULT (result);
824
if (g_simple_async_result_propagate_error (simple, error))
827
/* Special case already closed */
828
if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
962
if (g_async_result_legacy_propagate_error (result, error))
964
else if (g_async_result_is_tagged (result, g_input_stream_close_async))
965
return g_task_propagate_boolean (G_TASK (result), error);
832
967
class = G_INPUT_STREAM_GET_CLASS (stream);
833
968
return class->close_finish (stream, result, error);
922
1057
********************************************/
924
1059
typedef struct {
926
gsize count_requested;
931
read_async_thread (GSimpleAsyncResult *res,
933
GCancellable *cancellable)
1065
free_read_data (ReadData *op)
1067
g_slice_free (ReadData, op);
1071
read_async_thread (GTask *task,
1072
gpointer source_object,
1074
GCancellable *cancellable)
1076
GInputStream *stream = source_object;
1077
ReadData *op = task_data;
936
1078
GInputStreamClass *class;
937
1079
GError *error = NULL;
939
op = g_simple_async_result_get_op_res_gpointer (res);
941
class = G_INPUT_STREAM_GET_CLASS (object);
943
op->count_read = class->read_fn (G_INPUT_STREAM (object),
944
op->buffer, op->count_requested,
945
cancellable, &error);
946
if (op->count_read == -1)
947
g_simple_async_result_take_error (res, error);
1082
class = G_INPUT_STREAM_GET_CLASS (stream);
1084
nread = class->read_fn (stream,
1085
op->buffer, op->count,
1086
g_task_get_cancellable (task),
1089
g_task_return_error (task, error);
1091
g_task_return_int (task, nread);
1094
static void read_async_pollable (GPollableInputStream *stream,
1098
read_async_pollable_ready (GPollableInputStream *stream,
1101
GTask *task = user_data;
1103
read_async_pollable (stream, task);
1108
read_async_pollable (GPollableInputStream *stream,
1111
ReadData *op = g_task_get_task_data (task);
1112
GError *error = NULL;
1115
if (g_task_return_error_if_cancelled (task))
1118
nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1119
read_nonblocking (stream, op->buffer, op->count, &error);
1121
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1125
g_error_free (error);
1127
source = g_pollable_input_stream_create_source (stream,
1128
g_task_get_cancellable (task));
1129
g_task_attach_source (task, source,
1130
(GSourceFunc) read_async_pollable_ready);
1131
g_source_unref (source);
1136
g_task_return_error (task, error);
1138
g_task_return_int (task, nread);
1139
/* g_input_stream_real_read_async() unrefs task */
1142
#define CAN_DO_NONBLOCKING_READS(stream) \
1143
(G_IS_POLLABLE_INPUT_STREAM (stream) && \
1144
g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
951
1148
g_input_stream_real_read_async (GInputStream *stream,
956
1153
GAsyncReadyCallback callback,
957
1154
gpointer user_data)
959
GSimpleAsyncResult *res;
962
op = g_new (ReadData, 1);
963
res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
964
g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1159
op = g_slice_new0 (ReadData);
1160
task = g_task_new (stream, cancellable, callback, user_data);
1161
g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1162
g_task_set_priority (task, io_priority);
965
1163
op->buffer = buffer;
966
op->count_requested = count;
968
g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
969
g_object_unref (res);
1166
if (CAN_DO_NONBLOCKING_READS (stream))
1167
read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1169
g_task_run_in_thread (task, read_async_thread);
1170
g_object_unref (task);
974
1175
GAsyncResult *result,
977
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
980
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
981
g_input_stream_real_read_async);
983
op = g_simple_async_result_get_op_res_gpointer (simple);
985
return op->count_read;
1178
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1180
return g_task_propagate_int (G_TASK (result), error);
989
gsize count_requested;
990
gssize count_skipped;
995
skip_async_thread (GSimpleAsyncResult *res,
997
GCancellable *cancellable)
1185
skip_async_thread (GTask *task,
1186
gpointer source_object,
1188
GCancellable *cancellable)
1190
GInputStream *stream = source_object;
1191
gsize count = GPOINTER_TO_SIZE (task_data);
1000
1192
GInputStreamClass *class;
1001
1193
GError *error = NULL;
1003
class = G_INPUT_STREAM_GET_CLASS (object);
1004
op = g_simple_async_result_get_op_res_gpointer (res);
1005
op->count_skipped = class->skip (G_INPUT_STREAM (object),
1006
op->count_requested,
1007
cancellable, &error);
1008
if (op->count_skipped == -1)
1009
g_simple_async_result_take_error (res, error);
1196
class = G_INPUT_STREAM_GET_CLASS (stream);
1197
ret = class->skip (stream, count,
1198
g_task_get_cancellable (task),
1201
g_task_return_error (task, error);
1203
g_task_return_int (task, ret);
1012
1206
typedef struct {
1013
1207
char buffer[8192];
1015
1209
gsize count_skipped;
1017
GCancellable *cancellable;
1019
GAsyncReadyCallback callback;
1020
1210
} SkipFallbackAsyncData;
1041
1230
if (data->count > 0)
1043
1232
class = G_INPUT_STREAM_GET_CLASS (source_object);
1044
class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
1045
skip_callback_wrapper, data);
1233
class->read_async (G_INPUT_STREAM (source_object),
1234
data->buffer, MIN (8192, data->count),
1235
g_task_get_priority (task),
1236
g_task_get_cancellable (task),
1237
skip_callback_wrapper, task);
1050
op = g_new0 (SkipData, 1);
1051
op->count_skipped = data->count_skipped;
1052
simple = g_simple_async_result_new (source_object,
1053
data->callback, data->user_data,
1054
g_input_stream_real_skip_async);
1056
g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
1243
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1244
data->count_skipped)
1060
if (data->count_skipped &&
1061
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
1062
/* No error, return partial read */
1063
g_error_free (error);
1065
g_simple_async_result_take_error (simple, error);
1246
/* No error, return partial read */
1247
g_clear_error (&error);
1068
/* Complete immediately, not in idle, since we're already in a mainloop callout */
1069
g_simple_async_result_complete (simple);
1070
g_object_unref (simple);
1251
g_task_return_error (task, error);
1253
g_task_return_int (task, data->count_skipped);
1254
g_object_unref (task);
1081
1263
gpointer user_data)
1083
1265
GInputStreamClass *class;
1085
1266
SkipFallbackAsyncData *data;
1086
GSimpleAsyncResult *res;
1088
1269
class = G_INPUT_STREAM_GET_CLASS (stream);
1090
if (class->read_async == g_input_stream_real_read_async)
1271
task = g_task_new (stream, cancellable, callback, user_data);
1272
g_task_set_priority (task, io_priority);
1274
if (class->read_async == g_input_stream_real_read_async &&
1275
!CAN_DO_NONBLOCKING_READS (stream))
1092
1277
/* Read is thread-using async fallback.
1093
1278
* Make skip use threads too, so that we can use a possible sync skip
1094
1279
* implementation. */
1095
op = g_new0 (SkipData, 1);
1097
res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
1098
g_input_stream_real_skip_async);
1100
g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1102
op->count_requested = count;
1104
g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
1105
g_object_unref (res);
1280
g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1282
g_task_run_in_thread (task, skip_async_thread);
1283
g_object_unref (task);
1112
1290
data = g_new (SkipFallbackAsyncData, 1);
1113
1291
data->count = count;
1114
1292
data->count_skipped = 0;
1115
data->io_prio = io_priority;
1116
data->cancellable = cancellable;
1117
data->callback = callback;
1118
data->user_data = user_data;
1293
g_task_set_task_data (task, data, g_free);
1294
g_task_set_check_cancellable (task, FALSE);
1119
1295
class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1120
skip_callback_wrapper, data);
1296
skip_callback_wrapper, task);
1127
1303
GAsyncResult *result,
1128
1304
GError **error)
1130
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1306
g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1133
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
1134
op = g_simple_async_result_get_op_res_gpointer (simple);
1135
return op->count_skipped;
1308
return g_task_propagate_int (G_TASK (result), error);
1139
close_async_thread (GSimpleAsyncResult *res,
1141
GCancellable *cancellable)
1312
close_async_thread (GTask *task,
1313
gpointer source_object,
1315
GCancellable *cancellable)
1317
GInputStream *stream = source_object;
1143
1318
GInputStreamClass *class;
1144
1319
GError *error = NULL;
1145
1320
gboolean result;
1147
/* Auto handling of cancelation disabled, and ignore
1148
cancellation, since we want to close things anyway, although
1149
possibly in a quick-n-dirty way. At least we never want to leak
1152
class = G_INPUT_STREAM_GET_CLASS (object);
1322
class = G_INPUT_STREAM_GET_CLASS (stream);
1153
1323
if (class->close_fn)
1155
result = class->close_fn (G_INPUT_STREAM (object), cancellable, &error);
1325
result = class->close_fn (stream,
1326
g_task_get_cancellable (task),
1157
g_simple_async_result_take_error (res, error);
1330
g_task_return_error (task, error);
1335
g_task_return_boolean (task, TRUE);