11
class BadConfigurationException < Exception
11
# TaskQueueSQS provides a simple interface (implementing TaskQueue) to Amazon's
12
# Simple Queue Service (SQS). SQS is externally managed and hosted by Amazon,
13
# so we don't have to worry about deploying it.
14
# TODO(cgb): SQS is eventually consistent, so it is possible that we could
15
# receive the same message twice, and thus execute the same task twice.
15
16
class TaskQueueSQS < TaskQueue
16
attr_accessor :EC2_ACCESS_KEY, :EC2_SECRET_KEY
19
# The access key, provided by AWS, that is required to access SQS.
20
attr_accessor :EC2_ACCESS_KEY
23
# The secret key, provided by AWS, that is required to access SQS.
24
attr_accessor :EC2_SECRET_KEY
27
# The name of this queue. Since we always use our Executor to run tasks,
28
# and store task info in SQS, we call it 'executor-sqs'.
19
29
NAME = "executor-sqs"
22
# creates a new SQS queue via the AWS RubyGem
32
# Creates a new SQS queue connection and queue to store / retrieve tasks
33
# with, via the AWS RubyGem. We pull in the AWS RubyGem instead of the
34
# RightScale AWS RubyGem since the RightScale gem doesn't work with
35
# the current version of SQS.
23
36
def initialize(credentials)
24
37
if credentials.class != Hash
25
38
raise BadConfigurationException.new
44
# stores a hash in the queue for later processing, by converting it to JSON
57
# Stores a Hash in the queue for later processing. Since SQS can't store items
58
# in Hash format, we use the JSON library to dump it to a string, which SQS
46
61
if item.class != Hash
47
62
raise BadConfigurationException.new
70
# Retrieves an item from the queue, returning the Hash version of the
71
# JSON-dumped data. We should be the only ones writing to the queue, but
72
# just in case we aren't, we need to validate that the data we're receiving
73
# from the queue is data that we can load via JSON.
74
# TODO(cgb): Receiving and deleting the message isn't an atomic operation,
75
# and since SQS is eventually consistent, could this result in two readers
76
# getting the same item? If so, would using a lock on the queue (making the
77
# operation atomic) solve the problem?
56
79
json_item = @queue.receive_message()
57
80
return nil if json_item.nil? # occurs when the queue is empty
58
json_item.delete # TODO(cgb) - maybe delete messages after the task is done
60
item = JSON.load(json_item.body)
84
return JSON.load(json_item.body())
85
rescue JSON::ParserError # if somebody else wrote a non-JSON item
65
# returns all the credentials needed to access this queue
66
# for SQS, it's just the access key and secret key
91
# Returns all the credentials needed to access this queue. For SQS, it's
92
# just the access key and secret key
68
94
return {'@EC2_ACCESS_KEY' => @EC2_ACCESS_KEY,
69
95
'@EC2_SECRET_KEY' => @EC2_SECRET_KEY}
73
# returns the number of messages in the queue
99
# Returns the number of messages in the queue, which in SQS is the number
100
# of visible messages (there should be no invisible messages since we
101
# always immediately delete messages upon receiving them).
75
103
return @queue.visible_messages