/*
* Copyright 2012 Canonical Ltd.
*
* This file is part of u1db.
*
* u1db is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License version 3
* as published by the Free Software Foundation.
*
* u1db is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with u1db. If not, see .
*/
#include "u1db/u1db_http_internal.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#ifndef max
#define max(a, b) ((a) > (b) ? (a) : (b))
#endif // max
#define TRIES 4
static int retry_delays[] = {1, 1, 2, 4};
void u1db__set_zero_delays() {
retry_delays[0] = 0;
retry_delays[1] = 0;
retry_delays[2] = 0;
retry_delays[3] = 0;
}
struct _http_state;
struct _http_request;
static int st_http_get_sync_info(u1db_sync_target *st,
const char *source_replica_uid,
const char **st_replica_uid, int *st_gen,
char **st_trans_id, int *source_gen,
char **trans_id);
static int st_http_record_sync_info(u1db_sync_target *st,
const char *source_replica_uid, int source_gen, const char *trans_id);
static int st_http_get_sync_exchange(u1db_sync_target *st,
const char *source_replica_uid,
int source_gen,
u1db_sync_exchange **exchange);
static int st_http_sync_exchange(u1db_sync_target *st,
const char *source_replica_uid, int n_docs,
u1db_document **docs, int *generations,
const char **trans_ids, int *target_gen,
char **target_trans_id, void *context,
u1db_doc_gen_callback cb);
static int st_http_sync_exchange_doc_ids(u1db_sync_target *st,
u1database *source_db, int n_doc_ids,
const char **doc_ids,
int *generations,
const char **trans_ids,
int *target_gen,
char **target_trans_id, void *context,
u1db_doc_gen_callback cb);
static void st_http_finalize_sync_exchange(u1db_sync_target *st,
u1db_sync_exchange **exchange);
static int st_http_set_trace_hook(u1db_sync_target *st,
void *context, u1db__trace_callback cb);
static void st_http_finalize(u1db_sync_target *st);
static int initialize_curl(struct _http_state *state);
static int simple_set_curl_data(CURL *curl, struct _http_request *header,
struct _http_request *body, struct _http_request *put);
struct _http_state {
char is_http[4];
char *base_url;
char *replica_uid;
CURL *curl;
char *consumer_key;
char *consumer_secret;
char *token_key;
char *token_secret;
};
static const char is_http[4] = "HTTP";
static const char auth_header_prefix[] =
"Authorization: OAuth realm=\"\", ";
// Do a safe cast from implementation into the http state
static int
impl_as_http_state(void *impl, struct _http_state **state)
{
struct _http_state *maybe_state;
if (impl == NULL) {
return U1DB_INVALID_PARAMETER;
}
maybe_state = (struct _http_state *)impl;
if (memcmp(maybe_state->is_http, is_http, sizeof(is_http)) != 0) {
return U1DB_INVALID_PARAMETER;
}
*state = maybe_state;
return U1DB_OK;
}
struct _http_request {
struct _http_state *state;
int num_header_bytes;
int max_header_bytes;
char *header_buffer;
int num_body_bytes;
int max_body_bytes;
char *body_buffer;
int num_put_bytes;
const char *put_buffer;
struct _http_sync_response_state *response_state;
};
int
u1db__create_oauth_http_sync_target(const char *url,
const char *consumer_key, const char *consumer_secret,
const char *token_key, const char *token_secret,
u1db_sync_target **target)
{
int status = U1DB_OK;
int url_len;
struct _http_state *state = NULL;
u1db_sync_target *new_target;
if (url == NULL || target == NULL) {
return U1DB_INVALID_PARAMETER;
}
new_target = (u1db_sync_target *)calloc(1, sizeof(u1db_sync_target));
if (new_target == NULL) { goto oom; }
state = (struct _http_state *)calloc(1, sizeof(struct _http_state));
if (state == NULL) { goto oom; }
memcpy(state->is_http, is_http, sizeof(is_http));
status = initialize_curl(state);
if (status != U1DB_OK) { goto fail; }
// Copy the url, but ensure that it ends in a '/'
url_len = strlen(url);
if (url[url_len-1] == '/') {
state->base_url = strdup(url);
if (state->base_url == NULL) { goto oom; }
} else {
state->base_url = (char*)calloc(url_len+2, sizeof(char));
if (state->base_url == NULL) { goto oom; }
memcpy(state->base_url, url, url_len);
state->base_url[url_len] = '/';
state->base_url[url_len+1] = '\0';
}
if (consumer_key != NULL) {
state->consumer_key = strdup(consumer_key);
}
if (consumer_secret != NULL) {
state->consumer_secret = strdup(consumer_secret);
}
if (token_key != NULL) {
state->token_key = strdup(token_key);
}
if (token_secret != NULL) {
state->token_secret = strdup(token_secret);
}
new_target->implementation = state;
new_target->get_sync_info = st_http_get_sync_info;
new_target->record_sync_info = st_http_record_sync_info;
new_target->sync_exchange = st_http_sync_exchange;
new_target->sync_exchange_doc_ids = st_http_sync_exchange_doc_ids;
new_target->get_sync_exchange = st_http_get_sync_exchange;
new_target->finalize_sync_exchange = st_http_finalize_sync_exchange;
new_target->_set_trace_hook = st_http_set_trace_hook;
new_target->finalize = st_http_finalize;
*target = new_target;
return status;
oom:
status = U1DB_NOMEM;
fail:
if (state != NULL) {
if (state->base_url != NULL) {
free(state->base_url);
state->base_url = NULL;
}
if (state->curl != NULL) {
curl_easy_cleanup(state->curl);
state->curl = NULL;
}
free(state);
state = NULL;
}
if (new_target != NULL) {
free(new_target);
new_target = NULL;
}
return status;
}
int
u1db__create_http_sync_target(const char *url, u1db_sync_target **target)
{
return u1db__create_oauth_http_sync_target(url, NULL, NULL, NULL, NULL, target);
}
static size_t
recv_header_bytes(const char *ptr, size_t size, size_t nmemb, void *userdata)
{
size_t total_bytes;
int needed_bytes;
struct _http_request *req;
if (userdata == NULL) {
// No bytes processed, because we have nowhere to put them
return 0;
}
// Note: curl says that CURLOPT_HEADERFUNCTION is called 1 time for each
// header, with exactly the header contents. So we should be able to
// change this into something that parses the header content itself,
// without separately buffering the raw bytes.
req = (struct _http_request *)userdata;
total_bytes = size * nmemb;
if (req->state != NULL && total_bytes > 9 && strncmp(ptr, "HTTP/", 5) == 0)
{
if (strncmp(ptr, "HTTP/1.0 ", 9) == 0) {
// The server is an HTTP 1.0 server (like in the test suite). Tell
// curl to treat it as such from now on. I don't understand why
// curl isn't doing this already, because it has seen that the
// server is v1.0
curl_easy_setopt(req->state->curl, CURLOPT_HTTP_VERSION,
CURL_HTTP_VERSION_1_0);
} else if (strncmp(ptr, "HTTP/1.1 ", 9) == 0) {
curl_easy_setopt(req->state->curl, CURLOPT_HTTP_VERSION,
CURL_HTTP_VERSION_1_0);
}
}
needed_bytes = req->num_header_bytes + total_bytes + 1;
if (needed_bytes >= req->max_header_bytes) {
req->max_header_bytes = max((req->max_header_bytes * 2), needed_bytes);
req->max_header_bytes += 100;
req->header_buffer = realloc(req->header_buffer, req->max_header_bytes);
if (req->header_buffer == NULL) {
return 0;
}
}
memcpy(req->header_buffer + req->num_header_bytes, ptr, total_bytes);
req->num_header_bytes += total_bytes;
req->header_buffer[req->num_header_bytes] = '\0';
return total_bytes;
}
static size_t
recv_body_bytes(const char *ptr, size_t size, size_t nmemb, void *userdata)
{
size_t total_bytes;
int needed_bytes;
struct _http_request *req;
if (userdata == NULL) {
// No bytes processed, because we have nowhere to put them
return 0;
}
req = (struct _http_request *)userdata;
total_bytes = size * nmemb;
needed_bytes = req->num_body_bytes + total_bytes + 1;
if (needed_bytes >= req->max_body_bytes) {
req->max_body_bytes = max((req->max_body_bytes * 2), needed_bytes);
req->max_body_bytes += 100;
req->body_buffer = realloc(req->body_buffer, req->max_body_bytes);
if (req->body_buffer == NULL) {
return 0;
}
}
memcpy(req->body_buffer + req->num_body_bytes, ptr, total_bytes);
req->num_body_bytes += total_bytes;
req->body_buffer[req->num_body_bytes] = '\0';
return total_bytes;
}
static size_t
send_put_bytes(void *ptr, size_t size, size_t nmemb, void *userdata)
{
size_t total_bytes;
struct _http_request *req;
if (userdata == NULL) {
// No bytes processed, because we have nowhere to put them
return 0;
}
req = (struct _http_request *)userdata;
total_bytes = size * nmemb;
if (total_bytes > (size_t) req->num_put_bytes) {
total_bytes = req->num_put_bytes;
}
memcpy(ptr, req->put_buffer, total_bytes);
req->num_put_bytes -= total_bytes;
req->put_buffer += total_bytes;
return total_bytes;
}
static int
initialize_curl(struct _http_state *state)
{
int status;
state->curl = curl_easy_init();
if (state->curl == NULL) { goto oom; }
// All conversations are done without CURL generating progress bars.
status = curl_easy_setopt(state->curl, CURLOPT_NOPROGRESS, 1L);
if (status != CURLE_OK) { goto fail; }
/// status = curl_easy_setopt(state->curl, CURLOPT_VERBOSE, 1L);
/// if (status != CURLE_OK) { goto fail; }
status = curl_easy_setopt(state->curl, CURLOPT_HEADERFUNCTION,
recv_header_bytes);
if (status != CURLE_OK) { goto fail; }
status = curl_easy_setopt(state->curl, CURLOPT_WRITEFUNCTION,
recv_body_bytes);
if (status != CURLE_OK) { goto fail; }
status = curl_easy_setopt(state->curl, CURLOPT_READFUNCTION,
send_put_bytes);
if (status != CURLE_OK) { goto fail; }
return status;
oom:
status = U1DB_NOMEM;
fail:
if (state->curl != NULL) {
curl_easy_cleanup(state->curl);
state->curl = NULL;
}
return status;
}
// If we have oauth credentials, sign the URL and set the Authorization:
// header
static int
maybe_sign_url(u1db_sync_target *st, const char *http_method,
const char *url, struct curl_slist **headers)
{
int status;
struct _http_state *state;
char *authorization = NULL;
status = impl_as_http_state(st->implementation, &state);
if (status != U1DB_OK) {
return status;
}
if (state->consumer_key == NULL || state->consumer_secret == NULL) {
return U1DB_OK; // Shortcut, do nothing, no OAuth creds to use
}
status = u1db__get_oauth_authorization(st, http_method, url,
&authorization);
if (status != U1DB_OK) {
return status;
}
*headers = curl_slist_append(*headers, authorization);
if (authorization != NULL) {
// curl_slist_append already copies the data, so we don't need it now
free(authorization);
}
return U1DB_OK;
}
static int
st_http_get_sync_info(u1db_sync_target *st, const char *source_replica_uid,
const char **st_replica_uid, int *st_gen,
char **st_trans_id, int *source_gen, char **trans_id)
{
struct _http_state *state;
struct _http_request req = {0};
char *url = NULL;
const char *tmp = NULL;
const char *tmp2 = NULL;
int status = U1DB_OK;
long http_code;
struct curl_slist *headers = NULL;
int attempt = 0;
struct timeval timeout;
json_object *json = NULL, *obj = NULL;
if (st == NULL || source_replica_uid == NULL || st_replica_uid == NULL
|| st_gen == NULL || source_gen == NULL
|| st->implementation == NULL)
{
return U1DB_INVALID_PARAMETER;
}
status = impl_as_http_state(st->implementation, &state);
if (status != U1DB_OK) {
return status;
}
headers = curl_slist_append(NULL, "Content-Type: application/json");
if (headers == NULL) {
status = U1DB_NOMEM;
goto finish;
}
req.state = state;
status = u1db__format_sync_url(st, source_replica_uid, &url);
if (status != U1DB_OK) { goto finish; }
for (;;) {
status = curl_easy_setopt(state->curl, CURLOPT_HTTPGET, 1L);
if (status != CURLE_OK) { goto finish; }
// status = curl_easy_setopt(state->curl, CURLOPT_USERAGENT, "...");
status = curl_easy_setopt(state->curl, CURLOPT_URL, url);
if (status != CURLE_OK) { goto finish; }
req.body_buffer = req.header_buffer = NULL;
status = simple_set_curl_data(state->curl, &req, &req, NULL);
if (status != CURLE_OK) { goto finish; }
status = maybe_sign_url(st, "GET", url, &headers);
if (status != U1DB_OK) { goto finish; }
status = curl_easy_setopt(state->curl, CURLOPT_HTTPHEADER, headers);
if (status != CURLE_OK) { goto finish; }
// Now do the GET
status = curl_easy_perform(state->curl);
if (status != CURLE_OK) {
goto finish; }
status = curl_easy_getinfo(
state->curl, CURLINFO_RESPONSE_CODE, &http_code);
if (status != CURLE_OK) {
goto finish; }
status = U1DB_OK;
if (http_code == 503) {
status = U1DB_TARGET_UNAVAILABLE;
if (attempt < TRIES) {
timeout.tv_sec = retry_delays[attempt];
timeout.tv_usec = 0;
select(0, NULL, NULL, NULL, &timeout);
attempt++;
req.num_body_bytes = 0;
continue;
}
}
break;
}
if (status != U1DB_OK) { goto finish; }
if (http_code != 200) { // 201 for created? shouldn't happen on GET
status = http_code;
goto finish;
}
if (req.body_buffer == NULL) {
status = U1DB_INVALID_HTTP_RESPONSE;
goto finish;
}
json = json_tokener_parse(req.body_buffer);
if (json == NULL) {
status = U1DB_NOMEM;
goto finish;
}
obj = json_object_object_get(json, "target_replica_uid");
if (obj == NULL) {
status = U1DB_INVALID_HTTP_RESPONSE;
goto finish;
}
if (state->replica_uid == NULL) {
// we cache this on the state object, because the api for get_sync_info
// asserts that callers do not have to free the returned string.
// This isn't a functional problem, because if the sync target ever
// changed its replica uid we'd be seriously broken anyway.
state->replica_uid = strdup(json_object_get_string(obj));
} else {
if (strcmp(state->replica_uid, json_object_get_string(obj)) != 0) {
// Our http target changed replica_uid, this would be a really
// strange bug
status = U1DB_INVALID_HTTP_RESPONSE;
goto finish;
}
}
*st_replica_uid = state->replica_uid;
if (*st_replica_uid == NULL) {
status = U1DB_NOMEM;
goto finish;
}
obj = json_object_object_get(json, "target_replica_generation");
if (obj == NULL) {
status = U1DB_INVALID_HTTP_RESPONSE;
goto finish;
}
*st_gen = json_object_get_int(obj);
obj = json_object_object_get(json, "target_replica_transaction_id");
if (obj == NULL) {
status = U1DB_INVALID_HTTP_RESPONSE;
goto finish;
}
tmp2 = json_object_get_string(obj);
if (tmp2 == NULL) {
*st_trans_id = NULL;
} else {
*st_trans_id = strdup(tmp2);
if (*st_trans_id == NULL) {
status = U1DB_NOMEM;
goto finish;
}
}
obj = json_object_object_get(json, "source_replica_generation");
if (obj == NULL) {
status = U1DB_INVALID_HTTP_RESPONSE;
goto finish;
}
*source_gen = json_object_get_int(obj);
obj = json_object_object_get(json, "source_transaction_id");
if (obj == NULL) {
*trans_id = NULL;
} else {
tmp = json_object_get_string(obj);
if (tmp == NULL) {
*trans_id = NULL;
} else {
*trans_id = strdup(tmp);
if (*trans_id == NULL) {
status = U1DB_NOMEM;
}
}
}
finish:
if (req.header_buffer != NULL) {
free(req.header_buffer);
}
if (req.body_buffer != NULL) {
free(req.body_buffer);
}
if (json != NULL) {
json_object_put(json);
}
if (url != NULL) {
free(url);
}
curl_slist_free_all(headers);
return status;
}
// Use the default send_put_bytes, recv_body_bytes, and recv_header_bytes. Only
// set the functions if the associated data is not NULL
static int
simple_set_curl_data(CURL *curl, struct _http_request *header,
struct _http_request *body, struct _http_request *put)
{
int status;
status = curl_easy_setopt(curl, CURLOPT_HEADERDATA, header);
if (status != CURLE_OK) { goto finish; }
if (header == NULL) {
status = curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, NULL);
} else {
status = curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION,
recv_header_bytes);
}
status = curl_easy_setopt(curl, CURLOPT_WRITEDATA, body);
if (status != CURLE_OK) { goto finish; }
if (body == NULL) {
status = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, NULL);
} else {
status = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
recv_body_bytes);
}
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_READDATA, put);
if (status != CURLE_OK) { goto finish; }
if (put == NULL) {
status = curl_easy_setopt(curl, CURLOPT_READFUNCTION, NULL);
} else {
status = curl_easy_setopt(curl, CURLOPT_READFUNCTION,
send_put_bytes);
}
finish:
return status;
}
static int
st_http_record_sync_info(u1db_sync_target *st,
const char *source_replica_uid, int source_gen, const char *trans_id)
{
struct _http_state *state;
struct _http_request req = {0};
char *url = NULL;
int status;
long http_code;
json_object *json = NULL;
const char *raw_body = NULL;
int raw_len;
struct curl_slist *headers = NULL;
int attempt = 0;
struct timeval timeout;
if (st == NULL || source_replica_uid == NULL || st->implementation == NULL)
{
return U1DB_INVALID_PARAMETER;
}
status = impl_as_http_state(st->implementation, &state);
if (status != U1DB_OK) {
return status;
}
status = u1db__format_sync_url(st, source_replica_uid, &url);
if (status != U1DB_OK) { goto finish; }
json = json_object_new_object();
if (json == NULL) {
status = U1DB_NOMEM;
goto finish;
}
json_object_object_add(json, "generation", json_object_new_int(source_gen));
json_object_object_add(json, "transaction_id",
json_object_new_string(trans_id));
raw_body = json_object_to_json_string(json);
raw_len = strlen(raw_body);
req.state = state;
req.put_buffer = raw_body;
req.num_put_bytes = raw_len;
headers = curl_slist_append(headers, "Content-Type: application/json");
// We know the message is going to be short, no reason to wait for server
// confirmation of the post.
headers = curl_slist_append(headers, "Expect:");
for (;;) {
status = curl_easy_setopt(state->curl, CURLOPT_URL, url);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(state->curl, CURLOPT_HTTPHEADER, headers);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(state->curl, CURLOPT_UPLOAD, 1L);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(state->curl, CURLOPT_PUT, 1L);
if (status != CURLE_OK) { goto finish; }
status = simple_set_curl_data(state->curl, &req, &req, &req);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(state->curl, CURLOPT_INFILESIZE_LARGE,
(curl_off_t)req.num_put_bytes);
if (status != CURLE_OK) { goto finish; }
status = maybe_sign_url(st, "PUT", url, &headers);
if (status != U1DB_OK) { goto finish; }
// Now actually send the data
status = curl_easy_perform(state->curl);
if (status != CURLE_OK) {
goto finish; }
status = curl_easy_getinfo(
state->curl, CURLINFO_RESPONSE_CODE, &http_code);
if (status != CURLE_OK) {
goto finish; }
status = U1DB_OK;
if (http_code == 503) {
status = U1DB_TARGET_UNAVAILABLE;
if (attempt < TRIES) {
timeout.tv_sec = retry_delays[attempt];
timeout.tv_usec = 0;
select(0, NULL, NULL, NULL, &timeout);
attempt++;
req.num_body_bytes = 0;
continue;
}
}
break;
}
if (status != U1DB_OK) { goto finish; }
if (http_code != 200 && http_code != 201) {
status = http_code;
goto finish;
}
finish:
if (req.header_buffer != NULL) {
free(req.header_buffer);
}
if (req.body_buffer != NULL) {
free(req.body_buffer);
}
if (json != NULL) {
json_object_put(json);
}
if (url != NULL) {
free(url);
}
if (headers != NULL) {
curl_slist_free_all(headers);
}
return status;
}
// Setup the CURL handle for doing the POST for sync exchange
// @param headers (OUT) Pass in a handle for curl_slist, callers must call
// curl_slist_free_all themselves
// @param req The request state will be attached to this object
// @param fd This handle should have all data written to it. We will use
// ftell to determine content length, then seek to the
// beginning to do the upload
static int
setup_curl_for_sync(CURL *curl, struct curl_slist **headers,
struct _http_request *req, FILE *fd)
{
int status;
curl_off_t size;
*headers = curl_slist_append(*headers,
"Content-Type: application/x-u1db-sync-stream");
if (*headers == NULL) {
status = U1DB_NOMEM;
goto finish;
}
status = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, *headers);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_POST, 1L);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, NULL);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_HEADERDATA, req);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION,
recv_header_bytes);
status = curl_easy_setopt(curl, CURLOPT_WRITEDATA, req);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
recv_body_bytes);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_READDATA, fd);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_READFUNCTION, fread);
if (status != CURLE_OK) { goto finish; }
size = ftell(fd);
fseek(fd, 0, 0);
status = curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE_LARGE, size);
if (status != CURLE_OK) { goto finish; }
status = curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, size);
if (status != CURLE_OK) { goto finish; }
finish:
return status;
}
static int
doc_to_tempfile(u1db_document *doc, int gen, const char *trans_id, FILE *fd)
{
int status = U1DB_OK;
json_object *json = NULL;
fputs(",\r\n", fd);
json = json_object_new_object();
if (json == NULL) {
status = U1DB_NOMEM;
goto finish;
}
json_object_object_add(json, "id", json_object_new_string(doc->doc_id));
json_object_object_add(json, "rev", json_object_new_string(doc->doc_rev));
json_object_object_add(
json, "content", doc->json?json_object_new_string(doc->json):NULL);
json_object_object_add(json, "gen", json_object_new_int(gen));
json_object_object_add(json, "trans_id", json_object_new_string(trans_id));
fputs(json_object_to_json_string(json), fd);
finish:
if (json != NULL) {
json_object_put(json);
}
return status;
}
static FILE *
make_tempfile(char tmpname[1024])
{
const char tmp_template[] = "tmp-u1db-sync-XXXXXX";
const char *env_temp[] = {"TMP", "TEMP", "TMPDIR"};
int i, fd;
FILE *ret;
const char *tmpdir = NULL;
for (i = 0; i < sizeof(env_temp); ++i) {
tmpdir = getenv(env_temp[0]);
if (tmpdir != NULL && tmpdir[0] != '\0') break;
}
if (tmpdir == NULL || tmpdir[0] == '\0') {
tmpdir = ".";
}
snprintf(tmpname, 1024, "%s/%s", tmpdir, tmp_template);
fd = mkstemp(tmpname);
if (fd == -1) {
return NULL;
}
ret = fdopen(fd, "wb+");
if (ret == NULL) {
close(fd);
unlink(tmpname);
tmpname[0] = '\0';
}
return ret;
}
static int
init_temp_file(char tmpname[], FILE **temp_fd, int target_gen,
char *target_trans_id)
{
int status = U1DB_OK;
*temp_fd = make_tempfile(tmpname);
if (*temp_fd == NULL) {
status = errno;
if (status == 0) {
status = U1DB_INTERNAL_ERROR;
}
goto finish;
}
// Spool all of the documents to a temporary file, so that it we can
// determine Content-Length before we start uploading the data.
fprintf(
*temp_fd,
"[\r\n{\"last_known_generation\": %d, \"last_known_trans_id\": \"%s\"}",
target_gen, target_trans_id);
finish:
return status;
}
static int
finalize_and_send_temp_file(u1db_sync_target *st, FILE *temp_fd,
const char *source_replica_uid,
struct _http_request *req)
{
int status;
long http_code;
char *url = NULL;
struct _http_state *state;
struct curl_slist *headers = NULL;
int attempt = 0;
struct timeval timeout;
fputs("\r\n]", temp_fd);
status = impl_as_http_state(st->implementation, &state);
if (status != U1DB_OK) {
return status;
}
status = u1db__format_sync_url(st, source_replica_uid, &url);
if (status != U1DB_OK) { goto finish; }
for (;;) {
status = curl_easy_setopt(state->curl, CURLOPT_URL, url);
if (status != CURLE_OK) { goto finish; }
status = setup_curl_for_sync(state->curl, &headers, req, temp_fd);
if (status != CURLE_OK) { goto finish; }
status = maybe_sign_url(st, "POST", url, &headers);
if (status != U1DB_OK) { goto finish; }
// Now send off the messages, and handle the returned content.
status = curl_easy_perform(state->curl);
if (status != CURLE_OK) {
goto finish; }
status = curl_easy_getinfo(
state->curl, CURLINFO_RESPONSE_CODE, &http_code);
if (status != CURLE_OK) {
goto finish; }
status = U1DB_OK;
if (http_code == 503) {
status = U1DB_TARGET_UNAVAILABLE;
if (attempt < TRIES) {
timeout.tv_sec = retry_delays[attempt];
timeout.tv_usec = 0;
select(0, NULL, NULL, NULL, &timeout);
attempt++;
req->num_body_bytes = 0;
continue;
}
}
break;
}
if (status != U1DB_OK) { goto finish; }
if (http_code != 200 && http_code != 201) {
printf("broken 0\n");
status = U1DB_BROKEN_SYNC_STREAM;
goto finish;
}
finish:
if (url != NULL) {
free(url);
}
if (headers != NULL) {
curl_slist_free_all(headers);
}
return status;
}
static int
process_response(u1db_sync_target *st, void *context, u1db_doc_gen_callback cb,
char *response, int *target_gen, char **target_trans_id)
{
int status = U1DB_OK;
int i, doc_count;
json_object *json = NULL, *obj = NULL, *attr = NULL;
const char *doc_id, *content, *rev;
const char *tmp = NULL;
int gen;
const char *trans_id = NULL;
u1db_document *doc;
json = json_tokener_parse(response);
if (json == NULL || !json_object_is_type(json, json_type_array)) {
printf("broken 1, response: %s\n", response);
status = U1DB_BROKEN_SYNC_STREAM;
goto finish;
}
doc_count = json_object_array_length(json);
if (doc_count < 1) {
// the first response is the new_generation info, so it must exist
printf("broken 2\n");
status = U1DB_BROKEN_SYNC_STREAM;
goto finish;
}
obj = json_object_array_get_idx(json, 0);
attr = json_object_object_get(obj, "new_generation");
if (attr == NULL) {
printf("broken 3\n");
status = U1DB_BROKEN_SYNC_STREAM;
goto finish;
}
*target_gen = json_object_get_int(attr);
attr = json_object_object_get(obj, "new_transaction_id");
if (attr == NULL) {
printf("broken 4\n");
status = U1DB_BROKEN_SYNC_STREAM;
goto finish;
}
tmp = json_object_get_string(attr);
if (tmp == NULL) {
printf("broken 5\n");
status = U1DB_BROKEN_SYNC_STREAM;
goto finish;
}
*target_trans_id = strdup(tmp);
if (*target_trans_id == NULL) {
status = U1DB_NOMEM;
goto finish;
}
for (i = 1; i < doc_count; ++i) {
obj = json_object_array_get_idx(json, i);
attr = json_object_object_get(obj, "id");
doc_id = json_object_get_string(attr);
attr = json_object_object_get(obj, "rev");
rev = json_object_get_string(attr);
attr = json_object_object_get(obj, "content");
content = json_object_get_string(attr);
attr = json_object_object_get(obj, "gen");
gen = json_object_get_int(attr);
attr = json_object_object_get(obj, "trans_id");
trans_id = json_object_get_string(attr);
status = u1db__allocate_document(doc_id, rev, content, 0, &doc);
if (status != U1DB_OK)
goto finish;
if (doc == NULL) {
status = U1DB_NOMEM;
goto finish;
}
status = cb(context, doc, gen, trans_id);
if (status != U1DB_OK) { goto finish; }
}
finish:
if (json != NULL) {
json_object_put(json);
}
return status;
}
static void
cleanup_temp_files(char tmpname[], FILE *temp_fd, struct _http_request *req)
{
if (temp_fd != NULL) {
fclose(temp_fd);
}
if (req != NULL) {
if (req->body_buffer != NULL) {
free(req->body_buffer);
req->body_buffer = NULL;
}
if (req->header_buffer != NULL) {
free(req->header_buffer);
req->header_buffer = NULL;
}
}
if (tmpname[0] != '\0') {
unlink(tmpname);
}
}
static int
st_http_sync_exchange(u1db_sync_target *st, const char *source_replica_uid,
int n_docs, u1db_document **docs, int *generations,
const char **trans_ids, int *target_gen,
char **target_trans_id, void *context,
u1db_doc_gen_callback cb)
{
int status, i;
FILE *temp_fd = NULL;
struct _http_request req = {0};
char tmpname[1024] = {0};
if (st == NULL || generations == NULL || target_gen == NULL
|| target_trans_id == NULL || cb == NULL)
{
return U1DB_INVALID_PARAMETER;
}
if (n_docs > 0 && (docs == NULL || generations == NULL)) {
return U1DB_INVALID_PARAMETER;
}
status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id);
if (status != U1DB_OK) { goto finish; }
for (i = 0; i < n_docs; ++i) {
status = doc_to_tempfile(
docs[i], generations[i], trans_ids[i], temp_fd);
if (status != U1DB_OK) { goto finish; }
}
status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req);
if (status != U1DB_OK) { goto finish; }
status = process_response(st, context, cb, req.body_buffer, target_gen,
target_trans_id);
finish:
cleanup_temp_files(tmpname, temp_fd, &req);
return status;
}
struct _get_doc_to_tempfile_context {
int offset;
int num;
int *generations;
const char **trans_ids;
FILE *temp_fd;
};
static int
get_docs_to_tempfile(void *context, u1db_document *doc)
{
int status = U1DB_OK;
struct _get_doc_to_tempfile_context *state;
state = (struct _get_doc_to_tempfile_context *)context;
if (state->offset >= state->num) {
status = U1DB_INTERNAL_ERROR;
} else {
status = doc_to_tempfile(doc, state->generations[state->offset],
state->trans_ids[state->offset],
state->temp_fd);
}
u1db_free_doc(&doc);
return status;
}
int u1db_get_docs(u1database *db, int n_doc_ids, const char **doc_ids,
int check_for_conflicts, int include_deleted, void *context,
u1db_doc_callback cb);
static int
st_http_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db,
int n_doc_ids, const char **doc_ids,
int *generations, const char **trans_ids,
int *target_gen, char **target_trans_id,
void *context, u1db_doc_gen_callback cb)
{
int status;
FILE *temp_fd = NULL;
struct _http_request req = {0};
char tmpname[1024] = {0};
const char *source_replica_uid = NULL;
struct _get_doc_to_tempfile_context state = {0};
if (st == NULL || generations == NULL || target_gen == NULL
|| target_trans_id == NULL || cb == NULL)
{
return U1DB_INVALID_PARAMETER;
}
if (n_doc_ids > 0 && (doc_ids == NULL || generations == NULL)) {
return U1DB_INVALID_PARAMETER;
}
status = u1db_get_replica_uid(source_db, &source_replica_uid);
if (status != U1DB_OK) { goto finish; }
status = init_temp_file(tmpname, &temp_fd, *target_gen, *target_trans_id);
if (status != U1DB_OK) { goto finish; }
state.num = n_doc_ids;
state.generations = generations;
state.trans_ids = trans_ids;
state.temp_fd = temp_fd;
status = u1db_get_docs(source_db, n_doc_ids, doc_ids, 0, 1,
&state, get_docs_to_tempfile);
if (status != U1DB_OK) { goto finish; }
status = finalize_and_send_temp_file(st, temp_fd, source_replica_uid, &req);
if (status != U1DB_OK) { goto finish; }
status = process_response(st, context, cb, req.body_buffer, target_gen,
target_trans_id);
finish:
cleanup_temp_files(tmpname, temp_fd, &req);
return status;
}
static int
st_http_get_sync_exchange(u1db_sync_target *st,
const char *source_replica_uid,
int source_gen,
u1db_sync_exchange **exchange)
{
// Intentionally not implemented
return U1DB_NOT_IMPLEMENTED;
}
static void
st_http_finalize_sync_exchange(u1db_sync_target *st,
u1db_sync_exchange **exchange)
{
// Intentionally a no-op
}
static int
st_http_set_trace_hook(u1db_sync_target *st, void *context,
u1db__trace_callback cb)
{
// We can't trace a remote database
return U1DB_NOT_IMPLEMENTED;
}
static void
st_http_finalize(u1db_sync_target *st)
{
if (st->implementation != NULL) {
struct _http_state *state;
state = (struct _http_state *)st->implementation;
if (state->base_url != NULL) {
free(state->base_url);
state->base_url = NULL;
}
if (state->replica_uid != NULL) {
free(state->replica_uid);
state->replica_uid = NULL;
}
if (state->curl != NULL) {
curl_easy_cleanup(state->curl);
state->curl = NULL;
}
if (state->consumer_key != NULL) {
free(state->consumer_key);
state->consumer_key = NULL;
}
if (state->consumer_secret != NULL) {
free(state->consumer_secret);
state->consumer_secret = NULL;
}
if (state->token_key != NULL) {
free(state->token_key);
state->token_key = NULL;
}
if (state->token_secret != NULL) {
free(state->token_secret);
state->token_secret = NULL;
}
free(st->implementation);
st->implementation = NULL;
}
}
int
u1db__format_sync_url(u1db_sync_target *st,
const char *source_replica_uid, char **sync_url)
{
int status, url_len;
struct _http_state *state;
char *tmp;
status = impl_as_http_state(st->implementation, &state);
if (status != U1DB_OK) {
return status;
}
url_len = strlen(state->base_url) + 1;
url_len += strlen("sync-from/");
tmp = curl_easy_escape(state->curl, source_replica_uid, 0);
url_len += strlen(tmp);
*sync_url = (char *)calloc(url_len+1, sizeof(char));
snprintf(*sync_url, url_len, "%ssync-from/%s", state->base_url, tmp);
curl_free(tmp);
return U1DB_OK;
}
int
u1db__get_oauth_authorization(u1db_sync_target *st,
const char *http_method, const char *url,
char **oauth_authorization)
{
int status = U1DB_OK;
struct _http_state *state;
char *oauth_data = NULL;
char *http_hdr = NULL;
int argc = 0;
int hdr_size = 0, oauth_size = 0;
char **argv = NULL;
status = impl_as_http_state(st->implementation, &state);
if (status != U1DB_OK) {
return status;
}
if (state->consumer_key == NULL || state->consumer_secret == NULL
|| state->token_key == NULL || state->token_secret == NULL)
{
return U1DB_INVALID_PARAMETER;
}
argc = oauth_split_url_parameters(url, &argv);
oauth_sign_array2_process(&argc, &argv, NULL, OA_HMAC, http_method,
state->consumer_key, state->consumer_secret,
state->token_key, state->token_secret);
oauth_data = oauth_serialize_url_sep(argc, 1, argv, ", ", 6);
if (oauth_data == NULL) {
status = U1DB_INTERNAL_ERROR;
goto finish;
}
oauth_size = strlen(oauth_data);
// sizeof(auth_header_prefix) includes the trailing null, so we don't
// need to add 1
hdr_size = sizeof(auth_header_prefix) + oauth_size;
http_hdr = (char *)calloc(hdr_size, 1);
if (http_hdr == NULL) {
status = U1DB_NOMEM;
goto finish;
}
memcpy(http_hdr, auth_header_prefix, sizeof(auth_header_prefix));
memcpy(http_hdr + sizeof(auth_header_prefix)-1, oauth_data, oauth_size);
finish:
if (oauth_data != NULL) {
free(oauth_data);
}
oauth_free_array(&argc, &argv);
if (status == U1DB_OK) {
*oauth_authorization = http_hdr;
} else if (http_hdr != NULL) {
free(http_hdr);
}
return status;
}