~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to plugin/memcached_query_cache/memcached_qc.cc

  • Committer: Bazaar Package Importer
  • Author(s): Monty Taylor
  • Date: 2010-10-02 14:17:48 UTC
  • mfrom: (1.1.1 upstream)
  • mto: (2.1.17 sid)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20101002141748-m6vbfbfjhrw1153e
Tags: 2010.09.1802-1
* New upstream release.
* Removed pid-file argument hack.
* Updated GPL-2 address to be new address.
* Directly copy in drizzledump.1 since debian doesn't have sphinx 1.0 yet.
* Link to jquery from libjs-jquery. Add it as a depend.
* Add drizzled.8 symlink to the install files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* 
 
2
 * Copyright (c) 2010, Djellel Eddine Difallah
 
3
 * All rights reserved.
 
4
 *
 
5
 * Redistribution and use in source and binary forms, with or without
 
6
 * modification, are permitted provided that the following conditions are met:
 
7
 *
 
8
 *   * Redistributions of source code must retain the above copyright notice,
 
9
 *     this list of conditions and the following disclaimer.
 
10
 *   * Redistributions in binary form must reproduce the above copyright notice,
 
11
 *     this list of conditions and the following disclaimer in the documentation
 
12
 *     and/or other materials provided with the distribution.
 
13
 *   * Neither the name of Djellel Eddine Difallah nor the names of its contributors
 
14
 *     may be used to endorse or promote products derived from this software
 
15
 *     without specific prior written permission.
 
16
 *
 
17
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 
18
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
20
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 
21
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 
22
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 
23
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 
24
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 
25
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 
26
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 
27
 * THE POSSIBILITY OF SUCH DAMAGE.
 
28
 */
 
29
 
 
30
 
 
31
 
 
32
#include "config.h"
 
33
 
 
34
#include "drizzled/plugin.h"
 
35
#include "drizzled/session.h"
 
36
#include "drizzled/select_send.h"
 
37
#include "drizzled/item/null.h"
 
38
 
 
39
#include <gcrypt.h>
 
40
#include <string>
 
41
#include <iostream>
 
42
#include <vector>
 
43
 
 
44
#include "memcached_qc.h"
 
45
#include "query_cache_udf_tools.h"
 
46
#include "data_dictionary_schema.h"
 
47
#include "invalidator.h"
 
48
#include <boost/program_options.hpp>
 
49
#include <drizzled/module/option_map.h>
 
50
 
 
51
using namespace drizzled;
 
52
using namespace std;
 
53
namespace po= boost::program_options;
 
54
 
 
55
static char* sysvar_memcached_servers= NULL;
 
56
static ulong expiry_time;
 
57
 
 
58
memcache::Memcache* MemcachedQueryCache::client;
 
59
std::string MemcachedQueryCache::memcached_servers;
 
60
 
 
61
static DRIZZLE_SessionVAR_BOOL(enable, 
 
62
                               PLUGIN_VAR_SessionLOCAL,
 
63
                               "Enable Memcached Query Cache",
 
64
                               /* check_func */ NULL, 
 
65
                               /* update_func */ NULL,
 
66
                               /* default */ false);
 
67
 
 
68
static int check_memc_servers(Session *,
 
69
                              drizzle_sys_var *,
 
70
                              void *save,
 
71
                              drizzle_value *value)
 
72
{
 
73
  char buff[STRING_BUFFER_USUAL_SIZE];
 
74
  int len= sizeof(buff);
 
75
  const char *input= value->val_str(value, buff, &len);
 
76
 
 
77
  if (input)
 
78
  {
 
79
    MemcachedQueryCache::setServers(input);
 
80
    *(bool *) save= (bool) true;
 
81
    return 0;
 
82
  }
 
83
 
 
84
  *(bool *) save= (bool) false;
 
85
  return 1;
 
86
}
 
87
 
 
88
static void set_memc_servers(Session *,
 
89
                             drizzle_sys_var *,
 
90
                             void *var_ptr,
 
91
                             const void *save)
 
92
{
 
93
  if (*(bool *) save != false)
 
94
  {
 
95
    *(const char **) var_ptr= MemcachedQueryCache::getServers();
 
96
  }
 
97
}
 
98
 
 
99
static DRIZZLE_SYSVAR_STR(servers,
 
100
                          sysvar_memcached_servers,
 
101
                          PLUGIN_VAR_OPCMDARG,
 
102
                          N_("List of memcached servers."),
 
103
                          check_memc_servers, /* check func */
 
104
                          set_memc_servers, /* update func */
 
105
                          "127.0.0.1:11211"); /* default value */
 
106
 
 
107
static DRIZZLE_SYSVAR_ULONG(expiry, 
 
108
                            expiry_time,
 
109
                            PLUGIN_VAR_OPCMDARG,
 
110
                            "Expiry time of memcached entries",
 
111
                             NULL, NULL, 1000, 0, ~0L, 0);
 
112
 
 
113
bool MemcachedQueryCache::isSelect(string query)
 
114
{
 
115
  uint i= 0;
 
116
  /*
 
117
   Skip '(' characters in queries like following:
 
118
   (select a from t1) union (select a from t1);
 
119
  */
 
120
  const char* sql= query.c_str();
 
121
  while (sql[i] == '(')
 
122
    i++;
 
123
  /*
 
124
   Test if the query is a SELECT
 
125
   (pre-space is removed in dispatch_command).
 
126
    First '/' looks like comment before command it is not
 
127
    frequently appeared in real life, consequently we can
 
128
    check all such queries, too.
 
129
  */
 
130
  if ((my_toupper(system_charset_info, sql[i])     != 'S' ||
 
131
       my_toupper(system_charset_info, sql[i + 1]) != 'E' ||
 
132
       my_toupper(system_charset_info, sql[i + 2]) != 'L') &&
 
133
      sql[i] != '/')
 
134
  {
 
135
    return false;
 
136
  }
 
137
  return true;
 
138
}
 
139
 
 
140
bool MemcachedQueryCache::doIsCached(Session *session)
 
141
{
 
142
  if (SessionVAR(session, enable) && isSelect(session->query))
 
143
  {
 
144
    /* ToDo: Check against the cache content */
 
145
    string query= session->query+session->db;
 
146
    char* key= md5_key(query.c_str());
 
147
    if(queryCacheService.isCached(key))
 
148
    {
 
149
     session->query_cache_key.assign(key);
 
150
     free(key);
 
151
     return true;
 
152
    }
 
153
    free(key);
 
154
  }
 
155
  return false;
 
156
}
 
157
 
 
158
bool MemcachedQueryCache::doSendCachedResultset(Session *session)
 
159
{
 
160
  /** TODO: pay attention to the case where the cache value is empty
 
161
  * ie: there is a session in the process of caching the query
 
162
  * and didn't finish the work
 
163
  */ 
 
164
  
 
165
  LEX *lex= session->lex;
 
166
  register Select_Lex *select_lex= &lex->select_lex;
 
167
  select_result *result=lex->result;
 
168
  if (not result && not (result= new select_send()))
 
169
    return true;
 
170
  result->prepare(select_lex->item_list, select_lex->master_unit());
 
171
 
 
172
  /* fetching the resultset from memcached */  
 
173
  vector<char> raw_resultset; 
 
174
  getClient()->get(session->query_cache_key, raw_resultset);
 
175
  if(raw_resultset.empty())
 
176
    return false;
 
177
  message::Resultset resultset_message;
 
178
  if (not resultset_message.ParseFromString(string(raw_resultset.begin(),raw_resultset.end())))
 
179
    return false;
 
180
  List<Item> item_list;
 
181
 
 
182
  /* Send the fields */
 
183
  message::SelectHeader header= resultset_message.select_header();
 
184
  size_t num_fields= header.field_meta_size();
 
185
  for (size_t y= 0; y < num_fields; y++)
 
186
  {
 
187
    message::FieldMeta field= header.field_meta(y);
 
188
    string value=field.field_alias();
 
189
    item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
 
190
  }
 
191
  result->send_fields(item_list);
 
192
  item_list.empty();
 
193
 
 
194
  /* Send the Data */
 
195
  message::SelectData data= resultset_message.select_data();
 
196
  session->limit_found_rows= 0; 
 
197
  for (int j= 0; j < data.record_size(); j++)
 
198
  {
 
199
    message::SelectRecord record= data.record(j);
 
200
    for (size_t y= 0; y < num_fields; y++)
 
201
    {
 
202
      if(record.is_null(y))
 
203
      {
 
204
        item_list.push_back(new Item_null());
 
205
      }
 
206
      else
 
207
      {
 
208
        string value=record.record_value(y);
 
209
        item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
 
210
      }
 
211
    }
 
212
    result->send_data(item_list);
 
213
    item_list.empty();
 
214
  }
 
215
  /* Send End of file */
 
216
  result->send_eof();
 
217
  /* reset the cache key at the session level */
 
218
  session->query_cache_key= "";
 
219
  return false;
 
220
}
 
221
 
 
222
/* Check if the tables in the query do not contain
 
223
 * Data_dictionary
 
224
 */
 
225
void MemcachedQueryCache::checkTables(Session *session, TableList* in_table)
 
226
{
 
227
  for (TableList* tmp_table= in_table; tmp_table; tmp_table= tmp_table->next_global)
 
228
  {
 
229
    if (strcasecmp(tmp_table->db, "DATA_DICTIONARY") == 0)
 
230
    {
 
231
      session->lex->setCacheable(false);
 
232
      break;
 
233
    }
 
234
  } 
 
235
}
 
236
 
 
237
/* init the current resultset in the session
 
238
 * set the header message (hashkey= sql + schema)
 
239
 */
 
240
bool MemcachedQueryCache::doPrepareResultset(Session *session)
 
241
{               
 
242
  checkTables(session, session->lex->query_tables);
 
243
  if (SessionVAR(session, enable) && session->lex->isCacheable())
 
244
  {
 
245
    /* Prepare and set the key for the session */
 
246
    string query= session->query+session->db;
 
247
    char* key= md5_key(query.c_str());
 
248
 
 
249
    /* make sure only one thread will cache the query 
 
250
     * if executed concurently
 
251
     */
 
252
    pthread_mutex_lock(&mutex);
 
253
 
 
254
    if(not queryCacheService.isCached(key))
 
255
    {
 
256
      session->query_cache_key.assign(key);
 
257
      free(key);
 
258
    
 
259
      /* create the Resultset */
 
260
      message::Resultset *resultset= queryCacheService.setCurrentResultsetMessage(session);
 
261
  
 
262
      /* setting the resultset infos */
 
263
      resultset->set_key(session->query_cache_key);
 
264
      resultset->set_schema(session->db);
 
265
      resultset->set_sql(session->query);
 
266
      pthread_mutex_unlock(&mutex);
 
267
      
 
268
      return true;
 
269
    }
 
270
    pthread_mutex_unlock(&mutex);
 
271
    free(key);
 
272
  }
 
273
  return false;
 
274
}
 
275
 
 
276
/* Send the current resultset to memcached
 
277
 * Reset the current resultset of the session
 
278
 */
 
279
bool MemcachedQueryCache::doSetResultset(Session *session)
 
280
{               
 
281
  message::Resultset *resultset= session->getResultsetMessage();
 
282
  if (SessionVAR(session, enable) && (not session->is_error()) && resultset != NULL && session->lex->isCacheable())
 
283
  {
 
284
    /* Generate the final Header */
 
285
    queryCacheService.setResultsetHeader(*resultset, session, session->lex->query_tables);
 
286
    /* serialize the Resultset Message */
 
287
    std::string output;
 
288
    resultset->SerializeToString(&output);
 
289
 
 
290
    /* setting to memecahced */
 
291
    time_t expiry= expiry_time;  // ToDo: add a user defined expiry
 
292
    uint32_t flags= 0;
 
293
    std::vector<char> raw(output.size());
 
294
    memcpy(&raw[0], output.c_str(), output.size());
 
295
    if(not client->set(session->query_cache_key, raw, expiry, flags))
 
296
    {
 
297
      delete resultset;
 
298
      session->resetResultsetMessage();
 
299
      return false;
 
300
    }
 
301
    
 
302
    /* Clear the Selectdata from the Resultset to be localy cached
 
303
     * Comment if Keeping the data in the header is needed
 
304
     */
 
305
    resultset->clear_select_data();
 
306
 
 
307
    /* add the Resultset (including the header) to the hash 
 
308
     * This is done after the memcached set
 
309
     */
 
310
    queryCacheService.cache[session->query_cache_key]= *resultset;
 
311
 
 
312
    /* endup the current statement */
 
313
    delete resultset;
 
314
    session->resetResultsetMessage();
 
315
    return true;
 
316
  }
 
317
  return false;
 
318
}
 
319
 
 
320
/* Adds a record (List<Item>) to the current Resultset.SelectData
 
321
 */
 
322
bool MemcachedQueryCache::doInsertRecord(Session *session, List<Item> &list)
 
323
{               
 
324
  if(SessionVAR(session, enable))
 
325
  {
 
326
    queryCacheService.addRecord(session, list);
 
327
    return true;
 
328
  }
 
329
  return false;
 
330
}
 
331
 
 
332
char* MemcachedQueryCache::md5_key(const char *str)
 
333
{
 
334
  int msg_len= strlen(str);
 
335
  /* Length of resulting sha1 hash - gcry_md_get_algo_dlen
 
336
  * returns digest lenght for an algo */
 
337
  int hash_len= gcry_md_get_algo_dlen( GCRY_MD_MD5 );
 
338
  /* output sha1 hash - this will be binary data */
 
339
  unsigned char* hash= (unsigned char*) malloc(hash_len);
 
340
  /* output sha1 hash - converted to hex representation
 
341
  * 2 hex digits for every byte + 1 for trailing \0 */
 
342
  char *out= (char *) malloc( sizeof(char) * ((hash_len*2)+1) );
 
343
  char *p= out;
 
344
  /* calculate the SHA1 digest. This is a bit of a shortcut function
 
345
  * most gcrypt operations require the creation of a handle, etc. */
 
346
  gcry_md_hash_buffer( GCRY_MD_MD5, hash, str , msg_len );
 
347
  /* Convert each byte to its 2 digit ascii
 
348
  * hex representation and place in out */
 
349
  int i;
 
350
  for ( i = 0; i < hash_len; i++, p += 2 )
 
351
  {
 
352
    snprintf ( p, 3, "%02x", hash[i] );
 
353
  }
 
354
  free(hash);
 
355
  return out;
 
356
}
 
357
 
 
358
/** User Defined Function print_query_cache_meta **/
 
359
extern plugin::Create_function<PrintQueryCacheMetaFunction> *print_query_cache_meta_func_factory;
 
360
plugin::Create_function<QueryCacheFlushFunction> *query_cache_flush_func= NULL;
 
361
 
 
362
/** DATA_DICTIONARY views */
 
363
static QueryCacheTool *query_cache_tool;
 
364
static QueryCacheStatusTool *query_cache_status;
 
365
static CachedTables *query_cached_tables;
 
366
 
 
367
static int init(module::Context &context)
 
368
{
 
369
  const module::option_map &vm= context.getOptions();
 
370
 
 
371
  if (vm.count("expiry"))
 
372
  { 
 
373
    if (expiry_time > (ulong)~0L)
 
374
    {
 
375
      errmsg_printf(ERRMSG_LVL_ERROR, _("Invalid value of expiry\n"));
 
376
      exit(-1);
 
377
    }
 
378
  }
 
379
 
 
380
  if (vm.count("servers"))
 
381
  {
 
382
    sysvar_memcached_servers= const_cast<char *>(vm["servers"].as<string>().c_str());
 
383
  }
 
384
 
 
385
  if (vm.count("enable"))
 
386
  {
 
387
    (SessionVAR(NULL,enable))= vm["enable"].as<bool>();
 
388
  }
 
389
 
 
390
  MemcachedQueryCache* memc= new MemcachedQueryCache("Memcached_Query_Cache", sysvar_memcached_servers);
 
391
  context.add(memc);
 
392
 
 
393
  Invalidator* invalidator= new Invalidator("Memcached_Query_Cache_Invalidator");
 
394
  context.add(invalidator);
 
395
  ReplicationServices &replication_services= ReplicationServices::singleton();
 
396
  string replicator_name("default_replicator");
 
397
  replication_services.attachApplier(invalidator, replicator_name);
 
398
  
 
399
  /* Setup the module's UDFs */
 
400
  print_query_cache_meta_func_factory=
 
401
    new plugin::Create_function<PrintQueryCacheMetaFunction>("print_query_cache_meta");
 
402
  context.add(print_query_cache_meta_func_factory);
 
403
  
 
404
  query_cache_flush_func= new plugin::Create_function<QueryCacheFlushFunction>("query_cache_flush");
 
405
  context.add(query_cache_flush_func);
 
406
 
 
407
  /* Setup the module Data dict and status infos */
 
408
  query_cache_tool= new (nothrow) QueryCacheTool();
 
409
  context.add(query_cache_tool);
 
410
  query_cache_status= new (nothrow) QueryCacheStatusTool();
 
411
  context.add(query_cache_status);
 
412
  query_cached_tables= new (nothrow) CachedTables();
 
413
  context.add(query_cached_tables);
 
414
  
 
415
  return 0;
 
416
}
 
417
 
 
418
static drizzle_sys_var* vars[]= {
 
419
  DRIZZLE_SYSVAR(enable),
 
420
  DRIZZLE_SYSVAR(servers),
 
421
  DRIZZLE_SYSVAR(expiry),
 
422
  NULL
 
423
};
 
424
 
 
425
QueryCacheStatusTool::Generator::Generator(drizzled::Field **fields) :
 
426
  plugin::TableFunction::Generator(fields)
 
427
 
428
  status_var_ptr= vars;
 
429
}
 
430
 
 
431
bool QueryCacheStatusTool::Generator::populate()
 
432
{
 
433
  if (*status_var_ptr)
 
434
  {
 
435
    std::ostringstream oss;
 
436
    string return_value;
 
437
 
 
438
    /* VARIABLE_NAME */
 
439
    push((*status_var_ptr)->name);
 
440
    if (strcmp((**status_var_ptr).name, "enable") == 0)
 
441
      return_value= SessionVAR(&(getSession()), enable) ? "ON" : "OFF";
 
442
    if (strcmp((**status_var_ptr).name, "servers") == 0) 
 
443
      return_value= MemcachedQueryCache::getServers();
 
444
    if (strcmp((**status_var_ptr).name, "expiry") == 0)
 
445
    {
 
446
      oss << expiry_time;
 
447
      return_value= oss.str();
 
448
    }
 
449
    /* VARIABLE_VALUE */
 
450
    if (return_value.length())
 
451
      push(return_value);
 
452
    else 
 
453
      push(" ");
 
454
 
 
455
    status_var_ptr++;
 
456
 
 
457
    return true;
 
458
  }
 
459
  return false;
 
460
}
 
461
 
 
462
static void init_options(drizzled::module::option_context &context)
 
463
{
 
464
  context("servers",
 
465
          po::value<string>()->default_value("127.0.0.1:11211"),
 
466
          N_("List of memcached servers."));
 
467
  context("expiry",
 
468
          po::value<ulong>(&expiry_time)->default_value(1000),
 
469
          N_("Expiry time of memcached entries"));
 
470
  context("enable",
 
471
          po::value<bool>()->default_value(false)->zero_tokens(),
 
472
          N_("Enable Memcached Query Cache"));
 
473
}
 
474
 
 
475
DRIZZLE_DECLARE_PLUGIN
 
476
{
 
477
  DRIZZLE_VERSION_ID,
 
478
  "Query_Cache",
 
479
  "0.3",
 
480
  "Djellel Eddine Difallah",
 
481
  "Caches Select resultsets in Memcached",
 
482
  PLUGIN_LICENSE_BSD,
 
483
  init,   /* Plugin Init      */
 
484
  vars, /* system variables */
 
485
  init_options    /* config options   */
 
486
}
 
487
DRIZZLE_DECLARE_PLUGIN_END;