1
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
7
* Redistribution and use in source and binary forms, with or without
8
* modification, are permitted provided that the following conditions are
11
* * Redistributions of source code must retain the above copyright
12
* notice, this list of conditions and the following disclaimer.
14
* * Redistributions in binary form must reproduce the above
15
* copyright notice, this list of conditions and the following disclaimer
16
* in the documentation and/or other materials provided with the
19
* * The names of its contributors may not be used to endorse or
20
* promote products derived from this software without specific prior
23
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
29
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
30
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
33
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37
/* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
38
#include <libmemcachedprotocol/common.h>
41
#include <sys/types.h>
50
** **********************************************************************
52
** **********************************************************************
56
* The default function to receive data from the client. This function
57
* just wraps the recv function to receive from a socket.
58
* See man -s3socket recv for more information.
60
* @param cookie cookie indentifying a client, not used
61
* @param sock socket to read from
62
* @param buf the destination buffer
63
* @param nbytes the number of bytes to read
64
* @return the number of bytes transferred of -1 upon error
66
static ssize_t default_recv(const void *cookie,
67
memcached_socket_t sock,
72
return recv(sock, buf, nbytes, 0);
76
* The default function to send data to the server. This function
77
* just wraps the send function to send through a socket.
78
* See man -s3socket send for more information.
80
* @param cookie cookie indentifying a client, not used
81
* @param sock socket to send to
82
* @param buf the source buffer
83
* @param nbytes the number of bytes to send
84
* @return the number of bytes transferred of -1 upon error
86
static ssize_t default_send(const void *cookie,
87
memcached_socket_t fd,
92
return send(fd, buf, nbytes, MSG_NOSIGNAL);
96
* Try to drain the output buffers without blocking
98
* @param client the client to drain
99
* @return false if an error occured (connection should be shut down)
100
* true otherwise (please note that there may be more data to
101
* left in the buffer to send)
103
static bool drain_output(struct memcached_protocol_client_st *client)
105
if (client->is_verbose)
107
fprintf(stderr, "%s:%d %s mute:%d output:%s length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute,
108
client->output ? "yes" : "no",
109
client->output ? (int)(client->output->nbytes - client->output->offset) : 0);
112
/* Do we have pending data to send? */
113
while (client->output != NULL)
115
ssize_t len= client->root->send(client,
117
client->output->data + client->output->offset,
118
client->output->nbytes - client->output->offset);
122
if (get_socket_errno() == EWOULDBLOCK)
126
else if (get_socket_errno() != EINTR)
128
client->error= get_socket_errno();
134
client->output->offset += (size_t)len;
135
if (client->output->offset == client->output->nbytes)
137
/* This was the complete buffer */
138
struct chunk_st *old= client->output;
139
client->output= client->output->next;
140
if (client->output == NULL)
142
client->output_tail= NULL;
144
cache_free(client->root->buffer_cache, old);
153
* Allocate an output buffer and chain it into the output list
155
* @param client the client that needs the buffer
156
* @return pointer to the new chunk if the allocation succeeds, NULL otherwise
158
static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
160
struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
167
ret->offset= ret->nbytes= 0;
169
ret->size= CHUNK_BUFFERSIZE;
170
ret->data= (void*)(ret + 1);
171
if (client->output == NULL)
173
client->output= client->output_tail= ret;
177
client->output_tail->next= ret;
178
client->output_tail= ret;
185
* Spool data into the send-buffer for a client.
187
* @param client the client to spool the data for
188
* @param data the data to spool
189
* @param length the number of bytes of data to spool
190
* @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
191
* PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
193
static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
197
if (client->is_verbose)
199
fprintf(stderr, "%s:%d %s mute:%d length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute, (int)length);
204
return PROTOCOL_BINARY_RESPONSE_SUCCESS;
209
struct chunk_st *chunk= client->output;
210
while (offset < length)
212
if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
214
if ((chunk= allocate_output_chunk(client)) == NULL)
216
return PROTOCOL_BINARY_RESPONSE_ENOMEM;
220
size_t bulk= length - offset;
221
if (bulk > chunk->size - chunk->nbytes)
223
bulk= chunk->size - chunk->nbytes;
226
memcpy(chunk->data + chunk->nbytes, data, bulk);
227
chunk->nbytes += bulk;
231
return PROTOCOL_BINARY_RESPONSE_SUCCESS;
235
* Try to determine the protocol used on this connection.
236
* If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
237
* be using the binary protocol on the connection. I implemented the support
238
* for the ASCII protocol by wrapping into the simple interface (aka v1),
239
* so the implementors needs to provide an implementation of that interface
242
static memcached_protocol_event_t determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
244
if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
246
if (client->is_verbose)
248
fprintf(stderr, "%s:%d PROTOCOL: memcached_binary_protocol_process_data\n", __FILE__, __LINE__);
250
client->work= memcached_binary_protocol_process_data;
252
else if (client->root->callback->interface_version == 1)
254
if (client->is_verbose)
256
fprintf(stderr, "%s:%d PROTOCOL: memcached_ascii_protocol_process_data\n", __FILE__, __LINE__);
260
* The ASCII protocol can only be used if the implementors provide
261
* an implementation for the version 1 of the interface..
263
* @todo I should allow the implementors to provide an implementation
264
* for version 0 and 1 at the same time and set the preferred
265
* interface to use...
267
client->work= memcached_ascii_protocol_process_data;
271
if (client->is_verbose)
273
fprintf(stderr, "%s:%d PROTOCOL: Unsupported protocol\n", __FILE__, __LINE__);
276
/* Let's just output a warning the way it is supposed to look like
277
* in the ASCII protocol...
279
const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
280
client->root->spool(client, err, strlen(err));
281
client->root->drain(client);
283
return MEMCACHED_PROTOCOL_ERROR_EVENT; /* Unsupported protocol */
286
return client->work(client, length, endptr);
290
** **********************************************************************
291
** * PUBLIC INTERFACE
292
** * See protocol_handler.h for function description
293
** **********************************************************************
295
struct memcached_protocol_st *memcached_protocol_create_instance(void)
297
struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
300
ret->recv= default_recv;
301
ret->send= default_send;
302
ret->drain= drain_output;
303
ret->spool= spool_output;
304
ret->input_buffer_size= 1 * 1024 * 1024;
305
ret->input_buffer= malloc(ret->input_buffer_size);
306
if (ret->input_buffer == NULL)
314
ret->buffer_cache= cache_create("protocol_handler",
315
CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
317
if (ret->buffer_cache == NULL)
319
free(ret->input_buffer);
327
void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
329
cache_destroy(instance->buffer_cache);
330
free(instance->input_buffer);
334
struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, memcached_socket_t sock)
336
struct memcached_protocol_client_st *ret= calloc(1, sizeof(memcached_protocol_client_st));
341
ret->work= determine_protocol;
347
void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
352
void memcached_protocol_client_set_verbose(struct memcached_protocol_client_st *client, bool arg)
356
client->is_verbose= arg;
360
memcached_protocol_event_t memcached_protocol_client_work(struct memcached_protocol_client_st *client)
362
/* Try to send data and read from the socket */
363
bool more_data= true;
366
ssize_t len= client->root->recv(client,
368
client->root->input_buffer + client->input_buffer_offset,
369
client->root->input_buffer_size - client->input_buffer_offset);
373
/* Do we have the complete packet? */
374
if (client->input_buffer_offset > 0)
376
memcpy(client->root->input_buffer, client->input_buffer,
377
client->input_buffer_offset);
378
len += (ssize_t)client->input_buffer_offset;
380
/* @todo use buffer-cache! */
381
free(client->input_buffer);
382
client->input_buffer_offset= 0;
386
memcached_protocol_event_t events= client->work(client, &len, &endptr);
387
if (events == MEMCACHED_PROTOCOL_ERROR_EVENT)
389
return MEMCACHED_PROTOCOL_ERROR_EVENT;
394
/* save the data for later on */
395
/* @todo use buffer-cache */
396
client->input_buffer= malloc((size_t)len);
397
if (client->input_buffer == NULL)
399
client->error= ENOMEM;
400
return MEMCACHED_PROTOCOL_ERROR_EVENT;
402
memcpy(client->input_buffer, endptr, (size_t)len);
403
client->input_buffer_offset= (size_t)len;
409
/* Connection closed */
410
drain_output(client);
411
return MEMCACHED_PROTOCOL_ERROR_EVENT;
415
if (get_socket_errno() != EWOULDBLOCK)
417
client->error= get_socket_errno();
418
/* mark this client as terminated! */
419
return MEMCACHED_PROTOCOL_ERROR_EVENT;
425
if (!drain_output(client))
427
return MEMCACHED_PROTOCOL_ERROR_EVENT;
430
memcached_protocol_event_t ret= MEMCACHED_PROTOCOL_READ_EVENT;
433
ret|= MEMCACHED_PROTOCOL_READ_EVENT;