~fractalcat/gearmand/docfixes

« back to all changes in this revision

Viewing changes to libhashkit/ketama.cc

  • Committer: Brian Aker
  • Date: 2012-12-13 11:44:26 UTC
  • mto: (621.4.66 workspace)
  • mto: This revision was merged to the branch mainline in revision 676.
  • Revision ID: brian@tangent.org-20121213114426-lnrt6aysy7lqc01h
Adding support for deriving the unique value based on the data that is supplied by the client.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
 
2
 * 
 
3
 *  HashKit library
 
4
 *
 
5
 *  Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
 
6
 *  Copyright (C) 2006-2009 Brian Aker All rights reserved.
 
7
 *
 
8
 *  Redistribution and use in source and binary forms, with or without
 
9
 *  modification, are permitted provided that the following conditions are
 
10
 *  met:
 
11
 *
 
12
 *      * Redistributions of source code must retain the above copyright
 
13
 *  notice, this list of conditions and the following disclaimer.
 
14
 *
 
15
 *      * Redistributions in binary form must reproduce the above
 
16
 *  copyright notice, this list of conditions and the following disclaimer
 
17
 *  in the documentation and/or other materials provided with the
 
18
 *  distribution.
 
19
 *
 
20
 *      * The names of its contributors may not be used to endorse or
 
21
 *  promote products derived from this software without specific prior
 
22
 *  written permission.
 
23
 *
 
24
 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
25
 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
26
 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
27
 *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
28
 *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
29
 *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
30
 *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
31
 *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
32
 *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
33
 *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
34
 *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
35
 *
 
36
 */
 
37
 
 
38
 
 
39
#include <libhashkit/common.h>
 
40
#include <math.h>
 
41
 
 
42
#if 0
 
43
static uint32_t ketama_server_hash(const char *key, unsigned int key_length, int alignment)
 
44
{
 
45
  unsigned char results[16];
 
46
 
 
47
  md5_signature((unsigned char*)key, key_length, results);
 
48
  return ((uint32_t) (results[3 + alignment * 4] & 0xFF) << 24)
 
49
    | ((uint32_t) (results[2 + alignment * 4] & 0xFF) << 16)
 
50
    | ((uint32_t) (results[1 + alignment * 4] & 0xFF) << 8)
 
51
    | (results[0 + alignment * 4] & 0xFF);
 
52
}
 
53
 
 
54
static int continuum_points_cmp(const void *t1, const void *t2)
 
55
{
 
56
  hashkit_continuum_point_st *ct1= (hashkit_continuum_point_st *)t1;
 
57
  hashkit_continuum_point_st *ct2= (hashkit_continuum_point_st *)t2;
 
58
 
 
59
  if (ct1->value == ct2->value)
 
60
    return 0;
 
61
  else if (ct1->value > ct2->value)
 
62
    return 1;
 
63
  else
 
64
    return -1;
 
65
}
 
66
 
 
67
int update_continuum(hashkit_st *hashkit)
 
68
{
 
69
  uint32_t count;
 
70
  uint32_t continuum_index= 0;
 
71
  uint32_t value;
 
72
  uint32_t points_index;
 
73
  uint32_t points_count= 0;
 
74
  uint32_t points_per_server;
 
75
  uint32_t points_per_hash;
 
76
  uint64_t total_weight= 0;
 
77
  uint32_t live_servers;
 
78
  uint8_t *context;
 
79
 
 
80
  if (hashkit->active_fn != NULL || hashkit->weight_fn != NULL)
 
81
  {
 
82
    live_servers= 0;
 
83
 
 
84
    for (count= 0, context= hashkit->list; count < hashkit->list_size;
 
85
         count++, context+= hashkit->context_size)
 
86
    {
 
87
      if (hashkit->active_fn != NULL)
 
88
      {
 
89
        if (hashkit->active_fn(context))
 
90
          live_servers++;
 
91
        else
 
92
          continue;
 
93
      }
 
94
 
 
95
      if (hashkit->weight_fn != NULL)
 
96
        total_weight+= hashkit->weight_fn(context);
 
97
    }
 
98
  }
 
99
 
 
100
  if (hashkit->active_fn == NULL)
 
101
    live_servers= (uint32_t)hashkit->list_size;
 
102
 
 
103
  if (live_servers == 0)
 
104
    return 0;
 
105
 
 
106
  if (hashkit->weight_fn == NULL)
 
107
  {
 
108
    points_per_server= HASHKIT_POINTS_PER_NODE;
 
109
    points_per_hash= 1;
 
110
  }
 
111
  else
 
112
  {
 
113
    points_per_server= HASHKIT_POINTS_PER_NODE_WEIGHTED;
 
114
    points_per_hash= 4;
 
115
  }
 
116
 
 
117
  if (live_servers > hashkit->continuum_count)
 
118
  {
 
119
    hashkit_continuum_point_st *new_continuum;
 
120
 
 
121
    new_continuum= realloc(hashkit->continuum,
 
122
                           sizeof(hashkit_continuum_point_st) *
 
123
                           (live_servers + HASHKIT_CONTINUUM_ADDITION) *
 
124
                           points_per_server);
 
125
 
 
126
    if (new_continuum == NULL)
 
127
      return ENOMEM;
 
128
 
 
129
    hashkit->continuum= new_continuum;
 
130
    hashkit->continuum_count= live_servers + HASHKIT_CONTINUUM_ADDITION;
 
131
  }
 
132
 
 
133
  for (count= 0, context= hashkit->list; count < hashkit->list_size;
 
134
       count++, context+= hashkit->context_size)
 
135
  {
 
136
    if (hashkit->active_fn != NULL && hashkit->active_fn(context) == false)
 
137
      continue;
 
138
 
 
139
    if (hashkit->weight_fn != NULL)
 
140
    {
 
141
        float pct = (float)hashkit->weight_fn(context) / (float)total_weight;
 
142
        points_per_server= (uint32_t) ((floorf((float) (pct * HASHKIT_POINTS_PER_NODE_WEIGHTED / 4 * (float)live_servers + 0.0000000001))) * 4);
 
143
    }
 
144
 
 
145
    for (points_index= 0;
 
146
         points_index < points_per_server / points_per_hash;
 
147
         points_index++)
 
148
    {
 
149
      char sort_host[HASHKIT_CONTINUUM_KEY_SIZE]= "";
 
150
      size_t sort_host_length;
 
151
 
 
152
      if (hashkit->continuum_key_fn == NULL)
 
153
      {
 
154
        sort_host_length= (size_t) snprintf(sort_host, HASHKIT_CONTINUUM_KEY_SIZE, "%u",
 
155
                                            points_index);
 
156
      }
 
157
      else
 
158
      {
 
159
        sort_host_length= hashkit->continuum_key_fn(sort_host, HASHKIT_CONTINUUM_KEY_SIZE,
 
160
                                                 points_index, context);
 
161
      }
 
162
 
 
163
      if (hashkit->weight_fn == NULL)
 
164
      {
 
165
        if (hashkit->continuum_hash_fn == NULL)
 
166
          value= hashkit_default(sort_host, sort_host_length);
 
167
        else
 
168
          value= hashkit->continuum_hash_fn(sort_host, sort_host_length);
 
169
 
 
170
        hashkit->continuum[continuum_index].index= count;
 
171
        hashkit->continuum[continuum_index++].value= value;
 
172
      }
 
173
      else
 
174
      {
 
175
        unsigned int i;
 
176
        for (i = 0; i < points_per_hash; i++)
 
177
        {
 
178
           value= ketama_server_hash(sort_host, (uint32_t) sort_host_length, (int) i);
 
179
           hashkit->continuum[continuum_index].index= count;
 
180
           hashkit->continuum[continuum_index++].value= value;
 
181
        }
 
182
      }
 
183
    }
 
184
 
 
185
    points_count+= points_per_server;
 
186
  }
 
187
 
 
188
  hashkit->continuum_points_count= points_count;
 
189
  qsort(hashkit->continuum, hashkit->continuum_points_count, sizeof(hashkit_continuum_point_st),
 
190
        continuum_points_cmp);
 
191
 
 
192
  return 0;
 
193
}
 
194
#endif