56
64
#include "net/instaweb/util/public/writer.h"
58
66
namespace net_instaweb {
59
NgxFetch::NgxFetch(const GoogleString& url,
60
AsyncFetch* async_fetch,
61
MessageHandler* message_handler,
62
ngx_msec_t timeout_ms,
66
async_fetch_(async_fetch),
67
parser_(async_fetch->response_headers()),
68
message_handler_(message_handler),
74
content_length_known_(false),
76
ngx_memzero(&url_, sizeof(url_));
68
class NgxConnection : public PoolElement<NgxConnection> {
70
NgxConnection(MessageHandler* handler, int max_keepalive_requests);
72
void SetSock(u_char *sockaddr, socklen_t socklen) {
74
ngx_memcpy(&sockaddr_, sockaddr, socklen);
76
// Close ensures that NgxConnection deletes itself at the appropriate time,
77
// which can be after receiving a non-keepalive response, or when the remote
78
// server closes the connection when the NgxConnection is pooled and idle.
81
// Once keepalive is disabled, it can't be toggled back on.
82
void set_keepalive(bool k) { keepalive_ = keepalive_ && k; }
83
bool keepalive() { return keepalive_; }
85
typedef Pool<NgxConnection> NgxConnectionPool;
87
static NgxConnection* Connect(ngx_peer_connection_t* pc,
88
MessageHandler* handler,
89
int max_keepalive_requests);
90
static void IdleWriteHandler(ngx_event_t* ev);
91
static void IdleReadHandler(ngx_event_t* ev);
93
static NgxConnectionPool connection_pool;
94
static PthreadMutex connection_pool_mutex;
96
// c_ is owned by NgxConnection and freed in ::Close()
98
static const int64 keepalive_timeout_ms;
99
static const GoogleString ka_header;
102
int max_keepalive_requests_;
105
u_char sockaddr_[NGX_SOCKADDRLEN];
106
MessageHandler* handler_;
108
DISALLOW_COPY_AND_ASSIGN(NgxConnection);
111
NgxConnection::NgxConnectionPool NgxConnection::connection_pool;
112
PthreadMutex NgxConnection::connection_pool_mutex;
113
// Default keepalive 60s.
114
const int64 NgxConnection::keepalive_timeout_ms = 60000;
115
const GoogleString NgxConnection::ka_header =
116
StrCat("keep-alive ",
117
Integer64ToString(NgxConnection::keepalive_timeout_ms));
119
NgxConnection::NgxConnection(MessageHandler* handler,
120
int max_keepalive_requests) {
122
max_keepalive_requests_ = max_keepalive_requests;
124
// max_keepalive_requests specifies the number of http requests that are
125
// allowed to be performed over a single connection. So, a
126
// max_keepalive_requests of 1 effectively disables keepalive.
127
keepalive_ = max_keepalive_requests_ > 1;
130
NgxConnection::~NgxConnection() {
131
CHECK(c_ == NULL) << "NgxFetch: Underlying connection should be NULL";
134
NgxConnection* NgxConnection::Connect(ngx_peer_connection_t* pc,
135
MessageHandler* handler,
136
int max_keepalive_requests) {
139
ScopedMutex lock(&NgxConnection::connection_pool_mutex);
141
for (NgxConnectionPool::iterator p = connection_pool.begin();
142
p != connection_pool.end(); ++p) {
145
if (ngx_memn2cmp(static_cast<u_char*>(nc->sockaddr_),
146
reinterpret_cast<u_char*>(pc->sockaddr),
147
nc->socklen_, pc->socklen) == 0) {
148
CHECK(nc->c_->idle) << "Pool should only contain idle connections!";
151
nc->c_->log = pc->log;
152
nc->c_->read->log = pc->log;
153
nc->c_->write->log = pc->log;
154
if (nc->c_->pool != NULL) {
155
nc->c_->pool->log = pc->log;
158
if (nc->c_->read->timer_set) {
159
ngx_del_timer(nc->c_->read);
161
connection_pool.Remove(nc);
163
ngx_log_error(NGX_LOG_DEBUG, pc->log, 0,
164
"NgxFetch: re-using connection %p (pool size: %l)\n",
165
nc, connection_pool.size());
171
int rc = ngx_event_connect_peer(pc);
172
if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
176
// NgxConnection deletes itself if NgxConnection::Close()
177
nc = new NgxConnection(handler, max_keepalive_requests);
178
nc->SetSock(reinterpret_cast<u_char*>(pc->sockaddr), pc->socklen);
179
nc->c_ = pc->connection;
183
void NgxConnection::Close() {
184
bool removed_from_pool = false;
187
ScopedMutex lock(&NgxConnection::connection_pool_mutex);
188
for (NgxConnectionPool::iterator p = connection_pool.begin();
189
p != connection_pool.end(); ++p) {
191
// When we get here, that means that the connection either has timed
192
// out or has been closed remotely.
193
connection_pool.Remove(this);
194
ngx_log_error(NGX_LOG_DEBUG, c_->log, 0,
195
"NgxFetch: removed connection %p (pool size: %l)\n",
196
this, connection_pool.size());
197
removed_from_pool = true;
203
max_keepalive_requests_--;
205
if (c_->read->timer_set) {
206
ngx_del_timer(c_->read);
209
if (c_->write->timer_set) {
210
ngx_del_timer(c_->write);
213
if (!keepalive_ || max_keepalive_requests_ <= 0 || removed_from_pool) {
214
ngx_close_connection(c_);
220
ngx_add_timer(c_->read, static_cast<ngx_msec_t>(
221
NgxConnection::keepalive_timeout_ms));
224
c_->read->handler = NgxConnection::IdleReadHandler;
225
c_->write->handler = NgxConnection::IdleWriteHandler;
228
// This connection should not be associated with current fetch.
229
c_->log = ngx_cycle->log;
230
c_->read->log = ngx_cycle->log;
231
c_->write->log = ngx_cycle->log;
232
if (c_->pool != NULL) {
233
c_->pool->log = ngx_cycle->log;
236
// Allow this connection to be re-used, by adding it to the connection pool.
238
ScopedMutex lock(&NgxConnection::connection_pool_mutex);
239
connection_pool.Add(this);
240
ngx_log_error(NGX_LOG_DEBUG, c_->log, 0,
241
"NgxFetch: Added connection %p (pool size: %l - "
242
" max_keepalive_requests_ %d)\n",
243
this, connection_pool.size(), max_keepalive_requests_);
247
void NgxConnection::IdleWriteHandler(ngx_event_t* ev) {
248
ngx_connection_t* c = static_cast<ngx_connection_t*>(ev->data);
250
int n = c->recv(c, buf, 1);
251
if (c->write->timedout) {
252
DCHECK(false) << "NgxFetch: write timeout not expected." << n;
254
if (n == NGX_AGAIN) {
257
DCHECK(false) << "NgxFetch: Unexpected write event" << n;
260
void NgxConnection::IdleReadHandler(ngx_event_t* ev) {
261
ngx_connection_t* c = static_cast<ngx_connection_t*>(ev->data);
262
NgxConnection* nc = static_cast<NgxConnection*>(c->data);
264
if (c->read->timedout) {
265
nc->set_keepalive(false);
273
// not a timeout event, we should check connection
274
n = recv(c->fd, buf, 1, MSG_PEEK);
275
if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
276
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
277
nc->set_keepalive(false);
285
nc->set_keepalive(false);
289
NgxFetch::NgxFetch(const GoogleString& url,
290
AsyncFetch* async_fetch,
291
MessageHandler* message_handler,
295
async_fetch_(async_fetch),
296
parser_(async_fetch->response_headers()),
297
message_handler_(message_handler),
303
content_length_known_(false),
304
resolver_ctx_(NULL) {
305
ngx_memzero(&url_, sizeof(url_));
308
timeout_event_ = NULL;
312
NgxFetch::~NgxFetch() {
313
if (timeout_event_ != NULL && timeout_event_->timer_set) {
314
ngx_del_timer(timeout_event_);
316
if (connection_ != NULL) {
317
connection_->Close();
321
ngx_destroy_pool(pool_);
326
// This function is called by NgxUrlAsyncFetcher::StartFetch.
327
bool NgxFetch::Start(NgxUrlAsyncFetcher* fetcher) {
331
ngx_log_error(NGX_LOG_DEBUG, log_, 0, "NgxFetch %p: initialized\n",
333
} // else Init() will have emitted a reason
337
// Create the pool, parse the url, add the timeout event and
338
// hook the DNS resolver if needed. Else we connect directly.
339
// When this returns false, our caller (NgxUrlAsyncFetcher::StartFetch)
340
// will call fetch->CallbackDone()
341
bool NgxFetch::Init() {
342
pool_ = ngx_create_pool(12288, log_);
344
message_handler_->Message(kError, "NgxFetch: ngx_create_pool failed");
349
message_handler_->Message(kError, "NgxFetch: ParseUrl() failed");
353
timeout_event_ = static_cast<ngx_event_t*>(
354
ngx_pcalloc(pool_, sizeof(ngx_event_t)));
355
if (timeout_event_ == NULL) {
356
message_handler_->Message(kError,
357
"NgxFetch: ngx_pcalloc failed for timeout");
361
timeout_event_->data = this;
362
timeout_event_->handler = NgxFetch::TimeoutHandler;
363
timeout_event_->log = log_;
365
ngx_add_timer(timeout_event_, fetcher_->fetch_timeout_);
366
r_ = static_cast<ngx_http_request_t*>(
367
ngx_pcalloc(pool_, sizeof(ngx_http_request_t)));
370
message_handler_->Message(kError,
371
"NgxFetch: ngx_pcalloc failed for timer");
374
status_ = static_cast<ngx_http_status_t*>(
375
ngx_pcalloc(pool_, sizeof(ngx_http_status_t)));
377
if (status_ == NULL) {
378
message_handler_->Message(kError,
379
"NgxFetch: ngx_pcalloc failed for status");
383
// The host is either a domain name or an IP address. First check
384
// if it's a valid IP address and only if that fails fall back to
385
// using the DNS resolver.
387
// Maybe we have a Proxy.
388
ngx_url_t* tmp_url = &url_;
389
if (fetcher_->proxy_.url.len != 0) {
390
tmp_url = &fetcher_->proxy_;
393
GoogleString s_ipaddress(reinterpret_cast<char*>(tmp_url->host.data),
395
ngx_memzero(&sin_, sizeof(sin_));
396
sin_.sin_family = AF_INET;
397
sin_.sin_port = htons(tmp_url->port);
398
sin_.sin_addr.s_addr = inet_addr(s_ipaddress.c_str());
400
if (sin_.sin_addr.s_addr == INADDR_NONE) {
401
// inet_addr returned INADDR_NONE, which means the hostname
402
// isn't a valid IP address. Check DNS.
403
ngx_resolver_ctx_t temp;
404
temp.name.data = tmp_url->host.data;
405
temp.name.len = tmp_url->host.len;
406
resolver_ctx_ = ngx_resolve_start(fetcher_->resolver_, &temp);
407
if (resolver_ctx_ == NULL || resolver_ctx_ == NGX_NO_RESOLVER) {
408
// TODO(oschaaf): this spams the log, but is useful in the fetcher's
410
message_handler_->Message(
411
kError, "NgxFetch: Couldn't start resolving, "
412
"is there a proper resolver configured in nginx.conf?");
415
ngx_log_error(NGX_LOG_DEBUG, log_, 0,
416
"NgxFetch %p: start resolve for: %s\n",
417
this, s_ipaddress.c_str());
420
resolver_ctx_->data = this;
421
resolver_ctx_->name.data = tmp_url->host.data;
422
resolver_ctx_->name.len = tmp_url->host.len;
424
#if (nginx_version < 1005008)
425
resolver_ctx_->type = NGX_RESOLVE_A;
428
resolver_ctx_->handler = NgxFetch::ResolveDoneHandler;
429
resolver_ctx_->timeout = fetcher_->resolver_timeout_;
431
if (ngx_resolve_name(resolver_ctx_) != NGX_OK) {
432
message_handler_->Message(kWarning,
433
"NgxFetch: ngx_resolve_name failed");
437
if (InitRequest() != NGX_OK) {
438
message_handler()->Message(kError, "NgxFetch: InitRequest failed");
445
const char* NgxFetch::str_url() {
446
return str_url_.c_str();
449
// This function should be called only once. The only argument is sucess or
451
void NgxFetch::CallbackDone(bool success) {
452
ngx_log_error(NGX_LOG_DEBUG, log_, 0, "NgxFetch %p: CallbackDone: %s\n",
453
this, success ? "OK":"FAIL");
455
if (async_fetch_ == NULL) {
457
<< "BUG: NgxFetch callback called more than once on same fetch"
458
<< str_url_.c_str() << "(" << this << ").Please report this at"
459
<< "https://groups.google.com/forum/#!forum/ngx-pagespeed-discuss";
465
if (timeout_event_ && timeout_event_->timer_set) {
466
ngx_del_timer(timeout_event_);
79
467
timeout_event_ = NULL;
470
if (connection_ != NULL) {
471
// Connection will be re-used only on responses that specify
472
// 'Connection: keep-alive' in their headers.
473
bool keepalive = false;
476
ConstStringStarVector v;
477
if (async_fetch_->response_headers()->Lookup(
478
StringPiece(HttpAttributes::kConnection), &v)) {
479
for (size_t i = 0; i < v.size(); i++) {
480
if (*v[i] == "keep-alive") {
483
} else if (*v[i] == "close") {
488
ngx_log_error(NGX_LOG_DEBUG, log_, 0,
489
"NgxFetch %p: connection %p attempt keep-alive: %s\n",
490
this, connection_, keepalive ? "Yes":"No");
493
connection_->set_keepalive(keepalive);
494
connection_->Close();
80
495
connection_ = NULL;
83
NgxFetch::~NgxFetch() {
84
if (timeout_event_ != NULL && timeout_event_->timer_set) {
85
ngx_del_timer(timeout_event_);
87
if (connection_ != NULL) {
88
ngx_close_connection(connection_);
91
ngx_destroy_pool(pool_);
95
// This function is called by NgxUrlAsyncFetcher::StartFetch.
96
bool NgxFetch::Start(NgxUrlAsyncFetcher* fetcher) {
101
// Create the pool, parse the url, add the timeout event and
102
// hook the DNS resolver if needed. Else we connect directly.
103
// When this returns false, our caller (NgxUrlAsyncFetcher::StartFetch)
104
// will call fetch->CallbackDone()
105
bool NgxFetch::Init() {
106
pool_ = ngx_create_pool(12288, log_);
108
message_handler_->Message(kError, "NgxFetch: ngx_create_pool failed");
113
message_handler_->Message(kError, "NgxFetch: ParseUrl() failed");
117
timeout_event_ = static_cast<ngx_event_t*>(
118
ngx_pcalloc(pool_, sizeof(ngx_event_t)));
119
if (timeout_event_ == NULL) {
120
message_handler_->Message(kError,
121
"NgxFetch: ngx_pcalloc failed for timeout");
124
timeout_event_->data = this;
125
timeout_event_->handler = NgxFetchTimeout;
126
timeout_event_->log = log_;
128
ngx_add_timer(timeout_event_, fetcher_->fetch_timeout_);
129
r_ = static_cast<ngx_http_request_t*>(ngx_pcalloc(pool_,
130
sizeof(ngx_http_request_t)));
132
message_handler_->Message(kError,
133
"NgxFetch: ngx_pcalloc failed for timer");
136
status_ = static_cast<ngx_http_status_t*>(ngx_pcalloc(pool_,
137
sizeof(ngx_http_status_t)));
138
if (status_ == NULL) {
139
message_handler_->Message(kError,
140
"NgxFetch: ngx_pcalloc failed for status");
144
// The host is either a domain name or an IP address. First check
145
// if it's a valid IP address and only if that fails fall back to
146
// using the DNS resolver.
148
// Maybe we have a Proxy.
149
ngx_url_t* tmp_url = &url_;
150
if (0 != fetcher_->proxy_.url.len) {
151
tmp_url = &fetcher_->proxy_;
154
GoogleString s_ipaddress(reinterpret_cast<char*>(tmp_url->host.data),
156
ngx_memzero(&sin_, sizeof(sin_));
157
sin_.sin_family = AF_INET;
158
sin_.sin_port = htons(tmp_url->port);
159
sin_.sin_addr.s_addr = inet_addr(s_ipaddress.c_str());
161
if (sin_.sin_addr.s_addr == INADDR_NONE) {
162
// inet_addr returned INADDR_NONE, which means the hostname
163
// isn't a valid IP address. Check DNS.
164
ngx_resolver_ctx_t temp;
165
temp.name.data = tmp_url->host.data;
166
temp.name.len = tmp_url->host.len;
167
resolver_ctx_ = ngx_resolve_start(fetcher_->resolver_, &temp);
168
if (resolver_ctx_ == NULL || resolver_ctx_ == NGX_NO_RESOLVER) {
169
// TODO(oschaaf): this spams the log, but is useful in the fetcher's
171
message_handler_->Message(
172
kError, "NgxFetch: Couldn't start resolving, "
173
"is there a proper resolver configured in nginx.conf?");
177
resolver_ctx_->data = this;
178
resolver_ctx_->name.data = tmp_url->host.data;
179
resolver_ctx_->name.len = tmp_url->host.len;
181
#if (nginx_version < 1005008)
182
resolver_ctx_->type = NGX_RESOLVE_A;
185
resolver_ctx_->handler = NgxFetchResolveDone;
186
resolver_ctx_->timeout = fetcher_->resolver_timeout_;
188
if (ngx_resolve_name(resolver_ctx_) != NGX_OK) {
189
message_handler_->Message(kWarning,
190
"NgxFetch: ngx_resolve_name failed");
194
if (InitRequest() != NGX_OK) {
195
message_handler()->Message(kError, "NgxFetch: InitRequest failed");
202
const char* NgxFetch::str_url() {
203
return str_url_.c_str();
206
// This function should be called only once. The only argument is sucess or
208
void NgxFetch::CallbackDone(bool success) {
209
if (async_fetch_ == NULL) {
211
<< "BUG: NgxFetch callback called more than once on same fetch"
212
<< str_url_.c_str() << "(" << this << ").Please report this at"
213
<< "https://groups.google.com/forum/#!forum/ngx-pagespeed-discuss";
219
if (timeout_event_ && timeout_event_->timer_set) {
220
ngx_del_timer(timeout_event_);
221
timeout_event_ = NULL;
224
ngx_close_connection(connection_);
228
async_fetch_->Done(success);
230
if (fetcher_ != NULL) {
231
if (fetcher_->track_original_content_length()
232
&& async_fetch_->response_headers()->Has(
498
// TODO(oschaaf): see https://github.com/pagespeed/ngx_pagespeed/pull/755
499
async_fetch_->Done(success);
501
if (fetcher_ != NULL) {
502
if (fetcher_->track_original_content_length()
503
&& async_fetch_->response_headers()->Has(
233
504
HttpAttributes::kXOriginalContentLength)) {
234
async_fetch_->extra_response_headers()->SetOriginalContentLength(
237
fetcher_->FetchComplete(this);
243
size_t NgxFetch::bytes_received() {
244
return bytes_received_;
246
void NgxFetch::bytes_received_add(int64 x) {
247
bytes_received_ += x;
250
int64 NgxFetch::fetch_start_ms() {
251
return fetch_start_ms_;
254
void NgxFetch::set_fetch_start_ms(int64 start_ms) {
255
fetch_start_ms_ = start_ms;
258
int64 NgxFetch::fetch_end_ms() {
259
return fetch_end_ms_;
262
void NgxFetch::set_fetch_end_ms(int64 end_ms) {
263
fetch_end_ms_ = end_ms;
266
MessageHandler* NgxFetch::message_handler() {
267
return message_handler_;
270
bool NgxFetch::ParseUrl() {
271
url_.url.len = str_url_.length();
272
url_.url.data = static_cast<u_char*>(ngx_palloc(pool_, url_.url.len));
273
if (url_.url.data == NULL) {
276
str_url_.copy(reinterpret_cast<char*>(url_.url.data), str_url_.length(), 0);
278
return NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
281
// Issue a request after the resolver is done
282
void NgxFetch::NgxFetchResolveDone(ngx_resolver_ctx_t* resolver_ctx) {
283
NgxFetch* fetch = static_cast<NgxFetch*>(resolver_ctx->data);
284
NgxUrlAsyncFetcher* fetcher = fetch->fetcher_;
285
if (resolver_ctx->state != NGX_OK) {
286
if (fetch->timeout_event() != NULL && fetch->timeout_event()->timer_set) {
287
ngx_del_timer(fetch->timeout_event());
288
fetch->set_timeout_event(NULL);
290
fetch->message_handler()->Message(
291
kWarning, "NgxFetch: failed to resolve host [%.*s]",
292
static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data);
293
fetch->CallbackDone(false);
296
ngx_memzero(&fetch->sin_, sizeof(fetch->sin_));
505
async_fetch_->extra_response_headers()->SetOriginalContentLength(
508
fetcher_->FetchComplete(this);
514
size_t NgxFetch::bytes_received() {
515
return bytes_received_;
517
void NgxFetch::bytes_received_add(int64 x) {
518
bytes_received_ += x;
521
int64 NgxFetch::fetch_start_ms() {
522
return fetch_start_ms_;
525
void NgxFetch::set_fetch_start_ms(int64 start_ms) {
526
fetch_start_ms_ = start_ms;
529
int64 NgxFetch::fetch_end_ms() {
530
return fetch_end_ms_;
533
void NgxFetch::set_fetch_end_ms(int64 end_ms) {
534
fetch_end_ms_ = end_ms;
537
MessageHandler* NgxFetch::message_handler() {
538
return message_handler_;
541
bool NgxFetch::ParseUrl() {
542
url_.url.len = str_url_.length();
543
url_.url.data = static_cast<u_char*>(ngx_palloc(pool_, url_.url.len));
544
if (url_.url.data == NULL) {
547
str_url_.copy(reinterpret_cast<char*>(url_.url.data), str_url_.length(), 0);
549
return NgxUrlAsyncFetcher::ParseUrl(&url_, pool_);
552
// Issue a request after the resolver is done
553
void NgxFetch::ResolveDoneHandler(ngx_resolver_ctx_t* resolver_ctx) {
554
NgxFetch* fetch = static_cast<NgxFetch*>(resolver_ctx->data);
555
NgxUrlAsyncFetcher* fetcher = fetch->fetcher_;
557
if (resolver_ctx->state != NGX_OK) {
558
if (fetch->timeout_event() != NULL && fetch->timeout_event()->timer_set) {
559
ngx_del_timer(fetch->timeout_event());
560
fetch->set_timeout_event(NULL);
562
fetch->message_handler()->Message(
563
kWarning, "NgxFetch %p: failed to resolve host [%.*s]", fetch,
564
static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data);
565
fetch->CallbackDone(false);
570
// Find the first ipv4 address. We don't support ipv6 yet.
571
for (i = 0; i < resolver_ctx->naddrs; i++) {
572
if (reinterpret_cast<struct sockaddr_in*>(
573
resolver_ctx->addrs[i].sockaddr)->sin_family == AF_INET) {
578
// If no suitable ipv4 address was found, we fail.
579
if (i == resolver_ctx->naddrs) {
580
if (fetch->timeout_event() != NULL && fetch->timeout_event()->timer_set) {
581
ngx_del_timer(fetch->timeout_event());
582
fetch->set_timeout_event(NULL);
584
fetch->message_handler()->Message(
585
kWarning, "NgxFetch %p: no suitable address for host [%.*s]", fetch,
586
static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data);
587
fetch->CallbackDone(false);
590
ngx_memzero(&fetch->sin_, sizeof(fetch->sin_));
298
592
#if (nginx_version < 1005008)
299
fetch->sin_.sin_addr.s_addr = resolver_ctx->addrs[0];
593
fetch->sin_.sin_addr.s_addr = resolver_ctx->addrs[i];
302
struct sockaddr_in* sin;
303
sin = reinterpret_cast<struct sockaddr_in*>(
304
resolver_ctx->addrs[0].sockaddr);
305
fetch->sin_.sin_family = sin->sin_family;
306
fetch->sin_.sin_addr.s_addr = sin->sin_addr.s_addr;
595
struct sockaddr_in* sin;
597
sin = reinterpret_cast<struct sockaddr_in*>(
598
resolver_ctx->addrs[i].sockaddr);
600
fetch->sin_.sin_family = sin->sin_family;
601
fetch->sin_.sin_addr.s_addr = sin->sin_addr.s_addr;
309
fetch->sin_.sin_family = AF_INET;
310
fetch->sin_.sin_port = htons(fetch->url_.port);
312
// Maybe we have Proxy
313
if (0 != fetcher->proxy_.url.len) {
314
fetch->sin_.sin_port = htons(fetcher->proxy_.port);
317
char* ip_address = inet_ntoa(fetch->sin_.sin_addr);
319
fetch->message_handler()->Message(
320
kInfo, "NgxFetch: Resolved host [%.*s] to [%s]",
321
static_cast<int>(resolver_ctx->name.len), resolver_ctx->name.data,
324
fetch->release_resolver();
326
if (fetch->InitRequest() != NGX_OK) {
327
fetch->message_handler()->Message(kError, "NgxFetch: InitRequest failed");
328
fetch->CallbackDone(false);
332
// prepare the send data for this fetch, and hook write event.
333
int NgxFetch::InitRequest() {
334
in_ = ngx_create_temp_buf(pool_, 4096);
341
RequestHeaders* request_headers = async_fetch_->request_headers();
342
ConstStringStarVector v;
344
bool have_host = false;
604
fetch->sin_.sin_family = AF_INET;
605
fetch->sin_.sin_port = htons(fetch->url_.port);
607
// Maybe we have Proxy
608
if (0 != fetcher->proxy_.url.len) {
609
fetch->sin_.sin_port = htons(fetcher->proxy_.port);
612
char* ip_address = inet_ntoa(fetch->sin_.sin_addr);
614
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
615
"NgxFetch %p: Resolved host [%V] to [%s]", fetch,
616
&resolver_ctx->name, ip_address);
618
fetch->release_resolver();
620
if (fetch->InitRequest() != NGX_OK) {
621
fetch->message_handler()->Message(kError, "NgxFetch: InitRequest failed");
622
fetch->CallbackDone(false);
626
// Prepare the request data for this fetch, and hook the write event.
627
int NgxFetch::InitRequest() {
628
in_ = ngx_create_temp_buf(pool_, 4096);
635
RequestHeaders* request_headers = async_fetch_->request_headers();
636
ConstStringStarVector v;
638
bool have_host = false;
641
response_handler = NgxFetch::HandleStatusLine;
643
if (rc == NGX_AGAIN || rc == NGX_OK) {
644
if (connection_->keepalive()) {
645
request_headers->Add(HttpAttributes::kConnection,
646
NgxConnection::ka_header);
347
648
const char* method = request_headers->method_string();
348
649
size_t method_len = strlen(method);
403
703
*(out_->last++) = CR;
404
704
*(out_->last++) = LF;
406
response_handler = NgxFetchHandleStatusLine;
408
705
if (rc == NGX_AGAIN) {
410
} else if (rc < NGX_OK) {
414
NgxFetchWrite(connection_->write);
418
int NgxFetch::Connect() {
419
ngx_peer_connection_t pc;
420
ngx_memzero(&pc, sizeof(pc));
421
pc.sockaddr = (struct sockaddr*)&sin_;
422
pc.socklen = sizeof(struct sockaddr_in);
423
pc.name = &url_.host;
425
// get callback is dummy function, it just returns NGX_OK
426
pc.get = ngx_event_get_peer;
427
pc.log_error = NGX_ERROR_ERR;
428
pc.log = fetcher_->log_;
431
int rc = ngx_event_connect_peer(&pc);
432
if (rc == NGX_ERROR || rc == NGX_DECLINED || rc == NGX_BUSY) {
436
connection_ = pc.connection;
437
connection_->write->handler = NgxFetchWrite;
438
connection_->read->handler = NgxFetchRead;
439
connection_->data = this;
441
// Timer set in Init() is still in effect.
708
} else if (rc < NGX_OK) {
445
// When the fetch sends the request completely, it will hook the read event,
446
// and prepare to parse the response.
447
void NgxFetch::NgxFetchWrite(ngx_event_t* wev) {
448
ngx_connection_t* c = static_cast<ngx_connection_t*>(wev->data);
449
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
450
ngx_buf_t* out = fetch->out_;
452
while (out->pos < out->last) {
453
int n = c->send(c, out->pos, out->last - out->pos);
456
} else if (n == NGX_AGAIN) {
457
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
458
fetch->CallbackDone(false);
460
// Timer set in Init() is still in effect.
712
NgxFetch::ConnectionWriteHandler(connection_->c_->write);
716
int NgxFetch::Connect() {
717
ngx_peer_connection_t pc;
718
ngx_memzero(&pc, sizeof(pc));
719
pc.sockaddr = (struct sockaddr*)&sin_;
720
pc.socklen = sizeof(struct sockaddr_in);
721
pc.name = &url_.host;
723
// get callback is dummy function, it just returns NGX_OK
724
pc.get = ngx_event_get_peer;
725
pc.log_error = NGX_ERROR_ERR;
726
pc.log = fetcher_->log_;
730
connection_ = NgxConnection::Connect(&pc, message_handler(),
731
fetcher_->max_keepalive_requests_);
732
ngx_log_error(NGX_LOG_DEBUG, fetcher_->log_, 0,
733
"NgxFetch %p Connect() connection %p for [%s]\n",
734
this, connection_, str_url());
736
if (connection_ == NULL) {
740
connection_->c_->write->handler = NgxFetch::ConnectionWriteHandler;
741
connection_->c_->read->handler = NgxFetch::ConnectionReadHandler;
742
connection_->c_->data = this;
744
// Timer set in Init() is still in effect.
748
// When the fetch sends the request completely, it will hook the read event,
749
// and prepare to parse the response.
750
void NgxFetch::ConnectionWriteHandler(ngx_event_t* wev) {
751
ngx_connection_t* c = static_cast<ngx_connection_t*>(wev->data);
752
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
753
ngx_buf_t* out = fetch->out_;
755
while (out->pos < out->last) {
756
int n = c->send(c, out->pos, out->last - out->pos);
757
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
758
"NgxFetch %p: ConnectionWriteHandler "
759
"send result %d", fetch, n);
763
} else if (n == NGX_AGAIN) {
764
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
464
765
fetch->CallbackDone(false);
469
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
767
// Timer set in Init() is still in effect.
471
771
fetch->CallbackDone(false);
776
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
778
fetch->CallbackDone(false);
784
void NgxFetch::ConnectionReadHandler(ngx_event_t* rev) {
785
ngx_connection_t* c = static_cast<ngx_connection_t*>(rev->data);
786
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
790
c, fetch->in_->start, fetch->in_->end - fetch->in_->start);
792
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
793
"NgxFetch %p: ConnectionReadHandler "
794
"recv result %d", fetch, n);
796
if (n == NGX_AGAIN) {
801
// If the content length was not known, we assume that we have read
802
// all if we at least parsed the headers.
803
// If we do know the content length, having a mismatch on the bytes read
804
// will be interpreted as an error.
805
if (fetch->content_length_known_) {
806
fetch->CallbackDone(fetch->content_length_ == fetch->bytes_received_);
808
fetch->CallbackDone(fetch->parser_.headers_complete());
813
fetch->in_->pos = fetch->in_->start;
814
fetch->in_->last = fetch->in_->start + n;
815
if (!fetch->response_handler(c)) {
816
fetch->CallbackDone(false);
821
fetch->CallbackDone(true);
832
fetch->CallbackDone(true);
477
void NgxFetch::NgxFetchRead(ngx_event_t* rev) {
478
ngx_connection_t* c = static_cast<ngx_connection_t*>(rev->data);
479
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
483
c, fetch->in_->start, fetch->in_->end - fetch->in_->start);
485
if (n == NGX_AGAIN) {
490
// If the content length was not known, we assume that we have read
491
// all if we at least parsed the headers.
492
// If we do know the content length, having a mismatch on the bytes read
493
// will be interpreted as an error.
494
if (fetch->content_length_known_) {
495
fetch->CallbackDone(fetch->content_length_ == fetch->bytes_received_);
497
fetch->CallbackDone(fetch->parser_.headers_complete());
502
fetch->in_->pos = fetch->in_->start;
503
fetch->in_->last = fetch->in_->start + n;
504
if (!fetch->response_handler(c)) {
505
fetch->CallbackDone(false);
510
fetch->CallbackDone(true);
521
fetch->CallbackDone(true);
525
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
526
fetch->CallbackDone(false);
529
// Timer set in Init() is still in effect.
532
// Parse the status line: "HTTP/1.1 200 OK\r\n"
533
bool NgxFetch::NgxFetchHandleStatusLine(ngx_connection_t* c) {
534
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
535
// This function only works after Nginx-1.1.4. Before nginx-1.1.4,
536
// ngx_http_parse_status_line didn't save http_version.
537
ngx_int_t n = ngx_http_parse_status_line(fetch->r_, fetch->in_,
539
if (n == NGX_ERROR) { // parse status line error
540
fetch->message_handler()->Message(
541
kWarning, "NgxFetch: failed to parse status line");
543
} else if (n == NGX_AGAIN) { // not completed
546
ResponseHeaders* response_headers =
836
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
837
fetch->CallbackDone(false);
840
// Timer set in Init() is still in effect.
843
// Parse the status line: "HTTP/1.1 200 OK\r\n"
844
bool NgxFetch::HandleStatusLine(ngx_connection_t* c) {
845
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
846
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
847
"NgxFetch %p: Handle status line\n", fetch);
849
// This function only works after Nginx-1.1.4. Before nginx-1.1.4,
850
// ngx_http_parse_status_line didn't save http_version.
851
ngx_int_t n = ngx_http_parse_status_line(fetch->r_, fetch->in_,
853
if (n == NGX_ERROR) { // parse status line error
854
fetch->message_handler()->Message(
855
kWarning, "NgxFetch: failed to parse status line");
857
} else if (n == NGX_AGAIN) { // not completed
860
ResponseHeaders* response_headers =
547
861
fetch->async_fetch_->response_headers();
548
response_headers->SetStatusAndReason(
549
static_cast<HttpStatus::Code>(fetch->get_status_code()));
550
response_headers->set_major_version(fetch->get_major_version());
551
response_headers->set_minor_version(fetch->get_minor_version());
552
fetch->set_response_handler(NgxFetchHandleHeader);
553
return fetch->response_handler(c);
556
// Parse the HTTP headers
557
bool NgxFetch::NgxFetchHandleHeader(ngx_connection_t* c) {
558
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
559
char* data = reinterpret_cast<char*>(fetch->in_->pos);
560
size_t size = fetch->in_->last - fetch->in_->pos;
561
size_t n = fetch->parser_.ParseChunk(StringPiece(data, size),
562
fetch->message_handler_);
565
} else if (fetch->parser_.headers_complete()) {
566
if (fetch->async_fetch_->response_headers()->FindContentLength(
567
&fetch->content_length_)) {
568
if (fetch->content_length_ < 0) {
569
fetch->message_handler_->Message(
570
kError, "Negative content-length in response header");
573
fetch->content_length_known_ = true;
862
response_headers->SetStatusAndReason(
863
static_cast<HttpStatus::Code>(fetch->get_status_code()));
864
response_headers->set_major_version(fetch->get_major_version());
865
response_headers->set_minor_version(fetch->get_minor_version());
866
fetch->set_response_handler(NgxFetch::HandleHeader);
867
return fetch->response_handler(c);
870
// Parse the HTTP headers
871
bool NgxFetch::HandleHeader(ngx_connection_t* c) {
872
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
873
char* data = reinterpret_cast<char*>(fetch->in_->pos);
874
size_t size = fetch->in_->last - fetch->in_->pos;
875
size_t n = fetch->parser_.ParseChunk(StringPiece(data, size),
876
fetch->message_handler_);
878
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
879
"NgxFetch %p: Handle headers\n", fetch);
883
} else if (fetch->parser_.headers_complete()) {
884
if (fetch->async_fetch_->response_headers()->FindContentLength(
885
&fetch->content_length_)) {
886
if (fetch->content_length_ < 0) {
887
fetch->message_handler_->Message(
888
kError, "Negative content-length in response header");
891
fetch->content_length_known_ = true;
892
if (fetch->content_length_ == 0) {
577
if (fetch->fetcher_->track_original_content_length()
578
&& fetch->content_length_known_) {
579
fetch->async_fetch_->response_headers()->SetOriginalContentLength(
580
fetch->content_length_);
583
fetch->in_->pos += n;
584
fetch->set_response_handler(NgxFetchHandleBody);
898
if (fetch->fetcher_->track_original_content_length()
899
&& fetch->content_length_known_) {
900
fetch->async_fetch_->response_headers()->SetOriginalContentLength(
901
fetch->content_length_);
904
fetch->in_->pos += n;
905
fetch->set_response_handler(NgxFetch::HandleBody);
906
if ((fetch->in_->last - fetch->in_->pos) > 0) {
585
907
return fetch->response_handler(c);
910
fetch->in_->pos += n;
590
// Read the response body
591
bool NgxFetch::NgxFetchHandleBody(ngx_connection_t* c) {
592
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
593
char* data = reinterpret_cast<char*>(fetch->in_->pos);
594
size_t size = fetch->in_->last - fetch->in_->pos;
599
fetch->bytes_received_add(size);
601
if (fetch->async_fetch_->Write(StringPiece(data, size),
602
fetch->message_handler())) {
603
if (fetch->content_length_known_ &&
604
fetch->bytes_received_ == fetch->content_length_) {
915
// Read the response body
916
bool NgxFetch::HandleBody(ngx_connection_t* c) {
917
NgxFetch* fetch = static_cast<NgxFetch*>(c->data);
918
char* data = reinterpret_cast<char*>(fetch->in_->pos);
919
size_t size = fetch->in_->last - fetch->in_->pos;
921
fetch->bytes_received_add(size);
923
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
924
"NgxFetch %p: Handle body (%d bytes)\n", fetch, size);
926
if ( fetch->async_fetch_->Write(StringPiece(data, size),
927
fetch->message_handler()) ) {
928
if (fetch->bytes_received_ == fetch->content_length_) {
931
fetch->in_->pos += size;
933
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
934
"NgxFetch %p: async fetch write failure\n", fetch);
612
void NgxFetch::NgxFetchTimeout(ngx_event_t* tev) {
613
NgxFetch* fetch = static_cast<NgxFetch*>(tev->data);
614
fetch->CallbackDone(false);
617
void NgxFetch::FixUserAgent() {
618
GoogleString user_agent;
619
ConstStringStarVector v;
620
RequestHeaders* request_headers = async_fetch_->request_headers();
621
if (request_headers->Lookup(HttpAttributes::kUserAgent, &v)) {
622
for (int i = 0, n = v.size(); i < n; i++) {
628
user_agent += *(v[i]);
631
request_headers->RemoveAll(HttpAttributes::kUserAgent);
633
if (user_agent.empty()) {
634
user_agent += "NgxNativeFetcher";
636
GoogleString version = StrCat(
637
" ", kModPagespeedSubrequestUserAgent,
638
"/" MOD_PAGESPEED_VERSION_STRING "-" LASTCHANGE_STRING);
639
if (!StringPiece(user_agent).ends_with(version)) {
640
user_agent += version;
642
request_headers->Add(HttpAttributes::kUserAgent, user_agent);
940
void NgxFetch::TimeoutHandler(ngx_event_t* tev) {
941
NgxFetch* fetch = static_cast<NgxFetch*>(tev->data);
942
ngx_log_error(NGX_LOG_DEBUG, fetch->log_, 0,
943
"NgxFetch %p: TimeoutHandler called\n", fetch);
944
fetch->CallbackDone(false);
947
void NgxFetch::FixUserAgent() {
948
GoogleString user_agent;
949
ConstStringStarVector v;
950
RequestHeaders* request_headers = async_fetch_->request_headers();
951
if (request_headers->Lookup(HttpAttributes::kUserAgent, &v)) {
952
for (size_t i = 0, n = v.size(); i < n; i++) {
958
user_agent += *(v[i]);
961
request_headers->RemoveAll(HttpAttributes::kUserAgent);
963
if (user_agent.empty()) {
964
user_agent += "NgxNativeFetcher";
966
GoogleString version = StrCat(
967
" ", kModPagespeedSubrequestUserAgent,
968
"/" MOD_PAGESPEED_VERSION_STRING "-" LASTCHANGE_STRING);
969
if (!StringPiece(user_agent).ends_with(version)) {
970
user_agent += version;
972
request_headers->Add(HttpAttributes::kUserAgent, user_agent);
644
975
} // namespace net_instaweb