~brianaker/libmemcached/1164440

« back to all changes in this revision

Viewing changes to libmemcachedprotocol/handler.c

Merging bzr://gaz.tangent.org/libmemcached/build/ to Build branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
2
 * 
 
3
 *  Libmemcached library
 
4
 *
 
5
 *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
 
6
 *
 
7
 *  Redistribution and use in source and binary forms, with or without
 
8
 *  modification, are permitted provided that the following conditions are
 
9
 *  met:
 
10
 *
 
11
 *      * Redistributions of source code must retain the above copyright
 
12
 *  notice, this list of conditions and the following disclaimer.
 
13
 *
 
14
 *      * Redistributions in binary form must reproduce the above
 
15
 *  copyright notice, this list of conditions and the following disclaimer
 
16
 *  in the documentation and/or other materials provided with the
 
17
 *  distribution.
 
18
 *
 
19
 *      * The names of its contributors may not be used to endorse or
 
20
 *  promote products derived from this software without specific prior
 
21
 *  written permission.
 
22
 *
 
23
 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
24
 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
25
 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
26
 *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
27
 *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
28
 *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
29
 *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
30
 *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
31
 *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
32
 *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
33
 *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
34
 *
 
35
 */
 
36
 
 
37
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
 
38
#include <libmemcachedprotocol/common.h>
 
39
 
 
40
#include <stdlib.h>
 
41
#include <sys/types.h>
 
42
#include <errno.h>
 
43
#include <stdbool.h>
 
44
#include <string.h>
 
45
#include <strings.h>
 
46
#include <ctype.h>
 
47
#include <stdio.h>
 
48
 
 
49
/*
 
50
** **********************************************************************
 
51
** INTERNAL INTERFACE
 
52
** **********************************************************************
 
53
*/
 
54
 
 
55
/**
 
56
 * The default function to receive data from the client. This function
 
57
 * just wraps the recv function to receive from a socket.
 
58
 * See man -s3socket recv for more information.
 
59
 *
 
60
 * @param cookie cookie indentifying a client, not used
 
61
 * @param sock socket to read from
 
62
 * @param buf the destination buffer
 
63
 * @param nbytes the number of bytes to read
 
64
 * @return the number of bytes transferred of -1 upon error
 
65
 */
 
66
static ssize_t default_recv(const void *cookie,
 
67
                            memcached_socket_t sock,
 
68
                            void *buf,
 
69
                            size_t nbytes)
 
70
{
 
71
  (void)cookie;
 
72
  return recv(sock, buf, nbytes, 0);
 
73
}
 
74
 
 
75
/**
 
76
 * The default function to send data to the server. This function
 
77
 * just wraps the send function to send through a socket.
 
78
 * See man -s3socket send for more information.
 
79
 *
 
80
 * @param cookie cookie indentifying a client, not used
 
81
 * @param sock socket to send to
 
82
 * @param buf the source buffer
 
83
 * @param nbytes the number of bytes to send
 
84
 * @return the number of bytes transferred of -1 upon error
 
85
 */
 
86
static ssize_t default_send(const void *cookie,
 
87
                            memcached_socket_t fd,
 
88
                            const void *buf,
 
89
                            size_t nbytes)
 
90
{
 
91
  (void)cookie;
 
92
  return send(fd, buf, nbytes, MSG_NOSIGNAL);
 
93
}
 
94
 
 
95
/**
 
96
 * Try to drain the output buffers without blocking
 
97
 *
 
98
 * @param client the client to drain
 
99
 * @return false if an error occured (connection should be shut down)
 
100
 *         true otherwise (please note that there may be more data to
 
101
 *              left in the buffer to send)
 
102
 */
 
103
static bool drain_output(struct memcached_protocol_client_st *client)
 
104
{
 
105
  if (client->is_verbose)
 
106
  {
 
107
    fprintf(stderr, "%s:%d %s mute:%d output:%s length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute,
 
108
            client->output ? "yes" : "no",
 
109
            client->output ? (int)(client->output->nbytes - client->output->offset) : 0);
 
110
  }
 
111
 
 
112
  /* Do we have pending data to send? */
 
113
  while (client->output != NULL)
 
114
  {
 
115
    ssize_t len= client->root->send(client,
 
116
                            client->sock,
 
117
                            client->output->data + client->output->offset,
 
118
                            client->output->nbytes - client->output->offset);
 
119
 
 
120
    if (len == -1)
 
121
    {
 
122
      if (get_socket_errno() == EWOULDBLOCK)
 
123
      {
 
124
        return true;
 
125
      }
 
126
      else if (get_socket_errno() != EINTR)
 
127
      {
 
128
        client->error= get_socket_errno();
 
129
        return false;
 
130
      }
 
131
    }
 
132
    else
 
133
    {
 
134
      client->output->offset += (size_t)len;
 
135
      if (client->output->offset == client->output->nbytes)
 
136
      {
 
137
        /* This was the complete buffer */
 
138
        struct chunk_st *old= client->output;
 
139
        client->output= client->output->next;
 
140
        if (client->output == NULL)
 
141
        {
 
142
          client->output_tail= NULL;
 
143
        }
 
144
        cache_free(client->root->buffer_cache, old);
 
145
      }
 
146
    }
 
147
  }
 
148
 
 
149
  return true;
 
150
}
 
151
 
 
152
/**
 
153
 * Allocate an output buffer and chain it into the output list
 
154
 *
 
155
 * @param client the client that needs the buffer
 
156
 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
 
157
 */
 
158
static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
 
159
{
 
160
  struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
 
161
 
 
162
  if (ret == NULL)
 
163
  {
 
164
    return NULL;
 
165
  }
 
166
 
 
167
  ret->offset= ret->nbytes= 0;
 
168
  ret->next= NULL;
 
169
  ret->size= CHUNK_BUFFERSIZE;
 
170
  ret->data= (void*)(ret + 1);
 
171
  if (client->output == NULL)
 
172
  {
 
173
    client->output= client->output_tail= ret;
 
174
  }
 
175
  else
 
176
  {
 
177
    client->output_tail->next= ret;
 
178
    client->output_tail= ret;
 
179
  }
 
180
 
 
181
  return ret;
 
182
}
 
183
 
 
184
/**
 
185
 * Spool data into the send-buffer for a client.
 
186
 *
 
187
 * @param client the client to spool the data for
 
188
 * @param data the data to spool
 
189
 * @param length the number of bytes of data to spool
 
190
 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
 
191
 *         PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
 
192
 */
 
193
static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
 
194
                                                    const void *data,
 
195
                                                    size_t length)
 
196
{
 
197
  if (client->is_verbose)
 
198
  {
 
199
    fprintf(stderr, "%s:%d %s mute:%d length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute, (int)length);
 
200
  }
 
201
 
 
202
  if (client->mute)
 
203
  {
 
204
    return PROTOCOL_BINARY_RESPONSE_SUCCESS;
 
205
  }
 
206
 
 
207
  size_t offset= 0;
 
208
 
 
209
  struct chunk_st *chunk= client->output;
 
210
  while (offset < length)
 
211
  {
 
212
    if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
 
213
    {
 
214
      if ((chunk= allocate_output_chunk(client)) == NULL)
 
215
      {
 
216
        return PROTOCOL_BINARY_RESPONSE_ENOMEM;
 
217
      }
 
218
    }
 
219
 
 
220
    size_t bulk= length - offset;
 
221
    if (bulk > chunk->size - chunk->nbytes)
 
222
    {
 
223
      bulk= chunk->size - chunk->nbytes;
 
224
    }
 
225
 
 
226
    memcpy(chunk->data + chunk->nbytes, data, bulk);
 
227
    chunk->nbytes += bulk;
 
228
    offset += bulk;
 
229
  }
 
230
 
 
231
  return PROTOCOL_BINARY_RESPONSE_SUCCESS;
 
232
}
 
233
 
 
234
/**
 
235
 * Try to determine the protocol used on this connection.
 
236
 * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
 
237
 * be using the binary protocol on the connection. I implemented the support
 
238
 * for the ASCII protocol by wrapping into the simple interface (aka v1),
 
239
 * so the implementors needs to provide an implementation of that interface
 
240
 *
 
241
 */
 
242
static memcached_protocol_event_t determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
 
243
{
 
244
  if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
 
245
  {
 
246
    if (client->is_verbose)
 
247
    {
 
248
      fprintf(stderr, "%s:%d PROTOCOL: memcached_binary_protocol_process_data\n", __FILE__, __LINE__);
 
249
    }
 
250
    client->work= memcached_binary_protocol_process_data;
 
251
  }
 
252
  else if (client->root->callback->interface_version == 1)
 
253
  {
 
254
    if (client->is_verbose)
 
255
    {
 
256
      fprintf(stderr, "%s:%d PROTOCOL: memcached_ascii_protocol_process_data\n", __FILE__, __LINE__);
 
257
    }
 
258
 
 
259
    /*
 
260
     * The ASCII protocol can only be used if the implementors provide
 
261
     * an implementation for the version 1 of the interface..
 
262
     *
 
263
     * @todo I should allow the implementors to provide an implementation
 
264
     *       for version 0 and 1 at the same time and set the preferred
 
265
     *       interface to use...
 
266
     */
 
267
    client->work= memcached_ascii_protocol_process_data;
 
268
  }
 
269
  else
 
270
  {
 
271
    if (client->is_verbose)
 
272
    {
 
273
      fprintf(stderr, "%s:%d PROTOCOL: Unsupported protocol\n", __FILE__, __LINE__);
 
274
    }
 
275
 
 
276
    /* Let's just output a warning the way it is supposed to look like
 
277
     * in the ASCII protocol...
 
278
     */
 
279
    const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
 
280
    client->root->spool(client, err, strlen(err));
 
281
    client->root->drain(client);
 
282
 
 
283
    return MEMCACHED_PROTOCOL_ERROR_EVENT; /* Unsupported protocol */
 
284
  }
 
285
 
 
286
  return client->work(client, length, endptr);
 
287
}
 
288
 
 
289
/*
 
290
** **********************************************************************
 
291
** * PUBLIC INTERFACE
 
292
** * See protocol_handler.h for function description
 
293
** **********************************************************************
 
294
*/
 
295
struct memcached_protocol_st *memcached_protocol_create_instance(void)
 
296
{
 
297
  struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
 
298
  if (ret != NULL)
 
299
  {
 
300
    ret->recv= default_recv;
 
301
    ret->send= default_send;
 
302
    ret->drain= drain_output;
 
303
    ret->spool= spool_output;
 
304
    ret->input_buffer_size= 1 * 1024 * 1024;
 
305
    ret->input_buffer= malloc(ret->input_buffer_size);
 
306
    if (ret->input_buffer == NULL)
 
307
    {
 
308
      free(ret);
 
309
      ret= NULL;
 
310
 
 
311
      return NULL;
 
312
    }
 
313
 
 
314
    ret->buffer_cache= cache_create("protocol_handler",
 
315
                                     CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
 
316
                                     0, NULL, NULL);
 
317
    if (ret->buffer_cache == NULL)
 
318
    {
 
319
      free(ret->input_buffer);
 
320
      free(ret);
 
321
    }
 
322
  }
 
323
 
 
324
  return ret;
 
325
}
 
326
 
 
327
void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
 
328
{
 
329
  cache_destroy(instance->buffer_cache);
 
330
  free(instance->input_buffer);
 
331
  free(instance);
 
332
}
 
333
 
 
334
struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, memcached_socket_t sock)
 
335
{
 
336
  struct memcached_protocol_client_st *ret= calloc(1, sizeof(memcached_protocol_client_st));
 
337
  if (ret != NULL)
 
338
  {
 
339
    ret->root= instance;
 
340
    ret->sock= sock;
 
341
    ret->work= determine_protocol;
 
342
  }
 
343
 
 
344
  return ret;
 
345
}
 
346
 
 
347
void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
 
348
{
 
349
  free(client);
 
350
}
 
351
 
 
352
void memcached_protocol_client_set_verbose(struct memcached_protocol_client_st *client, bool arg)
 
353
{
 
354
  if (client)
 
355
  {
 
356
    client->is_verbose= arg;
 
357
  }
 
358
}
 
359
 
 
360
memcached_protocol_event_t memcached_protocol_client_work(struct memcached_protocol_client_st *client)
 
361
{
 
362
  /* Try to send data and read from the socket */
 
363
  bool more_data= true;
 
364
  do
 
365
  {
 
366
    ssize_t len= client->root->recv(client,
 
367
                                    client->sock,
 
368
                                    client->root->input_buffer + client->input_buffer_offset,
 
369
                                    client->root->input_buffer_size - client->input_buffer_offset);
 
370
 
 
371
    if (len > 0)
 
372
    {
 
373
      /* Do we have the complete packet? */
 
374
      if (client->input_buffer_offset > 0)
 
375
      {
 
376
        memcpy(client->root->input_buffer, client->input_buffer,
 
377
               client->input_buffer_offset);
 
378
        len += (ssize_t)client->input_buffer_offset;
 
379
 
 
380
        /* @todo use buffer-cache! */
 
381
        free(client->input_buffer);
 
382
        client->input_buffer_offset= 0;
 
383
      }
 
384
 
 
385
      void *endptr;
 
386
      memcached_protocol_event_t events= client->work(client, &len, &endptr);
 
387
      if (events == MEMCACHED_PROTOCOL_ERROR_EVENT)
 
388
      {
 
389
        return MEMCACHED_PROTOCOL_ERROR_EVENT;
 
390
      }
 
391
 
 
392
      if (len > 0)
 
393
      {
 
394
        /* save the data for later on */
 
395
        /* @todo use buffer-cache */
 
396
        client->input_buffer= malloc((size_t)len);
 
397
        if (client->input_buffer == NULL)
 
398
        {
 
399
          client->error= ENOMEM;
 
400
          return MEMCACHED_PROTOCOL_ERROR_EVENT;
 
401
        }
 
402
        memcpy(client->input_buffer, endptr, (size_t)len);
 
403
        client->input_buffer_offset= (size_t)len;
 
404
        more_data= false;
 
405
      }
 
406
    }
 
407
    else if (len == 0)
 
408
    {
 
409
      /* Connection closed */
 
410
      drain_output(client);
 
411
      return MEMCACHED_PROTOCOL_ERROR_EVENT;
 
412
    }
 
413
    else
 
414
    {
 
415
      if (get_socket_errno() != EWOULDBLOCK)
 
416
      {
 
417
        client->error= get_socket_errno();
 
418
        /* mark this client as terminated! */
 
419
        return MEMCACHED_PROTOCOL_ERROR_EVENT;
 
420
      }
 
421
      more_data= false;
 
422
    }
 
423
  } while (more_data);
 
424
 
 
425
  if (!drain_output(client))
 
426
  {
 
427
    return MEMCACHED_PROTOCOL_ERROR_EVENT;
 
428
  }
 
429
 
 
430
  memcached_protocol_event_t ret= MEMCACHED_PROTOCOL_READ_EVENT;
 
431
  if (client->output)
 
432
  {
 
433
    ret|= MEMCACHED_PROTOCOL_READ_EVENT;
 
434
  }
 
435
 
 
436
  return ret;
 
437
}