1
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
* Gearmand client and server library.
5
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
6
* Copyright (C) 2008 Brian Aker, Eric Day
9
* Redistribution and use in source and binary forms, with or without
10
* modification, are permitted provided that the following conditions are
13
* * Redistributions of source code must retain the above copyright
14
* notice, this list of conditions and the following disclaimer.
16
* * Redistributions in binary form must reproduce the above
17
* copyright notice, this list of conditions and the following disclaimer
18
* in the documentation and/or other materials provided with the
21
* * The names of its contributors may not be used to endorse or
22
* promote products derived from this software without specific prior
25
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
50
#include <libgearman/gearman.h>
51
#include <boost/program_options.hpp>
53
struct reverse_worker_options_t
59
reverse_worker_options_t():
66
#ifndef __INTEL_COMPILER
67
#pragma GCC diagnostic ignored "-Wold-style-cast"
70
static void *reverse(gearman_job_st *job, void *context,
71
size_t *result_size, gearman_return_t *ret_ptr);
73
int main(int args, char *argv[])
76
reverse_worker_options_t options;
81
boost::program_options::options_description desc("Options");
83
("help", "Options related to the program.")
84
("host,h", boost::program_options::value<std::string>(&host)->default_value("localhost"),"Connect to the host")
85
("port,p", boost::program_options::value<in_port_t>(&port)->default_value(GEARMAN_DEFAULT_TCP_PORT), "Port number use for connection")
86
("count,c", boost::program_options::value<uint32_t>(&count)->default_value(0), "Number of jobs to run before exiting")
87
("timeout,u", boost::program_options::value<int>(&timeout)->default_value(-1), "Timeout in milliseconds")
88
("chunk,d", boost::program_options::bool_switch(&options.chunk)->default_value(false), "Send result back in data chunks")
89
("status,s", boost::program_options::bool_switch(&options.status)->default_value(false), "Send status updates and sleep while running job")
90
("unique,u", boost::program_options::bool_switch(&options.unique)->default_value(false), "When grabbing jobs, grab the uniqie id")
93
boost::program_options::variables_map vm;
96
boost::program_options::store(boost::program_options::parse_command_line(args, argv, desc), vm);
97
boost::program_options::notify(vm);
99
catch(std::exception &e)
101
std::cout << e.what() << std::endl;
105
if (vm.count("help"))
107
std::cout << desc << std::endl;
111
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
113
std::cerr << "signal:" << strerror(errno) << std::endl;
117
gearman_worker_st worker;
118
if (gearman_worker_create(&worker) == NULL)
120
std::cerr << "Memory allocation failure on worker creation." << std::endl;
125
gearman_worker_add_options(&worker, GEARMAN_WORKER_GRAB_UNIQ);
128
gearman_worker_set_timeout(&worker, timeout);
130
gearman_return_t ret;
131
ret= gearman_worker_add_server(&worker, host.c_str(), port);
132
if (ret != GEARMAN_SUCCESS)
134
std::cerr << gearman_worker_error(&worker) << std::endl;
138
ret= gearman_worker_add_function(&worker, "reverse", 0, reverse, &options);
139
if (ret != GEARMAN_SUCCESS)
141
std::cerr << gearman_worker_error(&worker) << std::endl;
147
ret= gearman_worker_work(&worker);
148
if (ret != GEARMAN_SUCCESS)
150
std::cerr << gearman_worker_error(&worker) << std::endl;
162
gearman_worker_free(&worker);
167
static void *reverse(gearman_job_st *job, void *context,
168
size_t *result_size, gearman_return_t *ret_ptr)
170
reverse_worker_options_t options= *((reverse_worker_options_t *)context);
173
const char *workload;
174
workload= (const char *)gearman_job_workload(job);
175
*result_size= gearman_job_workload_size(job);
178
result= (char *)malloc(*result_size);
182
*ret_ptr= GEARMAN_WORK_FAIL;
188
for (y= 0, x= *result_size; x; x--, y++)
190
result[y]= ((uint8_t *)workload)[x - 1];
194
*ret_ptr= gearman_job_send_data(job, &(result[y]), 1);
195
if (*ret_ptr != GEARMAN_SUCCESS)
204
*ret_ptr= gearman_job_send_status(job, (uint32_t)y,
205
(uint32_t)*result_size);
206
if (*ret_ptr != GEARMAN_SUCCESS)
216
std::cout << "Job=" << gearman_job_handle(job);
220
std::cout << "Unique=" << gearman_job_unique(job);
224
std::cout << " Workload=";
225
std::cout.write(workload, *result_size);
227
std::cout << " Result=";
228
std::cout.write(result, *result_size);
230
std::cout << std::endl;
232
*ret_ptr= GEARMAN_SUCCESS;