~0x44/nova/extdoc

« back to all changes in this revision

Viewing changes to vendor/boto/boto/mapreduce/queuetools.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
import socket
 
3
from lqs import LQSServer, LQSMessage
 
4
import boto
 
5
from boto.sqs.jsonmessage import JSONMessage
 
6
 
 
7
class LQSClient:
 
8
 
 
9
    def __init__(self, host):
 
10
        self.host = host
 
11
        self.port = LQSServer.PORT
 
12
        self.timeout = LQSServer.TIMEOUT
 
13
        self.max_len = LQSServer.MAXSIZE
 
14
        self.sock = None
 
15
 
 
16
    def connect(self):
 
17
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 
18
        self.sock.settimeout(self.timeout)
 
19
        self.sock.connect((self.host, self.port))
 
20
 
 
21
    def decode(self, jsonstr):
 
22
        return LQSMessage(jsonvalue=jsonstr)
 
23
 
 
24
    def get(self):
 
25
        self.sock.send('next')
 
26
        try:
 
27
            jsonstr = self.sock.recv(self.max_len)
 
28
            msg = LQSMessage(jsonvalue=jsonstr)
 
29
            return msg
 
30
        except:
 
31
            print "recv from %s failed" % self.host
 
32
 
 
33
    def delete(self, msg):
 
34
        self.sock.send('delete %s' % msg['id'])
 
35
        try:
 
36
            jsonstr = self.sock.recv(self.max_len)
 
37
            msg = LQSMessage(jsonvalue=jsonstr)
 
38
            return msg
 
39
        except:
 
40
            print "recv from %s failed" % self.host
 
41
 
 
42
    def close(self):
 
43
        self.sock.close()
 
44
 
 
45
class SQSClient:
 
46
 
 
47
    def __init__(self, queue_name):
 
48
        self.queue_name = queue_name
 
49
 
 
50
    def connect(self):
 
51
        self.queue = boto.lookup('sqs', self.queue_name)
 
52
        self.queue.set_mesasge_class(JSONMessage)
 
53
 
 
54
    def get(self):
 
55
        m = self.queue.read()
 
56
        return m.get_body()
 
57
 
 
58
    def close(self):
 
59
        pass
 
60
 
 
61
def get_queue(name):
 
62
    if name == 'localhost':
 
63
        return LQSClient(name)
 
64
    else:
 
65
        return SQSClient(name)
 
66