~ubuntu-branches/debian/sid/glib2.0/sid

« back to all changes in this revision

Viewing changes to gio/ginputstream.c

  • Committer: Package Import Robot
  • Author(s): Martin Pitt
  • Date: 2013-05-08 06:25:57 UTC
  • mfrom: (1.27.14) (3.1.181 experimental)
  • Revision ID: package-import@ubuntu.com-20130508062557-i7gbku66mls70gi2
Tags: 2.36.1-2
Merge experimental branch, upload to unstable.

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
#include "gseekable.h"
29
29
#include "gcancellable.h"
30
30
#include "gasyncresult.h"
31
 
#include "gsimpleasyncresult.h"
32
31
#include "gioerror.h"
33
 
 
 
32
#include "gpollableinputstream.h"
34
33
 
35
34
/**
36
35
 * SECTION:ginputstream
137
136
/**
138
137
 * g_input_stream_read:
139
138
 * @stream: a #GInputStream.
140
 
 * @buffer: a buffer to read data into (which should be at least count bytes long).
 
139
 * @buffer: (array length=count) (element-type guint8): a buffer to
 
140
 *     read data into (which should be at least count bytes long).
141
141
 * @count: the number of bytes that will be read from the stream
142
142
 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
143
143
 * @error: location to store the error occurring, or %NULL to ignore
214
214
/**
215
215
 * g_input_stream_read_all:
216
216
 * @stream: a #GInputStream.
217
 
 * @buffer: a buffer to read data into (which should be at least count bytes long).
 
217
 * @buffer: (array length=count) (element-type guint8): a buffer to
 
218
 *     read data into (which should be at least count bytes long).
218
219
 * @count: the number of bytes that will be read from the stream
219
220
 * @bytes_read: (out): location to store the number of bytes that was read from the stream
220
221
 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
274
275
}
275
276
 
276
277
/**
 
278
 * g_input_stream_read_bytes:
 
279
 * @stream: a #GInputStream.
 
280
 * @count: maximum number of bytes that will be read from the stream. Common
 
281
 * values include 4096 and 8192.
 
282
 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
 
283
 * @error: location to store the error occurring, or %NULL to ignore
 
284
 *
 
285
 * Like g_input_stream_read(), this tries to read @count bytes from
 
286
 * the stream in a blocking fashion. However, rather than reading into
 
287
 * a user-supplied buffer, this will create a new #GBytes containing
 
288
 * the data that was read. This may be easier to use from language
 
289
 * bindings.
 
290
 *
 
291
 * If count is zero, returns a zero-length #GBytes and does nothing. A
 
292
 * value of @count larger than %G_MAXSSIZE will cause a
 
293
 * %G_IO_ERROR_INVALID_ARGUMENT error.
 
294
 *
 
295
 * On success, a new #GBytes is returned. It is not an error if the
 
296
 * size of this object is not the same as the requested size, as it
 
297
 * can happen e.g. near the end of a file. A zero-length #GBytes is
 
298
 * returned on end of file (or if @count is zero), but never
 
299
 * otherwise.
 
300
 *
 
301
 * If @cancellable is not %NULL, then the operation can be cancelled by
 
302
 * triggering the cancellable object from another thread. If the operation
 
303
 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
 
304
 * operation was partially finished when the operation was cancelled the
 
305
 * partial result will be returned, without an error.
 
306
 *
 
307
 * On error %NULL is returned and @error is set accordingly.
 
308
 *
 
309
 * Return value: a new #GBytes, or %NULL on error
 
310
 **/
 
311
GBytes *
 
312
g_input_stream_read_bytes (GInputStream  *stream,
 
313
                           gsize          count,
 
314
                           GCancellable  *cancellable,
 
315
                           GError       **error)
 
316
{
 
317
  guchar *buf;
 
318
  gssize nread;
 
319
 
 
320
  buf = g_malloc (count);
 
321
  nread = g_input_stream_read (stream, buf, count, cancellable, error);
 
322
  if (nread == -1)
 
323
    {
 
324
      g_free (buf);
 
325
      return NULL;
 
326
    }
 
327
  else if (nread == 0)
 
328
    {
 
329
      g_free (buf);
 
330
      return g_bytes_new_static ("", 0);
 
331
    }
 
332
  else
 
333
    return g_bytes_new_take (buf, nread);
 
334
}
 
335
 
 
336
/**
277
337
 * g_input_stream_skip:
278
338
 * @stream: a #GInputStream.
279
339
 * @count: the number of bytes that will be skipped from the stream
488
548
/**
489
549
 * g_input_stream_read_async:
490
550
 * @stream: A #GInputStream.
491
 
 * @buffer: a buffer to read data into (which should be at least count bytes long).
 
551
 * @buffer: (array length=count) (element-type guint8): a buffer to
 
552
 *     read data into (which should be at least count bytes long).
492
553
 * @count: the number of bytes that will be read from the stream
493
554
 * @io_priority: the <link linkend="io-priority">I/O priority</link> 
494
555
 * of the request. 
530
591
                           gpointer             user_data)
531
592
{
532
593
  GInputStreamClass *class;
533
 
  GSimpleAsyncResult *simple;
534
594
  GError *error = NULL;
535
595
 
536
596
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
538
598
 
539
599
  if (count == 0)
540
600
    {
541
 
      simple = g_simple_async_result_new (G_OBJECT (stream),
542
 
                                          callback,
543
 
                                          user_data,
544
 
                                          g_input_stream_read_async);
545
 
      g_simple_async_result_complete_in_idle (simple);
546
 
      g_object_unref (simple);
 
601
      GTask *task;
 
602
 
 
603
      task = g_task_new (stream, cancellable, callback, user_data);
 
604
      g_task_set_source_tag (task, g_input_stream_read_async);
 
605
      g_task_return_int (task, 0);
 
606
      g_object_unref (task);
547
607
      return;
548
608
    }
549
609
  
550
610
  if (((gssize) count) < 0)
551
611
    {
552
 
      g_simple_async_report_error_in_idle (G_OBJECT (stream),
553
 
                                           callback,
554
 
                                           user_data,
555
 
                                           G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
556
 
                                           _("Too large count value passed to %s"),
557
 
                                           G_STRFUNC);
 
612
      g_task_report_new_error (stream, callback, user_data,
 
613
                               g_input_stream_read_async,
 
614
                               G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
 
615
                               _("Too large count value passed to %s"),
 
616
                               G_STRFUNC);
558
617
      return;
559
618
    }
560
619
 
561
620
  if (!g_input_stream_set_pending (stream, &error))
562
621
    {
563
 
      g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
564
 
                                            callback,
565
 
                                            user_data,
566
 
                                            error);
 
622
      g_task_report_error (stream, callback, user_data,
 
623
                           g_input_stream_read_async,
 
624
                           error);
567
625
      return;
568
626
    }
569
627
 
590
648
                            GAsyncResult  *result,
591
649
                            GError       **error)
592
650
{
593
 
  GSimpleAsyncResult *simple;
594
651
  GInputStreamClass *class;
595
652
  
596
653
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
597
654
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
598
655
 
599
 
  if (G_IS_SIMPLE_ASYNC_RESULT (result))
600
 
    {
601
 
      simple = G_SIMPLE_ASYNC_RESULT (result);
602
 
      if (g_simple_async_result_propagate_error (simple, error))
603
 
        return -1;
604
 
 
605
 
      /* Special case read of 0 bytes */
606
 
      if (g_simple_async_result_get_source_tag (simple) == g_input_stream_read_async)
607
 
        return 0;
608
 
    }
 
656
  if (g_async_result_legacy_propagate_error (result, error))
 
657
    return -1;
 
658
  else if (g_async_result_is_tagged (result, g_input_stream_read_async))
 
659
    return g_task_propagate_int (G_TASK (result), error);
609
660
 
610
661
  class = G_INPUT_STREAM_GET_CLASS (stream);
611
662
  return class->read_finish (stream, result, error);
612
663
}
613
664
 
 
665
static void
 
666
read_bytes_callback (GObject      *stream,
 
667
                     GAsyncResult *result,
 
668
                     gpointer      user_data)
 
669
{
 
670
  GTask *task = user_data;
 
671
  guchar *buf = g_task_get_task_data (task);
 
672
  GError *error = NULL;
 
673
  gssize nread;
 
674
  GBytes *bytes = NULL;
 
675
 
 
676
  nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
 
677
                                      result, &error);
 
678
  if (nread == -1)
 
679
    {
 
680
      g_free (buf);
 
681
      g_task_return_error (task, error);
 
682
    }
 
683
  else if (nread == 0)
 
684
    {
 
685
      g_free (buf);
 
686
      bytes = g_bytes_new_static ("", 0);
 
687
    }
 
688
  else
 
689
    bytes = g_bytes_new_take (buf, nread);
 
690
 
 
691
  if (bytes)
 
692
    g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
 
693
 
 
694
  g_object_unref (task);
 
695
}
 
696
 
 
697
/**
 
698
 * g_input_stream_read_bytes_async:
 
699
 * @stream: A #GInputStream.
 
700
 * @count: the number of bytes that will be read from the stream
 
701
 * @io_priority: the <link linkend="io-priority">I/O priority</link>
 
702
 *   of the request.
 
703
 * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
 
704
 * @callback: (scope async): callback to call when the request is satisfied
 
705
 * @user_data: (closure): the data to pass to callback function
 
706
 *
 
707
 * Request an asynchronous read of @count bytes from the stream into a
 
708
 * new #GBytes. When the operation is finished @callback will be
 
709
 * called. You can then call g_input_stream_read_bytes_finish() to get the
 
710
 * result of the operation.
 
711
 *
 
712
 * During an async request no other sync and async calls are allowed
 
713
 * on @stream, and will result in %G_IO_ERROR_PENDING errors.
 
714
 *
 
715
 * A value of @count larger than %G_MAXSSIZE will cause a
 
716
 * %G_IO_ERROR_INVALID_ARGUMENT error.
 
717
 *
 
718
 * On success, the new #GBytes will be passed to the callback. It is
 
719
 * not an error if this is smaller than the requested size, as it can
 
720
 * happen e.g. near the end of a file, but generally we try to read as
 
721
 * many bytes as requested. Zero is returned on end of file (or if
 
722
 * @count is zero), but never otherwise.
 
723
 *
 
724
 * Any outstanding I/O request with higher priority (lower numerical
 
725
 * value) will be executed before an outstanding request with lower
 
726
 * priority. Default priority is %G_PRIORITY_DEFAULT.
 
727
 **/
 
728
void
 
729
g_input_stream_read_bytes_async (GInputStream          *stream,
 
730
                                 gsize                  count,
 
731
                                 int                    io_priority,
 
732
                                 GCancellable          *cancellable,
 
733
                                 GAsyncReadyCallback    callback,
 
734
                                 gpointer               user_data)
 
735
{
 
736
  GTask *task;
 
737
  guchar *buf;
 
738
 
 
739
  task = g_task_new (stream, cancellable, callback, user_data);
 
740
  buf = g_malloc (count);
 
741
  g_task_set_task_data (task, buf, NULL);
 
742
 
 
743
  g_input_stream_read_async (stream, buf, count,
 
744
                             io_priority, cancellable,
 
745
                             read_bytes_callback, task);
 
746
}
 
747
 
 
748
/**
 
749
 * g_input_stream_read_bytes_finish:
 
750
 * @stream: a #GInputStream.
 
751
 * @result: a #GAsyncResult.
 
752
 * @error: a #GError location to store the error occurring, or %NULL to
 
753
 *   ignore.
 
754
 *
 
755
 * Finishes an asynchronous stream read-into-#GBytes operation.
 
756
 *
 
757
 * Returns: the newly-allocated #GBytes, or %NULL on error
 
758
 **/
 
759
GBytes *
 
760
g_input_stream_read_bytes_finish (GInputStream  *stream,
 
761
                                  GAsyncResult  *result,
 
762
                                  GError       **error)
 
763
{
 
764
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
 
765
  g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
 
766
 
 
767
  return g_task_propagate_pointer (G_TASK (result), error);
 
768
}
 
769
 
614
770
/**
615
771
 * g_input_stream_skip_async:
616
772
 * @stream: A #GInputStream.
654
810
                           gpointer             user_data)
655
811
{
656
812
  GInputStreamClass *class;
657
 
  GSimpleAsyncResult *simple;
658
813
  GError *error = NULL;
659
814
 
660
815
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
661
816
 
662
817
  if (count == 0)
663
818
    {
664
 
      simple = g_simple_async_result_new (G_OBJECT (stream),
665
 
                                          callback,
666
 
                                          user_data,
667
 
                                          g_input_stream_skip_async);
 
819
      GTask *task;
668
820
 
669
 
      g_simple_async_result_complete_in_idle (simple);
670
 
      g_object_unref (simple);
 
821
      task = g_task_new (stream, cancellable, callback, user_data);
 
822
      g_task_set_source_tag (task, g_input_stream_skip_async);
 
823
      g_task_return_int (task, 0);
 
824
      g_object_unref (task);
671
825
      return;
672
826
    }
673
827
  
674
828
  if (((gssize) count) < 0)
675
829
    {
676
 
      g_simple_async_report_error_in_idle (G_OBJECT (stream),
677
 
                                           callback,
678
 
                                           user_data,
679
 
                                           G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
680
 
                                           _("Too large count value passed to %s"),
681
 
                                           G_STRFUNC);
 
830
      g_task_report_new_error (stream, callback, user_data,
 
831
                               g_input_stream_skip_async,
 
832
                               G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
 
833
                               _("Too large count value passed to %s"),
 
834
                               G_STRFUNC);
682
835
      return;
683
836
    }
684
837
 
685
838
  if (!g_input_stream_set_pending (stream, &error))
686
839
    {
687
 
      g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
688
 
                                            callback,
689
 
                                            user_data,
690
 
                                            error);
 
840
      g_task_report_error (stream, callback, user_data,
 
841
                           g_input_stream_skip_async,
 
842
                           error);
691
843
      return;
692
844
    }
693
845
 
714
866
                            GAsyncResult  *result,
715
867
                            GError       **error)
716
868
{
717
 
  GSimpleAsyncResult *simple;
718
869
  GInputStreamClass *class;
719
870
 
720
871
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
721
872
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
722
873
 
723
 
  if (G_IS_SIMPLE_ASYNC_RESULT (result))
724
 
    {
725
 
      simple = G_SIMPLE_ASYNC_RESULT (result);
726
 
      if (g_simple_async_result_propagate_error (simple, error))
727
 
        return -1;
728
 
 
729
 
      /* Special case skip of 0 bytes */
730
 
      if (g_simple_async_result_get_source_tag (simple) == g_input_stream_skip_async)
731
 
        return 0;
732
 
    }
 
874
  if (g_async_result_legacy_propagate_error (result, error))
 
875
    return -1;
 
876
  else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
 
877
    return g_task_propagate_int (G_TASK (result), error);
733
878
 
734
879
  class = G_INPUT_STREAM_GET_CLASS (stream);
735
880
  return class->skip_finish (stream, result, error);
763
908
                            gpointer             user_data)
764
909
{
765
910
  GInputStreamClass *class;
766
 
  GSimpleAsyncResult *simple;
767
911
  GError *error = NULL;
768
912
 
769
913
  g_return_if_fail (G_IS_INPUT_STREAM (stream));
770
914
 
771
915
  if (stream->priv->closed)
772
916
    {
773
 
      simple = g_simple_async_result_new (G_OBJECT (stream),
774
 
                                          callback,
775
 
                                          user_data,
776
 
                                          g_input_stream_close_async);
 
917
      GTask *task;
777
918
 
778
 
      g_simple_async_result_complete_in_idle (simple);
779
 
      g_object_unref (simple);
 
919
      task = g_task_new (stream, cancellable, callback, user_data);
 
920
      g_task_set_source_tag (task, g_input_stream_close_async);
 
921
      g_task_return_boolean (task, TRUE);
 
922
      g_object_unref (task);
780
923
      return;
781
924
    }
782
925
 
783
926
  if (!g_input_stream_set_pending (stream, &error))
784
927
    {
785
 
      g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
786
 
                                            callback,
787
 
                                            user_data,
788
 
                                            error);
 
928
      g_task_report_error (stream, callback, user_data,
 
929
                           g_input_stream_close_async,
 
930
                           error);
789
931
      return;
790
932
    }
791
933
  
812
954
                             GAsyncResult  *result,
813
955
                             GError       **error)
814
956
{
815
 
  GSimpleAsyncResult *simple;
816
957
  GInputStreamClass *class;
817
958
 
818
959
  g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
819
960
  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
820
961
 
821
 
  if (G_IS_SIMPLE_ASYNC_RESULT (result))
822
 
    {
823
 
      simple = G_SIMPLE_ASYNC_RESULT (result);
824
 
      if (g_simple_async_result_propagate_error (simple, error))
825
 
        return FALSE;
826
 
 
827
 
      /* Special case already closed */
828
 
      if (g_simple_async_result_get_source_tag (simple) == g_input_stream_close_async)
829
 
        return TRUE;
830
 
    }
 
962
  if (g_async_result_legacy_propagate_error (result, error))
 
963
    return FALSE;
 
964
  else if (g_async_result_is_tagged (result, g_input_stream_close_async))
 
965
    return g_task_propagate_boolean (G_TASK (result), error);
831
966
 
832
967
  class = G_INPUT_STREAM_GET_CLASS (stream);
833
968
  return class->close_finish (stream, result, error);
922
1057
 ********************************************/
923
1058
 
924
1059
typedef struct {
925
 
  void              *buffer;
926
 
  gsize              count_requested;
927
 
  gssize             count_read;
 
1060
  void   *buffer;
 
1061
  gsize   count;
928
1062
} ReadData;
929
1063
 
930
1064
static void
931
 
read_async_thread (GSimpleAsyncResult *res,
932
 
                   GObject            *object,
933
 
                   GCancellable       *cancellable)
934
 
{
935
 
  ReadData *op;
 
1065
free_read_data (ReadData *op)
 
1066
{
 
1067
  g_slice_free (ReadData, op);
 
1068
}
 
1069
 
 
1070
static void
 
1071
read_async_thread (GTask        *task,
 
1072
                   gpointer      source_object,
 
1073
                   gpointer      task_data,
 
1074
                   GCancellable *cancellable)
 
1075
{
 
1076
  GInputStream *stream = source_object;
 
1077
  ReadData *op = task_data;
936
1078
  GInputStreamClass *class;
937
1079
  GError *error = NULL;
 
1080
  gssize nread;
938
1081
 
939
 
  op = g_simple_async_result_get_op_res_gpointer (res);
940
 
 
941
 
  class = G_INPUT_STREAM_GET_CLASS (object);
942
 
 
943
 
  op->count_read = class->read_fn (G_INPUT_STREAM (object),
944
 
                                   op->buffer, op->count_requested,
945
 
                                   cancellable, &error);
946
 
  if (op->count_read == -1)
947
 
    g_simple_async_result_take_error (res, error);
948
 
}
 
1082
  class = G_INPUT_STREAM_GET_CLASS (stream);
 
1083
 
 
1084
  nread = class->read_fn (stream,
 
1085
                          op->buffer, op->count,
 
1086
                          g_task_get_cancellable (task),
 
1087
                          &error);
 
1088
  if (nread == -1)
 
1089
    g_task_return_error (task, error);
 
1090
  else
 
1091
    g_task_return_int (task, nread);
 
1092
}
 
1093
 
 
1094
static void read_async_pollable (GPollableInputStream *stream,
 
1095
                                 GTask                *task);
 
1096
 
 
1097
static gboolean
 
1098
read_async_pollable_ready (GPollableInputStream *stream,
 
1099
                           gpointer              user_data)
 
1100
{
 
1101
  GTask *task = user_data;
 
1102
 
 
1103
  read_async_pollable (stream, task);
 
1104
  return FALSE;
 
1105
}
 
1106
 
 
1107
static void
 
1108
read_async_pollable (GPollableInputStream *stream,
 
1109
                     GTask                *task)
 
1110
{
 
1111
  ReadData *op = g_task_get_task_data (task);
 
1112
  GError *error = NULL;
 
1113
  gssize nread;
 
1114
 
 
1115
  if (g_task_return_error_if_cancelled (task))
 
1116
    return;
 
1117
 
 
1118
  nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
 
1119
    read_nonblocking (stream, op->buffer, op->count, &error);
 
1120
 
 
1121
  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
 
1122
    {
 
1123
      GSource *source;
 
1124
 
 
1125
      g_error_free (error);
 
1126
 
 
1127
      source = g_pollable_input_stream_create_source (stream,
 
1128
                                                      g_task_get_cancellable (task));
 
1129
      g_task_attach_source (task, source,
 
1130
                            (GSourceFunc) read_async_pollable_ready);
 
1131
      g_source_unref (source);
 
1132
      return;
 
1133
    }
 
1134
 
 
1135
  if (nread == -1)
 
1136
    g_task_return_error (task, error);
 
1137
  else
 
1138
    g_task_return_int (task, nread);
 
1139
  /* g_input_stream_real_read_async() unrefs task */
 
1140
}
 
1141
 
 
1142
#define CAN_DO_NONBLOCKING_READS(stream) \
 
1143
  (G_IS_POLLABLE_INPUT_STREAM (stream) && \
 
1144
   g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
 
1145
 
949
1146
 
950
1147
static void
951
1148
g_input_stream_real_read_async (GInputStream        *stream,
956
1153
                                GAsyncReadyCallback  callback,
957
1154
                                gpointer             user_data)
958
1155
{
959
 
  GSimpleAsyncResult *res;
 
1156
  GTask *task;
960
1157
  ReadData *op;
961
1158
  
962
 
  op = g_new (ReadData, 1);
963
 
  res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_input_stream_real_read_async);
964
 
  g_simple_async_result_set_op_res_gpointer (res, op, g_free);
 
1159
  op = g_slice_new0 (ReadData);
 
1160
  task = g_task_new (stream, cancellable, callback, user_data);
 
1161
  g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
 
1162
  g_task_set_priority (task, io_priority);
965
1163
  op->buffer = buffer;
966
 
  op->count_requested = count;
967
 
  
968
 
  g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
969
 
  g_object_unref (res);
 
1164
  op->count = count;
 
1165
 
 
1166
  if (CAN_DO_NONBLOCKING_READS (stream))
 
1167
    read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
 
1168
  else
 
1169
    g_task_run_in_thread (task, read_async_thread);
 
1170
  g_object_unref (task);
970
1171
}
971
1172
 
972
1173
static gssize
974
1175
                                 GAsyncResult  *result,
975
1176
                                 GError       **error)
976
1177
{
977
 
  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
978
 
  ReadData *op;
979
 
 
980
 
  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
981
 
            g_input_stream_real_read_async);
982
 
 
983
 
  op = g_simple_async_result_get_op_res_gpointer (simple);
984
 
 
985
 
  return op->count_read;
 
1178
  g_return_val_if_fail (g_task_is_valid (result, stream), -1);
 
1179
 
 
1180
  return g_task_propagate_int (G_TASK (result), error);
986
1181
}
987
1182
 
988
 
typedef struct {
989
 
  gsize count_requested;
990
 
  gssize count_skipped;
991
 
} SkipData;
992
 
 
993
1183
 
994
1184
static void
995
 
skip_async_thread (GSimpleAsyncResult *res,
996
 
                   GObject            *object,
997
 
                   GCancellable       *cancellable)
 
1185
skip_async_thread (GTask        *task,
 
1186
                   gpointer      source_object,
 
1187
                   gpointer      task_data,
 
1188
                   GCancellable *cancellable)
998
1189
{
999
 
  SkipData *op;
 
1190
  GInputStream *stream = source_object;
 
1191
  gsize count = GPOINTER_TO_SIZE (task_data);
1000
1192
  GInputStreamClass *class;
1001
1193
  GError *error = NULL;
1002
 
  
1003
 
  class = G_INPUT_STREAM_GET_CLASS (object);
1004
 
  op = g_simple_async_result_get_op_res_gpointer (res);
1005
 
  op->count_skipped = class->skip (G_INPUT_STREAM (object),
1006
 
                                   op->count_requested,
1007
 
                                   cancellable, &error);
1008
 
  if (op->count_skipped == -1)
1009
 
    g_simple_async_result_take_error (res, error);
 
1194
  gssize ret;
 
1195
 
 
1196
  class = G_INPUT_STREAM_GET_CLASS (stream);
 
1197
  ret = class->skip (stream, count,
 
1198
                     g_task_get_cancellable (task),
 
1199
                     &error);
 
1200
  if (ret == -1)
 
1201
    g_task_return_error (task, error);
 
1202
  else
 
1203
    g_task_return_int (task, ret);
1010
1204
}
1011
1205
 
1012
1206
typedef struct {
1013
1207
  char buffer[8192];
1014
1208
  gsize count;
1015
1209
  gsize count_skipped;
1016
 
  int io_prio;
1017
 
  GCancellable *cancellable;
1018
 
  gpointer user_data;
1019
 
  GAsyncReadyCallback callback;
1020
1210
} SkipFallbackAsyncData;
1021
1211
 
1022
1212
static void
1025
1215
                       gpointer      user_data)
1026
1216
{
1027
1217
  GInputStreamClass *class;
1028
 
  SkipFallbackAsyncData *data = user_data;
1029
 
  SkipData *op;
1030
 
  GSimpleAsyncResult *simple;
 
1218
  GTask *task = user_data;
 
1219
  SkipFallbackAsyncData *data = g_task_get_task_data (task);
1031
1220
  GError *error = NULL;
1032
1221
  gssize ret;
1033
1222
 
1041
1230
      if (data->count > 0)
1042
1231
        {
1043
1232
          class = G_INPUT_STREAM_GET_CLASS (source_object);
1044
 
          class->read_async (G_INPUT_STREAM (source_object), data->buffer, MIN (8192, data->count), data->io_prio, data->cancellable,
1045
 
                             skip_callback_wrapper, data);
 
1233
          class->read_async (G_INPUT_STREAM (source_object),
 
1234
                             data->buffer, MIN (8192, data->count),
 
1235
                             g_task_get_priority (task),
 
1236
                             g_task_get_cancellable (task),
 
1237
                             skip_callback_wrapper, task);
1046
1238
          return;
1047
1239
        }
1048
1240
    }
1049
1241
 
1050
 
  op = g_new0 (SkipData, 1);
1051
 
  op->count_skipped = data->count_skipped;
1052
 
  simple = g_simple_async_result_new (source_object,
1053
 
                                      data->callback, data->user_data,
1054
 
                                      g_input_stream_real_skip_async);
1055
 
 
1056
 
  g_simple_async_result_set_op_res_gpointer (simple, op, g_free);
1057
 
 
1058
 
  if (ret == -1)
 
1242
  if (ret == -1 &&
 
1243
      g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
 
1244
      data->count_skipped)
1059
1245
    {
1060
 
      if (data->count_skipped &&
1061
 
          g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
1062
 
        /* No error, return partial read */
1063
 
        g_error_free (error);
1064
 
      else
1065
 
        g_simple_async_result_take_error (simple, error);
 
1246
      /* No error, return partial read */
 
1247
      g_clear_error (&error);
1066
1248
    }
1067
1249
 
1068
 
  /* Complete immediately, not in idle, since we're already in a mainloop callout */
1069
 
  g_simple_async_result_complete (simple);
1070
 
  g_object_unref (simple);
1071
 
  
1072
 
  g_free (data);
 
1250
  if (error)
 
1251
    g_task_return_error (task, error);
 
1252
  else
 
1253
    g_task_return_int (task, data->count_skipped);
 
1254
  g_object_unref (task);
1073
1255
 }
1074
1256
 
1075
1257
static void
1081
1263
                                gpointer             user_data)
1082
1264
{
1083
1265
  GInputStreamClass *class;
1084
 
  SkipData *op;
1085
1266
  SkipFallbackAsyncData *data;
1086
 
  GSimpleAsyncResult *res;
 
1267
  GTask *task;
1087
1268
 
1088
1269
  class = G_INPUT_STREAM_GET_CLASS (stream);
1089
1270
 
1090
 
  if (class->read_async == g_input_stream_real_read_async)
 
1271
  task = g_task_new (stream, cancellable, callback, user_data);
 
1272
  g_task_set_priority (task, io_priority);
 
1273
 
 
1274
  if (class->read_async == g_input_stream_real_read_async &&
 
1275
      !CAN_DO_NONBLOCKING_READS (stream))
1091
1276
    {
1092
1277
      /* Read is thread-using async fallback.
1093
1278
       * Make skip use threads too, so that we can use a possible sync skip
1094
1279
       * implementation. */
1095
 
      op = g_new0 (SkipData, 1);
1096
 
      
1097
 
      res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
1098
 
                                       g_input_stream_real_skip_async);
1099
 
 
1100
 
      g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1101
 
 
1102
 
      op->count_requested = count;
1103
 
 
1104
 
      g_simple_async_result_run_in_thread (res, skip_async_thread, io_priority, cancellable);
1105
 
      g_object_unref (res);
 
1280
      g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
 
1281
 
 
1282
      g_task_run_in_thread (task, skip_async_thread);
 
1283
      g_object_unref (task);
1106
1284
    }
1107
1285
  else
1108
1286
    {
1112
1290
      data = g_new (SkipFallbackAsyncData, 1);
1113
1291
      data->count = count;
1114
1292
      data->count_skipped = 0;
1115
 
      data->io_prio = io_priority;
1116
 
      data->cancellable = cancellable;
1117
 
      data->callback = callback;
1118
 
      data->user_data = user_data;
 
1293
      g_task_set_task_data (task, data, g_free);
 
1294
      g_task_set_check_cancellable (task, FALSE);
1119
1295
      class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1120
 
                         skip_callback_wrapper, data);
 
1296
                         skip_callback_wrapper, task);
1121
1297
    }
1122
1298
 
1123
1299
}
1127
1303
                                 GAsyncResult  *result,
1128
1304
                                 GError       **error)
1129
1305
{
1130
 
  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1131
 
  SkipData *op;
 
1306
  g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1132
1307
 
1133
 
  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_skip_async);
1134
 
  op = g_simple_async_result_get_op_res_gpointer (simple);
1135
 
  return op->count_skipped;
 
1308
  return g_task_propagate_int (G_TASK (result), error);
1136
1309
}
1137
1310
 
1138
1311
static void
1139
 
close_async_thread (GSimpleAsyncResult *res,
1140
 
                    GObject            *object,
1141
 
                    GCancellable       *cancellable)
 
1312
close_async_thread (GTask        *task,
 
1313
                    gpointer      source_object,
 
1314
                    gpointer      task_data,
 
1315
                    GCancellable *cancellable)
1142
1316
{
 
1317
  GInputStream *stream = source_object;
1143
1318
  GInputStreamClass *class;
1144
1319
  GError *error = NULL;
1145
1320
  gboolean result;
1146
1321
 
1147
 
  /* Auto handling of cancelation disabled, and ignore
1148
 
     cancellation, since we want to close things anyway, although
1149
 
     possibly in a quick-n-dirty way. At least we never want to leak
1150
 
     open handles */
1151
 
 
1152
 
  class = G_INPUT_STREAM_GET_CLASS (object);
 
1322
  class = G_INPUT_STREAM_GET_CLASS (stream);
1153
1323
  if (class->close_fn)
1154
1324
    {
1155
 
      result = class->close_fn (G_INPUT_STREAM (object), cancellable, &error);
 
1325
      result = class->close_fn (stream,
 
1326
                                g_task_get_cancellable (task),
 
1327
                                &error);
1156
1328
      if (!result)
1157
 
        g_simple_async_result_take_error (res, error);
 
1329
        {
 
1330
          g_task_return_error (task, error);
 
1331
          return;
 
1332
        }
1158
1333
    }
 
1334
 
 
1335
  g_task_return_boolean (task, TRUE);
1159
1336
}
1160
1337
 
1161
1338
static void
1165
1342
                                 GAsyncReadyCallback  callback,
1166
1343
                                 gpointer             user_data)
1167
1344
{
1168
 
  GSimpleAsyncResult *res;
1169
 
  
1170
 
  res = g_simple_async_result_new (G_OBJECT (stream),
1171
 
                                   callback,
1172
 
                                   user_data,
1173
 
                                   g_input_stream_real_close_async);
 
1345
  GTask *task;
1174
1346
 
1175
 
  g_simple_async_result_set_handle_cancellation (res, FALSE);
 
1347
  task = g_task_new (stream, cancellable, callback, user_data);
 
1348
  g_task_set_check_cancellable (task, FALSE);
 
1349
  g_task_set_priority (task, io_priority);
1176
1350
  
1177
 
  g_simple_async_result_run_in_thread (res,
1178
 
                                       close_async_thread,
1179
 
                                       io_priority,
1180
 
                                       cancellable);
1181
 
  g_object_unref (res);
 
1351
  g_task_run_in_thread (task, close_async_thread);
 
1352
  g_object_unref (task);
1182
1353
}
1183
1354
 
1184
1355
static gboolean
1186
1357
                                  GAsyncResult  *result,
1187
1358
                                  GError       **error)
1188
1359
{
1189
 
  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1190
 
  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_input_stream_real_close_async);
1191
 
  return TRUE;
 
1360
  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
 
1361
 
 
1362
  return g_task_propagate_boolean (G_TASK (result), error);
1192
1363
}