1
/* Gearman server and library
2
* Copyright (C) 2008 Brian Aker, Eric Day
5
* Use and distribution licensed under the BSD license. See
6
* the COPYING file in the parent directory for full text.
11
* @brief Server connection definitions
14
#include <libgearman-server/common.h>
17
#include <libgearman/command.h>
19
#include <libgearman-server/fifo.h>
23
#ifndef __INTEL_COMPILER
24
#pragma GCC diagnostic ignored "-Wold-style-cast"
31
gearman_server_packet_st *
32
gearman_server_packet_create(gearman_server_thread_st *thread,
35
gearman_server_packet_st *server_packet= NULL;
37
if (from_thread && Server->flags.threaded)
39
if (thread->free_packet_count > 0)
41
server_packet= thread->free_packet_list;
42
thread->free_packet_list= server_packet->next;
43
thread->free_packet_count--;
48
if (Server->free_packet_count > 0)
50
server_packet= Server->free_packet_list;
51
Server->free_packet_list= server_packet->next;
52
Server->free_packet_count--;
56
if (server_packet == NULL)
58
server_packet= (gearman_server_packet_st *)malloc(sizeof(gearman_server_packet_st));
59
if (server_packet == NULL)
61
gearmand_perror("malloc");
66
server_packet->next= NULL;
71
void gearman_server_packet_free(gearman_server_packet_st *packet,
72
gearman_server_thread_st *thread,
75
if (from_thread && Server->flags.threaded)
77
if (thread->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
79
packet->next= thread->free_packet_list;
80
thread->free_packet_list= packet;
81
thread->free_packet_count++;
85
gearmand_crazy("free");
91
if (Server->free_packet_count < GEARMAN_MAX_FREE_SERVER_PACKET)
93
packet->next= Server->free_packet_list;
94
Server->free_packet_list= packet;
95
Server->free_packet_count++;
99
gearmand_crazy("free");
105
gearmand_error_t gearman_server_io_packet_add(gearman_server_con_st *con,
107
enum gearman_magic_t magic,
108
gearman_command_t command,
109
const void *arg, ...)
111
gearman_server_packet_st *server_packet;
114
server_packet= gearman_server_packet_create(con->thread, false);
115
if (not server_packet)
116
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
118
gearmand_packet_init(&(server_packet->packet), magic, command);
124
size_t arg_size= va_arg(ap, size_t);
126
gearmand_error_t ret= gearmand_packet_create(&(server_packet->packet), arg, arg_size);
127
if (gearmand_failed(ret))
130
gearmand_packet_free(&(server_packet->packet));
131
gearman_server_packet_free(server_packet, con->thread, false);
135
arg= va_arg(ap, void *);
140
gearmand_error_t ret= gearmand_packet_pack_header(&(server_packet->packet));
141
if (gearmand_failed(ret))
143
gearmand_packet_free(&(server_packet->packet));
144
gearman_server_packet_free(server_packet, con->thread, false);
150
server_packet->packet.options.free_data= true;
153
(void) pthread_mutex_lock(&con->thread->lock);
154
gearmand_server_con_fifo_add(con, server_packet);
155
(void) pthread_mutex_unlock(&con->thread->lock);
157
gearman_server_con_io_add(con);
159
return GEARMAN_SUCCESS;
162
void gearman_server_io_packet_remove(gearman_server_con_st *con)
164
gearman_server_packet_st *server_packet= con->io_packet_list;
166
gearmand_packet_free(&(server_packet->packet));
168
(void) pthread_mutex_lock(&con->thread->lock);
169
gearmand_server_con_fifo_free(con, server_packet);
170
(void) pthread_mutex_unlock(&con->thread->lock);
172
gearman_server_packet_free(server_packet, con->thread, true);
175
void gearman_server_proc_packet_add(gearman_server_con_st *con,
176
gearman_server_packet_st *packet)
178
(void) pthread_mutex_lock(&con->thread->lock);
179
gearmand_server_con_fifo_proc_add(con, packet);
180
(void) pthread_mutex_unlock(&con->thread->lock);
182
gearman_server_con_proc_add(con);
185
gearman_server_packet_st *
186
gearman_server_proc_packet_remove(gearman_server_con_st *con)
188
gearman_server_packet_st *server_packet= con->proc_packet_list;
190
if (server_packet == NULL)
193
(void) pthread_mutex_lock(&con->thread->lock);
194
gearmand_server_con_fifo_proc_free(con, server_packet);
195
(void) pthread_mutex_unlock(&con->thread->lock);
197
return server_packet;
200
const char *gearmand_strcommand(gearmand_packet_st *packet)
203
return gearman_command_info(packet->command)->name;
206
inline static gearmand_error_t packet_create_arg(gearmand_packet_st *packet,
207
const void *arg, size_t arg_size)
211
if (packet->argc == gearman_command_info(packet->command)->argc &&
212
(not (gearman_command_info(packet->command)->data) ||
215
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "too many arguments for command(%s)", gearman_command_info(packet->command)->name);
216
return GEARMAN_TOO_MANY_ARGS;
219
if (packet->argc == gearman_command_info(packet->command)->argc)
221
packet->data= static_cast<const char *>(arg);
222
packet->data_size= arg_size;
223
return GEARMAN_SUCCESS;
226
if (packet->args_size == 0 and packet->magic != GEARMAN_MAGIC_TEXT)
227
packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
229
if ((packet->args_size + arg_size) < GEARMAN_ARGS_BUFFER_SIZE)
231
packet->args= packet->args_buffer;
235
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "resizing packet buffer");
236
if (packet->args == packet->args_buffer)
238
packet->args= (char *)malloc(packet->args_size + arg_size);
239
memcpy(packet->args, packet->args_buffer, packet->args_size);
243
char *new_args= (char *)realloc(packet->args, packet->args_size + arg_size);
246
gearmand_perror("realloc");
247
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
249
packet->args= new_args;
253
memcpy(packet->args + packet->args_size, arg, arg_size);
254
packet->args_size+= arg_size;
255
packet->arg_size[packet->argc]= arg_size;
258
if (packet->magic == GEARMAN_MAGIC_TEXT)
264
offset= GEARMAN_PACKET_HEADER_SIZE;
267
for (uint8_t x= 0; x < packet->argc; x++)
269
packet->arg[x]= packet->args + offset;
270
offset+= packet->arg_size[x];
273
return GEARMAN_SUCCESS;
283
void gearmand_packet_init(gearmand_packet_st *packet, enum gearman_magic_t magic, gearman_command_t command)
287
packet->options.complete= false;
288
packet->options.free_data= false;
290
packet->magic= magic;
291
packet->command= command;
293
packet->args_size= 0;
294
packet->data_size= 0;
300
gearmand_error_t gearmand_packet_create(gearmand_packet_st *packet,
301
const void *arg, size_t arg_size)
303
return packet_create_arg(packet, arg, arg_size);
306
void gearmand_packet_free(gearmand_packet_st *packet)
308
if (packet->args != packet->args_buffer && packet->args != NULL)
310
gearmand_crazy("free");
315
if (packet->options.free_data && packet->data != NULL)
317
gearmand_crazy("free");
318
free((void *)packet->data); //@todo fix the need for the casting.
323
gearmand_error_t gearmand_packet_pack_header(gearmand_packet_st *packet)
328
if (packet->magic == GEARMAN_MAGIC_TEXT)
330
packet->options.complete= true;
331
return GEARMAN_SUCCESS;
334
if (packet->args_size == 0)
336
packet->args= packet->args_buffer;
337
packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
340
switch (packet->magic)
342
case GEARMAN_MAGIC_TEXT:
345
case GEARMAN_MAGIC_REQUEST:
346
memcpy(packet->args, "\0REQ", 4);
349
case GEARMAN_MAGIC_RESPONSE:
350
memcpy(packet->args, "\0RES", 4);
354
gearmand_error("invalid magic value");
355
return GEARMAN_INVALID_MAGIC;
358
if (packet->command == GEARMAN_COMMAND_TEXT ||
359
packet->command >= GEARMAN_COMMAND_MAX)
361
gearmand_error("invalid command value");
362
return GEARMAN_INVALID_COMMAND;
365
tmp= packet->command;
367
memcpy(packet->args + 4, &tmp, 4);
369
length_64= packet->args_size + packet->data_size - GEARMAN_PACKET_HEADER_SIZE;
371
// Check for overflow on 32bit(portable?).
372
if (length_64 >= UINT32_MAX || length_64 < packet->data_size)
374
gearmand_error("data size too too long");
375
return GEARMAN_ARGUMENT_TOO_LARGE;
378
tmp= (uint32_t)length_64;
380
memcpy(packet->args + 8, &tmp, 4);
382
packet->options.complete= true;
384
return GEARMAN_SUCCESS;
387
static gearmand_error_t gearmand_packet_unpack_header(gearmand_packet_st *packet)
391
if (not memcmp(packet->args, "\0REQ", 4))
393
packet->magic= GEARMAN_MAGIC_REQUEST;
395
else if (not memcmp(packet->args, "\0RES", 4))
397
packet->magic= GEARMAN_MAGIC_RESPONSE;
401
gearmand_error("invalid magic value");
402
return GEARMAN_INVALID_MAGIC;
405
memcpy(&tmp, packet->args + 4, 4);
406
packet->command= static_cast<gearman_command_t>(ntohl(tmp));
408
if (packet->command == GEARMAN_COMMAND_TEXT ||
409
packet->command >= GEARMAN_COMMAND_MAX)
411
gearmand_error("invalid command value");
412
return GEARMAN_INVALID_COMMAND;
415
memcpy(&tmp, packet->args + 8, 4);
416
packet->data_size= ntohl(tmp);
418
return GEARMAN_SUCCESS;
421
size_t gearmand_packet_pack(const gearmand_packet_st *packet,
422
gearman_server_con_st *con __attribute__ ((unused)),
423
void *data, size_t data_size,
424
gearmand_error_t *ret_ptr)
426
if (packet->args_size == 0)
428
*ret_ptr= GEARMAN_SUCCESS;
432
if (packet->args_size > data_size)
434
*ret_ptr= GEARMAN_FLUSH_DATA;
438
memcpy(data, packet->args, packet->args_size);
439
*ret_ptr= GEARMAN_SUCCESS;
440
return packet->args_size;
443
size_t gearmand_packet_unpack(gearmand_packet_st *packet,
444
gearman_server_con_st *con __attribute__ ((unused)),
445
const void *data, size_t data_size,
446
gearmand_error_t *ret_ptr)
451
if (packet->args_size == 0)
453
if (data_size > 0 && ((uint8_t *)data)[0] != 0)
455
/* Try to parse a text-based command. */
456
ptr= (uint8_t *)memchr(data, '\n', data_size);
459
*ret_ptr= GEARMAN_IO_WAIT;
463
packet->magic= GEARMAN_MAGIC_TEXT;
464
packet->command= GEARMAN_COMMAND_TEXT;
466
used_size= (size_t)(ptr - ((uint8_t *)data)) + 1;
468
if (used_size > 1 && *(ptr - 1) == '\r')
472
for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
474
ptr= (uint8_t *)memchr(data, ' ', arg_size);
482
arg_size-= (size_t)(ptr - ((uint8_t *)data));
485
*ret_ptr= packet_create_arg(packet, data, ptr == NULL ? arg_size :
486
(size_t)(ptr - ((uint8_t *)data)));
487
if (*ret_ptr != GEARMAN_SUCCESS)
495
else if (data_size < GEARMAN_PACKET_HEADER_SIZE)
497
*ret_ptr= GEARMAN_IO_WAIT;
501
packet->args= packet->args_buffer;
502
packet->args_size= GEARMAN_PACKET_HEADER_SIZE;
503
memcpy(packet->args, data, GEARMAN_PACKET_HEADER_SIZE);
505
*ret_ptr= gearmand_packet_unpack_header(packet);
506
if (gearmand_failed(*ret_ptr))
511
used_size= GEARMAN_PACKET_HEADER_SIZE;
518
while (packet->argc != gearman_command_info(packet->command)->argc)
520
if (packet->argc != (gearman_command_info(packet->command)->argc - 1) ||
521
gearman_command_info(packet->command)->data)
523
ptr= (uint8_t *)memchr(((uint8_t *)data) + used_size, 0, data_size - used_size);
526
gearmand_log_crazy(GEARMAN_DEFAULT_LOG_PARAM, "Possible protocol error for %s, recieved only %u args", gearman_command_info(packet->command)->name, packet->argc);
527
*ret_ptr= GEARMAN_IO_WAIT;
531
size_t arg_size= (size_t)(ptr - (((uint8_t *)data) + used_size)) + 1;
532
*ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size, arg_size);
534
if (gearmand_failed(*ret_ptr))
537
packet->data_size-= arg_size;
538
used_size+= arg_size;
542
if ((data_size - used_size) < packet->data_size)
544
*ret_ptr= GEARMAN_IO_WAIT;
548
*ret_ptr= packet_create_arg(packet, ((uint8_t *)data) + used_size, packet->data_size);
549
if (gearmand_failed(*ret_ptr))
554
used_size+= packet->data_size;
555
packet->data_size= 0;
559
*ret_ptr= GEARMAN_SUCCESS;