~tcuthbert/wordpress/openstack-objectstorage-k8s

« back to all changes in this revision

Viewing changes to vendor/guzzlehttp/streams/src/AsyncReadStream.php

  • Committer: Thomas Cuthbert
  • Date: 2020-04-23 05:22:45 UTC
  • Revision ID: thomas.cuthbert@canonical.com-20200423052245-1jxao3mw31w435js
[,r=trivial] bionic composer vendor update

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
<?php
 
2
namespace GuzzleHttp\Stream;
 
3
 
 
4
/**
 
5
 * Represents an asynchronous read-only stream that supports a drain event and
 
6
 * pumping data from a source stream.
 
7
 *
 
8
 * The AsyncReadStream can be used as a completely asynchronous stream, meaning
 
9
 * the data you can read from the stream will immediately return only
 
10
 * the data that is currently buffered.
 
11
 *
 
12
 * AsyncReadStream can also be used in a "blocking" manner if a "pump" function
 
13
 * is provided. When a caller requests more bytes than are available in the
 
14
 * buffer, then the pump function is used to block until the requested number
 
15
 * of bytes are available or the remote source stream has errored, closed, or
 
16
 * timed-out. This behavior isn't strictly "blocking" because the pump function
 
17
 * can send other transfers while waiting on the desired buffer size to be
 
18
 * ready for reading (e.g., continue to tick an event loop).
 
19
 *
 
20
 * @unstable This class is subject to change.
 
21
 */
 
22
class AsyncReadStream implements StreamInterface
 
23
{
 
24
    use StreamDecoratorTrait;
 
25
 
 
26
    /** @var callable|null Fn used to notify writers the buffer has drained */
 
27
    private $drain;
 
28
 
 
29
    /** @var callable|null Fn used to block for more data */
 
30
    private $pump;
 
31
 
 
32
    /** @var int|null Highwater mark of the underlying buffer */
 
33
    private $hwm;
 
34
 
 
35
    /** @var bool Whether or not drain needs to be called at some point */
 
36
    private $needsDrain;
 
37
 
 
38
    /** @var int The expected size of the remote source */
 
39
    private $size;
 
40
 
 
41
    /**
 
42
     * In order to utilize high water marks to tell writers to slow down, the
 
43
     * provided stream must answer to the "hwm" stream metadata variable,
 
44
     * providing the high water mark. If no "hwm" metadata value is available,
 
45
     * then the "drain" functionality is not utilized.
 
46
     *
 
47
     * This class accepts an associative array of configuration options.
 
48
     *
 
49
     * - drain: (callable) Function to invoke when the stream has drained,
 
50
     *   meaning the buffer is now writable again because the size of the
 
51
     *   buffer is at an acceptable level (e.g., below the high water mark).
 
52
     *   The function accepts a single argument, the buffer stream object that
 
53
     *   has drained.
 
54
     * - pump: (callable) A function that accepts the number of bytes to read
 
55
     *   from the source stream. This function will block until all of the data
 
56
     *   that was requested has been read, EOF of the source stream, or the
 
57
     *   source stream is closed.
 
58
     * - size: (int) The expected size in bytes of the data that will be read
 
59
     *   (if known up-front).
 
60
     *
 
61
     * @param StreamInterface $buffer   Buffer that contains the data that has
 
62
     *                                  been read by the event loop.
 
63
     * @param array           $config   Associative array of options.
 
64
     *
 
65
     * @throws \InvalidArgumentException if the buffer is not readable and
 
66
     *                                   writable.
 
67
     */
 
68
    public function __construct(
 
69
        StreamInterface $buffer,
 
70
        array $config = []
 
71
    ) {
 
72
        if (!$buffer->isReadable() || !$buffer->isWritable()) {
 
73
            throw new \InvalidArgumentException(
 
74
                'Buffer must be readable and writable'
 
75
            );
 
76
        }
 
77
 
 
78
        if (isset($config['size'])) {
 
79
            $this->size = $config['size'];
 
80
        }
 
81
 
 
82
        static $callables = ['pump', 'drain'];
 
83
        foreach ($callables as $check) {
 
84
            if (isset($config[$check])) {
 
85
                if (!is_callable($config[$check])) {
 
86
                    throw new \InvalidArgumentException(
 
87
                        $check . ' must be callable'
 
88
                    );
 
89
                }
 
90
                $this->{$check} = $config[$check];
 
91
            }
 
92
        }
 
93
 
 
94
        $this->hwm = $buffer->getMetadata('hwm');
 
95
 
 
96
        // Cannot drain when there's no high water mark.
 
97
        if ($this->hwm === null) {
 
98
            $this->drain = null;
 
99
        }
 
100
 
 
101
        $this->stream = $buffer;
 
102
    }
 
103
 
 
104
    /**
 
105
     * Factory method used to create new async stream and an underlying buffer
 
106
     * if no buffer is provided.
 
107
     *
 
108
     * This function accepts the same options as AsyncReadStream::__construct,
 
109
     * but added the following key value pairs:
 
110
     *
 
111
     * - buffer: (StreamInterface) Buffer used to buffer data. If none is
 
112
     *   provided, a default buffer is created.
 
113
     * - hwm: (int) High water mark to use if a buffer is created on your
 
114
     *   behalf.
 
115
     * - max_buffer: (int) If provided, wraps the utilized buffer in a
 
116
     *   DroppingStream decorator to ensure that buffer does not exceed a given
 
117
     *   length. When exceeded, the stream will begin dropping data. Set the
 
118
     *   max_buffer to 0, to use a NullStream which does not store data.
 
119
     * - write: (callable) A function that is invoked when data is written
 
120
     *   to the underlying buffer. The function accepts the buffer as the first
 
121
     *   argument, and the data being written as the second. The function MUST
 
122
     *   return the number of bytes that were written or false to let writers
 
123
     *   know to slow down.
 
124
     * - drain: (callable) See constructor documentation.
 
125
     * - pump: (callable) See constructor documentation.
 
126
     *
 
127
     * @param array $options Associative array of options.
 
128
     *
 
129
     * @return array Returns an array containing the buffer used to buffer
 
130
     *               data, followed by the ready to use AsyncReadStream object.
 
131
     */
 
132
    public static function create(array $options = [])
 
133
    {
 
134
        $maxBuffer = isset($options['max_buffer'])
 
135
            ? $options['max_buffer']
 
136
            : null;
 
137
 
 
138
        if ($maxBuffer === 0) {
 
139
            $buffer = new NullStream();
 
140
        } elseif (isset($options['buffer'])) {
 
141
            $buffer = $options['buffer'];
 
142
        } else {
 
143
            $hwm = isset($options['hwm']) ? $options['hwm'] : 16384;
 
144
            $buffer = new BufferStream($hwm);
 
145
        }
 
146
 
 
147
        if ($maxBuffer > 0) {
 
148
            $buffer = new DroppingStream($buffer, $options['max_buffer']);
 
149
        }
 
150
 
 
151
        // Call the on_write callback if an on_write function was provided.
 
152
        if (isset($options['write'])) {
 
153
            $onWrite = $options['write'];
 
154
            $buffer = FnStream::decorate($buffer, [
 
155
                'write' => function ($string) use ($buffer, $onWrite) {
 
156
                    $result = $buffer->write($string);
 
157
                    $onWrite($buffer, $string);
 
158
                    return $result;
 
159
                }
 
160
            ]);
 
161
        }
 
162
 
 
163
        return [$buffer, new self($buffer, $options)];
 
164
    }
 
165
 
 
166
    public function getSize()
 
167
    {
 
168
        return $this->size;
 
169
    }
 
170
 
 
171
    public function isWritable()
 
172
    {
 
173
        return false;
 
174
    }
 
175
 
 
176
    public function write($string)
 
177
    {
 
178
        return false;
 
179
    }
 
180
 
 
181
    public function read($length)
 
182
    {
 
183
        if (!$this->needsDrain && $this->drain) {
 
184
            $this->needsDrain = $this->stream->getSize() >= $this->hwm;
 
185
        }
 
186
 
 
187
        $result = $this->stream->read($length);
 
188
 
 
189
        // If we need to drain, then drain when the buffer is empty.
 
190
        if ($this->needsDrain && $this->stream->getSize() === 0) {
 
191
            $this->needsDrain = false;
 
192
            $drainFn = $this->drain;
 
193
            $drainFn($this->stream);
 
194
        }
 
195
 
 
196
        $resultLen = strlen($result);
 
197
 
 
198
        // If a pump was provided, the buffer is still open, and not enough
 
199
        // data was given, then block until the data is provided.
 
200
        if ($this->pump && $resultLen < $length) {
 
201
            $pumpFn = $this->pump;
 
202
            $result .= $pumpFn($length - $resultLen);
 
203
        }
 
204
 
 
205
        return $result;
 
206
    }
 
207
}