2
* vim: tabstop=4 shiftwidth=4 softtabstop=4
4
* Copyright (c) 2011 Openstack, LLC.
7
* Licensed under the Apache License, Version 2.0 (the "License"); you may
8
* not use this file except in compliance with the License. You may obtain
9
* a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
* License for the specific language governing permissions and limitations
23
#include <sys/types.h>
29
#include "libagent_int.h"
32
typedef struct agent_plugin_info agent_plugin_info_t;
34
struct agent_plugin_info
39
PyObject *get_request;
40
PyObject *put_response;
41
PyObject *parse_request;
44
pthread_mutex_t _plugins_lock;
45
static agent_plugin_info_t *_plugins = NULL;
46
static int _num_plugins = 0;
48
static int _plugins_running = 0;
49
static int _plugins_die = 0;
52
static void _plugin_info_init(agent_plugin_info_t *pi,
53
PyObject *exchange, PyObject *parser)
55
memset(pi, 0, sizeof(agent_plugin_info_t));
60
pi->exchange = exchange;
64
static void _plugin_info_free(agent_plugin_info_t *pi)
66
Py_XDECREF(pi->exchange);
67
Py_XDECREF(pi->parser);
68
Py_XDECREF(pi->get_request);
69
Py_XDECREF(pi->put_response);
70
Py_XDECREF(pi->parse_request);
74
static void *_plugin_exchange_thread(void *arg)
76
agent_plugin_info_t *pi = arg;
77
PyGILState_STATE gstate;
81
pthread_mutex_lock(&_plugins_lock);
83
if (_plugins_running == 0)
85
pthread_mutex_unlock(&_plugins_lock);
89
pthread_mutex_unlock(&_plugins_lock);
92
gstate = PyGILState_Ensure();
94
PyGILState_Release(gstate);
98
/* check for shut down */
100
gstate = PyGILState_Ensure();
107
req = PyObject_CallFunctionObjArgs(pi->get_request, NULL);
109
if ((req == NULL) || (req == Py_None))
111
if (PyErr_Occurred())
113
agent_log_python_error("Error receiving request");
117
PyGILState_Release(gstate);
122
resp = PyObject_CallFunctionObjArgs(pi->parse_request, req, NULL);
125
agent_log_python_error("Error parsing request");
128
PyGILState_Release(gstate);
132
PyObject_CallFunctionObjArgs(pi->put_response, req, resp, NULL);
133
if (PyErr_Occurred())
135
agent_log_python_error("Error putting response");
141
PyGILState_Release(gstate);
144
PyGILState_Release(gstate);
149
static int _exchange_plugin_check(agent_plugin_info_t *pi)
151
PyObject *cls = pi->exchange;
153
pi->get_request = PyObject_GetAttrString(cls, "get_request");
154
if (pi->get_request == NULL)
156
PyErr_Format(PyExc_AttributeError, "%s", "An 'exchange' plugin needs to define a 'get_request' method");
160
pi->put_response = PyObject_GetAttrString(cls, "put_response");
161
if (pi->put_response == NULL)
163
PyErr_Format(PyExc_AttributeError, "%s", "An 'exchange' plugin needs to define a 'put_response' method");
165
Py_DECREF(pi->get_request);
166
pi->get_request= NULL;
174
static int _parser_plugin_check(agent_plugin_info_t *pi)
176
PyObject *cls = pi->parser;
178
pi->parse_request = PyObject_GetAttrString(cls, "parse_request");
179
if (pi->parse_request == NULL)
181
PyErr_Format(PyExc_AttributeError, "A 'parser' plugin needs to define a 'parse_request' method");
188
int LIBAGENT_PUBLIC_API agent_plugin_register(PyObject *exchange, PyObject *parser)
190
agent_plugin_info_t pi;
192
_plugin_info_init(&pi, exchange, parser);
194
if (_exchange_plugin_check(&pi) < 0)
196
_plugin_info_free(&pi);
200
if (_parser_plugin_check(&pi) < 0)
202
_plugin_info_free(&pi);
206
pthread_mutex_lock(&_plugins_lock);
208
void *vptr = realloc(_plugins,
209
sizeof(agent_plugin_info_t) * (_num_plugins + 1));
213
pthread_mutex_unlock(&_plugins_lock);
214
_plugin_info_free(&pi);
216
PyErr_Format(PyExc_SystemError, "Out of memory");
223
memcpy(&(_plugins[_num_plugins]), &pi, sizeof(pi));
226
pthread_mutex_unlock(&_plugins_lock);
231
int LIBAGENT_PUBLIC_API agent_plugin_init(void)
233
pthread_mutex_init(&_plugins_lock, NULL);
238
void LIBAGENT_PUBLIC_API agent_plugin_deinit(void)
242
for(i = 0;i < _num_plugins;i++)
244
_plugin_info_free(&(_plugins[i]));
251
pthread_mutex_destroy(&_plugins_lock);
254
int LIBAGENT_PUBLIC_API agent_plugin_run_threads(void)
260
pthread_mutex_lock(&_plugins_lock);
262
for(i = 0;i < _num_plugins;i++)
264
err = pthread_create(&(_plugins[i].thr_id), NULL,
265
_plugin_exchange_thread, &(_plugins[i]));
270
agent_error("Error creating thread: %d", err);
273
* Leaving 'running' at 0 will cause threads to die
277
pthread_mutex_unlock(&_plugins_lock);
279
for(ii = i;ii >= 0;ii--)
281
pthread_join(_plugins[i].thr_id, NULL);
284
return err > 0 ? -err : err;
290
if (num_started == 0)
292
pthread_mutex_unlock(&_plugins_lock);
293
agent_debug("no exchange plugins found to run");
297
_plugins_running = 1;
299
pthread_mutex_unlock(&_plugins_lock);
304
int LIBAGENT_PUBLIC_API agent_plugin_stop_threads(void)
308
pthread_mutex_lock(&_plugins_lock);
312
for(i = 0;i < _num_plugins;i++)
314
pthread_join(_plugins[i].thr_id, NULL);
317
pthread_mutex_unlock(&_plugins_lock);