~michaelforrest/use-case-mapper/trunk

« back to all changes in this revision

Viewing changes to vendor/rails/activesupport/lib/active_support/vendor/memcache-client-1.7.4/memcache.rb

  • Committer: Michael Forrest
  • Date: 2010-10-15 16:28:50 UTC
  • Revision ID: michael.forrest@canonical.com-20101015162850-tj2vchanv0kr0dun
refrozeĀ gems

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
$TESTING = defined?($TESTING) && $TESTING
 
2
 
 
3
require 'socket'
 
4
require 'thread'
 
5
require 'zlib'
 
6
require 'digest/sha1'
 
7
require 'net/protocol'
 
8
 
 
9
##
 
10
# A Ruby client library for memcached.
 
11
#
 
12
 
 
13
class MemCache
 
14
 
 
15
  ##
 
16
  # The version of MemCache you are using.
 
17
 
 
18
  VERSION = '1.7.4'
 
19
 
 
20
  ##
 
21
  # Default options for the cache object.
 
22
 
 
23
  DEFAULT_OPTIONS = {
 
24
    :namespace   => nil,
 
25
    :readonly    => false,
 
26
    :multithread => true,
 
27
    :failover    => true,
 
28
    :timeout     => 0.5,
 
29
    :logger      => nil,
 
30
    :no_reply    => false,
 
31
  }
 
32
 
 
33
  ##
 
34
  # Default memcached port.
 
35
 
 
36
  DEFAULT_PORT = 11211
 
37
 
 
38
  ##
 
39
  # Default memcached server weight.
 
40
 
 
41
  DEFAULT_WEIGHT = 1
 
42
 
 
43
  ##
 
44
  # The namespace for this instance
 
45
 
 
46
  attr_reader :namespace
 
47
 
 
48
  ##
 
49
  # The multithread setting for this instance
 
50
 
 
51
  attr_reader :multithread
 
52
 
 
53
  ##
 
54
  # The servers this client talks to.  Play at your own peril.
 
55
 
 
56
  attr_reader :servers
 
57
 
 
58
  ##
 
59
  # Socket timeout limit with this client, defaults to 0.5 sec.
 
60
  # Set to nil to disable timeouts.
 
61
 
 
62
  attr_reader :timeout
 
63
 
 
64
  ##
 
65
  # Should the client try to failover to another server if the
 
66
  # first server is down?  Defaults to true.
 
67
 
 
68
  attr_reader :failover
 
69
 
 
70
  ##
 
71
  # Log debug/info/warn/error to the given Logger, defaults to nil.
 
72
 
 
73
  attr_reader :logger
 
74
 
 
75
  ##
 
76
  # Don't send or look for a reply from the memcached server for write operations.
 
77
  # Please note this feature only works in memcached 1.2.5 and later.  Earlier
 
78
  # versions will reply with "ERROR".
 
79
  attr_reader :no_reply
 
80
 
 
81
  ##
 
82
  # Accepts a list of +servers+ and a list of +opts+.  +servers+ may be
 
83
  # omitted.  See +servers=+ for acceptable server list arguments.
 
84
  #
 
85
  # Valid options for +opts+ are:
 
86
  #
 
87
  #   [:namespace]   Prepends this value to all keys added or retrieved.
 
88
  #   [:readonly]    Raises an exception on cache writes when true.
 
89
  #   [:multithread] Wraps cache access in a Mutex for thread safety. Defaults to true.
 
90
  #   [:failover]    Should the client try to failover to another server if the
 
91
  #                  first server is down?  Defaults to true.
 
92
  #   [:timeout]     Time to use as the socket read timeout.  Defaults to 0.5 sec,
 
93
  #                  set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8,
 
94
  #                  "gem install SystemTimer' to remove most of the penalty).
 
95
  #   [:logger]      Logger to use for info/debug output, defaults to nil
 
96
  #   [:no_reply]    Don't bother looking for a reply for write operations (i.e. they
 
97
  #                  become 'fire and forget'), memcached 1.2.5 and later only, speeds up
 
98
  #                  set/add/delete/incr/decr significantly.
 
99
  #
 
100
  # Other options are ignored.
 
101
 
 
102
  def initialize(*args)
 
103
    servers = []
 
104
    opts = {}
 
105
 
 
106
    case args.length
 
107
    when 0 then # NOP
 
108
    when 1 then
 
109
      arg = args.shift
 
110
      case arg
 
111
      when Hash   then opts = arg
 
112
      when Array  then servers = arg
 
113
      when String then servers = [arg]
 
114
      else raise ArgumentError, 'first argument must be Array, Hash or String'
 
115
      end
 
116
    when 2 then
 
117
      servers, opts = args
 
118
    else
 
119
      raise ArgumentError, "wrong number of arguments (#{args.length} for 2)"
 
120
    end
 
121
 
 
122
    opts = DEFAULT_OPTIONS.merge opts
 
123
    @namespace   = opts[:namespace]
 
124
    @readonly    = opts[:readonly]
 
125
    @multithread = opts[:multithread]
 
126
    @timeout     = opts[:timeout]
 
127
    @failover    = opts[:failover]
 
128
    @logger      = opts[:logger]
 
129
    @no_reply    = opts[:no_reply]
 
130
    @mutex       = Mutex.new if @multithread
 
131
 
 
132
    logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger
 
133
 
 
134
    Thread.current[:memcache_client] = self.object_id if !@multithread
 
135
 
 
136
    self.servers = servers
 
137
  end
 
138
 
 
139
  ##
 
140
  # Returns a string representation of the cache object.
 
141
 
 
142
  def inspect
 
143
    "<MemCache: %d servers, ns: %p, ro: %p>" %
 
144
      [@servers.length, @namespace, @readonly]
 
145
  end
 
146
 
 
147
  ##
 
148
  # Returns whether there is at least one active server for the object.
 
149
 
 
150
  def active?
 
151
    not @servers.empty?
 
152
  end
 
153
 
 
154
  ##
 
155
  # Returns whether or not the cache object was created read only.
 
156
 
 
157
  def readonly?
 
158
    @readonly
 
159
  end
 
160
 
 
161
  ##
 
162
  # Set the servers that the requests will be distributed between.  Entries
 
163
  # can be either strings of the form "hostname:port" or
 
164
  # "hostname:port:weight" or MemCache::Server objects.
 
165
  #
 
166
  def servers=(servers)
 
167
    # Create the server objects.
 
168
    @servers = Array(servers).collect do |server|
 
169
      case server
 
170
      when String
 
171
        host, port, weight = server.split ':', 3
 
172
        port ||= DEFAULT_PORT
 
173
        weight ||= DEFAULT_WEIGHT
 
174
        Server.new self, host, port, weight
 
175
      else
 
176
        server
 
177
      end
 
178
    end
 
179
 
 
180
    logger.debug { "Servers now: #{@servers.inspect}" } if logger
 
181
 
 
182
    # There's no point in doing this if there's only one server
 
183
    @continuum = create_continuum_for(@servers) if @servers.size > 1
 
184
 
 
185
    @servers
 
186
  end
 
187
 
 
188
  ##
 
189
  # Decrements the value for +key+ by +amount+ and returns the new value.
 
190
  # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
 
191
  # 0.  +key+ can not be decremented below 0.
 
192
 
 
193
  def decr(key, amount = 1)
 
194
    raise MemCacheError, "Update of readonly cache" if @readonly
 
195
    with_server(key) do |server, cache_key|
 
196
      cache_decr server, cache_key, amount
 
197
    end
 
198
  rescue TypeError => err
 
199
    handle_error nil, err
 
200
  end
 
201
 
 
202
  ##
 
203
  # Retrieves +key+ from memcache.  If +raw+ is false, the value will be
 
204
  # unmarshalled.
 
205
 
 
206
  def get(key, raw = false)
 
207
    with_server(key) do |server, cache_key|
 
208
      logger.debug { "get #{key} from #{server.inspect}" } if logger
 
209
      value = cache_get server, cache_key
 
210
      return nil if value.nil?
 
211
      value = Marshal.load value unless raw
 
212
      return value
 
213
    end
 
214
  rescue TypeError => err
 
215
    handle_error nil, err
 
216
  end
 
217
 
 
218
  ##
 
219
  # Performs a +get+ with the given +key+.  If 
 
220
  # the value does not exist and a block was given,
 
221
  # the block will be called and the result saved via +add+.
 
222
  #
 
223
  # If you do not provide a block, using this
 
224
  # method is the same as using +get+.
 
225
  #
 
226
  def fetch(key, expiry = 0, raw = false)
 
227
    value = get(key, raw)
 
228
 
 
229
    if value.nil? && block_given?
 
230
      value = yield
 
231
      add(key, value, expiry, raw)
 
232
    end
 
233
 
 
234
    value
 
235
  end
 
236
 
 
237
  ##
 
238
  # Retrieves multiple values from memcached in parallel, if possible.
 
239
  #
 
240
  # The memcached protocol supports the ability to retrieve multiple
 
241
  # keys in a single request.  Pass in an array of keys to this method
 
242
  # and it will:
 
243
  #
 
244
  # 1. map the key to the appropriate memcached server
 
245
  # 2. send a single request to each server that has one or more key values
 
246
  #
 
247
  # Returns a hash of values.
 
248
  #
 
249
  #   cache["a"] = 1
 
250
  #   cache["b"] = 2
 
251
  #   cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
 
252
  #
 
253
  # Note that get_multi assumes the values are marshalled.
 
254
 
 
255
  def get_multi(*keys)
 
256
    raise MemCacheError, 'No active servers' unless active?
 
257
 
 
258
    keys.flatten!
 
259
    key_count = keys.length
 
260
    cache_keys = {}
 
261
    server_keys = Hash.new { |h,k| h[k] = [] }
 
262
 
 
263
    # map keys to servers
 
264
    keys.each do |key|
 
265
      server, cache_key = request_setup key
 
266
      cache_keys[cache_key] = key
 
267
      server_keys[server] << cache_key
 
268
    end
 
269
 
 
270
    results = {}
 
271
 
 
272
    server_keys.each do |server, keys_for_server|
 
273
      keys_for_server_str = keys_for_server.join ' '
 
274
      begin
 
275
        values = cache_get_multi server, keys_for_server_str
 
276
        values.each do |key, value|
 
277
          results[cache_keys[key]] = Marshal.load value
 
278
        end
 
279
      rescue IndexError => e
 
280
        # Ignore this server and try the others
 
281
        logger.warn { "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger
 
282
      end
 
283
    end
 
284
 
 
285
    return results
 
286
  rescue TypeError => err
 
287
    handle_error nil, err
 
288
  end
 
289
 
 
290
  ##
 
291
  # Increments the value for +key+ by +amount+ and returns the new value.
 
292
  # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
 
293
  # 0.
 
294
 
 
295
  def incr(key, amount = 1)
 
296
    raise MemCacheError, "Update of readonly cache" if @readonly
 
297
    with_server(key) do |server, cache_key|
 
298
      cache_incr server, cache_key, amount
 
299
    end
 
300
  rescue TypeError => err
 
301
    handle_error nil, err
 
302
  end
 
303
 
 
304
  ##
 
305
  # Add +key+ to the cache with value +value+ that expires in +expiry+
 
306
  # seconds.  If +raw+ is true, +value+ will not be Marshalled.
 
307
  #
 
308
  # Warning: Readers should not call this method in the event of a cache miss;
 
309
  # see MemCache#add.
 
310
 
 
311
  ONE_MB = 1024 * 1024
 
312
 
 
313
  def set(key, value, expiry = 0, raw = false)
 
314
    raise MemCacheError, "Update of readonly cache" if @readonly
 
315
    with_server(key) do |server, cache_key|
 
316
 
 
317
      value = Marshal.dump value unless raw
 
318
      logger.debug { "set #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
 
319
 
 
320
      raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if value.to_s.size > ONE_MB
 
321
 
 
322
      command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 
323
 
 
324
      with_socket_management(server) do |socket|
 
325
        socket.write command
 
326
        break nil if @no_reply
 
327
        result = socket.gets
 
328
        raise_on_error_response! result
 
329
 
 
330
        if result.nil?
 
331
          server.close
 
332
          raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 
333
        end
 
334
 
 
335
        result
 
336
      end
 
337
    end
 
338
  end
 
339
 
 
340
  ##
 
341
  # "cas" is a check and set operation which means "store this data but
 
342
  # only if no one else has updated since I last fetched it."  This can
 
343
  # be used as a form of optimistic locking.
 
344
  #
 
345
  # Works in block form like so:
 
346
  #   cache.cas('some-key') do |value|
 
347
  #     value + 1
 
348
  #   end
 
349
  #
 
350
  # Returns:
 
351
  # +nil+ if the value was not found on the memcached server.
 
352
  # +STORED+ if the value was updated successfully
 
353
  # +EXISTS+ if the value was updated by someone else since last fetch
 
354
 
 
355
  def cas(key, expiry=0, raw=false)
 
356
    raise MemCacheError, "Update of readonly cache" if @readonly
 
357
    raise MemCacheError, "A block is required" unless block_given?
 
358
 
 
359
    (value, token) = gets(key, raw)
 
360
    return nil unless value
 
361
    updated = yield value
 
362
 
 
363
    with_server(key) do |server, cache_key|
 
364
 
 
365
      value = Marshal.dump updated unless raw
 
366
      logger.debug { "cas #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
 
367
      command = "cas #{cache_key} 0 #{expiry} #{value.to_s.size} #{token}#{noreply}\r\n#{value}\r\n"
 
368
 
 
369
      with_socket_management(server) do |socket|
 
370
        socket.write command
 
371
        break nil if @no_reply
 
372
        result = socket.gets
 
373
        raise_on_error_response! result
 
374
 
 
375
        if result.nil?
 
376
          server.close
 
377
          raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 
378
        end
 
379
 
 
380
        result
 
381
      end
 
382
    end
 
383
  end
 
384
 
 
385
  ##
 
386
  # Add +key+ to the cache with value +value+ that expires in +expiry+
 
387
  # seconds, but only if +key+ does not already exist in the cache.
 
388
  # If +raw+ is true, +value+ will not be Marshalled.
 
389
  #
 
390
  # Readers should call this method in the event of a cache miss, not
 
391
  # MemCache#set.
 
392
 
 
393
  def add(key, value, expiry = 0, raw = false)
 
394
    raise MemCacheError, "Update of readonly cache" if @readonly
 
395
    with_server(key) do |server, cache_key|
 
396
      value = Marshal.dump value unless raw
 
397
      logger.debug { "add #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 
398
      command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 
399
 
 
400
      with_socket_management(server) do |socket|
 
401
        socket.write command
 
402
        break nil if @no_reply
 
403
        result = socket.gets
 
404
        raise_on_error_response! result
 
405
        result
 
406
      end
 
407
    end
 
408
  end
 
409
  
 
410
  ##
 
411
  # Add +key+ to the cache with value +value+ that expires in +expiry+
 
412
  # seconds, but only if +key+ already exists in the cache.
 
413
  # If +raw+ is true, +value+ will not be Marshalled.
 
414
  def replace(key, value, expiry = 0, raw = false)
 
415
    raise MemCacheError, "Update of readonly cache" if @readonly
 
416
    with_server(key) do |server, cache_key|
 
417
      value = Marshal.dump value unless raw
 
418
      logger.debug { "replace #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 
419
      command = "replace #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 
420
 
 
421
      with_socket_management(server) do |socket|
 
422
        socket.write command
 
423
        break nil if @no_reply
 
424
        result = socket.gets
 
425
        raise_on_error_response! result
 
426
        result
 
427
      end
 
428
    end
 
429
  end
 
430
 
 
431
  ##
 
432
  # Append - 'add this data to an existing key after existing data'
 
433
  # Please note the value is always passed to memcached as raw since it
 
434
  # doesn't make a lot of sense to concatenate marshalled data together.
 
435
  def append(key, value)
 
436
    raise MemCacheError, "Update of readonly cache" if @readonly
 
437
    with_server(key) do |server, cache_key|
 
438
      logger.debug { "append #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 
439
      command = "append #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 
440
 
 
441
      with_socket_management(server) do |socket|
 
442
        socket.write command
 
443
        break nil if @no_reply
 
444
        result = socket.gets
 
445
        raise_on_error_response! result
 
446
        result
 
447
      end
 
448
    end
 
449
  end
 
450
 
 
451
  ##
 
452
  # Prepend - 'add this data to an existing key before existing data'
 
453
  # Please note the value is always passed to memcached as raw since it
 
454
  # doesn't make a lot of sense to concatenate marshalled data together.
 
455
  def prepend(key, value)
 
456
    raise MemCacheError, "Update of readonly cache" if @readonly
 
457
    with_server(key) do |server, cache_key|
 
458
      logger.debug { "prepend #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 
459
      command = "prepend #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 
460
 
 
461
      with_socket_management(server) do |socket|
 
462
        socket.write command
 
463
        break nil if @no_reply
 
464
        result = socket.gets
 
465
        raise_on_error_response! result
 
466
        result
 
467
      end
 
468
    end
 
469
  end
 
470
 
 
471
  ##
 
472
  # Removes +key+ from the cache in +expiry+ seconds.
 
473
 
 
474
  def delete(key, expiry = 0)
 
475
    raise MemCacheError, "Update of readonly cache" if @readonly
 
476
    with_server(key) do |server, cache_key|
 
477
      with_socket_management(server) do |socket|
 
478
        logger.debug { "delete #{cache_key} on #{server}" } if logger
 
479
        socket.write "delete #{cache_key} #{expiry}#{noreply}\r\n"
 
480
        break nil if @no_reply
 
481
        result = socket.gets
 
482
        raise_on_error_response! result
 
483
        result
 
484
      end
 
485
    end
 
486
  end
 
487
 
 
488
  ##
 
489
  # Flush the cache from all memcache servers.
 
490
  # A non-zero value for +delay+ will ensure that the flush
 
491
  # is propogated slowly through your memcached server farm.
 
492
  # The Nth server will be flushed N*delay seconds from now,
 
493
  # asynchronously so this method returns quickly.
 
494
  # This prevents a huge database spike due to a total
 
495
  # flush all at once.
 
496
 
 
497
  def flush_all(delay=0)
 
498
    raise MemCacheError, 'No active servers' unless active?
 
499
    raise MemCacheError, "Update of readonly cache" if @readonly
 
500
 
 
501
    begin
 
502
      delay_time = 0
 
503
      @servers.each do |server|
 
504
        with_socket_management(server) do |socket|
 
505
          logger.debug { "flush_all #{delay_time} on #{server}" } if logger
 
506
          if delay == 0 # older versions of memcached will fail silently otherwise
 
507
            socket.write "flush_all#{noreply}\r\n"
 
508
          else
 
509
            socket.write "flush_all #{delay_time}#{noreply}\r\n"
 
510
          end
 
511
          break nil if @no_reply
 
512
          result = socket.gets
 
513
          raise_on_error_response! result
 
514
          result
 
515
        end
 
516
        delay_time += delay
 
517
      end
 
518
    rescue IndexError => err
 
519
      handle_error nil, err
 
520
    end
 
521
  end
 
522
 
 
523
  ##
 
524
  # Reset the connection to all memcache servers.  This should be called if
 
525
  # there is a problem with a cache lookup that might have left the connection
 
526
  # in a corrupted state.
 
527
 
 
528
  def reset
 
529
    @servers.each { |server| server.close }
 
530
  end
 
531
 
 
532
  ##
 
533
  # Returns statistics for each memcached server.  An explanation of the
 
534
  # statistics can be found in the memcached docs:
 
535
  #
 
536
  # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
 
537
  #
 
538
  # Example:
 
539
  #
 
540
  #   >> pp CACHE.stats
 
541
  #   {"localhost:11211"=>
 
542
  #     {"bytes"=>4718,
 
543
  #      "pid"=>20188,
 
544
  #      "connection_structures"=>4,
 
545
  #      "time"=>1162278121,
 
546
  #      "pointer_size"=>32,
 
547
  #      "limit_maxbytes"=>67108864,
 
548
  #      "cmd_get"=>14532,
 
549
  #      "version"=>"1.2.0",
 
550
  #      "bytes_written"=>432583,
 
551
  #      "cmd_set"=>32,
 
552
  #      "get_misses"=>0,
 
553
  #      "total_connections"=>19,
 
554
  #      "curr_connections"=>3,
 
555
  #      "curr_items"=>4,
 
556
  #      "uptime"=>1557,
 
557
  #      "get_hits"=>14532,
 
558
  #      "total_items"=>32,
 
559
  #      "rusage_system"=>0.313952,
 
560
  #      "rusage_user"=>0.119981,
 
561
  #      "bytes_read"=>190619}}
 
562
  #   => nil
 
563
 
 
564
  def stats
 
565
    raise MemCacheError, "No active servers" unless active?
 
566
    server_stats = {}
 
567
 
 
568
    @servers.each do |server|
 
569
      next unless server.alive?
 
570
 
 
571
      with_socket_management(server) do |socket|
 
572
        value = nil
 
573
        socket.write "stats\r\n"
 
574
        stats = {}
 
575
        while line = socket.gets do
 
576
          raise_on_error_response! line
 
577
          break if line == "END\r\n"
 
578
          if line =~ /\ASTAT ([\S]+) ([\w\.\:]+)/ then
 
579
            name, value = $1, $2
 
580
            stats[name] = case name
 
581
                          when 'version'
 
582
                            value
 
583
                          when 'rusage_user', 'rusage_system' then
 
584
                            seconds, microseconds = value.split(/:/, 2)
 
585
                            microseconds ||= 0
 
586
                            Float(seconds) + (Float(microseconds) / 1_000_000)
 
587
                          else
 
588
                            if value =~ /\A\d+\Z/ then
 
589
                              value.to_i
 
590
                            else
 
591
                              value
 
592
                            end
 
593
                          end
 
594
          end
 
595
        end
 
596
        server_stats["#{server.host}:#{server.port}"] = stats
 
597
      end
 
598
    end
 
599
 
 
600
    raise MemCacheError, "No active servers" if server_stats.empty?
 
601
    server_stats
 
602
  end
 
603
 
 
604
  ##
 
605
  # Shortcut to get a value from the cache.
 
606
 
 
607
  alias [] get
 
608
 
 
609
  ##
 
610
  # Shortcut to save a value in the cache.  This method does not set an
 
611
  # expiration on the entry.  Use set to specify an explicit expiry.
 
612
 
 
613
  def []=(key, value)
 
614
    set key, value
 
615
  end
 
616
 
 
617
  protected unless $TESTING
 
618
 
 
619
  ##
 
620
  # Create a key for the cache, incorporating the namespace qualifier if
 
621
  # requested.
 
622
 
 
623
  def make_cache_key(key)
 
624
    if namespace.nil? then
 
625
      key
 
626
    else
 
627
      "#{@namespace}:#{key}"
 
628
    end
 
629
  end
 
630
 
 
631
  ##
 
632
  # Returns an interoperable hash value for +key+.  (I think, docs are
 
633
  # sketchy for down servers).
 
634
 
 
635
  def hash_for(key)
 
636
    Zlib.crc32(key)
 
637
  end
 
638
 
 
639
  ##
 
640
  # Pick a server to handle the request based on a hash of the key.
 
641
 
 
642
  def get_server_for_key(key, options = {})
 
643
    raise ArgumentError, "illegal character in key #{key.inspect}" if
 
644
      key =~ /\s/
 
645
    raise ArgumentError, "key too long #{key.inspect}" if key.length > 250
 
646
    raise MemCacheError, "No servers available" if @servers.empty?
 
647
    return @servers.first if @servers.length == 1
 
648
 
 
649
    hkey = hash_for(key)
 
650
 
 
651
    20.times do |try|
 
652
      entryidx = Continuum.binary_search(@continuum, hkey)
 
653
      server = @continuum[entryidx].server
 
654
      return server if server.alive?
 
655
      break unless failover
 
656
      hkey = hash_for "#{try}#{key}"
 
657
    end
 
658
    
 
659
    raise MemCacheError, "No servers available"
 
660
  end
 
661
 
 
662
  ##
 
663
  # Performs a raw decr for +cache_key+ from +server+.  Returns nil if not
 
664
  # found.
 
665
 
 
666
  def cache_decr(server, cache_key, amount)
 
667
    with_socket_management(server) do |socket|
 
668
      socket.write "decr #{cache_key} #{amount}#{noreply}\r\n"
 
669
      break nil if @no_reply
 
670
      text = socket.gets
 
671
      raise_on_error_response! text
 
672
      return nil if text == "NOT_FOUND\r\n"
 
673
      return text.to_i
 
674
    end
 
675
  end
 
676
 
 
677
  ##
 
678
  # Fetches the raw data for +cache_key+ from +server+.  Returns nil on cache
 
679
  # miss.
 
680
 
 
681
  def cache_get(server, cache_key)
 
682
    with_socket_management(server) do |socket|
 
683
      socket.write "get #{cache_key}\r\n"
 
684
      keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
 
685
 
 
686
      if keyline.nil? then
 
687
        server.close
 
688
        raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 
689
      end
 
690
 
 
691
      raise_on_error_response! keyline
 
692
      return nil if keyline == "END\r\n"
 
693
 
 
694
      unless keyline =~ /(\d+)\r/ then
 
695
        server.close
 
696
        raise MemCacheError, "unexpected response #{keyline.inspect}"
 
697
      end
 
698
      value = socket.read $1.to_i
 
699
      socket.read 2 # "\r\n"
 
700
      socket.gets   # "END\r\n"
 
701
      return value
 
702
    end
 
703
  end
 
704
 
 
705
  def gets(key, raw = false)
 
706
    with_server(key) do |server, cache_key|
 
707
      logger.debug { "gets #{key} from #{server.inspect}" } if logger
 
708
      result = with_socket_management(server) do |socket|
 
709
        socket.write "gets #{cache_key}\r\n"
 
710
        keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n"
 
711
 
 
712
        if keyline.nil? then
 
713
          server.close
 
714
          raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 
715
        end
 
716
 
 
717
        raise_on_error_response! keyline
 
718
        return nil if keyline == "END\r\n"
 
719
 
 
720
        unless keyline =~ /(\d+) (\w+)\r/ then
 
721
          server.close
 
722
          raise MemCacheError, "unexpected response #{keyline.inspect}"
 
723
        end
 
724
        value = socket.read $1.to_i
 
725
        socket.read 2 # "\r\n"
 
726
        socket.gets   # "END\r\n"
 
727
        [value, $2]
 
728
      end
 
729
      result[0] = Marshal.load result[0] unless raw
 
730
      result
 
731
    end
 
732
  rescue TypeError => err
 
733
    handle_error nil, err
 
734
  end
 
735
 
 
736
 
 
737
  ##
 
738
  # Fetches +cache_keys+ from +server+ using a multi-get.
 
739
 
 
740
  def cache_get_multi(server, cache_keys)
 
741
    with_socket_management(server) do |socket|
 
742
      values = {}
 
743
      socket.write "get #{cache_keys}\r\n"
 
744
 
 
745
      while keyline = socket.gets do
 
746
        return values if keyline == "END\r\n"
 
747
        raise_on_error_response! keyline
 
748
 
 
749
        unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then
 
750
          server.close
 
751
          raise MemCacheError, "unexpected response #{keyline.inspect}"
 
752
        end
 
753
 
 
754
        key, data_length = $1, $3
 
755
        values[$1] = socket.read data_length.to_i
 
756
        socket.read(2) # "\r\n"
 
757
      end
 
758
 
 
759
      server.close
 
760
      raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
 
761
    end
 
762
  end
 
763
 
 
764
  ##
 
765
  # Performs a raw incr for +cache_key+ from +server+.  Returns nil if not
 
766
  # found.
 
767
 
 
768
  def cache_incr(server, cache_key, amount)
 
769
    with_socket_management(server) do |socket|
 
770
      socket.write "incr #{cache_key} #{amount}#{noreply}\r\n"
 
771
      break nil if @no_reply
 
772
      text = socket.gets
 
773
      raise_on_error_response! text
 
774
      return nil if text == "NOT_FOUND\r\n"
 
775
      return text.to_i
 
776
    end
 
777
  end
 
778
 
 
779
  ##
 
780
  # Gets or creates a socket connected to the given server, and yields it
 
781
  # to the block, wrapped in a mutex synchronization if @multithread is true.
 
782
  #
 
783
  # If a socket error (SocketError, SystemCallError, IOError) or protocol error
 
784
  # (MemCacheError) is raised by the block, closes the socket, attempts to
 
785
  # connect again, and retries the block (once).  If an error is again raised,
 
786
  # reraises it as MemCacheError.
 
787
  #
 
788
  # If unable to connect to the server (or if in the reconnect wait period),
 
789
  # raises MemCacheError.  Note that the socket connect code marks a server
 
790
  # dead for a timeout period, so retrying does not apply to connection attempt
 
791
  # failures (but does still apply to unexpectedly lost connections etc.).
 
792
 
 
793
  def with_socket_management(server, &block)
 
794
    check_multithread_status!
 
795
 
 
796
    @mutex.lock if @multithread
 
797
    retried = false
 
798
 
 
799
    begin
 
800
      socket = server.socket
 
801
 
 
802
      # Raise an IndexError to show this server is out of whack. If were inside
 
803
      # a with_server block, we'll catch it and attempt to restart the operation.
 
804
 
 
805
      raise IndexError, "No connection to server (#{server.status})" if socket.nil?
 
806
 
 
807
      block.call(socket)
 
808
 
 
809
    rescue SocketError, Errno::EAGAIN, Timeout::Error => err
 
810
      logger.warn { "Socket failure: #{err.message}" } if logger
 
811
      server.mark_dead(err)
 
812
      handle_error(server, err)
 
813
 
 
814
    rescue MemCacheError, SystemCallError, IOError => err
 
815
      logger.warn { "Generic failure: #{err.class.name}: #{err.message}" } if logger
 
816
      handle_error(server, err) if retried || socket.nil?
 
817
      retried = true
 
818
      retry
 
819
    end
 
820
  ensure
 
821
    @mutex.unlock if @multithread
 
822
  end
 
823
 
 
824
  def with_server(key)
 
825
    retried = false
 
826
    begin
 
827
      server, cache_key = request_setup(key)
 
828
      yield server, cache_key
 
829
    rescue IndexError => e
 
830
      logger.warn { "Server failed: #{e.class.name}: #{e.message}" } if logger
 
831
      if !retried && @servers.size > 1
 
832
        logger.info { "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger
 
833
        retried = true
 
834
        retry
 
835
      end
 
836
      handle_error(nil, e)
 
837
    end
 
838
  end
 
839
 
 
840
  ##
 
841
  # Handles +error+ from +server+.
 
842
 
 
843
  def handle_error(server, error)
 
844
    raise error if error.is_a?(MemCacheError)
 
845
    server.close if server
 
846
    new_error = MemCacheError.new error.message
 
847
    new_error.set_backtrace error.backtrace
 
848
    raise new_error
 
849
  end
 
850
 
 
851
  def noreply
 
852
    @no_reply ? ' noreply' : ''
 
853
  end
 
854
 
 
855
  ##
 
856
  # Performs setup for making a request with +key+ from memcached.  Returns
 
857
  # the server to fetch the key from and the complete key to use.
 
858
 
 
859
  def request_setup(key)
 
860
    raise MemCacheError, 'No active servers' unless active?
 
861
    cache_key = make_cache_key key
 
862
    server = get_server_for_key cache_key
 
863
    return server, cache_key
 
864
  end
 
865
 
 
866
  def raise_on_error_response!(response)
 
867
    if response =~ /\A(?:CLIENT_|SERVER_)?ERROR(.*)/
 
868
      raise MemCacheError, $1.strip
 
869
    end
 
870
  end
 
871
 
 
872
  def create_continuum_for(servers)
 
873
    total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
 
874
    continuum = []
 
875
 
 
876
    servers.each do |server|
 
877
      entry_count_for(server, servers.size, total_weight).times do |idx|
 
878
        hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
 
879
        value = Integer("0x#{hash[0..7]}")
 
880
        continuum << Continuum::Entry.new(value, server)
 
881
      end
 
882
    end
 
883
 
 
884
    continuum.sort { |a, b| a.value <=> b.value }
 
885
  end
 
886
 
 
887
  def entry_count_for(server, total_servers, total_weight)
 
888
    ((total_servers * Continuum::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
 
889
  end
 
890
 
 
891
  def check_multithread_status!
 
892
    return if @multithread
 
893
 
 
894
    if Thread.current[:memcache_client] != self.object_id
 
895
      raise MemCacheError, <<-EOM
 
896
        You are accessing this memcache-client instance from multiple threads but have not enabled multithread support.
 
897
        Normally:  MemCache.new(['localhost:11211'], :multithread => true)
 
898
        In Rails:  config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }]
 
899
      EOM
 
900
    end
 
901
  end
 
902
 
 
903
  ##
 
904
  # This class represents a memcached server instance.
 
905
 
 
906
  class Server
 
907
 
 
908
    ##
 
909
    # The amount of time to wait before attempting to re-establish a
 
910
    # connection with a server that is marked dead.
 
911
 
 
912
    RETRY_DELAY = 30.0
 
913
 
 
914
    ##
 
915
    # The host the memcached server is running on.
 
916
 
 
917
    attr_reader :host
 
918
 
 
919
    ##
 
920
    # The port the memcached server is listening on.
 
921
 
 
922
    attr_reader :port
 
923
 
 
924
    ##
 
925
    # The weight given to the server.
 
926
 
 
927
    attr_reader :weight
 
928
 
 
929
    ##
 
930
    # The time of next retry if the connection is dead.
 
931
 
 
932
    attr_reader :retry
 
933
 
 
934
    ##
 
935
    # A text status string describing the state of the server.
 
936
 
 
937
    attr_reader :status
 
938
 
 
939
    attr_reader :logger
 
940
 
 
941
    ##
 
942
    # Create a new MemCache::Server object for the memcached instance
 
943
    # listening on the given host and port, weighted by the given weight.
 
944
 
 
945
    def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT)
 
946
      raise ArgumentError, "No host specified" if host.nil? or host.empty?
 
947
      raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero?
 
948
 
 
949
      @host   = host
 
950
      @port   = port.to_i
 
951
      @weight = weight.to_i
 
952
 
 
953
      @sock   = nil
 
954
      @retry  = nil
 
955
      @status = 'NOT CONNECTED'
 
956
      @timeout = memcache.timeout
 
957
      @logger = memcache.logger
 
958
    end
 
959
 
 
960
    ##
 
961
    # Return a string representation of the server object.
 
962
 
 
963
    def inspect
 
964
      "<MemCache::Server: %s:%d [%d] (%s)>" % [@host, @port, @weight, @status]
 
965
    end
 
966
 
 
967
    ##
 
968
    # Check whether the server connection is alive.  This will cause the
 
969
    # socket to attempt to connect if it isn't already connected and or if
 
970
    # the server was previously marked as down and the retry time has
 
971
    # been exceeded.
 
972
 
 
973
    def alive?
 
974
      !!socket
 
975
    end
 
976
 
 
977
    ##
 
978
    # Try to connect to the memcached server targeted by this object.
 
979
    # Returns the connected socket object on success or nil on failure.
 
980
 
 
981
    def socket
 
982
      return @sock if @sock and not @sock.closed?
 
983
 
 
984
      @sock = nil
 
985
 
 
986
      # If the host was dead, don't retry for a while.
 
987
      return if @retry and @retry > Time.now
 
988
 
 
989
      # Attempt to connect if not already connected.
 
990
      begin
 
991
        @sock = connect_to(@host, @port, @timeout)
 
992
        @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
 
993
        @retry  = nil
 
994
        @status = 'CONNECTED'
 
995
      rescue SocketError, SystemCallError, IOError => err
 
996
        logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
 
997
        mark_dead err
 
998
      end
 
999
 
 
1000
      return @sock
 
1001
    end
 
1002
 
 
1003
    def connect_to(host, port, timeout=nil)
 
1004
      io = MemCache::BufferedIO.new(TCPSocket.new(host, port))
 
1005
      io.read_timeout = timeout
 
1006
      io
 
1007
    end
 
1008
 
 
1009
    ##
 
1010
    # Close the connection to the memcached server targeted by this
 
1011
    # object.  The server is not considered dead.
 
1012
 
 
1013
    def close
 
1014
      @sock.close if @sock && !@sock.closed?
 
1015
      @sock   = nil
 
1016
      @retry  = nil
 
1017
      @status = "NOT CONNECTED"
 
1018
    end
 
1019
 
 
1020
    ##
 
1021
    # Mark the server as dead and close its socket.
 
1022
 
 
1023
    def mark_dead(error)
 
1024
      @sock.close if @sock && !@sock.closed?
 
1025
      @sock   = nil
 
1026
      @retry  = Time.now + RETRY_DELAY
 
1027
 
 
1028
      reason = "#{error.class.name}: #{error.message}"
 
1029
      @status = sprintf "%s:%s DEAD (%s), will retry at %s", @host, @port, reason, @retry
 
1030
      @logger.info { @status } if @logger
 
1031
    end
 
1032
 
 
1033
  end
 
1034
 
 
1035
  ##
 
1036
  # Base MemCache exception class.
 
1037
 
 
1038
  class MemCacheError < RuntimeError; end
 
1039
 
 
1040
  class BufferedIO < Net::BufferedIO # :nodoc:
 
1041
    BUFSIZE = 1024 * 16
 
1042
 
 
1043
    # An implementation similar to this is in *trunk* for 1.9.  When it
 
1044
    # gets released, this method can be removed when using 1.9
 
1045
    def rbuf_fill
 
1046
      begin
 
1047
        @rbuf << @io.read_nonblock(BUFSIZE)
 
1048
      rescue Errno::EWOULDBLOCK
 
1049
        retry unless @read_timeout
 
1050
        if IO.select([@io], nil, nil, @read_timeout)
 
1051
          retry
 
1052
        else
 
1053
          raise Timeout::Error, 'IO timeout'
 
1054
        end
 
1055
      end
 
1056
    end
 
1057
 
 
1058
    def setsockopt *args
 
1059
      @io.setsockopt *args
 
1060
    end
 
1061
 
 
1062
    def gets
 
1063
      readuntil("\n")
 
1064
    end
 
1065
  end
 
1066
 
 
1067
end
 
1068
 
 
1069
module Continuum
 
1070
  POINTS_PER_SERVER = 160 # this is the default in libmemcached
 
1071
 
 
1072
  # Find the closest index in Continuum with value <= the given value
 
1073
  def self.binary_search(ary, value, &block)
 
1074
    upper = ary.size - 1
 
1075
    lower = 0
 
1076
    idx = 0
 
1077
 
 
1078
    while(lower <= upper) do
 
1079
      idx = (lower + upper) / 2
 
1080
      comp = ary[idx].value <=> value
 
1081
 
 
1082
      if comp == 0
 
1083
        return idx
 
1084
      elsif comp > 0
 
1085
        upper = idx - 1
 
1086
      else
 
1087
        lower = idx + 1
 
1088
      end
 
1089
    end
 
1090
    return upper
 
1091
  end
 
1092
 
 
1093
  class Entry
 
1094
    attr_reader :value
 
1095
    attr_reader :server
 
1096
 
 
1097
    def initialize(val, srv)
 
1098
      @value = val
 
1099
      @server = srv
 
1100
    end
 
1101
 
 
1102
    def inspect
 
1103
      "<#{value}, #{server.host}:#{server.port}>"
 
1104
    end
 
1105
  end
 
1106
 
 
1107
end