~michaelforrest/use-case-mapper/trunk

« back to all changes in this revision

Viewing changes to vendor/rails/activesupport/lib/active_support/vendor/memcache-client-1.7.4/memcache.rb

  • Committer: Richard Lee (Canonical)
  • Date: 2010-10-15 15:17:58 UTC
  • mfrom: (190.1.3 use-case-mapper)
  • Revision ID: richard.lee@canonical.com-20101015151758-wcvmfxrexsongf9d
Merge

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
$TESTING = defined?($TESTING) && $TESTING
2
 
 
3
 
require 'socket'
4
 
require 'thread'
5
 
require 'zlib'
6
 
require 'digest/sha1'
7
 
require 'net/protocol'
8
 
 
9
 
##
10
 
# A Ruby client library for memcached.
11
 
#
12
 
 
13
 
class MemCache
14
 
 
15
 
  ##
16
 
  # The version of MemCache you are using.
17
 
 
18
 
  VERSION = '1.7.4'
19
 
 
20
 
  ##
21
 
  # Default options for the cache object.
22
 
 
23
 
  DEFAULT_OPTIONS = {
24
 
    :namespace   => nil,
25
 
    :readonly    => false,
26
 
    :multithread => true,
27
 
    :failover    => true,
28
 
    :timeout     => 0.5,
29
 
    :logger      => nil,
30
 
    :no_reply    => false,
31
 
  }
32
 
 
33
 
  ##
34
 
  # Default memcached port.
35
 
 
36
 
  DEFAULT_PORT = 11211
37
 
 
38
 
  ##
39
 
  # Default memcached server weight.
40
 
 
41
 
  DEFAULT_WEIGHT = 1
42
 
 
43
 
  ##
44
 
  # The namespace for this instance
45
 
 
46
 
  attr_reader :namespace
47
 
 
48
 
  ##
49
 
  # The multithread setting for this instance
50
 
 
51
 
  attr_reader :multithread
52
 
 
53
 
  ##
54
 
  # The servers this client talks to.  Play at your own peril.
55
 
 
56
 
  attr_reader :servers
57
 
 
58
 
  ##
59
 
  # Socket timeout limit with this client, defaults to 0.5 sec.
60
 
  # Set to nil to disable timeouts.
61
 
 
62
 
  attr_reader :timeout
63
 
 
64
 
  ##
65
 
  # Should the client try to failover to another server if the
66
 
  # first server is down?  Defaults to true.
67
 
 
68
 
  attr_reader :failover
69
 
 
70
 
  ##
71
 
  # Log debug/info/warn/error to the given Logger, defaults to nil.
72
 
 
73
 
  attr_reader :logger
74
 
 
75
 
  ##
76
 
  # Don't send or look for a reply from the memcached server for write operations.
77
 
  # Please note this feature only works in memcached 1.2.5 and later.  Earlier
78
 
  # versions will reply with "ERROR".
79
 
  attr_reader :no_reply
80
 
 
81
 
  ##
82
 
  # Accepts a list of +servers+ and a list of +opts+.  +servers+ may be
83
 
  # omitted.  See +servers=+ for acceptable server list arguments.
84
 
  #
85
 
  # Valid options for +opts+ are:
86
 
  #
87
 
  #   [:namespace]   Prepends this value to all keys added or retrieved.
88
 
  #   [:readonly]    Raises an exception on cache writes when true.
89
 
  #   [:multithread] Wraps cache access in a Mutex for thread safety. Defaults to true.
90
 
  #   [:failover]    Should the client try to failover to another server if the
91
 
  #                  first server is down?  Defaults to true.
92
 
  #   [:timeout]     Time to use as the socket read timeout.  Defaults to 0.5 sec,
93
 
  #                  set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8,
94
 
  #                  "gem install SystemTimer' to remove most of the penalty).
95
 
  #   [:logger]      Logger to use for info/debug output, defaults to nil
96
 
  #   [:no_reply]    Don't bother looking for a reply for write operations (i.e. they
97
 
  #                  become 'fire and forget'), memcached 1.2.5 and later only, speeds up
98
 
  #                  set/add/delete/incr/decr significantly.
99
 
  #
100
 
  # Other options are ignored.
101
 
 
102
 
  def initialize(*args)
103
 
    servers = []
104
 
    opts = {}
105
 
 
106
 
    case args.length
107
 
    when 0 then # NOP
108
 
    when 1 then
109
 
      arg = args.shift
110
 
      case arg
111
 
      when Hash   then opts = arg
112
 
      when Array  then servers = arg
113
 
      when String then servers = [arg]
114
 
      else raise ArgumentError, 'first argument must be Array, Hash or String'
115
 
      end
116
 
    when 2 then
117
 
      servers, opts = args
118
 
    else
119
 
      raise ArgumentError, "wrong number of arguments (#{args.length} for 2)"
120
 
    end
121
 
 
122
 
    opts = DEFAULT_OPTIONS.merge opts
123
 
    @namespace   = opts[:namespace]
124
 
    @readonly    = opts[:readonly]
125
 
    @multithread = opts[:multithread]
126
 
    @timeout     = opts[:timeout]
127
 
    @failover    = opts[:failover]
128
 
    @logger      = opts[:logger]
129
 
    @no_reply    = opts[:no_reply]
130
 
    @mutex       = Mutex.new if @multithread
131
 
 
132
 
    logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger
133
 
 
134
 
    Thread.current[:memcache_client] = self.object_id if !@multithread
135
 
 
136
 
    self.servers = servers
137
 
  end
138
 
 
139
 
  ##
140
 
  # Returns a string representation of the cache object.
141
 
 
142
 
  def inspect
143
 
    "<MemCache: %d servers, ns: %p, ro: %p>" %
144
 
      [@servers.length, @namespace, @readonly]
145
 
  end
146
 
 
147
 
  ##
148
 
  # Returns whether there is at least one active server for the object.
149
 
 
150
 
  def active?
151
 
    not @servers.empty?
152
 
  end
153
 
 
154
 
  ##
155
 
  # Returns whether or not the cache object was created read only.
156
 
 
157
 
  def readonly?
158
 
    @readonly
159
 
  end
160
 
 
161
 
  ##
162
 
  # Set the servers that the requests will be distributed between.  Entries
163
 
  # can be either strings of the form "hostname:port" or
164
 
  # "hostname:port:weight" or MemCache::Server objects.
165
 
  #
166
 
  def servers=(servers)
167
 
    # Create the server objects.
168
 
    @servers = Array(servers).collect do |server|
169
 
      case server
170
 
      when String
171
 
        host, port, weight = server.split ':', 3
172
 
        port ||= DEFAULT_PORT
173
 
        weight ||= DEFAULT_WEIGHT
174
 
        Server.new self, host, port, weight
175
 
      else
176
 
        server
177
 
      end
178
 
    end
179
 
 
180
 
    logger.debug { "Servers now: #{@servers.inspect}" } if logger
181
 
 
182
 
    # There's no point in doing this if there's only one server
183
 
    @continuum = create_continuum_for(@servers) if @servers.size > 1
184
 
 
185
 
    @servers
186
 
  end
187
 
 
188
 
  ##
189
 
  # Decrements the value for +key+ by +amount+ and returns the new value.
190
 
  # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
191
 
  # 0.  +key+ can not be decremented below 0.
192
 
 
193
 
  def decr(key, amount = 1)
194
 
    raise MemCacheError, "Update of readonly cache" if @readonly
195
 
    with_server(key) do |server, cache_key|
196
 
      cache_decr server, cache_key, amount
197
 
    end
198
 
  rescue TypeError => err
199
 
    handle_error nil, err
200
 
  end
201
 
 
202
 
  ##
203
 
  # Retrieves +key+ from memcache.  If +raw+ is false, the value will be
204
 
  # unmarshalled.
205
 
 
206
 
  def get(key, raw = false)
207
 
    with_server(key) do |server, cache_key|
208
 
      logger.debug { "get #{key} from #{server.inspect}" } if logger
209
 
      value = cache_get server, cache_key
210
 
      return nil if value.nil?
211
 
      value = Marshal.load value unless raw
212
 
      return value
213
 
    end
214
 
  rescue TypeError => err
215
 
    handle_error nil, err
216
 
  end
217
 
 
218
 
  ##
219
 
  # Performs a +get+ with the given +key+.  If 
220
 
  # the value does not exist and a block was given,
221
 
  # the block will be called and the result saved via +add+.
222
 
  #
223
 
  # If you do not provide a block, using this
224
 
  # method is the same as using +get+.
225
 
  #
226
 
  def fetch(key, expiry = 0, raw = false)
227
 
    value = get(key, raw)
228
 
 
229
 
    if value.nil? && block_given?
230
 
      value = yield
231
 
      add(key, value, expiry, raw)
232
 
    end
233
 
 
234
 
    value
235
 
  end
236
 
 
237
 
  ##
238
 
  # Retrieves multiple values from memcached in parallel, if possible.
239
 
  #
240
 
  # The memcached protocol supports the ability to retrieve multiple
241
 
  # keys in a single request.  Pass in an array of keys to this method
242
 
  # and it will:
243
 
  #
244
 
  # 1. map the key to the appropriate memcached server
245
 
  # 2. send a single request to each server that has one or more key values
246
 
  #
247
 
  # Returns a hash of values.
248
 
  #
249
 
  #   cache["a"] = 1
250
 
  #   cache["b"] = 2
251
 
  #   cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
252
 
  #
253
 
  # Note that get_multi assumes the values are marshalled.
254
 
 
255
 
  def get_multi(*keys)
256
 
    raise MemCacheError, 'No active servers' unless active?
257
 
 
258
 
    keys.flatten!
259
 
    key_count = keys.length
260
 
    cache_keys = {}
261
 
    server_keys = Hash.new { |h,k| h[k] = [] }
262
 
 
263
 
    # map keys to servers
264
 
    keys.each do |key|
265
 
      server, cache_key = request_setup key
266
 
      cache_keys[cache_key] = key
267
 
      server_keys[server] << cache_key
268
 
    end
269
 
 
270
 
    results = {}
271
 
 
272
 
    server_keys.each do |server, keys_for_server|
273
 
      keys_for_server_str = keys_for_server.join ' '
274
 
      begin
275
 
        values = cache_get_multi server, keys_for_server_str
276
 
        values.each do |key, value|
277
 
          results[cache_keys[key]] = Marshal.load value
278
 
        end
279
 
      rescue IndexError => e
280
 
        # Ignore this server and try the others
281
 
        logger.warn { "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger
282
 
      end
283
 
    end
284
 
 
285
 
    return results
286
 
  rescue TypeError => err
287
 
    handle_error nil, err
288
 
  end
289
 
 
290
 
  ##
291
 
  # Increments the value for +key+ by +amount+ and returns the new value.
292
 
  # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
293
 
  # 0.
294
 
 
295
 
  def incr(key, amount = 1)
296
 
    raise MemCacheError, "Update of readonly cache" if @readonly
297
 
    with_server(key) do |server, cache_key|
298
 
      cache_incr server, cache_key, amount
299
 
    end
300
 
  rescue TypeError => err
301
 
    handle_error nil, err
302
 
  end
303
 
 
304
 
  ##
305
 
  # Add +key+ to the cache with value +value+ that expires in +expiry+
306
 
  # seconds.  If +raw+ is true, +value+ will not be Marshalled.
307
 
  #
308
 
  # Warning: Readers should not call this method in the event of a cache miss;
309
 
  # see MemCache#add.
310
 
 
311
 
  ONE_MB = 1024 * 1024
312
 
 
313
 
  def set(key, value, expiry = 0, raw = false)
314
 
    raise MemCacheError, "Update of readonly cache" if @readonly
315
 
    with_server(key) do |server, cache_key|
316
 
 
317
 
      value = Marshal.dump value unless raw
318
 
      logger.debug { "set #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
319
 
 
320
 
      raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if value.to_s.size > ONE_MB
321
 
 
322
 
      command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
323
 
 
324
 
      with_socket_management(server) do |socket|
325
 
        socket.write command
326
 
        break nil if @no_reply
327
 
        result = socket.gets
328
 
        raise_on_error_response! result
329
 
 
330
 
        if result.nil?
331
 
          server.close
332
 
          raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
333
 
        end
334
 
 
335
 
        result
336
 
      end
337
 
    end
338
 
  end
339
 
 
340
 
  ##
341
 
  # "cas" is a check and set operation which means "store this data but
342
 
  # only if no one else has updated since I last fetched it."  This can
343
 
  # be used as a form of optimistic locking.
344
 
  #
345
 
  # Works in block form like so:
346
 
  #   cache.cas('some-key') do |value|
347
 
  #     value + 1
348
 
  #   end
349
 
  #
350
 
  # Returns:
351
 
  # +nil+ if the value was not found on the memcached server.
352
 
  # +STORED+ if the value was updated successfully
353
 
  # +EXISTS+ if the value was updated by someone else since last fetch
354
 
 
355
 
  def cas(key, expiry=0, raw=false)
356
 
    raise MemCacheError, "Update of readonly cache" if @readonly
357
 
    raise MemCacheError, "A block is required" unless block_given?
358
 
 
359
 
    (value, token) = gets(key, raw)
360
 
    return nil unless value
361
 
    updated = yield value
362
 
 
363
 
    with_server(key) do |server, cache_key|
364
 
 
365
 
      value = Marshal.dump updated unless raw
366
 
      logger.debug { "cas #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
367
 
      command = "cas #{cache_key} 0 #{expiry} #{value.to_s.size} #{token}#{noreply}\r\n#{value}\r\n"
368
 
 
369
 
      with_socket_management(server) do |socket|
370
 
        socket.write command
371
 
        break nil if @no_reply
372
 
        result = socket.gets
373
 
        raise_on_error_response! result
374
 
 
375
 
        if result.nil?
376
 
          server.close
377
 
          raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
378
 
        end
379
 
 
380
 
        result
381
 
      end
382
 
    end
383
 
  end
384
 
 
385
 
  ##
386
 
  # Add +key+ to the cache with value +value+ that expires in +expiry+
387
 
  # seconds, but only if +key+ does not already exist in the cache.
388
 
  # If +raw+ is true, +value+ will not be Marshalled.
389
 
  #
390
 
  # Readers should call this method in the event of a cache miss, not
391
 
  # MemCache#set.
392
 
 
393
 
  def add(key, value, expiry = 0, raw = false)
394
 
    raise MemCacheError, "Update of readonly cache" if @readonly
395
 
    with_server(key) do |server, cache_key|
396
 
      value = Marshal.dump value unless raw
397
 
      logger.debug { "add #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
398
 
      command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
399
 
 
400
 
      with_socket_management(server) do |socket|
401
 
        socket.write command
402
 
        break nil if @no_reply
403
 
        result = socket.gets
404
 
        raise_on_error_response! result
405
 
        result
406
 
      end
407
 
    end
408
 
  end
409
 
  
410
 
  ##
411
 
  # Add +key+ to the cache with value +value+ that expires in +expiry+
412
 
  # seconds, but only if +key+ already exists in the cache.
413
 
  # If +raw+ is true, +value+ will not be Marshalled.
414
 
  def replace(key, value, expiry = 0, raw = false)
415
 
    raise MemCacheError, "Update of readonly cache" if @readonly
416
 
    with_server(key) do |server, cache_key|
417
 
      value = Marshal.dump value unless raw
418
 
      logger.debug { "replace #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
419
 
      command = "replace #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
420
 
 
421
 
      with_socket_management(server) do |socket|
422
 
        socket.write command
423
 
        break nil if @no_reply
424
 
        result = socket.gets
425
 
        raise_on_error_response! result
426
 
        result
427
 
      end
428
 
    end
429
 
  end
430
 
 
431
 
  ##
432
 
  # Append - 'add this data to an existing key after existing data'
433
 
  # Please note the value is always passed to memcached as raw since it
434
 
  # doesn't make a lot of sense to concatenate marshalled data together.
435
 
  def append(key, value)
436
 
    raise MemCacheError, "Update of readonly cache" if @readonly
437
 
    with_server(key) do |server, cache_key|
438
 
      logger.debug { "append #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
439
 
      command = "append #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
440
 
 
441
 
      with_socket_management(server) do |socket|
442
 
        socket.write command
443
 
        break nil if @no_reply
444
 
        result = socket.gets
445
 
        raise_on_error_response! result
446
 
        result
447
 
      end
448
 
    end
449
 
  end
450
 
 
451
 
  ##
452
 
  # Prepend - 'add this data to an existing key before existing data'
453
 
  # Please note the value is always passed to memcached as raw since it
454
 
  # doesn't make a lot of sense to concatenate marshalled data together.
455
 
  def prepend(key, value)
456
 
    raise MemCacheError, "Update of readonly cache" if @readonly
457
 
    with_server(key) do |server, cache_key|
458
 
      logger.debug { "prepend #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
459
 
      command = "prepend #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
460
 
 
461
 
      with_socket_management(server) do |socket|
462
 
        socket.write command
463
 
        break nil if @no_reply
464
 
        result = socket.gets
465
 
        raise_on_error_response! result
466
 
        result
467
 
      end
468
 
    end
469
 
  end
470
 
 
471
 
  ##
472
 
  # Removes +key+ from the cache in +expiry+ seconds.
473
 
 
474
 
  def delete(key, expiry = 0)
475
 
    raise MemCacheError, "Update of readonly cache" if @readonly
476
 
    with_server(key) do |server, cache_key|
477
 
      with_socket_management(server) do |socket|
478
 
        logger.debug { "delete #{cache_key} on #{server}" } if logger
479
 
        socket.write "delete #{cache_key} #{expiry}#{noreply}\r\n"
480
 
        break nil if @no_reply
481
 
        result = socket.gets
482
 
        raise_on_error_response! result
483
 
        result
484
 
      end
485
 
    end
486
 
  end
487
 
 
488
 
  ##
489
 
  # Flush the cache from all memcache servers.
490
 
  # A non-zero value for +delay+ will ensure that the flush
491
 
  # is propogated slowly through your memcached server farm.
492
 
  # The Nth server will be flushed N*delay seconds from now,
493
 
  # asynchronously so this method returns quickly.
494
 
  # This prevents a huge database spike due to a total
495
 
  # flush all at once.
496
 
 
497
 
  def flush_all(delay=0)
498
 
    raise MemCacheError, 'No active servers' unless active?
499
 
    raise MemCacheError, "Update of readonly cache" if @readonly
500
 
 
501
 
    begin
502
 
      delay_time = 0
503
 
      @servers.each do |server|
504
 
        with_socket_management(server) do |socket|
505
 
          logger.debug { "flush_all #{delay_time} on #{server}" } if logger
506
 
          if delay == 0 # older versions of memcached will fail silently otherwise
507
 
            socket.write "flush_all#{noreply}\r\n"
508
 
          else
509
 
            socket.write "flush_all #{delay_time}#{noreply}\r\n"
510
 
          end
511
 
          break nil if @no_reply
512
 
          result = socket.gets
513
 
          raise_on_error_response! result
514
 
          result
515
 
        end
516
 
        delay_time += delay
517
 
      end
518
 
    rescue IndexError => err
519
 
      handle_error nil, err
520
 
    end
521
 
  end
522
 
 
523
 
  ##
524
 
  # Reset the connection to all memcache servers.  This should be called if
525
 
  # there is a problem with a cache lookup that might have left the connection
526
 
  # in a corrupted state.
527
 
 
528
 
  def reset
529
 
    @servers.each { |server| server.close }
530
 
  end
531
 
 
532
 
  ##
533
 
  # Returns statistics for each memcached server.  An explanation of the
534
 
  # statistics can be found in the memcached docs:
535
 
  #
536
 
  # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
537
 
  #
538
 
  # Example:
539
 
  #
540
 
  #   >> pp CACHE.stats
541
 
  #   {"localhost:11211"=>
542
 
  #     {"bytes"=>4718,
543
 
  #      "pid"=>20188,
544
 
  #      "connection_structures"=>4,
545
 
  #      "time"=>1162278121,
546
 
  #      "pointer_size"=>32,
547
 
  #      "limit_maxbytes"=>67108864,
548
 
  #      "cmd_get"=>14532,
549
 
  #      "version"=>"1.2.0",
550
 
  #      "bytes_written"=>432583,
551
 
  #      "cmd_set"=>32,
552
 
  #      "get_misses"=>0,
553
 
  #      "total_connections"=>19,
554
 
  #      "curr_connections"=>3,
555
 
  #      "curr_items"=>4,
556
 
  #      "uptime"=>1557,
557
 
  #      "get_hits"=>14532,
558
 
  #      "total_items"=>32,
559
 
  #      "rusage_system"=>0.313952,
560
 
  #      "rusage_user"=>0.119981,
561
 
  #      "bytes_read"=>190619}}
562
 
  #   => nil
563
 
 
564
 
  def stats
565
 
    raise MemCacheError, "No active servers" unless active?
566
 
    server_stats = {}
567
 
 
568
 
    @servers.each do |server|
569
 
      next unless server.alive?
570
 
 
571
 
      with_socket_management(server) do |socket|
572
 
        value = nil
573
 
        socket.write "stats\r\n"
574
 
        stats = {}
575
 
        while line = socket.gets do
576
 
          raise_on_error_response! line
577
 
          break if line == "END\r\n"
578
 
          if line =~ /\ASTAT ([\S]+) ([\w\.\:]+)/ then
579
 
            name, value = $1, $2
580
 
            stats[name] = case name
581
 
                          when 'version'
582
 
                            value
583
 
                          when 'rusage_user', 'rusage_system' then
584
 
                            seconds, microseconds = value.split(/:/, 2)
585
 
                            microseconds ||= 0
586
 
                            Float(seconds) + (Float(microseconds) / 1_000_000)
587
 
                          else
588
 
                            if value =~ /\A\d+\Z/ then
589
 
                              value.to_i
590
 
                            else
591
 
                              value
592
 
                            end
593
 
                          end
594
 
          end
595
 
        end
596
 
        server_stats["#{server.host}:#{server.port}"] = stats
597
 
      end
598
 
    end
599
 
 
600
 
    raise MemCacheError, "No active servers" if server_stats.empty?
601
 
    server_stats
602
 
  end
603
 
 
604
 
  ##
605
 
  # Shortcut to get a value from the cache.
606
 
 
607
 
  alias [] get
608
 
 
609
 
  ##
610
 
  # Shortcut to save a value in the cache.  This method does not set an
611
 
  # expiration on the entry.  Use set to specify an explicit expiry.
612
 
 
613
 
  def []=(key, value)
614
 
    set key, value
615
 
  end
616
 
 
617
 
  protected unless $TESTING
618
 
 
619
 
  ##
620
 
  # Create a key for the cache, incorporating the namespace qualifier if
621
 
  # requested.
622
 
 
623
 
  def make_cache_key(key)
624
 
    if namespace.nil? then
625
 
      key
626
 
    else
627
 
      "#{@namespace}:#{key}"
628
 
    end
629
 
  end
630
 
 
631
 
  ##
632
 
  # Returns an interoperable hash value for +key+.  (I think, docs are
633
 
  # sketchy for down servers).
634
 
 
635
 
  def hash_for(key)
636
 
    Zlib.crc32(key)
637
 
  end
638
 
 
639
 
  ##
640
 
  # Pick a server to handle the request based on a hash of the key.
641
 
 
642
 
  def get_server_for_key(key, options = {})
643
 
    raise ArgumentError, "illegal character in key #{key.inspect}" if
644
 
      key =~ /\s/
645
 
    raise ArgumentError, "key too long #{key.inspect}" if key.length > 250
646
 
    raise MemCacheError, "No servers available" if @servers.empty?
647
 
    return @servers.first if @servers.length == 1
648
 
 
649
 
    hkey = hash_for(key)
650
 
 
651
 
    20.times do |try|
652
 
      entryidx = Continuum.binary_search(@continuum, hkey)
653
 
      server = @continuum[entryidx].server
654
 
      return server if server.alive?
655
 
      break unless failover
656
 
      hkey = hash_for "#{try}#{key}"
657
 
    end
658
 
    
659
 
    raise MemCacheError, "No servers available"
660
 
  end
661
 
 
662
 
  ##
663
 
  # Performs a raw decr for +cache_key+ from +server+.  Returns nil if not
664
 
  # found.
665
 
 
666
 
  def cache_decr(server, cache_key, amount)
667
 
    with_socket_management(server) do |socket|
668
 
      socket.write "decr #{cache_key} #{amount}#{noreply}\r\n"
669
 
      break nil if @no_reply
670
 
      text = socket.gets
671
 
      raise_on_error_response! text
672
 
      return nil if text == "NOT_FOUND\r\n"
673
 
      return text.to_i
674
 
    end
675
 
  end
676
 
 
677
 
  ##
678
 
  # Fetches the raw data for +cache_key+ from +server+.  Returns nil on cache
679
 
  # miss.
680
 
 
681
 
  def cache_get(server, cache_key)
682
 
    with_socket_management(server) do |socket|
683
 
      socket.write "get #{cache_key}\r\n"
684
 
      keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
685
 
 
686
 
      if keyline.nil? then
687
 
        server.close
688
 
        raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
689
 
      end
690
 
 
691
 
      raise_on_error_response! keyline
692
 
      return nil if keyline == "END\r\n"
693
 
 
694
 
      unless keyline =~ /(\d+)\r/ then
695
 
        server.close
696
 
        raise MemCacheError, "unexpected response #{keyline.inspect}"
697
 
      end
698
 
      value = socket.read $1.to_i
699
 
      socket.read 2 # "\r\n"
700
 
      socket.gets   # "END\r\n"
701
 
      return value
702
 
    end
703
 
  end
704
 
 
705
 
  def gets(key, raw = false)
706
 
    with_server(key) do |server, cache_key|
707
 
      logger.debug { "gets #{key} from #{server.inspect}" } if logger
708
 
      result = with_socket_management(server) do |socket|
709
 
        socket.write "gets #{cache_key}\r\n"
710
 
        keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n"
711
 
 
712
 
        if keyline.nil? then
713
 
          server.close
714
 
          raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
715
 
        end
716
 
 
717
 
        raise_on_error_response! keyline
718
 
        return nil if keyline == "END\r\n"
719
 
 
720
 
        unless keyline =~ /(\d+) (\w+)\r/ then
721
 
          server.close
722
 
          raise MemCacheError, "unexpected response #{keyline.inspect}"
723
 
        end
724
 
        value = socket.read $1.to_i
725
 
        socket.read 2 # "\r\n"
726
 
        socket.gets   # "END\r\n"
727
 
        [value, $2]
728
 
      end
729
 
      result[0] = Marshal.load result[0] unless raw
730
 
      result
731
 
    end
732
 
  rescue TypeError => err
733
 
    handle_error nil, err
734
 
  end
735
 
 
736
 
 
737
 
  ##
738
 
  # Fetches +cache_keys+ from +server+ using a multi-get.
739
 
 
740
 
  def cache_get_multi(server, cache_keys)
741
 
    with_socket_management(server) do |socket|
742
 
      values = {}
743
 
      socket.write "get #{cache_keys}\r\n"
744
 
 
745
 
      while keyline = socket.gets do
746
 
        return values if keyline == "END\r\n"
747
 
        raise_on_error_response! keyline
748
 
 
749
 
        unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then
750
 
          server.close
751
 
          raise MemCacheError, "unexpected response #{keyline.inspect}"
752
 
        end
753
 
 
754
 
        key, data_length = $1, $3
755
 
        values[$1] = socket.read data_length.to_i
756
 
        socket.read(2) # "\r\n"
757
 
      end
758
 
 
759
 
      server.close
760
 
      raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
761
 
    end
762
 
  end
763
 
 
764
 
  ##
765
 
  # Performs a raw incr for +cache_key+ from +server+.  Returns nil if not
766
 
  # found.
767
 
 
768
 
  def cache_incr(server, cache_key, amount)
769
 
    with_socket_management(server) do |socket|
770
 
      socket.write "incr #{cache_key} #{amount}#{noreply}\r\n"
771
 
      break nil if @no_reply
772
 
      text = socket.gets
773
 
      raise_on_error_response! text
774
 
      return nil if text == "NOT_FOUND\r\n"
775
 
      return text.to_i
776
 
    end
777
 
  end
778
 
 
779
 
  ##
780
 
  # Gets or creates a socket connected to the given server, and yields it
781
 
  # to the block, wrapped in a mutex synchronization if @multithread is true.
782
 
  #
783
 
  # If a socket error (SocketError, SystemCallError, IOError) or protocol error
784
 
  # (MemCacheError) is raised by the block, closes the socket, attempts to
785
 
  # connect again, and retries the block (once).  If an error is again raised,
786
 
  # reraises it as MemCacheError.
787
 
  #
788
 
  # If unable to connect to the server (or if in the reconnect wait period),
789
 
  # raises MemCacheError.  Note that the socket connect code marks a server
790
 
  # dead for a timeout period, so retrying does not apply to connection attempt
791
 
  # failures (but does still apply to unexpectedly lost connections etc.).
792
 
 
793
 
  def with_socket_management(server, &block)
794
 
    check_multithread_status!
795
 
 
796
 
    @mutex.lock if @multithread
797
 
    retried = false
798
 
 
799
 
    begin
800
 
      socket = server.socket
801
 
 
802
 
      # Raise an IndexError to show this server is out of whack. If were inside
803
 
      # a with_server block, we'll catch it and attempt to restart the operation.
804
 
 
805
 
      raise IndexError, "No connection to server (#{server.status})" if socket.nil?
806
 
 
807
 
      block.call(socket)
808
 
 
809
 
    rescue SocketError, Errno::EAGAIN, Timeout::Error => err
810
 
      logger.warn { "Socket failure: #{err.message}" } if logger
811
 
      server.mark_dead(err)
812
 
      handle_error(server, err)
813
 
 
814
 
    rescue MemCacheError, SystemCallError, IOError => err
815
 
      logger.warn { "Generic failure: #{err.class.name}: #{err.message}" } if logger
816
 
      handle_error(server, err) if retried || socket.nil?
817
 
      retried = true
818
 
      retry
819
 
    end
820
 
  ensure
821
 
    @mutex.unlock if @multithread
822
 
  end
823
 
 
824
 
  def with_server(key)
825
 
    retried = false
826
 
    begin
827
 
      server, cache_key = request_setup(key)
828
 
      yield server, cache_key
829
 
    rescue IndexError => e
830
 
      logger.warn { "Server failed: #{e.class.name}: #{e.message}" } if logger
831
 
      if !retried && @servers.size > 1
832
 
        logger.info { "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger
833
 
        retried = true
834
 
        retry
835
 
      end
836
 
      handle_error(nil, e)
837
 
    end
838
 
  end
839
 
 
840
 
  ##
841
 
  # Handles +error+ from +server+.
842
 
 
843
 
  def handle_error(server, error)
844
 
    raise error if error.is_a?(MemCacheError)
845
 
    server.close if server
846
 
    new_error = MemCacheError.new error.message
847
 
    new_error.set_backtrace error.backtrace
848
 
    raise new_error
849
 
  end
850
 
 
851
 
  def noreply
852
 
    @no_reply ? ' noreply' : ''
853
 
  end
854
 
 
855
 
  ##
856
 
  # Performs setup for making a request with +key+ from memcached.  Returns
857
 
  # the server to fetch the key from and the complete key to use.
858
 
 
859
 
  def request_setup(key)
860
 
    raise MemCacheError, 'No active servers' unless active?
861
 
    cache_key = make_cache_key key
862
 
    server = get_server_for_key cache_key
863
 
    return server, cache_key
864
 
  end
865
 
 
866
 
  def raise_on_error_response!(response)
867
 
    if response =~ /\A(?:CLIENT_|SERVER_)?ERROR(.*)/
868
 
      raise MemCacheError, $1.strip
869
 
    end
870
 
  end
871
 
 
872
 
  def create_continuum_for(servers)
873
 
    total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
874
 
    continuum = []
875
 
 
876
 
    servers.each do |server|
877
 
      entry_count_for(server, servers.size, total_weight).times do |idx|
878
 
        hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
879
 
        value = Integer("0x#{hash[0..7]}")
880
 
        continuum << Continuum::Entry.new(value, server)
881
 
      end
882
 
    end
883
 
 
884
 
    continuum.sort { |a, b| a.value <=> b.value }
885
 
  end
886
 
 
887
 
  def entry_count_for(server, total_servers, total_weight)
888
 
    ((total_servers * Continuum::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
889
 
  end
890
 
 
891
 
  def check_multithread_status!
892
 
    return if @multithread
893
 
 
894
 
    if Thread.current[:memcache_client] != self.object_id
895
 
      raise MemCacheError, <<-EOM
896
 
        You are accessing this memcache-client instance from multiple threads but have not enabled multithread support.
897
 
        Normally:  MemCache.new(['localhost:11211'], :multithread => true)
898
 
        In Rails:  config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }]
899
 
      EOM
900
 
    end
901
 
  end
902
 
 
903
 
  ##
904
 
  # This class represents a memcached server instance.
905
 
 
906
 
  class Server
907
 
 
908
 
    ##
909
 
    # The amount of time to wait before attempting to re-establish a
910
 
    # connection with a server that is marked dead.
911
 
 
912
 
    RETRY_DELAY = 30.0
913
 
 
914
 
    ##
915
 
    # The host the memcached server is running on.
916
 
 
917
 
    attr_reader :host
918
 
 
919
 
    ##
920
 
    # The port the memcached server is listening on.
921
 
 
922
 
    attr_reader :port
923
 
 
924
 
    ##
925
 
    # The weight given to the server.
926
 
 
927
 
    attr_reader :weight
928
 
 
929
 
    ##
930
 
    # The time of next retry if the connection is dead.
931
 
 
932
 
    attr_reader :retry
933
 
 
934
 
    ##
935
 
    # A text status string describing the state of the server.
936
 
 
937
 
    attr_reader :status
938
 
 
939
 
    attr_reader :logger
940
 
 
941
 
    ##
942
 
    # Create a new MemCache::Server object for the memcached instance
943
 
    # listening on the given host and port, weighted by the given weight.
944
 
 
945
 
    def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT)
946
 
      raise ArgumentError, "No host specified" if host.nil? or host.empty?
947
 
      raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero?
948
 
 
949
 
      @host   = host
950
 
      @port   = port.to_i
951
 
      @weight = weight.to_i
952
 
 
953
 
      @sock   = nil
954
 
      @retry  = nil
955
 
      @status = 'NOT CONNECTED'
956
 
      @timeout = memcache.timeout
957
 
      @logger = memcache.logger
958
 
    end
959
 
 
960
 
    ##
961
 
    # Return a string representation of the server object.
962
 
 
963
 
    def inspect
964
 
      "<MemCache::Server: %s:%d [%d] (%s)>" % [@host, @port, @weight, @status]
965
 
    end
966
 
 
967
 
    ##
968
 
    # Check whether the server connection is alive.  This will cause the
969
 
    # socket to attempt to connect if it isn't already connected and or if
970
 
    # the server was previously marked as down and the retry time has
971
 
    # been exceeded.
972
 
 
973
 
    def alive?
974
 
      !!socket
975
 
    end
976
 
 
977
 
    ##
978
 
    # Try to connect to the memcached server targeted by this object.
979
 
    # Returns the connected socket object on success or nil on failure.
980
 
 
981
 
    def socket
982
 
      return @sock if @sock and not @sock.closed?
983
 
 
984
 
      @sock = nil
985
 
 
986
 
      # If the host was dead, don't retry for a while.
987
 
      return if @retry and @retry > Time.now
988
 
 
989
 
      # Attempt to connect if not already connected.
990
 
      begin
991
 
        @sock = connect_to(@host, @port, @timeout)
992
 
        @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
993
 
        @retry  = nil
994
 
        @status = 'CONNECTED'
995
 
      rescue SocketError, SystemCallError, IOError => err
996
 
        logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
997
 
        mark_dead err
998
 
      end
999
 
 
1000
 
      return @sock
1001
 
    end
1002
 
 
1003
 
    def connect_to(host, port, timeout=nil)
1004
 
      io = MemCache::BufferedIO.new(TCPSocket.new(host, port))
1005
 
      io.read_timeout = timeout
1006
 
      io
1007
 
    end
1008
 
 
1009
 
    ##
1010
 
    # Close the connection to the memcached server targeted by this
1011
 
    # object.  The server is not considered dead.
1012
 
 
1013
 
    def close
1014
 
      @sock.close if @sock && !@sock.closed?
1015
 
      @sock   = nil
1016
 
      @retry  = nil
1017
 
      @status = "NOT CONNECTED"
1018
 
    end
1019
 
 
1020
 
    ##
1021
 
    # Mark the server as dead and close its socket.
1022
 
 
1023
 
    def mark_dead(error)
1024
 
      @sock.close if @sock && !@sock.closed?
1025
 
      @sock   = nil
1026
 
      @retry  = Time.now + RETRY_DELAY
1027
 
 
1028
 
      reason = "#{error.class.name}: #{error.message}"
1029
 
      @status = sprintf "%s:%s DEAD (%s), will retry at %s", @host, @port, reason, @retry
1030
 
      @logger.info { @status } if @logger
1031
 
    end
1032
 
 
1033
 
  end
1034
 
 
1035
 
  ##
1036
 
  # Base MemCache exception class.
1037
 
 
1038
 
  class MemCacheError < RuntimeError; end
1039
 
 
1040
 
  class BufferedIO < Net::BufferedIO # :nodoc:
1041
 
    BUFSIZE = 1024 * 16
1042
 
 
1043
 
    # An implementation similar to this is in *trunk* for 1.9.  When it
1044
 
    # gets released, this method can be removed when using 1.9
1045
 
    def rbuf_fill
1046
 
      begin
1047
 
        @rbuf << @io.read_nonblock(BUFSIZE)
1048
 
      rescue Errno::EWOULDBLOCK
1049
 
        retry unless @read_timeout
1050
 
        if IO.select([@io], nil, nil, @read_timeout)
1051
 
          retry
1052
 
        else
1053
 
          raise Timeout::Error, 'IO timeout'
1054
 
        end
1055
 
      end
1056
 
    end
1057
 
 
1058
 
    def setsockopt *args
1059
 
      @io.setsockopt *args
1060
 
    end
1061
 
 
1062
 
    def gets
1063
 
      readuntil("\n")
1064
 
    end
1065
 
  end
1066
 
 
1067
 
end
1068
 
 
1069
 
module Continuum
1070
 
  POINTS_PER_SERVER = 160 # this is the default in libmemcached
1071
 
 
1072
 
  # Find the closest index in Continuum with value <= the given value
1073
 
  def self.binary_search(ary, value, &block)
1074
 
    upper = ary.size - 1
1075
 
    lower = 0
1076
 
    idx = 0
1077
 
 
1078
 
    while(lower <= upper) do
1079
 
      idx = (lower + upper) / 2
1080
 
      comp = ary[idx].value <=> value
1081
 
 
1082
 
      if comp == 0
1083
 
        return idx
1084
 
      elsif comp > 0
1085
 
        upper = idx - 1
1086
 
      else
1087
 
        lower = idx + 1
1088
 
      end
1089
 
    end
1090
 
    return upper
1091
 
  end
1092
 
 
1093
 
  class Entry
1094
 
    attr_reader :value
1095
 
    attr_reader :server
1096
 
 
1097
 
    def initialize(val, srv)
1098
 
      @value = val
1099
 
      @server = srv
1100
 
    end
1101
 
 
1102
 
    def inspect
1103
 
      "<#{value}, #{server.host}:#{server.port}>"
1104
 
    end
1105
 
  end
1106
 
 
1107
 
end