~ballot/wordpress/openstack-objectstorage-bis

« back to all changes in this revision

Viewing changes to vendor/guzzlehttp/guzzle/src/Pool.php

  • Committer: Thomas Cuthbert
  • Date: 2020-04-23 05:22:45 UTC
  • Revision ID: thomas.cuthbert@canonical.com-20200423052245-1jxao3mw31w435js
[,r=trivial] bionic composer vendor update

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
<?php
 
2
namespace GuzzleHttp;
 
3
 
 
4
use GuzzleHttp\Event\BeforeEvent;
 
5
use GuzzleHttp\Event\RequestEvents;
 
6
use GuzzleHttp\Message\RequestInterface;
 
7
use GuzzleHttp\Message\ResponseInterface;
 
8
use GuzzleHttp\Ring\Core;
 
9
use GuzzleHttp\Ring\Future\FutureInterface;
 
10
use GuzzleHttp\Event\ListenerAttacherTrait;
 
11
use GuzzleHttp\Event\EndEvent;
 
12
use React\Promise\Deferred;
 
13
use React\Promise\FulfilledPromise;
 
14
use React\Promise\PromiseInterface;
 
15
use React\Promise\RejectedPromise;
 
16
 
 
17
/**
 
18
 * Sends and iterator of requests concurrently using a capped pool size.
 
19
 *
 
20
 * The Pool object implements FutureInterface, meaning it can be used later
 
21
 * when necessary, the requests provided to the pool can be cancelled, and
 
22
 * you can check the state of the pool to know if it has been dereferenced
 
23
 * (sent) or has been cancelled.
 
24
 *
 
25
 * When sending the pool, keep in mind that no results are returned: callers
 
26
 * are expected to handle results asynchronously using Guzzle's event system.
 
27
 * When requests complete, more are added to the pool to ensure that the
 
28
 * requested pool size is always filled as much as possible.
 
29
 *
 
30
 * IMPORTANT: Do not provide a pool size greater that what the utilized
 
31
 * underlying RingPHP handler can support. This will result is extremely poor
 
32
 * performance.
 
33
 */
 
34
class Pool implements FutureInterface
 
35
{
 
36
    use ListenerAttacherTrait;
 
37
 
 
38
    /** @var \GuzzleHttp\ClientInterface */
 
39
    private $client;
 
40
 
 
41
    /** @var \Iterator Yields requests */
 
42
    private $iter;
 
43
 
 
44
    /** @var Deferred */
 
45
    private $deferred;
 
46
 
 
47
    /** @var PromiseInterface */
 
48
    private $promise;
 
49
 
 
50
    private $waitQueue = [];
 
51
    private $eventListeners = [];
 
52
    private $poolSize;
 
53
    private $isRealized = false;
 
54
 
 
55
    /**
 
56
     * The option values for 'before', 'complete', 'error' and 'end' can be a
 
57
     * callable, an associative array containing event data, or an array of
 
58
     * event data arrays. Event data arrays contain the following keys:
 
59
     *
 
60
     * - fn: callable to invoke that receives the event
 
61
     * - priority: Optional event priority (defaults to 0)
 
62
     * - once: Set to true so that the event is removed after it is triggered
 
63
     *
 
64
     * @param ClientInterface $client   Client used to send the requests.
 
65
     * @param array|\Iterator $requests Requests to send in parallel
 
66
     * @param array           $options  Associative array of options
 
67
     *     - pool_size: (callable|int)   Maximum number of requests to send
 
68
     *                                   concurrently, or a callback that receives
 
69
     *                                   the current queue size and returns the
 
70
     *                                   number of new requests to send
 
71
     *     - before:    (callable|array) Receives a BeforeEvent
 
72
     *     - complete:  (callable|array) Receives a CompleteEvent
 
73
     *     - error:     (callable|array) Receives a ErrorEvent
 
74
     *     - end:       (callable|array) Receives an EndEvent
 
75
     */
 
76
    public function __construct(
 
77
        ClientInterface $client,
 
78
        $requests,
 
79
        array $options = []
 
80
    ) {
 
81
        $this->client = $client;
 
82
        $this->iter = $this->coerceIterable($requests);
 
83
        $this->deferred = new Deferred();
 
84
        $this->promise = $this->deferred->promise();
 
85
        $this->poolSize = isset($options['pool_size'])
 
86
            ? $options['pool_size'] : 25;
 
87
        $this->eventListeners = $this->prepareListeners(
 
88
            $options,
 
89
            ['before', 'complete', 'error', 'end']
 
90
        );
 
91
    }
 
92
 
 
93
    /**
 
94
     * Sends multiple requests in parallel and returns an array of responses
 
95
     * and exceptions that uses the same ordering as the provided requests.
 
96
     *
 
97
     * IMPORTANT: This method keeps every request and response in memory, and
 
98
     * as such, is NOT recommended when sending a large number or an
 
99
     * indeterminate number of requests concurrently.
 
100
     *
 
101
     * @param ClientInterface $client   Client used to send the requests
 
102
     * @param array|\Iterator $requests Requests to send in parallel
 
103
     * @param array           $options  Passes through the options available in
 
104
     *                                  {@see GuzzleHttp\Pool::__construct}
 
105
     *
 
106
     * @return BatchResults Returns a container for the results.
 
107
     * @throws \InvalidArgumentException if the event format is incorrect.
 
108
     */
 
109
    public static function batch(
 
110
        ClientInterface $client,
 
111
        $requests,
 
112
        array $options = []
 
113
    ) {
 
114
        $hash = new \SplObjectStorage();
 
115
        foreach ($requests as $request) {
 
116
            $hash->attach($request);
 
117
        }
 
118
 
 
119
        // In addition to the normally run events when requests complete, add
 
120
        // and event to continuously track the results of transfers in the hash.
 
121
        (new self($client, $requests, RequestEvents::convertEventArray(
 
122
            $options,
 
123
            ['end'],
 
124
            [
 
125
                'priority' => RequestEvents::LATE,
 
126
                'fn'       => function (EndEvent $e) use ($hash) {
 
127
                    $hash[$e->getRequest()] = $e->getException()
 
128
                        ? $e->getException()
 
129
                        : $e->getResponse();
 
130
                }
 
131
            ]
 
132
        )))->wait();
 
133
 
 
134
        return new BatchResults($hash);
 
135
    }
 
136
 
 
137
    /**
 
138
     * Creates a Pool and immediately sends the requests.
 
139
     *
 
140
     * @param ClientInterface $client   Client used to send the requests
 
141
     * @param array|\Iterator $requests Requests to send in parallel
 
142
     * @param array           $options  Passes through the options available in
 
143
     *                                  {@see GuzzleHttp\Pool::__construct}
 
144
     */
 
145
    public static function send(
 
146
        ClientInterface $client,
 
147
        $requests,
 
148
        array $options = []
 
149
    ) {
 
150
        $pool = new self($client, $requests, $options);
 
151
        $pool->wait();
 
152
    }
 
153
 
 
154
    private function getPoolSize()
 
155
    {
 
156
        return is_callable($this->poolSize)
 
157
            ? call_user_func($this->poolSize, count($this->waitQueue))
 
158
            : $this->poolSize;
 
159
    }
 
160
 
 
161
    /**
 
162
     * Add as many requests as possible up to the current pool limit.
 
163
     */
 
164
    private function addNextRequests()
 
165
    {
 
166
        $limit = max($this->getPoolSize() - count($this->waitQueue), 0);
 
167
        while ($limit--) {
 
168
            if (!$this->addNextRequest()) {
 
169
                break;
 
170
            }
 
171
        }
 
172
    }
 
173
 
 
174
    public function wait()
 
175
    {
 
176
        if ($this->isRealized) {
 
177
            return false;
 
178
        }
 
179
 
 
180
        // Seed the pool with N number of requests.
 
181
        $this->addNextRequests();
 
182
 
 
183
        // Stop if the pool was cancelled while transferring requests.
 
184
        if ($this->isRealized) {
 
185
            return false;
 
186
        }
 
187
 
 
188
        // Wait on any outstanding FutureResponse objects.
 
189
        while ($response = array_pop($this->waitQueue)) {
 
190
            try {
 
191
                $response->wait();
 
192
            } catch (\Exception $e) {
 
193
                // Eat exceptions because they should be handled asynchronously
 
194
            }
 
195
            $this->addNextRequests();
 
196
        }
 
197
 
 
198
        // Clean up no longer needed state.
 
199
        $this->isRealized = true;
 
200
        $this->waitQueue = $this->eventListeners = [];
 
201
        $this->client = $this->iter = null;
 
202
        $this->deferred->resolve(true);
 
203
 
 
204
        return true;
 
205
    }
 
206
 
 
207
    /**
 
208
     * {@inheritdoc}
 
209
     *
 
210
     * Attempt to cancel all outstanding requests (requests that are queued for
 
211
     * dereferencing). Returns true if all outstanding requests can be
 
212
     * cancelled.
 
213
     *
 
214
     * @return bool
 
215
     */
 
216
    public function cancel()
 
217
    {
 
218
        if ($this->isRealized) {
 
219
            return false;
 
220
        }
 
221
 
 
222
        $success = $this->isRealized = true;
 
223
        foreach ($this->waitQueue as $response) {
 
224
            if (!$response->cancel()) {
 
225
                $success = false;
 
226
            }
 
227
        }
 
228
 
 
229
        return $success;
 
230
    }
 
231
 
 
232
    /**
 
233
     * Returns a promise that is invoked when the pool completed. There will be
 
234
     * no passed value.
 
235
     *
 
236
     * {@inheritdoc}
 
237
     */
 
238
    public function then(
 
239
        callable $onFulfilled = null,
 
240
        callable $onRejected = null,
 
241
        callable $onProgress = null
 
242
    ) {
 
243
        return $this->promise->then($onFulfilled, $onRejected, $onProgress);
 
244
    }
 
245
 
 
246
    public function promise()
 
247
    {
 
248
        return $this->promise;
 
249
    }
 
250
 
 
251
    private function coerceIterable($requests)
 
252
    {
 
253
        if ($requests instanceof \Iterator) {
 
254
            return $requests;
 
255
        } elseif (is_array($requests)) {
 
256
            return new \ArrayIterator($requests);
 
257
        }
 
258
 
 
259
        throw new \InvalidArgumentException('Expected Iterator or array. '
 
260
            . 'Found ' . Core::describeType($requests));
 
261
    }
 
262
 
 
263
    /**
 
264
     * Adds the next request to pool and tracks what requests need to be
 
265
     * dereferenced when completing the pool.
 
266
     */
 
267
    private function addNextRequest()
 
268
    {
 
269
        add_next:
 
270
 
 
271
        if ($this->isRealized || !$this->iter || !$this->iter->valid()) {
 
272
            return false;
 
273
        }
 
274
 
 
275
        $request = $this->iter->current();
 
276
        $this->iter->next();
 
277
 
 
278
        if (!($request instanceof RequestInterface)) {
 
279
            throw new \InvalidArgumentException(sprintf(
 
280
                'All requests in the provided iterator must implement '
 
281
                . 'RequestInterface. Found %s',
 
282
                Core::describeType($request)
 
283
            ));
 
284
        }
 
285
 
 
286
        // Be sure to use "lazy" futures, meaning they do not send right away.
 
287
        $request->getConfig()->set('future', 'lazy');
 
288
        $hash = spl_object_hash($request);
 
289
        $this->attachListeners($request, $this->eventListeners);
 
290
        $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY);
 
291
        $response = $this->client->send($request);
 
292
        $this->waitQueue[$hash] = $response;
 
293
        $promise = $response->promise();
 
294
 
 
295
        // Don't recursively call itself for completed or rejected responses.
 
296
        if ($promise instanceof FulfilledPromise
 
297
            || $promise instanceof RejectedPromise
 
298
        ) {
 
299
            try {
 
300
                $this->finishResponse($request, $response->wait(), $hash);
 
301
            } catch (\Exception $e) {
 
302
                $this->finishResponse($request, $e, $hash);
 
303
            }
 
304
            goto add_next;
 
305
        }
 
306
 
 
307
        // Use this function for both resolution and rejection.
 
308
        $thenFn = function ($value) use ($request, $hash) {
 
309
            $this->finishResponse($request, $value, $hash);
 
310
            if (!$request->getConfig()->get('_pool_retries')) {
 
311
                $this->addNextRequests();
 
312
            }
 
313
        };
 
314
 
 
315
        $promise->then($thenFn, $thenFn);
 
316
 
 
317
        return true;
 
318
    }
 
319
 
 
320
    public function _trackRetries(BeforeEvent $e)
 
321
    {
 
322
        $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount());
 
323
    }
 
324
 
 
325
    private function finishResponse($request, $value, $hash)
 
326
    {
 
327
        unset($this->waitQueue[$hash]);
 
328
        $result = $value instanceof ResponseInterface
 
329
            ? ['request' => $request, 'response' => $value, 'error' => null]
 
330
            : ['request' => $request, 'response' => null, 'error' => $value];
 
331
        $this->deferred->notify($result);
 
332
    }
 
333
}