/*
* Copyright 2012 Canonical Ltd.
*
* This file is part of u1db.
*
* u1db is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License version 3
* as published by the Free Software Foundation.
*
* u1db is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with u1db. If not, see .
*/
#include "u1db/u1db_internal.h"
#include
#include
static int st_get_sync_info(u1db_sync_target *st,
const char *source_replica_uid,
const char **st_replica_uid, int *st_gen,
char **st_trans_id, int *source_gen,
char **source_trans_id);
static int st_record_sync_info(u1db_sync_target *st,
const char *source_replica_uid, int source_gen, const char *trans_id);
static int st_sync_exchange(u1db_sync_target *st,
const char *source_replica_uid, int n_docs,
u1db_document **docs, int *generations,
const char **trans_ids, int *target_gen,
char **target_trans_id, void *context,
u1db_doc_gen_callback cb);
static int st_sync_exchange_doc_ids(u1db_sync_target *st,
u1database *source_db, int n_doc_ids,
const char **doc_ids, int *generations,
const char **trans_ids, int *target_gen,
char **target_trans_id, void *context,
u1db_doc_gen_callback cb);
static int st_get_sync_exchange(u1db_sync_target *st,
const char *source_replica_uid,
int source_gen,
u1db_sync_exchange **exchange);
static void st_finalize_sync_exchange(u1db_sync_target *st,
u1db_sync_exchange **exchange);
static int st_set_trace_hook(u1db_sync_target *st,
void *context, u1db__trace_callback cb);
static void st_finalize(u1db_sync_target *st);
static void se_free_seen_id(struct lh_entry *e);
struct _get_docs_to_doc_gen_context {
int doc_offset;
void *user_context;
u1db_doc_gen_callback user_cb;
int *gen_for_doc_ids;
const char **trans_ids_for_doc_ids;
int free_when_done;
};
// A wrapper to change a 'u1db_doc_callback' into a 'u1db_doc_gen_callback'.
static int get_docs_to_gen_docs(void *context, u1db_document *doc);
int
u1db__get_sync_target(u1database *db, u1db_sync_target **sync_target)
{
int status = U1DB_OK;
if (db == NULL || sync_target == NULL) {
return U1DB_INVALID_PARAMETER;
}
*sync_target = (u1db_sync_target *)calloc(1, sizeof(u1db_sync_target));
if (*sync_target == NULL) {
return U1DB_NOMEM;
}
(*sync_target)->implementation = db;
(*sync_target)->get_sync_info = st_get_sync_info;
(*sync_target)->record_sync_info = st_record_sync_info;
(*sync_target)->sync_exchange = st_sync_exchange;
(*sync_target)->sync_exchange_doc_ids = st_sync_exchange_doc_ids;
(*sync_target)->get_sync_exchange = st_get_sync_exchange;
(*sync_target)->finalize_sync_exchange = st_finalize_sync_exchange;
(*sync_target)->_set_trace_hook = st_set_trace_hook;
(*sync_target)->finalize = st_finalize;
return status;
}
void
u1db__free_sync_target(u1db_sync_target **sync_target)
{
if (sync_target == NULL || *sync_target == NULL) {
return;
}
(*sync_target)->finalize(*sync_target);
free(*sync_target);
*sync_target = NULL;
}
static int
st_get_sync_info(u1db_sync_target *st, const char *source_replica_uid,
const char **st_replica_uid, int *st_gen, char **st_trans_id,
int *source_gen, char **source_trans_id)
{
int status = U1DB_OK;
u1database *db;
if (st == NULL || source_replica_uid == NULL || st_replica_uid == NULL
|| st_gen == NULL || source_gen == NULL)
{
return U1DB_INVALID_PARAMETER;
}
// TODO: This really feels like it should be done inside some sort of
// transaction, so that the sync information is consistent with the
// current db generation. (at local generation X we are synchronized
// with remote generation Y.)
// At the very least, though, we check the sync generation *first*,
// so that we should only be getting the same data again, if for some
// reason we are currently synchronizing with the remote object.
db = (u1database *)st->implementation;
status = u1db_get_replica_uid(db, st_replica_uid);
if (status != U1DB_OK) { goto finish; }
status = u1db__get_replica_gen_and_trans_id(
db, source_replica_uid, source_gen, source_trans_id);
if (status != U1DB_OK) { goto finish; }
status = u1db__get_generation_info(db, st_gen, st_trans_id);
finish:
return status;
}
static int
st_record_sync_info(u1db_sync_target *st, const char *source_replica_uid,
int source_gen, const char *trans_id)
{
int status;
u1database *db;
if (st == NULL || source_replica_uid == NULL) {
return U1DB_INVALID_PARAMETER;
}
if (st->trace_cb) {
status = st->trace_cb(st->trace_context, "record_sync_info");
if (status != U1DB_OK) { goto finish; }
}
db = (u1database *)st->implementation;
status = u1db__set_replica_gen_and_trans_id(
db, source_replica_uid, source_gen, trans_id);
finish:
return status;
}
static int
st_get_sync_exchange(u1db_sync_target *st, const char *source_replica_uid,
int target_gen_known_by_source,
u1db_sync_exchange **exchange)
{
u1db_sync_exchange *tmp;
if (st == NULL || source_replica_uid == NULL || exchange == NULL) {
return U1DB_INVALID_PARAMETER;
}
tmp = (u1db_sync_exchange *)calloc(1, sizeof(u1db_sync_exchange));
if (tmp == NULL) {
return U1DB_NOMEM;
}
tmp->db = (u1database *)st->implementation;
tmp->source_replica_uid = source_replica_uid;
tmp->target_gen = target_gen_known_by_source;
// Note: lh_table is overkill for what we need. We only need a set, not a
// mapping, and we don't need the prev/next pointers. But it is
// already available, and doesn't require us to implement and debug
// another set() implementation.
tmp->seen_ids = lh_kchar_table_new(100, "seen_ids",
se_free_seen_id);
tmp->trace_context = st->trace_context;
tmp->trace_cb = st->trace_cb;
*exchange = tmp;
return U1DB_OK;
}
static void
st_finalize_sync_exchange(u1db_sync_target *st, u1db_sync_exchange **exchange)
{
int i;
if (exchange == NULL || *exchange == NULL) {
return;
}
if ((*exchange)->seen_ids != NULL) {
lh_table_free((*exchange)->seen_ids);
(*exchange)->seen_ids = NULL;
}
if ((*exchange)->doc_ids_to_return != NULL) {
for (i = 0; i < (*exchange)->num_doc_ids; ++i) {
free((*exchange)->doc_ids_to_return[i]);
}
free((*exchange)->doc_ids_to_return);
(*exchange)->doc_ids_to_return = NULL;
(*exchange)->num_doc_ids = 0;
}
if ((*exchange)->gen_for_doc_ids != NULL) {
free((*exchange)->gen_for_doc_ids);
(*exchange)->gen_for_doc_ids = NULL;
}
if ((*exchange)->trans_ids_for_doc_ids != NULL) {
free((*exchange)->trans_ids_for_doc_ids);
(*exchange)->trans_ids_for_doc_ids = NULL;
}
if ((*exchange)->target_trans_id != NULL) {
free((*exchange)->target_trans_id);
(*exchange)->target_trans_id = NULL;
}
free(*exchange);
*exchange = NULL;
}
static int
st_set_trace_hook(u1db_sync_target *st, void *context, u1db__trace_callback cb)
{
st->trace_context = context;
st->trace_cb = cb;
return U1DB_OK;
}
static void
st_finalize(u1db_sync_target *st)
{
return;
}
static void
se_free_seen_id(struct lh_entry *e)
{
if (e == NULL) {
return;
}
if (e->k != NULL) {
free((void *)e->k);
e->k = NULL;
}
/* v is a (void*)int */
}
int
u1db__sync_exchange_seen_ids(u1db_sync_exchange *se, int *n_ids,
const char ***doc_ids)
{
int i;
struct lh_entry *entry;
if (se == NULL || n_ids == NULL || doc_ids == NULL) {
return U1DB_INVALID_PARAMETER;
}
if (se->seen_ids == NULL || se->seen_ids->count == 0) {
*n_ids = 0;
*doc_ids = NULL;
return U1DB_OK;
}
*n_ids = se->seen_ids->count;
(*doc_ids) = (const char **)calloc(*n_ids, sizeof(char *));
i = 0;
lh_foreach(se->seen_ids, entry) {
if (entry->k != NULL) {
if (i >= (*n_ids)) {
// TODO: Better error? For some reason we found more than
// 'count' valid entries
return U1DB_INVALID_PARAMETER;
}
(*doc_ids)[i] = entry->k;
i++;
}
}
return U1DB_OK;
}
int
u1db__sync_exchange_insert_doc_from_source(u1db_sync_exchange *se,
u1db_document *doc, int source_gen, const char *trans_id)
{
int status = U1DB_OK;
int insert_state;
int at_gen;
if (se == NULL || se->db == NULL || doc == NULL) {
return U1DB_INVALID_PARAMETER;
}
// fprintf(stderr, "Inserting %s from source\n", doc->doc_id);
status = u1db__put_doc_if_newer(se->db, doc, 0, se->source_replica_uid,
source_gen, trans_id, &insert_state,
&at_gen);
if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONVERGED) {
lh_table_insert(se->seen_ids, strdup(doc->doc_id),
(void *)(intptr_t)at_gen);
} else {
// state should be either U1DB_SUPERSEDED or U1DB_CONFLICTED, in either
// case, we don't count this as a 'seen_id' because we will want to be
// returning a document with this identifier back to the user.
// fprintf(stderr, "Not inserting %s, %d\n", doc->doc_id, insert_state);
}
return status;
}
struct _whats_changed_doc_ids_state {
int num_doc_ids;
int max_doc_ids;
struct lh_table *exclude_ids;
char **doc_ids_to_return;
int *gen_for_doc_ids;
const char **trans_ids_for_doc_ids;
};
// Callback for whats_changed to map the callback into the sync_exchange
// doc_ids_to_return array.
static int
whats_changed_to_doc_ids(void *context, const char *doc_id, int gen,
const char *trans_id)
{
struct lh_entry *e;
struct _whats_changed_doc_ids_state *state;
state = (struct _whats_changed_doc_ids_state *)context;
if (state->exclude_ids != NULL
&& (e = lh_table_lookup_entry(state->exclude_ids, doc_id)) != NULL
&& (intptr_t)e->v >= gen)
{
// This document was already seen at this gen,
// so we don't need to return it
return 0;
}
if (state->num_doc_ids >= state->max_doc_ids) {
state->max_doc_ids = (state->max_doc_ids * 2) + 10;
if (state->doc_ids_to_return == NULL) {
state->doc_ids_to_return = (char **)calloc(
state->max_doc_ids, sizeof(char*));
state->gen_for_doc_ids = (int *)calloc(
state->max_doc_ids, sizeof(int));
state->trans_ids_for_doc_ids = (const char **)calloc(
state->max_doc_ids, sizeof(char*));
} else {
state->doc_ids_to_return = (char **)realloc(
state->doc_ids_to_return, state->max_doc_ids * sizeof(char*));
state->gen_for_doc_ids = (int *)realloc(
state->gen_for_doc_ids, state->max_doc_ids * sizeof(int));
state->trans_ids_for_doc_ids = (const char **)realloc(
state->gen_for_doc_ids, state->max_doc_ids * sizeof(char*));
}
if (state->doc_ids_to_return == NULL || state->gen_for_doc_ids == NULL
|| state->trans_ids_for_doc_ids == NULL)
{
return U1DB_NOMEM;
}
}
state->doc_ids_to_return[state->num_doc_ids] = strdup(doc_id);
state->gen_for_doc_ids[state->num_doc_ids] = gen;
state->trans_ids_for_doc_ids[state->num_doc_ids] = trans_id;
state->num_doc_ids++;
return 0;
}
int
u1db__sync_exchange_find_doc_ids_to_return(u1db_sync_exchange *se)
{
int status;
struct _whats_changed_doc_ids_state state = {0};
if (se == NULL) {
return U1DB_INVALID_PARAMETER;
}
if (se->trace_cb) {
status = se->trace_cb(se->trace_context, "before whats_changed");
if (status != U1DB_OK) { goto finish; }
}
state.exclude_ids = se->seen_ids;
status = u1db_whats_changed(se->db, &se->target_gen, &se->target_trans_id,
(void*)&state, whats_changed_to_doc_ids);
if (status != U1DB_OK) {
free(state.doc_ids_to_return);
free(state.gen_for_doc_ids);
free(state.trans_ids_for_doc_ids);
goto finish;
}
if (se->trace_cb) {
status = se->trace_cb(se->trace_context, "after whats_changed");
if (status != U1DB_OK) { goto finish; }
}
se->num_doc_ids = state.num_doc_ids;
se->doc_ids_to_return = state.doc_ids_to_return;
se->gen_for_doc_ids = state.gen_for_doc_ids;
se->trans_ids_for_doc_ids = state.trans_ids_for_doc_ids;
finish:
return status;
}
// A wrapper to change a 'u1db_doc_callback' into a 'u1db_doc_gen_callback'.
static int
get_docs_to_gen_docs(void *context, u1db_document *doc)
{
struct _get_docs_to_doc_gen_context *ctx;
int status;
ctx = (struct _get_docs_to_doc_gen_context *)context;
// Note: using doc_offset in this way assumes that u1db_get_docs will
// always return them in exactly the order we requested. This is
// probably true, though.
status = ctx->user_cb(
ctx->user_context, doc, ctx->gen_for_doc_ids[ctx->doc_offset],
ctx->trans_ids_for_doc_ids[ctx->doc_offset]);
ctx->doc_offset++;
if (ctx->free_when_done) {
u1db_free_doc(&doc);
}
return status;
}
int
u1db__sync_exchange_return_docs(u1db_sync_exchange *se, void *context,
int (*cb)(void *context, u1db_document *doc,
int gen, const char *trans_id))
{
int status = U1DB_OK;
struct _get_docs_to_doc_gen_context state = {0};
if (se == NULL || cb == NULL) {
return U1DB_INVALID_PARAMETER;
}
state.user_context = context;
state.user_cb = cb;
state.doc_offset = 0;
state.gen_for_doc_ids = se->gen_for_doc_ids;
state.trans_ids_for_doc_ids = se->trans_ids_for_doc_ids;
if (se->trace_cb) {
status = se->trace_cb(se->trace_context, "before get_docs");
if (status != U1DB_OK) { goto finish; }
}
if (se->num_doc_ids > 0) {
status = u1db_get_docs(se->db, se->num_doc_ids,
(const char **)se->doc_ids_to_return,
0, 1, &state, get_docs_to_gen_docs);
}
finish:
return status;
}
struct _return_doc_state {
u1database *db;
const char *target_uid;
int num_inserted;
};
static int
return_doc_to_insert_from_target(void *context, u1db_document *doc, int gen,
const char *trans_id)
{
int status, insert_state;
struct _return_doc_state *state;
state = (struct _return_doc_state *)context;
status = u1db__put_doc_if_newer(
state->db, doc, 1, state->target_uid, gen, trans_id, &insert_state,
NULL);
u1db_free_doc(&doc);
if (status == U1DB_OK) {
if (insert_state == U1DB_INSERTED || insert_state == U1DB_CONFLICTED) {
// Either it was directly inserted, or it was saved as a conflict
state->num_inserted++;
}
} else {
}
return status;
}
static int
get_and_insert_docs(u1database *source_db, u1db_sync_exchange *se,
int n_doc_ids, const char **doc_ids, int *generations,
const char **trans_ids)
{
struct _get_docs_to_doc_gen_context get_doc_state = {0};
get_doc_state.free_when_done = 1;
get_doc_state.user_context = se;
// Note: user_cb takes a 'void *' as the first parameter, so we cast the
// u1db__sync_exchange_insert_doc_from_source to avoid the warning
get_doc_state.user_cb =
(u1db_doc_gen_callback)u1db__sync_exchange_insert_doc_from_source;
get_doc_state.gen_for_doc_ids = generations;
get_doc_state.trans_ids_for_doc_ids = trans_ids;
return u1db_get_docs(source_db, n_doc_ids, doc_ids,
0, 1, &get_doc_state, get_docs_to_gen_docs);
}
static int
st_sync_exchange(u1db_sync_target *st, const char *source_replica_uid,
int n_docs, u1db_document **docs, int *generations,
const char **trans_ids, int *target_gen,
char **target_trans_id, void *context,
u1db_doc_gen_callback cb)
{
int status, i;
u1db_sync_exchange *exchange = NULL;
if (st == NULL || generations == NULL || target_gen == NULL
|| target_trans_id == NULL || cb == NULL)
{
return U1DB_INVALID_PARAMETER;
}
if (n_docs > 0 && (docs == NULL || generations == NULL)) {
return U1DB_INVALID_PARAMETER;
}
status = st->get_sync_exchange(st, source_replica_uid,
*target_gen, &exchange);
if (status != U1DB_OK) { goto finish; }
status = u1db_validate_gen_and_trans_id(
exchange->db, *target_gen, *target_trans_id);
if (status != U1DB_OK) { goto finish; }
for (i = 0; i < n_docs; ++i) {
status = u1db__sync_exchange_insert_doc_from_source(
exchange, docs[i], generations[i], trans_ids[i]);
if (status != U1DB_OK) { goto finish; }
}
status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
if (status != U1DB_OK) { goto finish; }
status = u1db__sync_exchange_return_docs(exchange, context, cb);
finish:
*target_gen = exchange->target_gen;
*target_trans_id = exchange->target_trans_id;
// We set this to NULL, because the caller is now responsible for it
exchange->target_trans_id = NULL;
st->finalize_sync_exchange(st, &exchange);
return status;
}
static int
st_sync_exchange_doc_ids(u1db_sync_target *st, u1database *source_db,
int n_doc_ids, const char **doc_ids, int *generations,
const char **trans_ids, int *target_gen, char **target_trans_id,
void *context, u1db_doc_gen_callback cb)
{
int status;
const char *source_replica_uid = NULL;
u1db_sync_exchange *exchange = NULL;
if (st == NULL || source_db == NULL || target_gen == NULL
|| target_trans_id == NULL || cb == NULL)
{
return U1DB_INVALID_PARAMETER;
}
if (n_doc_ids > 0 && (doc_ids == NULL || generations == NULL)) {
return U1DB_INVALID_PARAMETER;
}
status = u1db_get_replica_uid(source_db, &source_replica_uid);
if (status != U1DB_OK) { goto finish; }
status = st->get_sync_exchange(st, source_replica_uid,
*target_gen, &exchange);
if (status != U1DB_OK) { goto finish; }
status = u1db_validate_gen_and_trans_id(
exchange->db, *target_gen, *target_trans_id);
if (status != U1DB_OK) { goto finish; }
if (n_doc_ids > 0) {
status = get_and_insert_docs(source_db, exchange,
n_doc_ids, doc_ids, generations, trans_ids);
if (status != U1DB_OK) { goto finish; }
}
status = u1db__sync_exchange_find_doc_ids_to_return(exchange);
if (status != U1DB_OK) { goto finish; }
status = u1db__sync_exchange_return_docs(exchange, context, cb);
finish:
*target_gen = exchange->target_gen;
*target_trans_id = exchange->target_trans_id;
// We set this to NULL, because the caller is now responsible for it
exchange->target_trans_id = NULL;
st->finalize_sync_exchange(st, &exchange);
return status;
}
int
u1db__sync_db_to_target(u1database *db, u1db_sync_target *target,
int *local_gen_before_sync)
{
int status;
struct _whats_changed_doc_ids_state to_send_state = {0};
struct _return_doc_state return_doc_state = {0};
const char *target_uid, *local_uid;
char *local_trans_id = NULL;
char *local_target_trans_id = NULL;
char *target_trans_id_known_by_local = NULL;
char *local_trans_id_known_by_target = NULL;
char *target_trans_id = NULL;
int target_gen, local_gen;
int local_gen_known_by_target, target_gen_known_by_local;
// fprintf(stderr, "Starting\n");
if (db == NULL || target == NULL || local_gen_before_sync == NULL) {
// fprintf(stderr, "DB, target, or local are NULL\n");
status = U1DB_INVALID_PARAMETER;
goto finish;
}
status = u1db_get_replica_uid(db, &local_uid);
if (status != U1DB_OK) { goto finish; }
// fprintf(stderr, "Local uid: %s\n", local_uid);
status = target->get_sync_info(
target, local_uid, &target_uid, &target_gen, &target_trans_id,
&local_gen_known_by_target, &local_trans_id_known_by_target);
if (status != U1DB_OK) { goto finish; }
status = u1db_validate_gen_and_trans_id(
db, local_gen_known_by_target, local_trans_id_known_by_target);
if (status != U1DB_OK) { goto finish; }
status = u1db__get_replica_gen_and_trans_id(
db, target_uid, &target_gen_known_by_local,
&target_trans_id_known_by_local);
if (status != U1DB_OK) { goto finish; }
local_target_trans_id = target_trans_id_known_by_local;
local_gen = local_gen_known_by_target;
// Before we start the sync exchange, get the list of doc_ids that we want
// to send. We have to do this first, so that local_gen_before_sync will
// match exactly the list of doc_ids we send
status = u1db_whats_changed(
db, &local_gen, &local_trans_id, (void*)&to_send_state,
whats_changed_to_doc_ids);
if (status != U1DB_OK) { goto finish; }
if (local_gen == local_gen_known_by_target
&& target_gen == target_gen_known_by_local)
{
if (strcmp(target_trans_id, target_trans_id_known_by_local) != 0) {
status = U1DB_INVALID_TRANSACTION_ID;
}
// We know status == U1DB_OK, and we can shortcut the rest of the
// logic, no need to look for more information.
goto finish;
}
*local_gen_before_sync = local_gen;
return_doc_state.db = db;
return_doc_state.target_uid = target_uid;
return_doc_state.num_inserted = 0;
status = target->sync_exchange_doc_ids(target, db,
to_send_state.num_doc_ids,
(const char**)to_send_state.doc_ids_to_return,
to_send_state.gen_for_doc_ids, to_send_state.trans_ids_for_doc_ids,
&target_gen_known_by_local, &target_trans_id_known_by_local,
&return_doc_state, return_doc_to_insert_from_target);
if (status != U1DB_OK) { goto finish; }
if (local_trans_id != NULL) {
free(local_trans_id);
}
status = u1db__get_generation_info(db, &local_gen, &local_trans_id);
if (status != U1DB_OK) { goto finish; }
// Now we successfully sent and received docs, make sure we record the
// current remote generation
status = u1db__set_replica_gen_and_trans_id(
db, target_uid, target_gen_known_by_local,
target_trans_id_known_by_local);
if (status != U1DB_OK) { goto finish; }
if (return_doc_state.num_inserted > 0 &&
((*local_gen_before_sync + return_doc_state.num_inserted)
== local_gen))
{
status = target->record_sync_info(
target, local_uid, local_gen, local_trans_id);
if (status != U1DB_OK) { goto finish; }
}
finish:
if (local_trans_id != NULL) {
free(local_trans_id);
}
if (local_trans_id_known_by_target != NULL) {
free(local_trans_id_known_by_target);
}
if (target_trans_id != NULL) {
free(target_trans_id);
}
if (local_target_trans_id != NULL) {
if (target_trans_id_known_by_local == local_target_trans_id) {
// Don't double free
target_trans_id_known_by_local = NULL;
}
free(local_target_trans_id);
local_target_trans_id = NULL;
}
if (target_trans_id_known_by_local != NULL) {
free(target_trans_id_known_by_local);
target_trans_id_known_by_local = NULL;
}
if (to_send_state.doc_ids_to_return != NULL) {
int i;
for (i = 0; i < to_send_state.num_doc_ids; ++i) {
free(to_send_state.doc_ids_to_return[i]);
}
free(to_send_state.doc_ids_to_return);
}
if (to_send_state.gen_for_doc_ids != NULL) {
free(to_send_state.gen_for_doc_ids);
}
if (to_send_state.trans_ids_for_doc_ids != NULL) {
free(to_send_state.trans_ids_for_doc_ids);
}
return status;
}