~ballot/wordpress/openstack-objectstorage-breaking-insight

« back to all changes in this revision

Viewing changes to vendor/guzzlehttp/guzzle/src/Adapter/Curl/MultiAdapter.php

  • Committer: Jacek Nykis
  • Date: 2015-02-11 15:35:31 UTC
  • Revision ID: jacek.nykis@canonical.com-20150211153531-hmy6zi0ov2qfkl0b
Initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
<?php
 
2
 
 
3
namespace GuzzleHttp\Adapter\Curl;
 
4
 
 
5
use GuzzleHttp\Adapter\AdapterInterface;
 
6
use GuzzleHttp\Adapter\ParallelAdapterInterface;
 
7
use GuzzleHttp\Adapter\TransactionInterface;
 
8
use GuzzleHttp\Event\RequestEvents;
 
9
use GuzzleHttp\Exception\AdapterException;
 
10
use GuzzleHttp\Exception\RequestException;
 
11
use GuzzleHttp\Message\MessageFactoryInterface;
 
12
 
 
13
/**
 
14
 * HTTP adapter that uses cURL multi as a transport layer
 
15
 *
 
16
 * When using the CurlAdapter, custom curl options can be specified as an
 
17
 * associative array of curl option constants mapping to values in the
 
18
 * **curl** key of a request's configuration options.
 
19
 *
 
20
 * In addition to being able to supply configuration options via the curl
 
21
 * request config, you can also specify the select_timeout variable using the
 
22
 * `GUZZLE_CURL_SELECT_TIMEOUT` environment variable.
 
23
 */
 
24
class MultiAdapter implements AdapterInterface, ParallelAdapterInterface
 
25
{
 
26
    const ERROR_STR = 'See http://curl.haxx.se/libcurl/c/libcurl-errors.html for an explanation of cURL errors';
 
27
    const ENV_SELECT_TIMEOUT = 'GUZZLE_CURL_SELECT_TIMEOUT';
 
28
 
 
29
    /** @var CurlFactory */
 
30
    private $curlFactory;
 
31
 
 
32
    /** @var MessageFactoryInterface */
 
33
    private $messageFactory;
 
34
 
 
35
    /** @var array Array of curl multi handles */
 
36
    private $multiHandles = [];
 
37
 
 
38
    /** @var array Array of curl multi handles */
 
39
    private $multiOwned = [];
 
40
 
 
41
    /** @var int Total number of idle handles to keep in cache */
 
42
    private $maxHandles;
 
43
 
 
44
    /** @var double */
 
45
    private $selectTimeout;
 
46
 
 
47
    /**
 
48
     * Accepts an associative array of options:
 
49
     *
 
50
     * - handle_factory: Optional callable factory used to create cURL handles.
 
51
     *   The callable is invoked with the following arguments:
 
52
     *   TransactionInterface, MessageFactoryInterface, and an optional cURL
 
53
     *   handle to modify. The factory method must then return a cURL resource.
 
54
     * - select_timeout: Specify a float in seconds to use for a
 
55
     *   curl_multi_select timeout.
 
56
     * - max_handles: Maximum number of idle handles (defaults to 3).
 
57
     *
 
58
     * @param MessageFactoryInterface $messageFactory
 
59
     * @param array $options Array of options to use with the adapter:
 
60
     */
 
61
    public function __construct(
 
62
        MessageFactoryInterface $messageFactory,
 
63
        array $options = []
 
64
    ) {
 
65
        $this->messageFactory = $messageFactory;
 
66
        $this->curlFactory = isset($options['handle_factory'])
 
67
            ? $options['handle_factory']
 
68
            : new CurlFactory();
 
69
 
 
70
        if (isset($options['select_timeout'])) {
 
71
            $this->selectTimeout = $options['select_timeout'];
 
72
        } elseif (isset($_SERVER[self::ENV_SELECT_TIMEOUT])) {
 
73
            $this->selectTimeout = $_SERVER[self::ENV_SELECT_TIMEOUT];
 
74
        } else {
 
75
            $this->selectTimeout = 1;
 
76
        }
 
77
 
 
78
        $this->maxHandles = isset($options['max_handles'])
 
79
            ? $options['max_handles']
 
80
            : 3;
 
81
    }
 
82
 
 
83
    public function __destruct()
 
84
    {
 
85
        foreach ($this->multiHandles as $handle) {
 
86
            if (is_resource($handle)) {
 
87
                curl_multi_close($handle);
 
88
            }
 
89
        }
 
90
    }
 
91
 
 
92
    /**
 
93
     * Throw an exception for a cURL multi response
 
94
     *
 
95
     * @param int $code Curl response code
 
96
     * @throws AdapterException
 
97
     */
 
98
    public static function throwMultiError($code)
 
99
    {
 
100
        $buffer = function_exists('curl_multi_strerror')
 
101
            ? curl_multi_strerror($code)
 
102
            : self::ERROR_STR;
 
103
 
 
104
        throw new AdapterException(sprintf('cURL error %s: %s', $code, $buffer));
 
105
    }
 
106
 
 
107
    public function send(TransactionInterface $transaction)
 
108
    {
 
109
        $context = new BatchContext($this->checkoutMultiHandle(), true);
 
110
        $this->addHandle($transaction, $context);
 
111
        $this->perform($context);
 
112
 
 
113
        return $transaction->getResponse();
 
114
    }
 
115
 
 
116
    public function sendAll(\Iterator $transactions, $parallel)
 
117
    {
 
118
        $context = new BatchContext(
 
119
            $this->checkoutMultiHandle(),
 
120
            false,
 
121
            $transactions
 
122
        );
 
123
 
 
124
        foreach (new \LimitIterator($transactions, 0, $parallel) as $trans) {
 
125
            $this->addHandle($trans, $context);
 
126
        }
 
127
 
 
128
        $this->perform($context);
 
129
    }
 
130
 
 
131
    private function perform(BatchContext $context)
 
132
    {
 
133
        // The first curl_multi_select often times out no matter what, but is
 
134
        // usually required for fast transfers.
 
135
        $active = false;
 
136
        $multi = $context->getMultiHandle();
 
137
 
 
138
        do {
 
139
            do {
 
140
                $mrc = curl_multi_exec($multi, $active);
 
141
            } while ($mrc === CURLM_CALL_MULTI_PERFORM);
 
142
 
 
143
            if ($mrc != CURLM_OK) {
 
144
                self::throwMultiError($mrc);
 
145
            }
 
146
 
 
147
            $this->processMessages($context);
 
148
 
 
149
            if ($active &&
 
150
                curl_multi_select($multi, $this->selectTimeout) === -1
 
151
            ) {
 
152
                // Perform a usleep if a select returns -1.
 
153
                // See: https://bugs.php.net/bug.php?id=61141
 
154
                usleep(250);
 
155
            }
 
156
 
 
157
        } while ($context->isActive() || $active);
 
158
 
 
159
        $this->releaseMultiHandle($multi, $this->maxHandles);
 
160
    }
 
161
 
 
162
    private function processMessages(BatchContext $context)
 
163
    {
 
164
        $multi = $context->getMultiHandle();
 
165
 
 
166
        while ($done = curl_multi_info_read($multi)) {
 
167
            $transaction = $context->findTransaction($done['handle']);
 
168
            $this->processResponse($transaction, $done, $context);
 
169
            // Add the next transaction if there are more in the queue
 
170
            if ($next = $context->nextPending()) {
 
171
                $this->addHandle($next, $context);
 
172
            }
 
173
        }
 
174
    }
 
175
 
 
176
    private function processResponse(
 
177
        TransactionInterface $transaction,
 
178
        array $curl,
 
179
        BatchContext $context
 
180
    ) {
 
181
        $info = $context->removeTransaction($transaction);
 
182
 
 
183
        try {
 
184
            if (!$this->isCurlException($transaction, $curl, $context, $info) &&
 
185
                $this->validateResponseWasSet($transaction, $context)
 
186
            ) {
 
187
                if ($body = $transaction->getResponse()->getBody()) {
 
188
                    $body->seek(0);
 
189
                }
 
190
                RequestEvents::emitComplete($transaction, $info);
 
191
            }
 
192
        } catch (\Exception $e) {
 
193
            $this->throwException($e, $context);
 
194
        }
 
195
    }
 
196
 
 
197
    private function addHandle(
 
198
        TransactionInterface $transaction,
 
199
        BatchContext $context
 
200
    ) {
 
201
        try {
 
202
            RequestEvents::emitBefore($transaction);
 
203
            // Only transfer if the request was not intercepted
 
204
            if (!$transaction->getResponse()) {
 
205
                $factory = $this->curlFactory;
 
206
                $context->addTransaction(
 
207
                    $transaction,
 
208
                    $factory($transaction, $this->messageFactory)
 
209
                );
 
210
            }
 
211
        } catch (RequestException $e) {
 
212
            $this->throwException($e, $context);
 
213
        }
 
214
    }
 
215
 
 
216
    private function isCurlException(
 
217
        TransactionInterface $transaction,
 
218
        array $curl,
 
219
        BatchContext $context,
 
220
        array $info
 
221
    ) {
 
222
        if (CURLM_OK == $curl['result'] ||
 
223
            CURLM_CALL_MULTI_PERFORM == $curl['result']
 
224
        ) {
 
225
            return false;
 
226
        }
 
227
 
 
228
        $request = $transaction->getRequest();
 
229
        try {
 
230
            // Send curl stats along if they are available
 
231
            $stats = ['curl_result' => $curl['result']] + $info;
 
232
            RequestEvents::emitError(
 
233
                $transaction,
 
234
                new RequestException(
 
235
                    sprintf(
 
236
                        '[curl] (#%s) %s [url] %s',
 
237
                        $curl['result'],
 
238
                        function_exists('curl_strerror')
 
239
                            ? curl_strerror($curl['result'])
 
240
                            : self::ERROR_STR,
 
241
                        $request->getUrl()
 
242
                    ),
 
243
                    $request
 
244
                ),
 
245
                $stats
 
246
            );
 
247
        } catch (\Exception $e) {
 
248
            $this->throwException($e, $context);
 
249
        }
 
250
 
 
251
        return true;
 
252
    }
 
253
 
 
254
    private function throwException(\Exception $e, BatchContext $context)
 
255
    {
 
256
        if ($context->throwsExceptions()
 
257
            || ($e instanceof RequestException && $e->getThrowImmediately())
 
258
        ) {
 
259
            $context->removeAll();
 
260
            $this->releaseMultiHandle($context->getMultiHandle(), -1);
 
261
            throw $e;
 
262
        }
 
263
    }
 
264
 
 
265
    /**
 
266
     * Returns a curl_multi handle from the cache or creates a new one
 
267
     *
 
268
     * @return resource
 
269
     */
 
270
    private function checkoutMultiHandle()
 
271
    {
 
272
        // Find an unused handle in the cache
 
273
        $key = array_search(false, $this->multiOwned, true);
 
274
        if (false !== $key) {
 
275
            $this->multiOwned[$key] = true;
 
276
            return $this->multiHandles[$key];
 
277
        }
 
278
 
 
279
        // Add a new handle
 
280
        $handle = curl_multi_init();
 
281
        $id = (int) $handle;
 
282
        $this->multiHandles[$id] = $handle;
 
283
        $this->multiOwned[$id] = true;
 
284
 
 
285
        return $handle;
 
286
    }
 
287
 
 
288
    /**
 
289
     * Releases a curl_multi handle back into the cache and removes excess cache
 
290
     *
 
291
     * @param resource $handle Curl multi handle to remove
 
292
     * @param int $maxHandles (Optional) Maximum number of existing multiHandles to allow before pruning.
 
293
     */
 
294
    private function releaseMultiHandle($handle, $maxHandles)
 
295
    {
 
296
        $id = (int) $handle;
 
297
 
 
298
        if (count($this->multiHandles) <= $maxHandles) {
 
299
            $this->multiOwned[$id] = false;
 
300
        } elseif (isset($this->multiHandles[$id], $this->multiOwned[$id])) {
 
301
            // Prune excessive handles
 
302
            curl_multi_close($this->multiHandles[$id]);
 
303
            unset($this->multiHandles[$id], $this->multiOwned[$id]);
 
304
        }
 
305
    }
 
306
 
 
307
    /**
 
308
     * This function ensures that a response was set on a transaction. If one
 
309
     * was not set, then the request is retried if possible. This error
 
310
     * typically means you are sending a payload, curl encountered a
 
311
     * "Connection died, retrying a fresh connect" error, tried to rewind the
 
312
     * stream, and then encountered a "necessary data rewind wasn't possible"
 
313
     * error, causing the request to be sent through curl_multi_info_read()
 
314
     * without an error status.
 
315
     *
 
316
     * @param TransactionInterface $transaction
 
317
     * @param BatchContext         $context
 
318
     *
 
319
     * @return bool Returns true if it's OK, and false if it failed.
 
320
     * @throws \GuzzleHttp\Exception\RequestException If it failed and cannot
 
321
     *                                                recover.
 
322
     */
 
323
    private function validateResponseWasSet(
 
324
        TransactionInterface $transaction,
 
325
        BatchContext $context
 
326
    ) {
 
327
        if ($transaction->getResponse()) {
 
328
            return true;
 
329
        }
 
330
 
 
331
        $body = $transaction->getRequest()->getBody();
 
332
 
 
333
        if (!$body) {
 
334
            // This is weird and should probably never happen.
 
335
            RequestEvents::emitError(
 
336
                $transaction,
 
337
                new RequestException(
 
338
                    'No response was received for a request with no body. This'
 
339
                    . ' could mean that you are saturating your network.',
 
340
                    $transaction->getRequest()
 
341
                )
 
342
            );
 
343
        } elseif (!$body->isSeekable() || !$body->seek(0)) {
 
344
            // Nothing we can do with this. Sorry!
 
345
            RequestEvents::emitError(
 
346
                $transaction,
 
347
                new RequestException(
 
348
                    'The connection was unexpectedly closed. The request would'
 
349
                    . ' have been retried, but attempting to rewind the'
 
350
                    . ' request body failed. Consider wrapping your request'
 
351
                    . ' body in a CachingStream decorator to work around this'
 
352
                    . ' issue if necessary.',
 
353
                    $transaction->getRequest()
 
354
                )
 
355
            );
 
356
        } else {
 
357
            $this->retryFailedConnection($transaction, $context);
 
358
        }
 
359
 
 
360
        return false;
 
361
    }
 
362
 
 
363
    private function retryFailedConnection(
 
364
        TransactionInterface $transaction,
 
365
        BatchContext $context
 
366
    ) {
 
367
        // Add the request back to the batch to retry automatically.
 
368
        $context->addTransaction(
 
369
            $transaction,
 
370
            call_user_func(
 
371
                $this->curlFactory,
 
372
                $transaction,
 
373
                $this->messageFactory
 
374
            )
 
375
        );
 
376
    }
 
377
}