~simonkberg/ngx-pagespeed/release-1.9.32.3-beta

« back to all changes in this revision

Viewing changes to src/ngx_fetch.cc

  • Committer: Jeffrey Crowell
  • Author(s): Otto van der Schaaf
  • Date: 2014-10-24 20:23:45 UTC
  • Revision ID: git-v1:ab9929e5a5c631353a7eac4c49543aa8756d5e7a
native fetcher: Support http keep-alive

Based on @dinic his work, add keep-alive support for the native fetcher.
Adds a new option, usable at the http{} level in configuration:

pagespeed NativeFetcherMaxKeepaliveRequests 50;

The default value is 100 (aligned to nginx). Setting the value to 1 turns off
keep-alive requests altogether).

Most notable changes:
- Request keep-alive by adding the appropriate request header
- Fixes connections getting reused while they are servicing other requests:
- Remove connection from the pool of available connections for keepalive when applicable
- Disable keepalive in more appropriate situations
- Response parsing fixes
- Remove connections that timeout from the k.a. pool
- Add a few sanity (D)CHECKS
- Emit debug messages for traceability
- Fix for ignoring ipv6 addresses returned from dns queries when ipv6 is enabled.
- Bump the fetch timeout in test configuration to deflake tests that require dns
  lookups (which will be done via 8.8.8.8 currently for the native fetcher)

Conflicts:
        src/ngx_fetch.cc

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
//  - The read handler parses the response. Add the response to the buffer at
25
25
//    last.
26
26
 
 
27
// TODO(oschaaf): Currently the first applicable connection is picked from the
 
28
// pool when re-using connections. Perhaps it would be worth it to pick the one
 
29
// that was active the longest time ago to keep a larger pool available.
 
30
// TODO(oschaaf): style: reindent namespace according to google C++ style guide
 
31
// TODO(oschaaf): Retry mechanism for failures on a re-used k-a connection.
 
32
// Currently we don't think it's going to be an issue, see the comments at
 
33
// https://github.com/pagespeed/ngx_pagespeed/pull/781.
 
34
 
27
35
extern "C" {
28
36
#include <nginx.h>
29
37
}
56
64
#include "net/instaweb/util/public/writer.h"
57
65
 
58
66
namespace net_instaweb {
59
 
  NgxFetch::NgxFetch(const GoogleString& url,
60
 
                     AsyncFetch* async_fetch,
61
 
                     MessageHandler* message_handler,
62
 
                     ngx_msec_t timeout_ms,
63
 
                     ngx_log_t* log)
64
 
      : str_url_(url),
65
 
        fetcher_(NULL),
66
 
        async_fetch_(async_fetch),
67
 
        parser_(async_fetch->response_headers()),
68
 
        message_handler_(message_handler),
69
 
        bytes_received_(0),
70
 
        fetch_start_ms_(0),
71
 
        fetch_end_ms_(0),
72
 
        done_(false),
73
 
        content_length_(-1),
74
 
        content_length_known_(false),
75
 
        resolver_ctx_(NULL) {
76
 
    ngx_memzero(&url_, sizeof(url_));
77
 
    log_ = log;
 
67
 
 
68
class NgxConnection : public PoolElement<NgxConnection> {
 
69
 public:
 
70
  NgxConnection(MessageHandler* handler, int max_keepalive_requests);
 
71
  ~NgxConnection();
 
72
  void SetSock(u_char *sockaddr, socklen_t socklen) {
 
73
    socklen_ = socklen;
 
74
    ngx_memcpy(&sockaddr_, sockaddr, socklen);
 
75
  }
 
76
  // Close ensures that NgxConnection deletes itself at the appropriate time,
 
77
  // which can be after receiving a non-keepalive response, or when the remote
 
78
  // server closes the connection when the NgxConnection is pooled and idle.
 
79
  void Close();
 
80
 
 
81
  // Once keepalive is disabled, it can't be toggled back on.
 
82
  void set_keepalive(bool k) { keepalive_ = keepalive_ && k; }
 
83
  bool keepalive() { return keepalive_; }
 
84
 
 
85
  typedef Pool<NgxConnection> NgxConnectionPool;
 
86
 
 
87
  static NgxConnection* Connect(ngx_peer_connection_t* pc,
 
88
                                MessageHandler* handler,
 
89
                                int max_keepalive_requests);
 
90
  static void IdleWriteHandler(ngx_event_t* ev);
 
91
  static void IdleReadHandler(ngx_event_t* ev);
 
92
 
 
93
  static NgxConnectionPool connection_pool;
 
94
  static PthreadMutex connection_pool_mutex;
 
95
 
 
96
  // c_ is owned by NgxConnection and freed in ::Close()
 
97
  ngx_connection_t* c_;
 
98
  static const int64 keepalive_timeout_ms;
 
99
  static const GoogleString ka_header;
 
100
 
 
101
 private:
 
102
  int max_keepalive_requests_;
 
103
  bool keepalive_;
 
104
  socklen_t socklen_;
 
105
  u_char sockaddr_[NGX_SOCKADDRLEN];
 
106
  MessageHandler* handler_;
 
107
 
 
108
  DISALLOW_COPY_AND_ASSIGN(NgxConnection);
 
109
};
 
110
 
 
111
NgxConnection::NgxConnectionPool NgxConnection::connection_pool;
 
112
PthreadMutex NgxConnection::connection_pool_mutex;
 
113
// Default keepalive 60s.
 
114
const int64 NgxConnection::keepalive_timeout_ms = 60000;
 
115
const GoogleString NgxConnection::ka_header =
 
116
    StrCat("keep-alive ",
 
117
           Integer64ToString(NgxConnection::keepalive_timeout_ms));
 
118
 
 
119
NgxConnection::NgxConnection(MessageHandler* handler,
 
120
                             int max_keepalive_requests) {
 
121
  c_ = NULL;
 
122
  max_keepalive_requests_ = max_keepalive_requests;
 
123
  handler_ = handler;
 
124
  // max_keepalive_requests specifies the number of http requests that are
 
125
  // allowed to be performed over a single connection. So, a
 
126
  // max_keepalive_requests of 1 effectively disables keepalive.
 
127
  keepalive_ = max_keepalive_requests_ > 1;
 
128
}
 
129
 
 
130
NgxConnection::~NgxConnection() {
 
131
  CHECK(c_ == NULL) << "NgxFetch: Underlying connection should be NULL";
 
132
}
 
133
 
 
134
NgxConnection* NgxConnection::Connect(ngx_peer_connection_t* pc,
 
135
                                      MessageHandler* handler,
 
136
                                      int max_keepalive_requests) {
 
137
  NgxConnection* nc;
 
138
  {
 
139
    ScopedMutex lock(&NgxConnection::connection_pool_mutex);
 
140
 
 
141
    for (NgxConnectionPool::iterator p = connection_pool.begin();
 
142
         p != connection_pool.end(); ++p) {
 
143
      nc = *p;
 
144
 
 
145
      if (ngx_memn2cmp(static_cast<u_char*>(nc->sockaddr_),
 
146
                       reinterpret_cast<u_char*>(pc->sockaddr),
 
147
                       nc->socklen_, pc->socklen) == 0) {
 
148
        CHECK(nc->c_->idle) << "Pool should only contain idle connections!";
 
149
 
 
150
        nc->c_->idle = 0;
 
151
        nc->c_->log = pc->log;
 
152
        nc->c_->read->log = pc->log;
 
153
        nc->c_->write->log = pc->log;
 
154
        if (nc->c_->pool != NULL) {
 
155
          nc->c_->pool->log = pc->log;
 
156
        }
 
157
 
 
158
        if (nc->c_->read->timer_set) {
 
159
          ngx_del_timer(nc->c_->read);
 
160
        }
 
161
        connection_pool.Remove(nc);
 
162
 
 
163
        ngx_log_error(NGX_LOG_DEBUG, pc->log, 0,
 
164
                      "NgxFetch: re-using connection %p (pool size: %l)\n",
 
165
                      nc, connection_pool.size());
 
166
        return nc;
 
167
      }
 
168
    }
 
169
  }
 
170
 
 
171
  int rc = ngx_event_connect_peer(pc);
 
172
  if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
 
173
    return NULL;
 
174
  }
 
175
 
 
176
  // NgxConnection deletes itself if NgxConnection::Close()
 
177
  nc = new NgxConnection(handler, max_keepalive_requests);
 
178
  nc->SetSock(reinterpret_cast<u_char*>(pc->sockaddr), pc->socklen);
 
179
  nc->c_ = pc->connection;
 
180
  return nc;
 
181
}
 
182
 
 
183
void NgxConnection::Close() {
 
184
  bool removed_from_pool = false;
 
185
 
 
186
  {
 
187
    ScopedMutex lock(&NgxConnection::connection_pool_mutex);
 
188
    for (NgxConnectionPool::iterator p = connection_pool.begin();
 
189
         p != connection_pool.end(); ++p) {
 
190
      if (*p == this) {
 
191
        // When we get here, that means that the connection either has timed
 
192
        // out or has been closed remotely.
 
193
        connection_pool.Remove(this);
 
194
        ngx_log_error(NGX_LOG_DEBUG, c_->log, 0,
 
195
                      "NgxFetch: removed connection %p (pool size: %l)\n",
 
196
                      this, connection_pool.size());
 
197
        removed_from_pool = true;
 
198
        break;
 
199
      }
 
200
    }
 
201
  }
 
202
 
 
203
  max_keepalive_requests_--;
 
204
 
 
205
  if (c_->read->timer_set) {
 
206
    ngx_del_timer(c_->read);
 
207
  }
 
208
 
 
209
  if (c_->write->timer_set) {
 
210
    ngx_del_timer(c_->write);
 
211
  }
 
212
 
 
213
  if (!keepalive_ || max_keepalive_requests_ <= 0 || removed_from_pool) {
 
214
    ngx_close_connection(c_);
 
215
    c_ = NULL;
 
216
    delete this;
 
217
    return;
 
218
  }
 
219
 
 
220
  ngx_add_timer(c_->read, static_cast<ngx_msec_t>(
 
221
      NgxConnection::keepalive_timeout_ms));
 
222
 
 
223
  c_->data = this;
 
224
  c_->read->handler = NgxConnection::IdleReadHandler;
 
225
  c_->write->handler = NgxConnection::IdleWriteHandler;
 
226
  c_->idle = 1;
 
227
 
 
228
  // This connection should not be associated with current fetch.
 
229
  c_->log = ngx_cycle->log;
 
230
  c_->read->log = ngx_cycle->log;
 
231
  c_->write->log = ngx_cycle->log;
 
232
  if (c_->pool != NULL) {
 
233
    c_->pool->log = ngx_cycle->log;
 
234
  }
 
235
 
 
236
  // Allow this connection to be re-used, by adding it to the connection pool.
 
237
  {
 
238
    ScopedMutex lock(&NgxConnection::connection_pool_mutex);
 
239
    connection_pool.Add(this);
 
240
    ngx_log_error(NGX_LOG_DEBUG, c_->log, 0,
 
241
                  "NgxFetch: Added connection %p (pool size: %l - "
 
242
                  " max_keepalive_requests_ %d)\n",
 
243
                  this, connection_pool.size(), max_keepalive_requests_);
 
244
  }
 
245
}
 
246
 
 
247
void NgxConnection::IdleWriteHandler(ngx_event_t* ev) {
 
248
  ngx_connection_t* c = static_cast<ngx_connection_t*>(ev->data);
 
249
  u_char buf[1];
 
250
  int n = c->recv(c, buf, 1);
 
251
  if (c->write->timedout) {
 
252
    DCHECK(false) << "NgxFetch: write timeout not expected." << n;
 
253
  }
 
254
  if (n == NGX_AGAIN) {
 
255
    return;
 
256
  }
 
257
  DCHECK(false) << "NgxFetch: Unexpected write event" << n;
 
258
}
 
259
 
 
260
void NgxConnection::IdleReadHandler(ngx_event_t* ev) {
 
261
  ngx_connection_t* c = static_cast<ngx_connection_t*>(ev->data);
 
262
  NgxConnection* nc = static_cast<NgxConnection*>(c->data);
 
263
 
 
264
  if (c->read->timedout) {
 
265
    nc->set_keepalive(false);
 
266
    nc->Close();
 
267
    return;
 
268
  }
 
269
 
 
270
  char buf[1];
 
271
  int n;
 
272
 
 
273
  // not a timeout event, we should check connection
 
274
  n = recv(c->fd, buf, 1, MSG_PEEK);
 
275
  if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
 
276
    if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
 
277
      nc->set_keepalive(false);
 
278
      nc->Close();
 
279
      return;
 
280
    }
 
281
 
 
282
    return;
 
283
  }
 
284
 
 
285
  nc->set_keepalive(false);
 
286
  nc->Close();
 
287
}
 
288
 
 
289
NgxFetch::NgxFetch(const GoogleString& url,
 
290
                   AsyncFetch* async_fetch,
 
291
                   MessageHandler* message_handler,
 
292
                   ngx_log_t* log)
 
293
    : str_url_(url),
 
294
      fetcher_(NULL),
 
295
      async_fetch_(async_fetch),
 
296
      parser_(async_fetch->response_headers()),
 
297
      message_handler_(message_handler),
 
298
      bytes_received_(0),
 
299
      fetch_start_ms_(0),
 
300
      fetch_end_ms_(0),
 
301
      done_(false),
 
302
      content_length_(-1),
 
303
      content_length_known_(false),
 
304
      resolver_ctx_(NULL) {
 
305
  ngx_memzero(&url_, sizeof(url_));
 
306
  log_ = log;
 
307
  pool_ = NULL;
 
308
  timeout_event_ = NULL;
 
309
  connection_ = NULL;
 
310
}
 
311
 
 
312
NgxFetch::~NgxFetch() {
 
313
  if (timeout_event_ != NULL && timeout_event_->timer_set) {
 
314
    ngx_del_timer(timeout_event_);
 
315
  }
 
316
  if (connection_ != NULL) {
 
317
    connection_->Close();
 
318
    connection_ = NULL;
 
319
  }
 
320
  if (pool_ != NULL) {
 
321
    ngx_destroy_pool(pool_);
78
322
    pool_ = NULL;
 
323
  }
 
324
}
 
325
 
 
326
// This function is called by NgxUrlAsyncFetcher::StartFetch.
 
327
bool NgxFetch::Start(NgxUrlAsyncFetcher* fetcher) {
 
328
  fetcher_ = fetcher;
 
329
  bool ok = Init();
 
330
  if (ok) {
 
331
    ngx_log_error(NGX_LOG_DEBUG, log_, 0, "NgxFetch %p: initialized\n",
 
332
                  this);
 
333
  }  // else Init() will have emitted a reason
 
334
  return ok;
 
335
}
 
336
 
 
337
// Create the pool, parse the url, add the timeout event and
 
338
// hook the DNS resolver if needed. Else we connect directly.
 
339
// When this returns false, our caller (NgxUrlAsyncFetcher::StartFetch)
 
340
// will call fetch->CallbackDone()
 
341
bool NgxFetch::Init() {
 
342
  pool_ = ngx_create_pool(12288, log_);
 
343
  if (pool_ == NULL) {
 
344
    message_handler_->Message(kError, "NgxFetch: ngx_create_pool failed");
 
345
    return false;
 
346
  }
 
347
 
 
348
  if (!ParseUrl()) {
 
349
    message_handler_->Message(kError, "NgxFetch: ParseUrl() failed");
 
350
    return false;
 
351
  }
 
352
 
 
353
  timeout_event_ = static_cast<ngx_event_t*>(
 
354
      ngx_pcalloc(pool_, sizeof(ngx_event_t)));
 
355
  if (timeout_event_ == NULL) {
 
356
    message_handler_->Message(kError,
 
357
                              "NgxFetch: ngx_pcalloc failed for timeout");
 
358
    return false;
 
359
  }
 
360
 
 
361
  timeout_event_->data = this;
 
362
  timeout_event_->handler = NgxFetch::TimeoutHandler;
 
363
  timeout_event_->log = log_;
 
364
 
 
365
  ngx_add_timer(timeout_event_, fetcher_->fetch_timeout_);
 
366
  r_ = static_cast<ngx_http_request_t*>(
 
367
      ngx_pcalloc(pool_, sizeof(ngx_http_request_t)));
 
368
 
 
369
  if (r_ == NULL) {
 
370
    message_handler_->Message(kError,
 
371
                              "NgxFetch: ngx_pcalloc failed for timer");
 
372
    return false;
 
373
  }
 
374
  status_ = static_cast<ngx_http_status_t*>(
 
375
      ngx_pcalloc(pool_, sizeof(ngx_http_status_t)));
 
376
 
 
377
  if (status_ == NULL) {
 
378
    message_handler_->Message(kError,
 
379
                              "NgxFetch: ngx_pcalloc failed for status");
 
380
    return false;
 
381
  }
 
382
 
 
383
  // The host is either a domain name or an IP address.  First check
 
384
  // if it's a valid IP address and only if that fails fall back to
 
385
  // using the DNS resolver.
 
386
 
 
387
  // Maybe we have a Proxy.
 
388
  ngx_url_t* tmp_url = &url_;
 
389
  if (fetcher_->proxy_.url.len != 0) {
 
390
    tmp_url = &fetcher_->proxy_;
 
391
  }
 
392
 
 
393
  GoogleString s_ipaddress(reinterpret_cast<char*>(tmp_url->host.data),
 
394
                           tmp_url->host.len);
 
395
  ngx_memzero(&sin_, sizeof(sin_));
 
396
  sin_.sin_family = AF_INET;
 
397
  sin_.sin_port = htons(tmp_url->port);
 
398
  sin_.sin_addr.s_addr = inet_addr(s_ipaddress.c_str());
 
399
 
 
400
  if (sin_.sin_addr.s_addr == INADDR_NONE) {
 
401
    // inet_addr returned INADDR_NONE, which means the hostname
 
402
    // isn't a valid IP address.  Check DNS.
 
403
    ngx_resolver_ctx_t temp;
 
404
    temp.name.data = tmp_url->host.data;
 
405
    temp.name.len = tmp_url->host.len;
 
406
    resolver_ctx_ = ngx_resolve_start(fetcher_->resolver_, &temp);
 
407
    if (resolver_ctx_ == NULL || resolver_ctx_ == NGX_NO_RESOLVER) {
 
408
      // TODO(oschaaf): this spams the log, but is useful in the fetcher's
 
409
      // current state
 
410
      message_handler_->Message(
 
411
          kError, "NgxFetch: Couldn't start resolving, "
 
412
          "is there a proper resolver configured in nginx.conf?");
 
413
      return false;
 
414
    } else {
 
415
      ngx_log_error(NGX_LOG_DEBUG, log_, 0,
 
416
                    "NgxFetch %p: start resolve for: %s\n",
 
417
                    this, s_ipaddress.c_str());
 
418
    }
 
419
 
 
420
    resolver_ctx_->data = this;
 
421
    resolver_ctx_->name.data = tmp_url->host.data;
 
422
    resolver_ctx_->name.len = tmp_url->host.len;
 
423
 
 
424
#if (nginx_version < 1005008)
 
425
    resolver_ctx_->type = NGX_RESOLVE_A;
 
426
#endif
 
427
 
 
428
    resolver_ctx_->handler = NgxFetch::ResolveDoneHandler;
 
429
    resolver_ctx_->timeout = fetcher_->resolver_timeout_;
 
430
 
 
431
    if (ngx_resolve_name(resolver_ctx_) != NGX_OK) {
 
432
      message_handler_->Message(kWarning,
 
433
                                "NgxFetch: ngx_resolve_name failed");
 
434
      return false;
 
435
    }
 
436
  } else {
 
437
    if (InitRequest() != NGX_OK) {
 
438
      message_handler()->Message(kError, "NgxFetch: InitRequest failed");
 
439
      return false;
 
440
    }
 
441
  }
 
442
  return true;
 
443
}
 
444
 
 
445
const char* NgxFetch::str_url() {
 
446
  return str_url_.c_str();
 
447
}
 
448
 
 
449
// This function should be called only once. The only argument is sucess or
 
450
// not.
 
451
void NgxFetch::CallbackDone(bool success) {
 
452
  ngx_log_error(NGX_LOG_DEBUG, log_, 0, "NgxFetch %p: CallbackDone: %s\n",
 
453
                this, success ? "OK":"FAIL");
 
454
 
 
455
  if (async_fetch_ == NULL) {
 
456
    LOG(FATAL)
 
457
        << "BUG: NgxFetch callback called more than once on same fetch"
 
458
        << str_url_.c_str() << "(" << this << ").Please report this at"
 
459
        << "https://groups.google.com/forum/#!forum/ngx-pagespeed-discuss";
 
460
    return;
 
461
  }
 
462
 
 
463
  release_resolver();
 
464
 
 
465
  if (timeout_event_ && timeout_event_->timer_set) {
 
466
    ngx_del_timer(timeout_event_);
79
467
    timeout_event_ = NULL;
 
468
  }
 
469
 
 
470
  if (connection_ != NULL) {
 
471
    // Connection will be re-used only on responses that specify
 
472
    // 'Connection: keep-alive' in their headers.
 
473
    bool keepalive = false;
 
474
 
 
475
    if (success) {
 
476
      ConstStringStarVector v;
 
477
      if (async_fetch_->response_headers()->Lookup(
 
478
              StringPiece(HttpAttributes::kConnection), &v)) {
 
479
        for (size_t i = 0; i < v.size(); i++) {
 
480
          if (*v[i] == "keep-alive") {
 
481
            keepalive = true;
 
482
            break;
 
483
          } else if (*v[i] == "close") {
 
484
            break;
 
485
          }
 
486
        }
 
487
      }
 
488
      ngx_log_error(NGX_LOG_DEBUG, log_, 0,
 
489
                    "NgxFetch %p: connection %p attempt keep-alive: %s\n",
 
490
                    this, connection_, keepalive ? "Yes":"No");
 
491
    }
 
492
 
 
493
    connection_->set_keepalive(keepalive);
 
494
    connection_->Close();
80
495
    connection_ = NULL;
81
496
  }
82
497
 
83
 
  NgxFetch::~NgxFetch() {
84
 
    if (timeout_event_ != NULL && timeout_event_->timer_set) {
85
 
        ngx_del_timer(timeout_event_);
86
 
    }
87
 
    if (connection_ != NULL) {
88
 
      ngx_close_connection(connection_);
89
 
    }
90
 
    if (pool_ != NULL) {
91
 
      ngx_destroy_pool(pool_);
92
 
    }
93
 
  }
94
 
 
95
 
  // This function is called by NgxUrlAsyncFetcher::StartFetch.
96
 
  bool NgxFetch::Start(NgxUrlAsyncFetcher* fetcher) {
97
 
    fetcher_ = fetcher;
98
 
    return Init();
99
 
  }
100
 
 
101
 
  // Create the pool, parse the url, add the timeout event and
102
 
  // hook the DNS resolver if needed. Else we connect directly.
103
 
  // When this returns false, our caller (NgxUrlAsyncFetcher::StartFetch)
104
 
  // will call fetch->CallbackDone()
105
 
  bool NgxFetch::Init() {
106
 
    pool_ = ngx_create_pool(12288, log_);
107
 
    if (pool_ == NULL) {
108
 
      message_handler_->Message(kError, "NgxFetch: ngx_create_pool failed");
109
 
      return false;
110
 
    }
111
 
 
112
 
    if (!ParseUrl()) {
113
 
      message_handler_->Message(kError, "NgxFetch: ParseUrl() failed");
114
 
      return false;
115
 
    }
116
 
 
117
 
    timeout_event_ = static_cast<ngx_event_t*>(
118
 
        ngx_pcalloc(pool_, sizeof(ngx_event_t)));
119
 
    if (timeout_event_ == NULL) {
120
 
      message_handler_->Message(kError,
121
 
                                "NgxFetch: ngx_pcalloc failed for timeout");
122
 
      return false;
123
 
    }
124
 
    timeout_event_->data = this;
125
 
    timeout_event_->handler = NgxFetchTimeout;
126
 
    timeout_event_->log = log_;
127
 
 
128
 
    ngx_add_timer(timeout_event_, fetcher_->fetch_timeout_);
129
 
    r_ = static_cast<ngx_http_request_t*>(ngx_pcalloc(pool_,
130
 
                                           sizeof(ngx_http_request_t)));
131
 
    if (r_ == NULL) {
132
 
      message_handler_->Message(kError,
133
 
                                "NgxFetch: ngx_pcalloc failed for timer");
134
 
      return false;
135
 
    }
136
 
    status_ = static_cast<ngx_http_status_t*>(ngx_pcalloc(pool_,
137
 
                                              sizeof(ngx_http_status_t)));
138
 
    if (status_ == NULL) {
139
 
      message_handler_->Message(kError,
140
 
                                "NgxFetch: ngx_pcalloc failed for status");
141
 
      return false;
142
 
    }
143
 
 
144
 
    // The host is either a domain name or an IP address.  First check
145
 
    // if it's a valid IP address and only if that fails fall back to
146
 
    // using the DNS resolver.
147
 
 
148
 
    // Maybe we have a Proxy.
149
 
    ngx_url_t* tmp_url = &url_;
150
 
    if (0 != fetcher_->proxy_.url.len) {
151
 
      tmp_url = &fetcher_->proxy_;
152
 
    }
153
 
 
154
 
    GoogleString s_ipaddress(reinterpret_cast<char*>(tmp_url->host.data),
155
 
                             tmp_url->host.len);
156
 
    ngx_memzero(&sin_, sizeof(sin_));
157
 
    sin_.sin_family = AF_INET;
158
 
    sin_.sin_port = htons(tmp_url->port);
159
 
    sin_.sin_addr.s_addr = inet_addr(s_ipaddress.c_str());
160
 
 
161
 
    if (sin_.sin_addr.s_addr == INADDR_NONE) {
162
 
      // inet_addr returned INADDR_NONE, which means the hostname
163
 
      // isn't a valid IP address.  Check DNS.
164
 
      ngx_resolver_ctx_t temp;
165
 
      temp.name.data = tmp_url->host.data;
166
 
      temp.name.len = tmp_url->host.len;
167
 
      resolver_ctx_ = ngx_resolve_start(fetcher_->resolver_, &temp);
168
 
      if (resolver_ctx_ == NULL || resolver_ctx_ == NGX_NO_RESOLVER) {
169
 
        // TODO(oschaaf): this spams the log, but is useful in the fetcher's
170
 
        // current state
171
 
        message_handler_->Message(
172
 
            kError, "NgxFetch: Couldn't start resolving, "
173
 
            "is there a proper resolver configured in nginx.conf?");
174
 
        return false;
175
 
      }
176
 
 
177
 
      resolver_ctx_->data = this;
178
 
      resolver_ctx_->name.data = tmp_url->host.data;
179
 
      resolver_ctx_->name.len = tmp_url->host.len;
180
 
 
181
 
#if (nginx_version < 1005008)
182
 
      resolver_ctx_->type = NGX_RESOLVE_A;
183
 
#endif
184
 
 
185
 
      resolver_ctx_->handler = NgxFetchResolveDone;
186
 
      resolver_ctx_->timeout = fetcher_->resolver_timeout_;
187
 
 
188
 
      if (ngx_resolve_name(resolver_ctx_) != NGX_OK) {
189
 
        message_handler_->Message(kWarning,
190
 
                                  "NgxFetch: ngx_resolve_name failed");
191
 
        return false;
192
 
      }
193
 
    } else {
194
 
      if (InitRequest() != NGX_OK) {
195
 
        message_handler()->Message(kError, "NgxFetch: InitRequest failed");
196
 
        return false;
197
 
      }
198
 
    }
199
 
    return true;
200
 
  }
201
 
 
202
 
  const char* NgxFetch::str_url() {
203
 
      return str_url_.c_str();
204
 
  }
205
 
 
206
 
  // This function should be called only once. The only argument is sucess or
207
 
  // not.
208
 
  void NgxFetch::CallbackDone(bool success) {
209
 
    if (async_fetch_ == NULL) {
210
 
      LOG(FATAL)
211
 
          << "BUG: NgxFetch callback called more than once on same fetch"
212
 
          << str_url_.c_str() << "(" << this << ").Please report this at"
213
 
          << "https://groups.google.com/forum/#!forum/ngx-pagespeed-discuss";
214
 
      return;
215
 
    }
216
 
 
217
 
    release_resolver();
218
 
 
219
 
    if (timeout_event_ && timeout_event_->timer_set) {
220
 
      ngx_del_timer(timeout_event_);
221
 
      timeout_event_ = NULL;
222
 
    }
223
 
    if (connection_) {
224
 
      ngx_close_connection(connection_);
225
 
      connection_ = NULL;
226
 
    }
227
 
 
228
 
    async_fetch_->Done(success);
229
 
 
230
 
    if (fetcher_ != NULL) {
231
 
      if (fetcher_->track_original_content_length()
232
 
          && async_fetch_->response_headers()->Has(
 
498
  // TODO(oschaaf): see https://github.com/pagespeed/ngx_pagespeed/pull/755
 
499
  async_fetch_->Done(success);
 
500
 
 
501
  if (fetcher_ != NULL) {
 
502
    if (fetcher_->track_original_content_length()
 
503
        && async_fetch_->response_headers()->Has(
233
504
            HttpAttributes::kXOriginalContentLength)) {
234
 
        async_fetch_->extra_response_headers()->SetOriginalContentLength(
235
 
            bytes_received_);
236
 
      }
237
 
      fetcher_->FetchComplete(this);
238
 
    }
239
 
 
240
 
    async_fetch_ = NULL;
241
 
  }
242
 
 
243
 
  size_t NgxFetch::bytes_received() {
244
 
      return bytes_received_;
245
 
  }
246
 
  void NgxFetch::bytes_received_add(int64 x) {
247
 
      bytes_received_ += x;
248
 
  }
249
 
 
250
 
  int64 NgxFetch::fetch_start_ms() {
251
 
      return fetch_start_ms_;
252
 
  }
253
 
 
254
 
  void NgxFetch::set_fetch_start_ms(int64 start_ms) {
255
 
    fetch_start_ms_ = start_ms;
256
 
  }
257
 
 
258
 
  int64 NgxFetch::fetch_end_ms() {
259
 
      return fetch_end_ms_;
260
 
  }
261
 
 
262
 
  void NgxFetch::set_fetch_end_ms(int64 end_ms) {
263
 
      fetch_end_ms_ = end_ms;
264
 
  }
265
 
 
266
 
  MessageHandler* NgxFetch::message_handler() {
267
 
      return message_handler_;
268
 
  }
269
 
 
270
 
  bool NgxFetch::ParseUrl() {
271
 
    url_.url.len = str_url_.length();
272
 
    url_.url.data = static_cast<u_char*>(ngx_palloc(pool_, url_.url.len));
273
 
    if (url_.url.data == NULL) {
274
 
      return false;
275
 
    }
276
 
    str_url_.copy(reinterpret_cast<char*>(url_.url.data), str_url_.length(), 0);
277
 
 
278
 
    return NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
279
 
  }
280
 
 
281
 
  // Issue a request after the resolver is done
282
 
  void NgxFetch::NgxFetchResolveDone(ngx_resolver_ctx_t* resolver_ctx) {
283
 
    NgxFetch* fetch = static_cast<NgxFetch*>(resolver_ctx->data);
284
 
    NgxUrlAsyncFetcher* fetcher = fetch->fetcher_;
285
 
    if (resolver_ctx->state != NGX_OK) {
286
 
      if (fetch->timeout_event() != NULL && fetch->timeout_event()->timer_set) {
287
 
        ngx_del_timer(fetch->timeout_event());
288
 
        fetch->set_timeout_event(NULL);
289
 
      }
290
 
      fetch->message_handler()->Message(
291
 
          kWarning, "NgxFetch: failed to resolve host [%.*s]",
292
 
          static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data);
293
 
      fetch->CallbackDone(false);
294
 
      return;
295
 
    }
296
 
    ngx_memzero(&fetch->sin_, sizeof(fetch->sin_));
 
505
      async_fetch_->extra_response_headers()->SetOriginalContentLength(
 
506
          bytes_received_);
 
507
    }
 
508
    fetcher_->FetchComplete(this);
 
509
  }
 
510
 
 
511
  async_fetch_ = NULL;
 
512
}
 
513
 
 
514
size_t NgxFetch::bytes_received() {
 
515
  return bytes_received_;
 
516
}
 
517
void NgxFetch::bytes_received_add(int64 x) {
 
518
  bytes_received_ += x;
 
519
}
 
520
 
 
521
int64 NgxFetch::fetch_start_ms() {
 
522
  return fetch_start_ms_;
 
523
}
 
524
 
 
525
void NgxFetch::set_fetch_start_ms(int64 start_ms) {
 
526
  fetch_start_ms_ = start_ms;
 
527
}
 
528
 
 
529
int64 NgxFetch::fetch_end_ms() {
 
530
  return fetch_end_ms_;
 
531
}
 
532
 
 
533
void NgxFetch::set_fetch_end_ms(int64 end_ms) {
 
534
  fetch_end_ms_ = end_ms;
 
535
}
 
536
 
 
537
MessageHandler* NgxFetch::message_handler() {
 
538
  return message_handler_;
 
539
}
 
540
 
 
541
bool NgxFetch::ParseUrl() {
 
542
  url_.url.len = str_url_.length();
 
543
  url_.url.data = static_cast<u_char*>(ngx_palloc(pool_, url_.url.len));
 
544
  if (url_.url.data == NULL) {
 
545
    return false;
 
546
  }
 
547
  str_url_.copy(reinterpret_cast<char*>(url_.url.data), str_url_.length(), 0);
 
548
 
 
549
  return NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
 
550
}
 
551
 
 
552
// Issue a request after the resolver is done
 
553
void NgxFetch::ResolveDoneHandler(ngx_resolver_ctx_t* resolver_ctx) {
 
554
  NgxFetch* fetch = static_cast<NgxFetch*>(resolver_ctx->data);
 
555
  NgxUrlAsyncFetcher* fetcher = fetch->fetcher_;
 
556
 
 
557
  if (resolver_ctx->state != NGX_OK) {
 
558
    if (fetch->timeout_event() != NULL && fetch->timeout_event()->timer_set) {
 
559
      ngx_del_timer(fetch->timeout_event());
 
560
      fetch->set_timeout_event(NULL);
 
561
    }
 
562
    fetch->message_handler()->Message(
 
563
        kWarning, "NgxFetch %p: failed to resolve host [%.*s]", fetch,
 
564
        static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data);
 
565
    fetch->CallbackDone(false);
 
566
    return;
 
567
  }
 
568
 
 
569
  ngx_uint_t i;
 
570
  // Find the first ipv4 address. We don't support ipv6 yet.
 
571
  for (i = 0; i < resolver_ctx->naddrs; i++) {
 
572
    if (reinterpret_cast<struct sockaddr_in*>(
 
573
            resolver_ctx->addrs[i].sockaddr)->sin_family == AF_INET) {
 
574
      break;
 
575
    }
 
576
  }
 
577
 
 
578
  // If no suitable ipv4 address was found, we fail.
 
579
  if (i == resolver_ctx->naddrs) {
 
580
    if (fetch->timeout_event() != NULL && fetch->timeout_event()->timer_set) {
 
581
      ngx_del_timer(fetch->timeout_event());
 
582
      fetch->set_timeout_event(NULL);
 
583
    }
 
584
    fetch->message_handler()->Message(
 
585
        kWarning, "NgxFetch %p: no suitable address for host [%.*s]", fetch,
 
586
        static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data);
 
587
    fetch->CallbackDone(false);
 
588
  }
 
589
 
 
590
  ngx_memzero(&fetch->sin_, sizeof(fetch->sin_));
297
591
 
298
592
#if (nginx_version < 1005008)
299
 
    fetch->sin_.sin_addr.s_addr = resolver_ctx->addrs[0];
 
593
  fetch->sin_.sin_addr.s_addr = resolver_ctx->addrs[i];
300
594
#else
301
 
 
302
 
    struct sockaddr_in* sin;
303
 
    sin = reinterpret_cast<struct sockaddr_in*>(
304
 
        resolver_ctx->addrs[0].sockaddr);
305
 
    fetch->sin_.sin_family = sin->sin_family;
306
 
    fetch->sin_.sin_addr.s_addr = sin->sin_addr.s_addr;
 
595
  struct sockaddr_in* sin;
 
596
 
 
597
  sin = reinterpret_cast<struct sockaddr_in*>(
 
598
      resolver_ctx->addrs[i].sockaddr);
 
599
 
 
600
  fetch->sin_.sin_family = sin->sin_family;
 
601
  fetch->sin_.sin_addr.s_addr = sin->sin_addr.s_addr;
307
602
#endif
308
603
 
309
 
    fetch->sin_.sin_family = AF_INET;
310
 
    fetch->sin_.sin_port = htons(fetch->url_.port);
311
 
 
312
 
    // Maybe we have Proxy
313
 
    if (0 != fetcher->proxy_.url.len) {
314
 
      fetch->sin_.sin_port = htons(fetcher->proxy_.port);
315
 
    }
316
 
 
317
 
    char* ip_address = inet_ntoa(fetch->sin_.sin_addr);
318
 
 
319
 
    fetch->message_handler()->Message(
320
 
        kInfo, "NgxFetch: Resolved host [%.*s] to [%s]",
321
 
        static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data,
322
 
        ip_address);
323
 
 
324
 
    fetch->release_resolver();
325
 
 
326
 
    if (fetch->InitRequest() != NGX_OK) {
327
 
      fetch->message_handler()->Message(kError, "NgxFetch: InitRequest failed");
328
 
      fetch->CallbackDone(false);
329
 
    }
330
 
  }
331
 
 
332
 
  // prepare the send data for this fetch, and hook write event.
333
 
  int NgxFetch::InitRequest() {
334
 
    in_ = ngx_create_temp_buf(pool_, 4096);
335
 
    if (in_ == NULL) {
336
 
      return NGX_ERROR;
337
 
    }
338
 
 
339
 
    FixUserAgent();
340
 
 
341
 
    RequestHeaders* request_headers = async_fetch_->request_headers();
342
 
    ConstStringStarVector v;
343
 
    size_t size = 0;
344
 
    bool have_host = false;
345
 
    GoogleString port;
346
 
 
 
604
  fetch->sin_.sin_family = AF_INET;
 
605
  fetch->sin_.sin_port = htons(fetch->url_.port);
 
606
 
 
607
  // Maybe we have Proxy
 
608
  if (0 != fetcher->proxy_.url.len) {
 
609
    fetch->sin_.sin_port = htons(fetcher->proxy_.port);
 
610
  }
 
611
 
 
612
  char* ip_address = inet_ntoa(fetch->sin_.sin_addr);
 
613
 
 
614
  ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
615
                "NgxFetch %p: Resolved host [%V] to [%s]", fetch,
 
616
                &resolver_ctx->name, ip_address);
 
617
 
 
618
  fetch->release_resolver();
 
619
 
 
620
  if (fetch->InitRequest() != NGX_OK) {
 
621
    fetch->message_handler()->Message(kError, "NgxFetch: InitRequest failed");
 
622
    fetch->CallbackDone(false);
 
623
  }
 
624
}
 
625
 
 
626
// Prepare the request data for this fetch, and hook the write event.
 
627
int NgxFetch::InitRequest() {
 
628
  in_ = ngx_create_temp_buf(pool_, 4096);
 
629
  if (in_ == NULL) {
 
630
    return NGX_ERROR;
 
631
  }
 
632
 
 
633
  FixUserAgent();
 
634
 
 
635
  RequestHeaders* request_headers = async_fetch_->request_headers();
 
636
  ConstStringStarVector v;
 
637
  size_t size = 0;
 
638
  bool have_host = false;
 
639
  GoogleString port;
 
640
 
 
641
  response_handler = NgxFetch::HandleStatusLine;
 
642
  int rc = Connect();
 
643
  if (rc == NGX_AGAIN || rc == NGX_OK) {
 
644
    if (connection_->keepalive()) {
 
645
      request_headers->Add(HttpAttributes::kConnection,
 
646
                           NgxConnection::ka_header);
 
647
    }
347
648
    const char* method = request_headers->method_string();
348
649
    size_t method_len = strlen(method);
349
650
 
361
662
 
362
663
      // name: value\r\n
363
664
      size += request_headers->Name(i).length()
364
 
           + request_headers->Value(i).length()
365
 
           + 4;  // for ": \r\n"
 
665
          + request_headers->Value(i).length() + 4;  // 4 for ": \r\n"
366
666
    }
367
667
 
368
668
    if (!have_host) {
402
702
    }
403
703
    *(out_->last++) = CR;
404
704
    *(out_->last++) = LF;
405
 
 
406
 
    response_handler = NgxFetchHandleStatusLine;
407
 
    int rc = Connect();
408
705
    if (rc == NGX_AGAIN) {
409
706
      return NGX_OK;
410
 
    } else if (rc < NGX_OK) {
411
 
      return rc;
412
 
    }
413
 
 
414
 
    NgxFetchWrite(connection_->write);
415
 
    return NGX_OK;
416
 
  }
417
 
 
418
 
  int NgxFetch::Connect() {
419
 
    ngx_peer_connection_t pc;
420
 
    ngx_memzero(&pc, sizeof(pc));
421
 
    pc.sockaddr = (struct sockaddr*)&sin_;
422
 
    pc.socklen = sizeof(struct sockaddr_in);
423
 
    pc.name = &url_.host;
424
 
 
425
 
    // get callback is dummy function, it just returns NGX_OK
426
 
    pc.get = ngx_event_get_peer;
427
 
    pc.log_error = NGX_ERROR_ERR;
428
 
    pc.log = fetcher_->log_;
429
 
    pc.rcvbuf = -1;
430
 
 
431
 
    int rc = ngx_event_connect_peer(&pc);
432
 
    if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
433
 
      return rc;
434
 
    }
435
 
 
436
 
    connection_ = pc.connection;
437
 
    connection_->write->handler = NgxFetchWrite;
438
 
    connection_->read->handler = NgxFetchRead;
439
 
    connection_->data = this;
440
 
 
441
 
    // Timer set in Init() is still in effect.
 
707
    }
 
708
  } else if (rc < NGX_OK) {
442
709
    return rc;
443
710
  }
444
 
 
445
 
  // When the fetch sends the request completely, it will hook the read event,
446
 
  // and prepare to parse the response.
447
 
  void NgxFetch::NgxFetchWrite(ngx_event_t* wev) {
448
 
    ngx_connection_t* c = static_cast<ngx_connection_t*>(wev->data);
449
 
    NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
450
 
    ngx_buf_t* out = fetch->out_;
451
 
 
452
 
    while (out->pos < out->last) {
453
 
      int n = c->send(c, out->pos, out->last - out->pos);
454
 
      if (n >= 0) {
455
 
        out->pos += n;
456
 
      } else if (n == NGX_AGAIN) {
457
 
        if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
458
 
          fetch->CallbackDone(false);
459
 
        }
460
 
        // Timer set in Init() is still in effect.
461
 
        return;
462
 
      } else {
463
 
        c->error = 1;
 
711
  CHECK(rc == NGX_OK);
 
712
  NgxFetch::ConnectionWriteHandler(connection_->c_->write);
 
713
  return NGX_OK;
 
714
}
 
715
 
 
716
int NgxFetch::Connect() {
 
717
  ngx_peer_connection_t pc;
 
718
  ngx_memzero(&pc, sizeof(pc));
 
719
  pc.sockaddr = (struct sockaddr*)&sin_;
 
720
  pc.socklen = sizeof(struct sockaddr_in);
 
721
  pc.name = &url_.host;
 
722
 
 
723
  // get callback is dummy function, it just returns NGX_OK
 
724
  pc.get = ngx_event_get_peer;
 
725
  pc.log_error = NGX_ERROR_ERR;
 
726
  pc.log = fetcher_->log_;
 
727
  pc.rcvbuf = -1;
 
728
 
 
729
 
 
730
  connection_ = NgxConnection::Connect(&pc, message_handler(),
 
731
                                       fetcher_->max_keepalive_requests_);
 
732
  ngx_log_error(NGX_LOG_DEBUG, fetcher_->log_, 0,
 
733
                "NgxFetch %p Connect() connection %p for [%s]\n",
 
734
                this, connection_, str_url());
 
735
 
 
736
  if (connection_ == NULL) {
 
737
    return NGX_ERROR;
 
738
  }
 
739
 
 
740
  connection_->c_->write->handler = NgxFetch::ConnectionWriteHandler;
 
741
  connection_->c_->read->handler = NgxFetch::ConnectionReadHandler;
 
742
  connection_->c_->data = this;
 
743
 
 
744
  // Timer set in Init() is still in effect.
 
745
  return NGX_OK;
 
746
}
 
747
 
 
748
// When the fetch sends the request completely, it will hook the read event,
 
749
// and prepare to parse the response.
 
750
void NgxFetch::ConnectionWriteHandler(ngx_event_t* wev) {
 
751
  ngx_connection_t* c = static_cast<ngx_connection_t*>(wev->data);
 
752
  NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
 
753
  ngx_buf_t* out = fetch->out_;
 
754
 
 
755
  while (out->pos < out->last) {
 
756
    int n = c->send(c, out->pos, out->last - out->pos);
 
757
    ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
758
                  "NgxFetch %p: ConnectionWriteHandler "
 
759
                  "send result %d", fetch, n);
 
760
 
 
761
    if (n >= 0) {
 
762
      out->pos += n;
 
763
    } else if (n == NGX_AGAIN) {
 
764
      if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
464
765
        fetch->CallbackDone(false);
465
 
        return;
466
766
      }
467
 
    }
468
 
 
469
 
    if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
 
767
      // Timer set in Init() is still in effect.
 
768
      return;
 
769
    } else {
470
770
      c->error = 1;
471
771
      fetch->CallbackDone(false);
472
 
    }
473
 
 
 
772
      return;
 
773
    }
 
774
  }
 
775
 
 
776
  if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
 
777
    c->error = 1;
 
778
    fetch->CallbackDone(false);
 
779
  }
 
780
 
 
781
  return;
 
782
}
 
783
 
 
784
void NgxFetch::ConnectionReadHandler(ngx_event_t* rev) {
 
785
  ngx_connection_t* c = static_cast<ngx_connection_t*>(rev->data);
 
786
  NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
 
787
 
 
788
  for (;;) {
 
789
    int n = c->recv(
 
790
        c, fetch->in_->start, fetch->in_->end - fetch->in_->start);
 
791
 
 
792
    ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
793
                  "NgxFetch %p: ConnectionReadHandler "
 
794
                  "recv result %d", fetch, n);
 
795
 
 
796
    if (n == NGX_AGAIN) {
 
797
      break;
 
798
    }
 
799
 
 
800
    if (n == 0) {
 
801
      // If the content length was not known, we assume that we have read
 
802
      // all if we at least parsed the headers.
 
803
      // If we do know the content length, having a mismatch on the bytes read
 
804
      // will be interpreted as an error.
 
805
      if (fetch->content_length_known_) {
 
806
        fetch->CallbackDone(fetch->content_length_ == fetch->bytes_received_);
 
807
      } else {
 
808
        fetch->CallbackDone(fetch->parser_.headers_complete());
 
809
      }
 
810
 
 
811
      return;
 
812
    } else if (n > 0) {
 
813
      fetch->in_->pos = fetch->in_->start;
 
814
      fetch->in_->last = fetch->in_->start + n;
 
815
      if (!fetch->response_handler(c)) {
 
816
        fetch->CallbackDone(false);
 
817
        return;
 
818
      }
 
819
 
 
820
      if (fetch->done_) {
 
821
        fetch->CallbackDone(true);
 
822
        return;
 
823
      }
 
824
    }
 
825
 
 
826
    if (!rev->ready) {
 
827
      break;
 
828
    }
 
829
  }
 
830
 
 
831
  if (fetch->done_) {
 
832
    fetch->CallbackDone(true);
474
833
    return;
475
834
  }
476
835
 
477
 
  void NgxFetch::NgxFetchRead(ngx_event_t* rev) {
478
 
    ngx_connection_t* c = static_cast<ngx_connection_t*>(rev->data);
479
 
    NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
480
 
 
481
 
    for (;;) {
482
 
      int n = c->recv(
483
 
          c, fetch->in_->start, fetch->in_->end - fetch->in_->start);
484
 
 
485
 
      if (n == NGX_AGAIN) {
486
 
        break;
487
 
      }
488
 
 
489
 
      if (n == 0) {
490
 
        // If the content length was not known, we assume that we have read
491
 
        // all if we at least parsed the headers.
492
 
        // If we do know the content length, having a mismatch on the bytes read
493
 
        // will be interpreted as an error.
494
 
        if (fetch->content_length_known_) {
495
 
          fetch->CallbackDone(fetch->content_length_ == fetch->bytes_received_);
496
 
        } else {
497
 
          fetch->CallbackDone(fetch->parser_.headers_complete());
498
 
        }
499
 
 
500
 
        return;
501
 
      } else if (n > 0) {
502
 
        fetch->in_->pos = fetch->in_->start;
503
 
        fetch->in_->last = fetch->in_->start + n;
504
 
        if (!fetch->response_handler(c)) {
505
 
          fetch->CallbackDone(false);
506
 
          return;
507
 
        }
508
 
 
509
 
        if (fetch->done_) {
510
 
          fetch->CallbackDone(true);
511
 
          return;
512
 
        }
513
 
      }
514
 
 
515
 
      if (!rev->ready) {
516
 
        break;
517
 
      }
518
 
    }
519
 
 
520
 
    if (fetch->done_) {
521
 
      fetch->CallbackDone(true);
522
 
      return;
523
 
    }
524
 
 
525
 
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
526
 
      fetch->CallbackDone(false);
527
 
    }
528
 
 
529
 
    // Timer set in Init() is still in effect.
530
 
  }
531
 
 
532
 
  // Parse the status line: "HTTP/1.1 200 OK\r\n"
533
 
  bool NgxFetch::NgxFetchHandleStatusLine(ngx_connection_t* c) {
534
 
    NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
535
 
    // This function only works after Nginx-1.1.4. Before nginx-1.1.4,
536
 
    // ngx_http_parse_status_line didn't save http_version.
537
 
    ngx_int_t n = ngx_http_parse_status_line(fetch->r_, fetch->in_,
538
 
                                             fetch->status_);
539
 
    if (n == NGX_ERROR) {  // parse status line error
540
 
      fetch->message_handler()->Message(
541
 
          kWarning, "NgxFetch: failed to parse status line");
542
 
      return false;
543
 
    } else if (n == NGX_AGAIN) {  // not completed
544
 
      return true;
545
 
    }
546
 
    ResponseHeaders* response_headers =
 
836
  if (ngx_handle_read_event(rev, 0) != NGX_OK) {
 
837
    fetch->CallbackDone(false);
 
838
  }
 
839
 
 
840
  // Timer set in Init() is still in effect.
 
841
}
 
842
 
 
843
// Parse the status line: "HTTP/1.1 200 OK\r\n"
 
844
bool NgxFetch::HandleStatusLine(ngx_connection_t* c) {
 
845
  NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
 
846
  ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
847
                "NgxFetch %p: Handle status line\n", fetch);
 
848
 
 
849
  // This function only works after Nginx-1.1.4. Before nginx-1.1.4,
 
850
  // ngx_http_parse_status_line didn't save http_version.
 
851
  ngx_int_t n = ngx_http_parse_status_line(fetch->r_, fetch->in_,
 
852
                                           fetch->status_);
 
853
  if (n == NGX_ERROR) {  // parse status line error
 
854
    fetch->message_handler()->Message(
 
855
        kWarning, "NgxFetch: failed to parse status line");
 
856
    return false;
 
857
  } else if (n == NGX_AGAIN) {  // not completed
 
858
    return true;
 
859
  }
 
860
  ResponseHeaders* response_headers =
547
861
      fetch->async_fetch_->response_headers();
548
 
    response_headers->SetStatusAndReason(
549
 
        static_cast<HttpStatus::Code>(fetch->get_status_code()));
550
 
    response_headers->set_major_version(fetch->get_major_version());
551
 
    response_headers->set_minor_version(fetch->get_minor_version());
552
 
    fetch->set_response_handler(NgxFetchHandleHeader);
553
 
    return fetch->response_handler(c);
554
 
  }
555
 
 
556
 
  // Parse the HTTP headers
557
 
  bool NgxFetch::NgxFetchHandleHeader(ngx_connection_t* c) {
558
 
    NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
559
 
    char* data = reinterpret_cast<char*>(fetch->in_->pos);
560
 
    size_t size = fetch->in_->last - fetch->in_->pos;
561
 
    size_t n = fetch->parser_.ParseChunk(StringPiece(data, size),
562
 
        fetch->message_handler_);
563
 
    if (n > size) {
564
 
      return false;
565
 
    } else if (fetch->parser_.headers_complete()) {
566
 
      if (fetch->async_fetch_->response_headers()->FindContentLength(
567
 
              &fetch->content_length_)) {
568
 
        if (fetch->content_length_ < 0) {
569
 
          fetch->message_handler_->Message(
570
 
              kError, "Negative content-length in response header");
571
 
          return false;
572
 
        } else {
573
 
          fetch->content_length_known_ = true;
 
862
  response_headers->SetStatusAndReason(
 
863
      static_cast<HttpStatus::Code>(fetch->get_status_code()));
 
864
  response_headers->set_major_version(fetch->get_major_version());
 
865
  response_headers->set_minor_version(fetch->get_minor_version());
 
866
  fetch->set_response_handler(NgxFetch::HandleHeader);
 
867
  return fetch->response_handler(c);
 
868
}
 
869
 
 
870
// Parse the HTTP headers
 
871
bool NgxFetch::HandleHeader(ngx_connection_t* c) {
 
872
  NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
 
873
  char* data = reinterpret_cast<char*>(fetch->in_->pos);
 
874
  size_t size = fetch->in_->last - fetch->in_->pos;
 
875
  size_t n = fetch->parser_.ParseChunk(StringPiece(data, size),
 
876
                                       fetch->message_handler_);
 
877
 
 
878
  ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
879
                "NgxFetch %p: Handle headers\n", fetch);
 
880
 
 
881
  if (n > size) {
 
882
    return false;
 
883
  } else if (fetch->parser_.headers_complete()) {
 
884
    if (fetch->async_fetch_->response_headers()->FindContentLength(
 
885
            &fetch->content_length_)) {
 
886
      if (fetch->content_length_ < 0) {
 
887
        fetch->message_handler_->Message(
 
888
            kError, "Negative content-length in response header");
 
889
        return false;
 
890
      } else {
 
891
        fetch->content_length_known_ = true;
 
892
        if (fetch->content_length_ == 0) {
 
893
          fetch->done_ = true;
574
894
        }
575
895
      }
576
 
 
577
 
      if (fetch->fetcher_->track_original_content_length()
578
 
         && fetch->content_length_known_) {
579
 
        fetch->async_fetch_->response_headers()->SetOriginalContentLength(
580
 
            fetch->content_length_);
581
 
      }
582
 
 
583
 
      fetch->in_->pos += n;
584
 
      fetch->set_response_handler(NgxFetchHandleBody);
 
896
    }
 
897
 
 
898
    if (fetch->fetcher_->track_original_content_length()
 
899
        && fetch->content_length_known_) {
 
900
      fetch->async_fetch_->response_headers()->SetOriginalContentLength(
 
901
          fetch->content_length_);
 
902
    }
 
903
 
 
904
    fetch->in_->pos += n;
 
905
    fetch->set_response_handler(NgxFetch::HandleBody);
 
906
    if ((fetch->in_->last - fetch->in_->pos) > 0) {
585
907
      return fetch->response_handler(c);
586
908
    }
587
 
    return true;
 
909
  } else {
 
910
    fetch->in_->pos += n;
588
911
  }
589
 
 
590
 
  // Read the response body
591
 
  bool NgxFetch::NgxFetchHandleBody(ngx_connection_t* c) {
592
 
    NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
593
 
    char* data = reinterpret_cast<char*>(fetch->in_->pos);
594
 
    size_t size = fetch->in_->last - fetch->in_->pos;
595
 
    if (size == 0) {
596
 
      return true;
597
 
    }
598
 
 
599
 
    fetch->bytes_received_add(size);
600
 
 
601
 
    if (fetch->async_fetch_->Write(StringPiece(data, size),
602
 
        fetch->message_handler())) {
603
 
      if (fetch->content_length_known_ &&
604
 
          fetch->bytes_received_ == fetch->content_length_) {
605
 
        fetch->done_ = true;
606
 
      }
607
 
      return true;
608
 
    }
 
912
  return true;
 
913
}
 
914
 
 
915
// Read the response body
 
916
bool NgxFetch::HandleBody(ngx_connection_t* c) {
 
917
  NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
 
918
  char* data = reinterpret_cast<char*>(fetch->in_->pos);
 
919
  size_t size = fetch->in_->last - fetch->in_->pos;
 
920
 
 
921
  fetch->bytes_received_add(size);
 
922
 
 
923
  ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
924
                "NgxFetch %p: Handle body (%d bytes)\n", fetch, size);
 
925
 
 
926
  if ( fetch->async_fetch_->Write(StringPiece(data, size),
 
927
                                  fetch->message_handler()) ) {
 
928
    if (fetch->bytes_received_ == fetch->content_length_) {
 
929
      fetch->done_ = true;
 
930
    }
 
931
    fetch->in_->pos += size;
 
932
  } else {
 
933
    ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
934
                  "NgxFetch %p: async fetch write failure\n", fetch);
609
935
    return false;
610
936
  }
611
 
 
612
 
  void NgxFetch::NgxFetchTimeout(ngx_event_t* tev) {
613
 
    NgxFetch* fetch = static_cast<NgxFetch*>(tev->data);
614
 
    fetch->CallbackDone(false);
615
 
  }
616
 
 
617
 
  void NgxFetch::FixUserAgent() {
618
 
    GoogleString user_agent;
619
 
    ConstStringStarVector v;
620
 
    RequestHeaders* request_headers = async_fetch_->request_headers();
621
 
    if (request_headers->Lookup(HttpAttributes::kUserAgent, &v)) {
622
 
      for (int i = 0, n = v.size(); i < n; i++) {
623
 
        if (i != 0) {
624
 
          user_agent += " ";
625
 
        }
626
 
 
627
 
        if (v[i] != NULL) {
628
 
          user_agent += *(v[i]);
629
 
        }
630
 
      }
631
 
      request_headers->RemoveAll(HttpAttributes::kUserAgent);
632
 
    }
633
 
    if (user_agent.empty()) {
634
 
      user_agent += "NgxNativeFetcher";
635
 
    }
636
 
    GoogleString version = StrCat(
637
 
        " ", kModPagespeedSubrequestUserAgent,
638
 
        "/" MOD_PAGESPEED_VERSION_STRING "-" LASTCHANGE_STRING);
639
 
    if (!StringPiece(user_agent).ends_with(version)) {
640
 
      user_agent += version;
641
 
    }
642
 
    request_headers->Add(HttpAttributes::kUserAgent, user_agent);
643
 
  }
 
937
  return true;
 
938
}
 
939
 
 
940
void NgxFetch::TimeoutHandler(ngx_event_t* tev) {
 
941
  NgxFetch* fetch = static_cast<NgxFetch*>(tev->data);
 
942
  ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
 
943
                "NgxFetch %p: TimeoutHandler called\n", fetch);
 
944
  fetch->CallbackDone(false);
 
945
}
 
946
 
 
947
void NgxFetch::FixUserAgent() {
 
948
  GoogleString user_agent;
 
949
  ConstStringStarVector v;
 
950
  RequestHeaders* request_headers = async_fetch_->request_headers();
 
951
  if (request_headers->Lookup(HttpAttributes::kUserAgent, &v)) {
 
952
    for (size_t i = 0, n = v.size(); i < n; i++) {
 
953
      if (i != 0) {
 
954
        user_agent += " ";
 
955
      }
 
956
 
 
957
      if (v[i] != NULL) {
 
958
        user_agent += *(v[i]);
 
959
      }
 
960
    }
 
961
    request_headers->RemoveAll(HttpAttributes::kUserAgent);
 
962
  }
 
963
  if (user_agent.empty()) {
 
964
    user_agent += "NgxNativeFetcher";
 
965
  }
 
966
  GoogleString version = StrCat(
 
967
      " ", kModPagespeedSubrequestUserAgent,
 
968
      "/" MOD_PAGESPEED_VERSION_STRING "-" LASTCHANGE_STRING);
 
969
  if (!StringPiece(user_agent).ends_with(version)) {
 
970
    user_agent += version;
 
971
  }
 
972
  request_headers->Add(HttpAttributes::kUserAgent, user_agent);
 
973
}
 
974
 
644
975
}  // namespace net_instaweb