/* binlog.c - binary log implementation */
/* Copyright (C) 2008 Graham Barr
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "config.h"
#if HAVE_STDINT_H
# include
#endif /* else we get int types from config.h */
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "tube.h"
#include "job.h"
#include "binlog.h"
#include "util.h"
#include "port.h"
typedef struct binlog *binlog;
struct binlog {
binlog next;
unsigned int refs;
int fd;
size_t free;
size_t reserved;
char path[];
};
/* max size we will create a log file */
size_t binlog_size_limit = BINLOG_SIZE_LIMIT_DEFAULT;
int enable_fsync = 0;
size_t fsync_throttle_ms = 0;
uint64_t last_fsync = 0;
char *binlog_dir = NULL;
static int binlog_index = 0;
static int binlog_version = 5;
static int lock_fd;
static binlog oldest_binlog = 0,
current_binlog = 0,
newest_binlog = 0;
static const size_t job_record_size = offsetof(struct job, pad);
static int
binlog_scan_dir()
{
DIR *dirp;
struct dirent *dp;
long min = 0;
long max = 0;
long val;
char *endptr;
size_t name_len;
dirp = opendir(binlog_dir);
if (!dirp) return 0;
while ((dp = readdir(dirp)) != NULL) {
name_len = strlen(dp->d_name);
if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
val = strtol(dp->d_name + 7, &endptr, 10);
if (endptr && *endptr == 0) {
if (max == 0 || val > max) max = val;
if (min == 0 || val < min) min = val;
}
}
}
closedir(dirp);
binlog_index = (int) max;
return (int) min;
}
static void
binlog_remove_oldest()
{
binlog b = oldest_binlog;
if (!b) return;
oldest_binlog = b->next;
unlink(b->path);
free(b);
}
static binlog
binlog_iref(binlog b)
{
if (b) b->refs++;
return b;
}
static void
binlog_dref(binlog b)
{
if (!b) return;
if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
--b->refs;
if (b->refs < 1) {
while (oldest_binlog && oldest_binlog->refs == 0) {
binlog_remove_oldest();
}
}
}
/*
static void
binlog_warn(int fd, const char* path, const char *msg)
{
warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
" Continuing. You may be missing data.");
}
*/
#define binlog_warn(b, fmt, args...) \
warnx("WARNING, " fmt " at %s:%u. %s: ", \
##args, b->path, lseek(b->fd, 0, SEEK_CUR), \
"Continuing. You may be missing data.")
static void
binlog_read_log_file(binlog b, job binlog_jobs)
{
struct job js;
tube t;
job j;
char tubename[MAX_TUBE_NAME_LEN];
size_t namelen;
ssize_t r;
int version;
r = read(b->fd, &version, sizeof(version));
if (r == -1) return twarn("read()");
if (r < sizeof(version)) {
return binlog_warn(b, "EOF while reading version record");
}
if (version != binlog_version) {
return warnx("%s: binlog version mismatch %d %d", b->path, version,
binlog_version);
}
while (read(b->fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
if (namelen >= MAX_TUBE_NAME_LEN) {
return binlog_warn(b, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
}
if (namelen > 0) {
r = read(b->fd, tubename, namelen);
if (r == -1) return twarn("read()");
if (r < namelen) {
lseek(b->fd, SEEK_CUR, 0);
return binlog_warn(b, "EOF while reading tube name");
}
}
tubename[namelen] = '\0';
r = read(b->fd, &js, job_record_size);
if (r == -1) return twarn("read()");
if (r < job_record_size) {
return binlog_warn(b, "EOF while reading job record");
}
if (!js.id) break;
j = job_find(js.id);
switch (js.state) {
case JOB_STATE_INVALID:
if (j) {
job_remove(j);
binlog_dref(j->binlog);
job_free(j);
j = NULL;
}
break;
case JOB_STATE_READY:
case JOB_STATE_DELAYED:
if (!j && namelen > 0) {
t = tube_find_or_make(tubename);
j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
t, js.id);
j->next = j->prev = j;
j->created_at = js.created_at;
job_insert(binlog_jobs, j);
}
if (js.body_size && namelen > 0) { /* namelen > 0 only on new jobs */
if (js.body_size > j->body_size) {
warnx("job size increased from %zu to %zu", j->body_size,
js.body_size);
job_remove(j);
binlog_dref(j->binlog);
job_free(j);
return binlog_warn(b, "EOF while reading job body");
}
r = read(b->fd, j->body, js.body_size);
if (r == -1) return twarn("read()");
if (r < js.body_size) {
warnx("dropping incomplete job %llu", j->id);
job_remove(j);
binlog_dref(j->binlog);
job_free(j);
return binlog_warn(b, "EOF while reading job body");
}
}
break;
}
if (j) {
j->state = js.state;
j->deadline_at = js.deadline_at;
j->pri = js.pri;
j->delay = js.delay;
j->ttr = js.ttr;
j->timeout_ct = js.timeout_ct;
j->release_ct = js.release_ct;
j->bury_ct = js.bury_ct;
j->kick_ct = js.kick_ct;
/* this is a complete record, so we can move the binlog ref */
if (namelen && js.body_size) {
binlog_dref(j->binlog);
j->binlog = binlog_iref(b);
}
}
}
}
static void
binlog_close(binlog b)
{
int r;
if (!b) return;
if (b->fd < 0) return;
if (b->free) {
// Some compilers give a warning if the return value of ftruncate is
// ignored. So we pretend to use it.
r = ftruncate(b->fd, binlog_size_limit - b->free);
if (r == -1) {
// Nothing we can do. The user might see warnings next startup.
}
}
close(b->fd);
b->fd = -1;
binlog_dref(b);
}
static binlog
make_binlog(char *path)
{
binlog b;
b = (binlog) malloc(sizeof(struct binlog) + strlen(path) + 1);
if (!b) return twarnx("OOM"), (binlog) 0;
strcpy(b->path, path);
b->refs = 0;
b->next = NULL;
b->fd = -1;
b->free = 0;
b->reserved = 0;
return b;
}
static binlog
make_next_binlog()
{
int r;
char path[PATH_MAX];
if (!binlog_dir) return NULL;
r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), (binlog)0;
return make_binlog(path);
}
static binlog
add_binlog(binlog b)
{
if (newest_binlog) newest_binlog->next = b;
newest_binlog = b;
if (!oldest_binlog) oldest_binlog = b;
return b;
}
static int
beanstalkd_fallocate(int fd, off_t offset, off_t len)
{
off_t i;
ssize_t w;
off_t p;
#define ZERO_BUF_SIZE 512
char buf[ZERO_BUF_SIZE] = {}; /* initialize to zero */
/* we only support a 0 offset */
if (offset != 0) return EINVAL;
if (len <= 0) return EINVAL;
for (i = 0; i < len; i += w) {
w = write(fd, &buf, ZERO_BUF_SIZE);
if (w == -1) return errno;
}
p = lseek(fd, 0, SEEK_SET);
if (p == -1) return errno;
return 0;
}
static void
binlog_open(binlog log, size_t *written)
{
int fd;
size_t bytes_written;
if (written) *written = 0;
if (!binlog_iref(log)) return;
fd = open(log->path, O_WRONLY | O_CREAT, 0400);
if (fd < 0) return twarn("Cannot open binlog %s", log->path);
#ifdef HAVE_POSIX_FALLOCATE
{
int r;
r = posix_fallocate(fd, 0, binlog_size_limit);
if (r == EINVAL) {
r = beanstalkd_fallocate(fd, 0, binlog_size_limit);
}
if (r) {
close(fd);
binlog_dref(log);
errno = r;
return twarn("Cannot allocate space for binlog %s", log->path);
}
}
#else
/* Allocate space in a slow but portable way. */
{
int r;
r = beanstalkd_fallocate(fd, 0, binlog_size_limit);
if (r) {
close(fd);
binlog_dref(log);
errno = r;
return twarn("Cannot allocate space for binlog %s", log->path);
}
}
#endif
bytes_written = write(fd, &binlog_version, sizeof(int));
if (written) *written = bytes_written;
if (bytes_written < sizeof(int)) {
twarn("Cannot write to binlog");
close(fd);
binlog_dref(log);
return;
}
log->fd = fd;
}
/* returns 1 on success, 0 on error. */
static int
binlog_use_next()
{
binlog next;
if (!current_binlog) return 0;
next = current_binlog->next;
if (!next) return 0;
/* assert(current_binlog->reserved == 0); */
binlog_close(current_binlog);
current_binlog = next;
return 1;
}
void
binlog_shutdown()
{
binlog_use_next();
binlog_close(current_binlog);
}
/* Returns the number of jobs successfully written (either 0 or 1).
If this fails, something is seriously wrong. It should never fail because of
a full disk. (The binlog_reserve_space_* functions, on the other hand, can
fail because of a full disk.)
If we are not using the binlog at all (!current_binlog), then we pretend to
have made a successful write and return 1. */
int
binlog_write_job(job j)
{
ssize_t written;
size_t tube_namelen, to_write = 0;
struct iovec vec[4], *vptr;
int vcnt = 3, r;
uint64_t now;
if (!current_binlog) return 1;
tube_namelen = 0;
vec[0].iov_base = (char *) &tube_namelen;
to_write += vec[0].iov_len = sizeof(size_t);
vec[1].iov_base = j->tube->name;
vec[1].iov_len = 0;
vec[2].iov_base = (char *) j;
to_write += vec[2].iov_len = job_record_size;
if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED ||
j->state == JOB_STATE_BURIED) {
if (!j->binlog) {
tube_namelen = strlen(j->tube->name);
to_write += vec[1].iov_len = tube_namelen;
vcnt = 4;
vec[3].iov_base = j->body;
to_write += vec[3].iov_len = j->body_size;
}
} else if (j->state == JOB_STATE_INVALID) {
if (j->binlog) binlog_dref(j->binlog);
j->binlog = NULL;
} else {
return twarnx("unserializable job state: %d", j->state), 0;
}
if (to_write > current_binlog->reserved) {
r = binlog_use_next();
if (!r) return twarnx("failed to use next binlog"), 0;
}
if (j->state && !j->binlog) j->binlog = binlog_iref(current_binlog);
while (to_write > 0) {
written = writev(current_binlog->fd, vec, vcnt);
if (written < 0) {
if (errno == EAGAIN) continue;
if (errno == EINTR) continue;
twarn("writev");
binlog_close(current_binlog);
current_binlog = 0;
return 0;
}
to_write -= written;
if (to_write > 0 && written > 0) {
for (vptr = vec; written >= vptr->iov_len; vptr++) {
written -= vptr->iov_len;
vptr->iov_len = 0;
}
vptr->iov_base = (char *) vptr->iov_base + written;
vptr->iov_len -= written;
}
current_binlog->reserved -= written;
j->reserved_binlog_space -= written;
}
now = now_usec() / 1000; /* usec -> msec */
if (enable_fsync && now - last_fsync >= fsync_throttle_ms) {
r = fdatasync(current_binlog->fd);
if (r == -1) return twarn("fdatasync"), 0;
last_fsync = now;
}
return 1;
}
static binlog
make_future_binlog()
{
binlog b;
size_t header;
/* open a new binlog with more space to reserve */
b = make_next_binlog();
if (!b) return twarnx("error making next binlog"), (binlog) 0;
binlog_open(b, &header);
/* open failed, so we can't reserve any space */
if (b->fd < 0) {
free(b);
return 0;
}
b->free = binlog_size_limit - header;
b->reserved = 0;
return b;
}
static int
can_move_reserved(size_t n, binlog from, binlog to)
{
return from->reserved >= n && to->free >= n;
}
static void
move_reserved(size_t n, binlog from, binlog to)
{
from->reserved -= n;
from->free += n;
to->reserved += n;
to->free -= n;
}
static size_t
ensure_free_space(size_t n)
{
binlog fb;
if (newest_binlog && newest_binlog->free >= n) return n;
/* open a new binlog */
fb = make_future_binlog();
if (!fb) return twarnx("make_future_binlog"), 0;
add_binlog(fb);
return n;
}
/* Preserve some invariants immediately after any space reservation.
* Invariant 1: current_binlog->reserved >= n.
* Invariant 2: current_binlog->reserved is congruent to n (mod z), where z
* is the size of a delete record in the binlog. */
static size_t
maintain_invariant(size_t n)
{
size_t reserved_later, remainder, complement, z, r;
/* In this function, reserved bytes are conserved (they are neither created
* nor destroyed). We just move them around to preserve the invariant. We
* might have to create new free space (i.e. allocate a new binlog file),
* though. */
/* Invariant 1. */
/* This is a loop, but it's guaranteed to run at most once. The proof is
* left as an exercise for the reader. */
while (current_binlog->reserved < n) {
size_t to_move = current_binlog->reserved;
r = ensure_free_space(to_move);
if (r != to_move) {
twarnx("ensure_free_space");
if (newest_binlog->reserved >= n) {
newest_binlog->reserved -= n;
} else {
twarnx("failed to unreserve %zd bytes", n); /* can't happen */
}
return 0;
}
move_reserved(to_move, current_binlog, newest_binlog);
binlog_use_next();
}
/* Invariant 2. */
z = sizeof(size_t) + job_record_size;
reserved_later = current_binlog->reserved - n;
remainder = reserved_later % z;
if (remainder == 0) return n;
complement = z - remainder;
if (can_move_reserved(complement, newest_binlog, current_binlog)) {
move_reserved(complement, newest_binlog, current_binlog);
return n;
}
r = ensure_free_space(remainder);
if (r != remainder) {
twarnx("ensure_free_space");
if (newest_binlog->reserved >= n) {
newest_binlog->reserved -= n;
} else {
twarnx("failed to unreserve %zd bytes", n); /* can't happen */
}
return 0;
}
move_reserved(remainder, current_binlog, newest_binlog);
return n;
}
/* Returns the number of bytes successfully reserved: either 0 or n. */
static size_t
binlog_reserve_space(size_t n)
{
size_t r;
/* This return value must be nonzero but is otherwise ignored. */
if (!current_binlog) return 1;
if (current_binlog->free >= n) {
current_binlog->free -= n;
current_binlog->reserved += n;
return maintain_invariant(n);
}
r = ensure_free_space(n);
if (r != n) return twarnx("ensure_free_space"), 0;
newest_binlog->free -= n;
newest_binlog->reserved += n;
return maintain_invariant(n);
}
/* Returns the number of bytes reserved. */
size_t
binlog_reserve_space_put(job j)
{
size_t z = 0;
/* reserve space for the initial job record */
z += sizeof(size_t);
z += strlen(j->tube->name);
z += job_record_size;
z += j->body_size;
/* plus space for a delete to come later */
z += sizeof(size_t);
z += job_record_size;
return binlog_reserve_space(z);
}
size_t
binlog_reserve_space_update(job j)
{
size_t z = 0;
z += sizeof(size_t);
z += job_record_size;
return binlog_reserve_space(z);
}
int
binlog_lock()
{
int r;
struct flock lock;
char path[PATH_MAX];
r = snprintf(path, PATH_MAX, "%s/lock", binlog_dir);
if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), 0;
lock_fd = open(path, O_WRONLY|O_CREAT, 0600);
if (lock_fd == -1) return twarn("open"), 0;
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
lock.l_start = 0;
lock.l_len = 0;
r = fcntl(lock_fd, F_SETLK, &lock);
if (r) return twarn("fcntl"), 0;
return 1;
}
void
binlog_init(job binlog_jobs)
{
int binlog_index_min;
struct stat sbuf;
int fd, idx, r;
size_t n;
char path[PATH_MAX];
binlog b;
if (!binlog_dir) return;
/* Recover any jobs in old binlogs */
if (stat(binlog_dir, &sbuf) < 0) {
if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
} else if (!(sbuf.st_mode & S_IFDIR)) {
twarnx("%s", binlog_dir);
return;
}
binlog_index_min = binlog_scan_dir();
if (binlog_index_min) {
for (idx = binlog_index_min; idx <= binlog_index; idx++) {
r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
fd = open(path, O_RDONLY);
if (fd < 0) {
twarn("%s", path);
} else {
b = binlog_iref(add_binlog(make_binlog(path)));
b->fd = fd;
binlog_read_log_file(b, binlog_jobs);
close(fd);
b->fd = -1;
binlog_dref(b);
}
}
}
/* Set up for writing out new jobs */
n = ensure_free_space(1);
if (!n) return twarnx("error making first writable binlog");
current_binlog = newest_binlog;
}
const char *
binlog_oldest_index()
{
if (!oldest_binlog) return "0";
return strrchr(oldest_binlog->path, '.') + 1;
}
const char *
binlog_current_index()
{
if (!newest_binlog) return "0";
return strrchr(newest_binlog->path, '.') + 1;
}