~cbehrens/openstack-guest-agents/lp764221

« back to all changes in this revision

Viewing changes to src/unix/lib/plugin.c

  • Committer: Chris Behrens
  • Date: 2011-04-13 20:03:06 UTC
  • mfrom: (2.1.61 unix-agent-dev)
  • Revision ID: cbehrens@codestud.com-20110413200306-tbd1fdrjdy0yjy2c
MergedĀ lp:~rackspace-ozone/openstack-guest-agents/unix-agent-dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * vim: tabstop=4 shiftwidth=4 softtabstop=4
 
3
 *
 
4
 * Copyright (c) 2011 Openstack, LLC.
 
5
 * All Rights Reserved.
 
6
 *
 
7
 *    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
8
 *    not use this file except in compliance with the License. You may obtain
 
9
 *    a copy of the License at
 
10
 *
 
11
 *         http://www.apache.org/licenses/LICENSE-2.0
 
12
 *
 
13
 *    Unless required by applicable law or agreed to in writing, software
 
14
 *    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
15
 *    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
16
 *    License for the specific language governing permissions and limitations
 
17
 *    under the License.
 
18
 */
 
19
 
 
20
#include <stdio.h>
 
21
#include <stdlib.h>
 
22
#include <unistd.h>
 
23
#include <sys/types.h>
 
24
#include <signal.h>
 
25
#include <string.h>
 
26
#include <pthread.h>
 
27
#include <assert.h>
 
28
#include <errno.h>
 
29
#include "libagent_int.h"
 
30
 
 
31
 
 
32
typedef struct agent_plugin_info agent_plugin_info_t;
 
33
 
 
34
struct agent_plugin_info
 
35
{
 
36
    pthread_t thr_id;
 
37
    PyObject *exchange;
 
38
    PyObject *parser;
 
39
    PyObject *get_request;
 
40
    PyObject *put_response;
 
41
    PyObject *parse_request;
 
42
};
 
43
 
 
44
pthread_mutex_t _plugins_lock;
 
45
static agent_plugin_info_t *_plugins = NULL;
 
46
static int _num_plugins = 0;
 
47
 
 
48
static int _plugins_running = 0;
 
49
static int _plugins_die = 0;
 
50
 
 
51
 
 
52
static void _plugin_info_init(agent_plugin_info_t *pi,
 
53
        PyObject *exchange, PyObject *parser)
 
54
{
 
55
    memset(pi, 0, sizeof(agent_plugin_info_t));
 
56
 
 
57
    Py_INCREF(exchange);
 
58
    Py_INCREF(parser);
 
59
 
 
60
    pi->exchange = exchange;
 
61
    pi->parser = parser;
 
62
}
 
63
 
 
64
static void _plugin_info_free(agent_plugin_info_t *pi)
 
65
{
 
66
    Py_XDECREF(pi->exchange);
 
67
    Py_XDECREF(pi->parser);
 
68
    Py_XDECREF(pi->get_request);
 
69
    Py_XDECREF(pi->put_response);
 
70
    Py_XDECREF(pi->parse_request);
 
71
}
 
72
 
 
73
 
 
74
static void *_plugin_exchange_thread(void *arg)
 
75
{
 
76
    agent_plugin_info_t *pi = arg;
 
77
    PyGILState_STATE gstate;
 
78
    PyObject *req;
 
79
    PyObject *resp;
 
80
 
 
81
    pthread_mutex_lock(&_plugins_lock);
 
82
 
 
83
    if (_plugins_running == 0)
 
84
    {
 
85
        pthread_mutex_unlock(&_plugins_lock);
 
86
        return NULL;
 
87
    }
 
88
 
 
89
    pthread_mutex_unlock(&_plugins_lock);
 
90
 
 
91
    /* Acquire GIL */
 
92
    gstate = PyGILState_Ensure();
 
93
 
 
94
    PyGILState_Release(gstate);
 
95
 
 
96
    for(;;)
 
97
    {
 
98
        /* check for shut down */
 
99
 
 
100
        gstate = PyGILState_Ensure();
 
101
 
 
102
        if (_plugins_die)
 
103
        {
 
104
            break;
 
105
        }
 
106
 
 
107
        req = PyObject_CallFunctionObjArgs(pi->get_request, NULL);
 
108
 
 
109
        if ((req == NULL) || (req == Py_None))
 
110
        {
 
111
            if (PyErr_Occurred())
 
112
            {
 
113
                agent_log_python_error("Error receiving request");
 
114
            }
 
115
 
 
116
            Py_XDECREF(req);
 
117
            PyGILState_Release(gstate);
 
118
            sleep(1);
 
119
            continue;
 
120
        }
 
121
 
 
122
        resp = PyObject_CallFunctionObjArgs(pi->parse_request, req, NULL);
 
123
        if (resp == NULL)
 
124
        {
 
125
            agent_log_python_error("Error parsing request");
 
126
 
 
127
            Py_DECREF(req);
 
128
            PyGILState_Release(gstate);
 
129
            continue;
 
130
        }
 
131
 
 
132
        PyObject_CallFunctionObjArgs(pi->put_response, req, resp, NULL);
 
133
        if (PyErr_Occurred())
 
134
        {
 
135
            agent_log_python_error("Error putting response");
 
136
        }
 
137
 
 
138
        Py_DECREF(req);
 
139
        Py_DECREF(resp);
 
140
 
 
141
        PyGILState_Release(gstate);
 
142
    }
 
143
 
 
144
    PyGILState_Release(gstate);
 
145
 
 
146
    return NULL;
 
147
}
 
148
 
 
149
static int _exchange_plugin_check(agent_plugin_info_t *pi)
 
150
{
 
151
    PyObject *cls = pi->exchange;
 
152
 
 
153
    pi->get_request = PyObject_GetAttrString(cls, "get_request");
 
154
    if (pi->get_request == NULL)
 
155
    {
 
156
        PyErr_Format(PyExc_AttributeError, "%s", "An 'exchange' plugin needs to define a 'get_request' method");
 
157
        return -1;
 
158
    }
 
159
 
 
160
    pi->put_response = PyObject_GetAttrString(cls, "put_response");
 
161
    if (pi->put_response == NULL)
 
162
    {
 
163
        PyErr_Format(PyExc_AttributeError, "%s", "An 'exchange' plugin needs to define a 'put_response' method");
 
164
 
 
165
        Py_DECREF(pi->get_request);
 
166
        pi->get_request= NULL;
 
167
 
 
168
        return -1;
 
169
    }
 
170
 
 
171
    return 0;
 
172
}
 
173
 
 
174
static int _parser_plugin_check(agent_plugin_info_t *pi)
 
175
{
 
176
    PyObject *cls = pi->parser;
 
177
 
 
178
    pi->parse_request = PyObject_GetAttrString(cls, "parse_request");
 
179
    if (pi->parse_request == NULL)
 
180
    {
 
181
        PyErr_Format(PyExc_AttributeError, "A 'parser' plugin needs to define a 'parse_request' method");
 
182
        return -1;
 
183
    }
 
184
 
 
185
    return 0;
 
186
}
 
187
 
 
188
int LIBAGENT_PUBLIC_API agent_plugin_register(PyObject *exchange, PyObject *parser)
 
189
{
 
190
    agent_plugin_info_t pi;
 
191
 
 
192
    _plugin_info_init(&pi, exchange, parser);
 
193
 
 
194
    if (_exchange_plugin_check(&pi) < 0)
 
195
    {
 
196
        _plugin_info_free(&pi);
 
197
        return -1;
 
198
    }
 
199
 
 
200
    if (_parser_plugin_check(&pi) < 0)
 
201
    {
 
202
        _plugin_info_free(&pi);
 
203
        return -1;
 
204
    }
 
205
 
 
206
    pthread_mutex_lock(&_plugins_lock);
 
207
 
 
208
    void *vptr = realloc(_plugins,
 
209
            sizeof(agent_plugin_info_t) * (_num_plugins + 1));
 
210
 
 
211
    if (vptr == NULL)
 
212
    {
 
213
        pthread_mutex_unlock(&_plugins_lock);
 
214
        _plugin_info_free(&pi);
 
215
 
 
216
        PyErr_Format(PyExc_SystemError, "Out of memory");
 
217
 
 
218
        return -1;
 
219
    }
 
220
 
 
221
    _plugins = vptr;
 
222
 
 
223
    memcpy(&(_plugins[_num_plugins]), &pi, sizeof(pi));
 
224
    _num_plugins++;
 
225
 
 
226
    pthread_mutex_unlock(&_plugins_lock);
 
227
 
 
228
    return 0;
 
229
}
 
230
 
 
231
int LIBAGENT_PUBLIC_API agent_plugin_init(void)
 
232
{
 
233
    pthread_mutex_init(&_plugins_lock, NULL);
 
234
 
 
235
    return 0;
 
236
}
 
237
 
 
238
void LIBAGENT_PUBLIC_API agent_plugin_deinit(void)
 
239
{
 
240
    int i;
 
241
 
 
242
    for(i = 0;i < _num_plugins;i++)
 
243
    {
 
244
        _plugin_info_free(&(_plugins[i]));
 
245
    }
 
246
 
 
247
    free(_plugins);
 
248
    _plugins = NULL;
 
249
    _num_plugins = 0;
 
250
 
 
251
    pthread_mutex_destroy(&_plugins_lock);
 
252
}
 
253
 
 
254
int LIBAGENT_PUBLIC_API agent_plugin_run_threads(void)
 
255
{
 
256
    int i;
 
257
    int err;
 
258
    int num_started = 0;
 
259
 
 
260
    pthread_mutex_lock(&_plugins_lock);
 
261
 
 
262
    for(i = 0;i < _num_plugins;i++)
 
263
    {
 
264
        err = pthread_create(&(_plugins[i].thr_id), NULL,
 
265
                _plugin_exchange_thread, &(_plugins[i]));
 
266
        if (err != 0)
 
267
        {   
 
268
            int ii;
 
269
 
 
270
            agent_error("Error creating thread: %d", err);
 
271
 
 
272
            /* 
 
273
             * Leaving 'running' at 0 will cause threads to die
 
274
             * when we unlock
 
275
             */
 
276
 
 
277
            pthread_mutex_unlock(&_plugins_lock);
 
278
 
 
279
            for(ii = i;ii >= 0;ii--)
 
280
            {
 
281
                pthread_join(_plugins[i].thr_id, NULL);
 
282
            }
 
283
 
 
284
            return err > 0 ? -err : err;
 
285
        }
 
286
 
 
287
        num_started++;
 
288
    }
 
289
 
 
290
    if (num_started == 0)
 
291
    {
 
292
        pthread_mutex_unlock(&_plugins_lock);
 
293
        agent_debug("no exchange plugins found to run");
 
294
        return -1;
 
295
    }
 
296
 
 
297
    _plugins_running = 1;
 
298
 
 
299
    pthread_mutex_unlock(&_plugins_lock);
 
300
 
 
301
    return 0;
 
302
}
 
303
 
 
304
int LIBAGENT_PUBLIC_API agent_plugin_stop_threads(void)
 
305
{
 
306
    int i;
 
307
 
 
308
    pthread_mutex_lock(&_plugins_lock);
 
309
 
 
310
    _plugins_die = 1;
 
311
 
 
312
    for(i = 0;i < _num_plugins;i++)
 
313
    {
 
314
        pthread_join(_plugins[i].thr_id, NULL);
 
315
    }
 
316
 
 
317
    pthread_mutex_unlock(&_plugins_lock);
 
318
 
 
319
    return 0;
 
320
}
 
321