1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief libmemcached Queue Storage Definitions
14
#include <libgearman-server/common.h>
16
#include <libgearman-server/plugins/queue/base.h>
17
#include <libgearman-server/plugins/queue/libmemcached/queue.h>
18
#include <libmemcached/memcached.h>
20
#pragma GCC diagnostic ignored "-Wold-style-cast"
22
using namespace gearmand;
25
* @addtogroup gearmand::plugins::queue::Libmemcachedatic Static libmemcached Queue Storage Functions
26
* @ingroup gearman_queue_libmemcached
33
#define GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX "gear_"
35
namespace gearmand { namespace plugins { namespace queue { class Libmemcached; }}}
37
static gearmand_error_t
38
_initialize(plugins::queue::Libmemcached *queue_obj,
39
gearman_server_st *server);
45
class Libmemcached : public gearmand::plugins::Queue {
50
gearmand_error_t initialize();
53
std::string server_list;
58
Libmemcached::Libmemcached() :
61
memcached_create(&memc);
63
command_line_options().add_options()
64
("libmemcached-servers", boost::program_options::value(&server_list), "List of Memcached servers to use.");
67
Libmemcached::~Libmemcached()
69
memcached_free(&memc);
72
gearmand_error_t Libmemcached::initialize()
74
return _initialize(this, &Gearmand()->server);
77
void initialize_libmemcached()
79
static Libmemcached local_instance;
83
} // namespace plugins
84
} // namespace gearmand
86
/* Queue callback functions. */
87
static gearmand_error_t _libmemcached_add(gearman_server_st *server,
89
const char *unique, size_t unique_size,
90
const char *function_name, size_t function_name_size,
91
const void *data, size_t data_size,
92
gearmand_job_priority_t priority,
95
static gearmand_error_t _libmemcached_flush(gearman_server_st *server,
98
static gearmand_error_t _libmemcached_done(gearman_server_st *server,
100
const char *unique, size_t unique_size,
101
const char *function_name, size_t function_name_size);
103
static gearmand_error_t _libmemcached_replay(gearman_server_st *server,
105
gearman_queue_add_fn *add_fn,
114
gearmand_error_t _initialize(plugins::queue::Libmemcached *queue,
115
gearman_server_st *server)
118
gearmand_info("Initializing libmemcached module");
120
memcached_server_st *servers= memcached_servers_parse(queue->server_list.c_str());
123
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "memcached_servers_parse");
125
return GEARMAN_QUEUE_ERROR;
128
memcached_server_push(&queue->memc, servers);
129
memcached_server_list_free(servers);
131
gearman_server_set_queue(server, queue, _libmemcached_add, _libmemcached_flush, _libmemcached_done, _libmemcached_replay);
133
return GEARMAN_SUCCESS;
140
static gearmand_error_t _libmemcached_add(gearman_server_st *server,
144
const char *function_name,
145
size_t function_name_size,
146
const void *data, size_t data_size,
147
gearmand_job_priority_t priority,
150
gearmand::plugins::queue::Libmemcached *queue= (gearmand::plugins::queue::Libmemcached *)context;
152
char key[MEMCACHED_MAX_KEY];
155
if (when) // No support for EPOCH jobs
156
return GEARMAN_QUEUE_ERROR;
160
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libmemcached add: %.*s", (uint32_t)unique_size, (char *)unique);
162
key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
163
GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
164
(int)function_name_size,
165
(const char *)function_name, (int)unique_size,
166
(const char *)unique);
168
rc= memcached_set(&queue->memc, (const char *)key, key_length,
169
(const char *)data, data_size, 0, (uint32_t)priority);
171
if (rc != MEMCACHED_SUCCESS)
172
return GEARMAN_QUEUE_ERROR;
174
return GEARMAN_SUCCESS;
177
static gearmand_error_t _libmemcached_flush(gearman_server_st *server,
180
gearmand_debug("libmemcached flush");
184
return GEARMAN_SUCCESS;
187
static gearmand_error_t _libmemcached_done(gearman_server_st *server,
189
const char *unique, size_t unique_size,
190
const char *function_name, size_t function_name_size)
193
char key[MEMCACHED_MAX_KEY];
194
gearmand::plugins::queue::Libmemcached *queue= (gearmand::plugins::queue::Libmemcached *)context;
197
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libmemcached done: %.*s", (uint32_t)unique_size, (char *)unique);
199
key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
200
GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
201
(int)function_name_size,
202
(const char *)function_name, (int)unique_size,
203
(const char *)unique);
205
/* For the moment we will assume it happened */
206
memcached_return rc= memcached_delete(&queue->memc, (const char *)key, key_length, 0);
207
if (rc != MEMCACHED_SUCCESS)
209
return GEARMAN_QUEUE_ERROR;
212
return GEARMAN_SUCCESS;
215
struct replay_context
218
gearman_server_st *server;
219
gearman_queue_add_fn *add_fn;
223
static memcached_return callback_loader(const memcached_st *ptr __attribute__((unused)),
224
memcached_result_st *result __attribute__((unused)),
227
struct replay_context *container= (struct replay_context *)context;
230
const char *function;
235
key= memcached_result_key_value(result);
236
if (strncmp(key, GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX, strlen(GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX)))
237
return MEMCACHED_SUCCESS;
239
function= key + strlen(GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX);
241
unique= index(function, '-');
244
return MEMCACHED_SUCCESS;
246
function_len = (size_t) (unique-function);
250
assert(strlen(unique));
252
assert(function_len);
254
data_size= (size_t) memcached_result_length(result);
255
/* need to make a copy here ... gearman_server_job_free will free it later */
256
data= (char *)malloc(data_size);
259
gearmand_perror("malloc");
260
return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
262
memcpy(data, memcached_result_value(result), data_size);
264
/* Currently not looking at failure cases */
265
(void)(*container->add_fn)(container->server, container->add_context,
266
unique, strlen(unique),
267
function, function_len,
269
static_cast<gearmand_job_priority_t>(memcached_result_flags(result)), 0);
272
return MEMCACHED_SUCCESS;
275
/* Grab the object and load it into the loader */
276
static memcached_return callback_for_key(const memcached_st *ptr __attribute__((unused)),
277
const char *key, size_t key_length,
280
struct replay_context *container= (struct replay_context *)context;
281
memcached_execute_function callbacks[1];
284
callbacks[0]= (memcached_execute_fn)&callback_loader;
286
passable[0]= (char *)key;
287
memcached_return_t rc= memcached_mget(&container->clone, passable, &key_length, 1);
290
/* Just void errors for the moment, since other treads might have picked up the object. */
291
(void)memcached_fetch_execute(&container->clone, callbacks, context, 1);
293
return MEMCACHED_SUCCESS;
297
If we have any failures for loading values back into replay we just ignore them.
299
static gearmand_error_t _libmemcached_replay(gearman_server_st *server, void *context,
300
gearman_queue_add_fn *add_fn,
303
gearmand::plugins::queue::Libmemcached *queue= (gearmand::plugins::queue::Libmemcached *)context;
304
struct replay_context container;
305
memcached_st *check_for_failure;
306
memcached_dump_func callbacks[1];
308
callbacks[0]= (memcached_dump_fn)&callback_for_key;
310
gearmand_info("libmemcached replay start");
312
memset(&container, 0, sizeof(struct replay_context));
313
check_for_failure= memcached_clone(&container.clone, &queue->memc);
314
container.server= server;
315
container.add_fn= add_fn;
316
container.add_context= add_context;
318
assert(check_for_failure);
321
(void)memcached_dump(&queue->memc, callbacks, (void *)&container, 1);
323
memcached_free(&container.clone);
325
return GEARMAN_SUCCESS;