~cgb-cs/appscale/appscale-main

« back to all changes in this revision

Viewing changes to Neptune/task_queue_sqs.rb

  • Committer: Chris Bunch
  • Date: 2012-02-27 07:19:48 UTC
  • Revision ID: cgb@cs.ucsb.edu-20120227071948-3uic3g1s3ph6cklo
added json validation for sqs interface, and regenerated coverage/rdoc

Show diffs side-by-side

added added

removed removed

Lines of Context:
8
8
require 'json'
9
9
 
10
10
 
11
 
class BadConfigurationException < Exception
12
 
end
13
 
 
14
 
 
 
11
# TaskQueueSQS provides a simple interface (implementing TaskQueue) to Amazon's
 
12
# Simple Queue Service (SQS). SQS is externally managed and hosted by Amazon,
 
13
# so we don't have to worry about deploying it.
 
14
# TODO(cgb): SQS is eventually consistent, so it is possible that we could
 
15
# receive the same message twice, and thus execute the same task twice.
15
16
class TaskQueueSQS < TaskQueue
16
 
  attr_accessor :EC2_ACCESS_KEY, :EC2_SECRET_KEY
17
 
 
18
 
 
 
17
 
 
18
 
 
19
  # The access key, provided by AWS, that is required to access SQS.
 
20
  attr_accessor :EC2_ACCESS_KEY
 
21
  
 
22
  
 
23
  # The secret key, provided by AWS, that is required to access SQS.
 
24
  attr_accessor :EC2_SECRET_KEY
 
25
 
 
26
 
 
27
  # The name of this queue. Since we always use our Executor to run tasks,
 
28
  # and store task info in SQS, we call it 'executor-sqs'.
19
29
  NAME = "executor-sqs"
20
30
 
21
31
 
22
 
  # creates a new SQS queue via the AWS RubyGem
 
32
  # Creates a new SQS queue connection and queue to store / retrieve tasks
 
33
  # with, via the AWS RubyGem. We pull in the AWS RubyGem instead of the
 
34
  # RightScale AWS RubyGem since the RightScale gem doesn't work with
 
35
  # the current version of SQS.
23
36
  def initialize(credentials)
24
37
    if credentials.class != Hash
25
38
      raise BadConfigurationException.new
41
54
  end
42
55
 
43
56
 
44
 
  # stores a hash in the queue for later processing, by converting it to JSON
 
57
  # Stores a Hash in the queue for later processing. Since SQS can't store items
 
58
  # in Hash format, we use the JSON library to dump it to a string, which SQS
 
59
  # can store.
45
60
  def push(item)
46
61
    if item.class != Hash
47
62
      raise BadConfigurationException.new
52
67
  end
53
68
 
54
69
 
 
70
  # Retrieves an item from the queue, returning the Hash version of the
 
71
  # JSON-dumped data. We should be the only ones writing to the queue, but
 
72
  # just in case we aren't, we need to validate that the data we're receiving
 
73
  # from the queue is data that we can load via JSON.
 
74
  # TODO(cgb): Receiving and deleting the message isn't an atomic operation,
 
75
  # and since SQS is eventually consistent, could this result in two readers
 
76
  # getting the same item? If so, would using a lock on the queue (making the
 
77
  # operation atomic) solve the problem?
55
78
  def pop()
56
79
    json_item = @queue.receive_message()
57
80
    return nil if json_item.nil?  # occurs when the queue is empty
58
 
    json_item.delete  # TODO(cgb) - maybe delete messages after the task is done
 
81
    json_item.delete()
59
82
 
60
 
    item = JSON.load(json_item.body)
61
 
    return item
 
83
    begin
 
84
      return JSON.load(json_item.body())
 
85
    rescue JSON::ParserError  # if somebody else wrote a non-JSON item
 
86
      return pop()
 
87
    end
62
88
  end
63
89
 
64
90
 
65
 
  # returns all the credentials needed to access this queue
66
 
  # for SQS, it's just the access key and secret key
 
91
  # Returns all the credentials needed to access this queue. For SQS, it's 
 
92
  # just the access key and secret key
67
93
  def get_creds()
68
94
    return {'@EC2_ACCESS_KEY' => @EC2_ACCESS_KEY,
69
95
      '@EC2_SECRET_KEY' => @EC2_SECRET_KEY}
70
96
  end
71
97
 
72
98
 
73
 
  # returns the number of messages in the queue
 
99
  # Returns the number of messages in the queue, which in SQS is the number
 
100
  # of visible messages (there should be no invisible messages since we
 
101
  # always immediately delete messages upon receiving them).
74
102
  def size()
75
103
    return @queue.visible_messages
76
104
  end