~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to examples/mongodb/controller.py

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-02-15 09:08:36 UTC
  • mfrom: (2.1.2 experimental)
  • Revision ID: james.westby@ubuntu.com-20110215090836-phh4slym1g6muucn
Tags: 2.0.10.1-2
* Team upload.
* Upload to unstable
* Add Breaks: ${python:Breaks}

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
#
 
4
#    Copyright (c) 2010 Justin Riley
 
5
#
 
6
#    This file is part of pyzmq.
 
7
#
 
8
#    pyzmq is free software; you can redistribute it and/or modify it under
 
9
#    the terms of the Lesser GNU General Public License as published by
 
10
#    the Free Software Foundation; either version 3 of the License, or
 
11
#    (at your option) any later version.
 
12
#
 
13
#    pyzmq is distributed in the hope that it will be useful,
 
14
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
#    Lesser GNU General Public License for more details.
 
17
#
 
18
#    You should have received a copy of the Lesser GNU General Public License
 
19
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
 
 
21
import sys
 
22
import zmq
 
23
import pymongo
 
24
import pymongo.json_util
 
25
import json
 
26
 
 
27
class MongoZMQ(object):
 
28
    """
 
29
    ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB.
 
30
 
 
31
    NOTE: mongod must be started before using this class
 
32
    """
 
33
 
 
34
    def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"):
 
35
        """
 
36
        bind_addr: address to bind zmq socket on
 
37
        db_name: name of database to write to (created if doesnt exist)
 
38
        table_name: name of mongodb 'table' in the db to write to (created if doesnt exist)
 
39
        """
 
40
        self._bind_addr = bind_addr
 
41
        self._db_name = db_name
 
42
        self._table_name = table_name
 
43
        self._conn = pymongo.Connection()
 
44
        self._db = self._conn[self._db_name]
 
45
        self._table = self._db[self._table_name]
 
46
 
 
47
    def _doc_to_json(self, doc):
 
48
        return json.dumps(doc,default=pymongo.json_util.default)
 
49
 
 
50
    def add_document(self, doc):
 
51
        """
 
52
        Inserts a document (dictionary) into mongo database table
 
53
        """
 
54
        print 'adding docment %s' % (doc)
 
55
        try:
 
56
            self._table.insert(doc)
 
57
        except Exception,e:
 
58
            return 'Error: %s' % e
 
59
 
 
60
    def get_document_by_keys(self, keys):
 
61
        """
 
62
        Attempts to return a single document from database table that matches
 
63
        each key/value in keys dictionary.
 
64
        """
 
65
        print 'attempting to retrieve document using keys: %s' % keys
 
66
        try:
 
67
            return self._table.find_one(keys)
 
68
        except Exception,e:
 
69
            return 'Error: %s' % e
 
70
 
 
71
    def start(self):
 
72
        context = zmq.Context()
 
73
        socket = context.socket(zmq.XREP)
 
74
        socket.bind(self._bind_addr)
 
75
        while True:
 
76
            msg = socket.recv_multipart()
 
77
            print "Received msg: ", msg
 
78
            if  len(msg) != 3:
 
79
                error_msg = 'invalid message received: %s' % msg
 
80
                print error_msg
 
81
                reply = [msg[0], error_msg]
 
82
                socket.send_multipart(reply)
 
83
                continue
 
84
            id = msg[0]
 
85
            operation = msg[1]
 
86
            contents = json.loads(msg[2])
 
87
            # always send back the id with XREP
 
88
            reply = [id]
 
89
            if operation == 'add':
 
90
                self.add_document(contents)
 
91
                reply.append("success")
 
92
            elif operation == 'get':
 
93
                doc = self.get_document_by_keys(contents)
 
94
                json_doc = self._doc_to_json(doc)
 
95
                reply.append(json_doc)
 
96
            else:
 
97
                print 'unknown request'
 
98
            socket.send_multipart(reply)
 
99
 
 
100
def main():
 
101
    MongoZMQ('ipcontroller','jobs').start()
 
102
 
 
103
if __name__ == "__main__":
 
104
   main()