~clint-fewbar/ubuntu/precise/gearmand/drop-unneeded-patches

« back to all changes in this revision

Viewing changes to libgearman-server/plugins/queue/libmemcached/queue.cc

  • Committer: Package Import Robot
  • Author(s): Stig Sandbeck Mathisen, Michael Fladischer, Stig Sandbeck Mathisen
  • Date: 2012-01-23 11:31:08 UTC
  • mfrom: (6.1.2 sid)
  • Revision ID: package-import@ubuntu.com-20120123113108-wl1yhiba13q9jusb
Tags: 0.27-1
[Michael Fladischer]
* Patch: fix spelling
* Patch: remove dependency on googleanalytics
* Patch: fix tests
* Use non-authenticating URL for Vcs-Git.
* Add "status" action to init script.

[Stig Sandbeck Mathisen]
* New upstream release (Closes: #621486) (LP: #682680)
* Remove build dependency on drizzle
  (until it reaches testing again)
* Build with support for tokyocabinet
* Remove backported ipv6 patch
* Patch: disable hostile build tests, they take hours...
* Patch: workaround duplicate address issue for tests
* Do not build API documentation, the sources are not shipped in
  upstream tarball
* Update debian/copyright

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Gearman server and library
 
2
 * Copyright (C) 2008 Brian Aker, Eric Day
 
3
 * All rights reserved.
 
4
 *
 
5
 * Use and distribution licensed under the BSD license.  See
 
6
 * the COPYING file in the parent directory for full text.
 
7
 */
 
8
 
 
9
/**
 
10
 * @file
 
11
 * @brief libmemcached Queue Storage Definitions
 
12
 */
 
13
 
 
14
#include <libgearman-server/common.h>
 
15
 
 
16
#include <libgearman-server/plugins/queue/base.h>
 
17
#include <libgearman-server/plugins/queue/libmemcached/queue.h>
 
18
#include <libmemcached/memcached.h>
 
19
 
 
20
#pragma GCC diagnostic ignored "-Wold-style-cast"
 
21
 
 
22
using namespace gearmand;
 
23
 
 
24
/**
 
25
 * @addtogroup gearmand::plugins::queue::Libmemcachedatic Static libmemcached Queue Storage Functions
 
26
 * @ingroup gearman_queue_libmemcached
 
27
 * @{
 
28
 */
 
29
 
 
30
/**
 
31
 * Default values.
 
32
 */
 
33
#define GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX "gear_"
 
34
 
 
35
namespace gearmand { namespace plugins { namespace queue { class Libmemcached;  }}}
 
36
 
 
37
static gearmand_error_t
 
38
_initialize(plugins::queue::Libmemcached *queue_obj,
 
39
            gearman_server_st *server);
 
40
 
 
41
namespace gearmand {
 
42
namespace plugins {
 
43
namespace queue {
 
44
 
 
45
class Libmemcached : public gearmand::plugins::Queue {
 
46
public:
 
47
  Libmemcached ();
 
48
  ~Libmemcached ();
 
49
 
 
50
  gearmand_error_t initialize();
 
51
 
 
52
  memcached_st memc;
 
53
  std::string server_list;
 
54
private:
 
55
 
 
56
};
 
57
 
 
58
Libmemcached::Libmemcached() :
 
59
  Queue("libmemcached")
 
60
{
 
61
  memcached_create(&memc);
 
62
 
 
63
  command_line_options().add_options()
 
64
    ("libmemcached-servers", boost::program_options::value(&server_list), "List of Memcached servers to use.");
 
65
}
 
66
 
 
67
Libmemcached::~Libmemcached()
 
68
{
 
69
  memcached_free(&memc);
 
70
}
 
71
 
 
72
gearmand_error_t Libmemcached::initialize()
 
73
{
 
74
  return _initialize(this, &Gearmand()->server);
 
75
}
 
76
 
 
77
void initialize_libmemcached()
 
78
{
 
79
  static Libmemcached local_instance;
 
80
}
 
81
 
 
82
} // namespace queue
 
83
} // namespace plugins
 
84
} // namespace gearmand
 
85
 
 
86
/* Queue callback functions. */
 
87
static gearmand_error_t _libmemcached_add(gearman_server_st *server,
 
88
                                          void *context,
 
89
                                          const char *unique, size_t unique_size,
 
90
                                          const char *function_name, size_t function_name_size,
 
91
                                          const void *data, size_t data_size,
 
92
                                          gearmand_job_priority_t priority,
 
93
                                          int64_t when);
 
94
 
 
95
static gearmand_error_t _libmemcached_flush(gearman_server_st *server,
 
96
                                            void *context);
 
97
 
 
98
static gearmand_error_t _libmemcached_done(gearman_server_st *server,
 
99
                                           void *context,
 
100
                                           const char *unique, size_t unique_size,
 
101
                                           const char *function_name, size_t function_name_size);
 
102
 
 
103
static gearmand_error_t _libmemcached_replay(gearman_server_st *server,
 
104
                                             void *context,
 
105
                                             gearman_queue_add_fn *add_fn,
 
106
                                             void *add_context);
 
107
 
 
108
/** @} */
 
109
 
 
110
/*
 
111
 * Public definitions
 
112
 */
 
113
 
 
114
gearmand_error_t _initialize(plugins::queue::Libmemcached *queue,
 
115
                             gearman_server_st *server)
 
116
{
 
117
 
 
118
  gearmand_info("Initializing libmemcached module");
 
119
 
 
120
  memcached_server_st *servers= memcached_servers_parse(queue->server_list.c_str());
 
121
  if (servers == NULL)
 
122
  {
 
123
    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "memcached_servers_parse");
 
124
 
 
125
    return GEARMAN_QUEUE_ERROR;
 
126
  }
 
127
 
 
128
  memcached_server_push(&queue->memc, servers);
 
129
  memcached_server_list_free(servers);
 
130
 
 
131
  gearman_server_set_queue(server, queue, _libmemcached_add, _libmemcached_flush, _libmemcached_done, _libmemcached_replay);
 
132
 
 
133
  return GEARMAN_SUCCESS;
 
134
}
 
135
 
 
136
/*
 
137
 * Static definitions
 
138
 */
 
139
 
 
140
static gearmand_error_t _libmemcached_add(gearman_server_st *server,
 
141
                                          void *context,
 
142
                                          const char *unique,
 
143
                                          size_t unique_size,
 
144
                                          const char *function_name,
 
145
                                          size_t function_name_size,
 
146
                                          const void *data, size_t data_size,
 
147
                                          gearmand_job_priority_t priority,
 
148
                                          int64_t when)
 
149
{
 
150
  gearmand::plugins::queue::Libmemcached *queue= (gearmand::plugins::queue::Libmemcached *)context;
 
151
  memcached_return rc;
 
152
  char key[MEMCACHED_MAX_KEY];
 
153
  size_t key_length;
 
154
 
 
155
  if (when) // No support for EPOCH jobs
 
156
    return GEARMAN_QUEUE_ERROR;
 
157
 
 
158
  (void)server;
 
159
 
 
160
  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libmemcached add: %.*s", (uint32_t)unique_size, (char *)unique);
 
161
 
 
162
  key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
 
163
                               GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
 
164
                               (int)function_name_size,
 
165
                               (const char *)function_name, (int)unique_size,
 
166
                               (const char *)unique);
 
167
 
 
168
  rc= memcached_set(&queue->memc, (const char *)key, key_length,
 
169
                    (const char *)data, data_size, 0, (uint32_t)priority);
 
170
 
 
171
  if (rc != MEMCACHED_SUCCESS)
 
172
    return GEARMAN_QUEUE_ERROR;
 
173
 
 
174
  return GEARMAN_SUCCESS;
 
175
}
 
176
 
 
177
static gearmand_error_t _libmemcached_flush(gearman_server_st *server,
 
178
                                            void *context)
 
179
{
 
180
  gearmand_debug("libmemcached flush");
 
181
  (void)server;
 
182
  (void)context;
 
183
 
 
184
  return GEARMAN_SUCCESS;
 
185
}
 
186
 
 
187
static gearmand_error_t _libmemcached_done(gearman_server_st *server,
 
188
                                           void *context,
 
189
                                           const char *unique, size_t unique_size,
 
190
                                           const char *function_name, size_t function_name_size)
 
191
{
 
192
  size_t key_length;
 
193
  char key[MEMCACHED_MAX_KEY];
 
194
  gearmand::plugins::queue::Libmemcached *queue= (gearmand::plugins::queue::Libmemcached *)context;
 
195
  (void)server;
 
196
 
 
197
  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libmemcached done: %.*s", (uint32_t)unique_size, (char *)unique);
 
198
 
 
199
  key_length= (size_t)snprintf(key, MEMCACHED_MAX_KEY, "%s%.*s-%.*s",
 
200
                               GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX,
 
201
                               (int)function_name_size,
 
202
                               (const char *)function_name, (int)unique_size,
 
203
                               (const char *)unique);
 
204
 
 
205
  /* For the moment we will assume it happened */
 
206
  memcached_return rc= memcached_delete(&queue->memc, (const char *)key, key_length, 0);
 
207
  if (rc != MEMCACHED_SUCCESS)
 
208
  {
 
209
    return GEARMAN_QUEUE_ERROR;
 
210
  }
 
211
 
 
212
  return GEARMAN_SUCCESS;
 
213
}
 
214
 
 
215
struct replay_context
 
216
{
 
217
  memcached_st clone;
 
218
  gearman_server_st *server;
 
219
  gearman_queue_add_fn *add_fn;
 
220
  void *add_context;
 
221
};
 
222
 
 
223
static memcached_return callback_loader(const memcached_st *ptr __attribute__((unused)),
 
224
                                        memcached_result_st *result __attribute__((unused)),
 
225
                                        void *context)
 
226
{
 
227
  struct replay_context *container= (struct replay_context *)context;
 
228
  const char *key;
 
229
  const char *unique;
 
230
  const char *function;
 
231
  size_t function_len;
 
232
  char *data;
 
233
  size_t data_size;
 
234
 
 
235
  key= memcached_result_key_value(result);
 
236
  if (strncmp(key, GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX, strlen(GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX)))
 
237
    return MEMCACHED_SUCCESS;
 
238
 
 
239
  function= key + strlen(GEARMAN_QUEUE_LIBMEMCACHED_DEFAULT_PREFIX);
 
240
 
 
241
  unique= index(function, '-');
 
242
 
 
243
  if (! unique)
 
244
    return MEMCACHED_SUCCESS;
 
245
 
 
246
  function_len = (size_t) (unique-function);
 
247
  unique++;
 
248
 
 
249
  assert(unique);
 
250
  assert(strlen(unique));
 
251
  assert(function);
 
252
  assert(function_len);
 
253
 
 
254
  data_size= (size_t) memcached_result_length(result);
 
255
  /* need to make a copy here ... gearman_server_job_free will free it later */
 
256
  data= (char *)malloc(data_size);
 
257
  if (data == NULL)
 
258
  {
 
259
    gearmand_perror("malloc");
 
260
    return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
 
261
  } 
 
262
  memcpy(data, memcached_result_value(result), data_size);
 
263
 
 
264
  /* Currently not looking at failure cases */
 
265
  (void)(*container->add_fn)(container->server, container->add_context,
 
266
                             unique, strlen(unique),
 
267
                             function, function_len,
 
268
                             data, data_size,
 
269
                             static_cast<gearmand_job_priority_t>(memcached_result_flags(result)), 0);
 
270
 
 
271
 
 
272
  return MEMCACHED_SUCCESS;
 
273
}
 
274
 
 
275
/* Grab the object and load it into the loader */
 
276
static memcached_return callback_for_key(const memcached_st *ptr __attribute__((unused)),
 
277
                                         const char *key, size_t key_length,
 
278
                                         void *context)
 
279
{
 
280
  struct replay_context *container= (struct replay_context *)context;
 
281
  memcached_execute_function callbacks[1];
 
282
  char *passable[1];
 
283
 
 
284
  callbacks[0]= (memcached_execute_fn)&callback_loader;
 
285
 
 
286
  passable[0]= (char *)key;
 
287
  memcached_return_t rc= memcached_mget(&container->clone, passable, &key_length, 1);
 
288
  (void)rc;
 
289
 
 
290
  /* Just void errors for the moment, since other treads might have picked up the object. */
 
291
  (void)memcached_fetch_execute(&container->clone, callbacks, context, 1);
 
292
 
 
293
  return MEMCACHED_SUCCESS;
 
294
}
 
295
 
 
296
/*
 
297
  If we have any failures for loading values back into replay we just ignore them.
 
298
*/
 
299
static gearmand_error_t _libmemcached_replay(gearman_server_st *server, void *context,
 
300
                                             gearman_queue_add_fn *add_fn,
 
301
                                             void *add_context)
 
302
{
 
303
  gearmand::plugins::queue::Libmemcached *queue= (gearmand::plugins::queue::Libmemcached *)context;
 
304
  struct replay_context container;
 
305
  memcached_st *check_for_failure;
 
306
  memcached_dump_func callbacks[1];
 
307
 
 
308
  callbacks[0]= (memcached_dump_fn)&callback_for_key;
 
309
 
 
310
  gearmand_info("libmemcached replay start");
 
311
 
 
312
  memset(&container, 0, sizeof(struct replay_context));
 
313
  check_for_failure= memcached_clone(&container.clone, &queue->memc);
 
314
  container.server= server;
 
315
  container.add_fn= add_fn;
 
316
  container.add_context= add_context;
 
317
 
 
318
  assert(check_for_failure);
 
319
 
 
320
 
 
321
  (void)memcached_dump(&queue->memc, callbacks, (void *)&container, 1);
 
322
 
 
323
  memcached_free(&container.clone);
 
324
 
 
325
  return GEARMAN_SUCCESS;
 
326
}