4
# Copyright (c) 2010 Justin Riley
6
# This file is part of pyzmq.
8
# pyzmq is free software; you can redistribute it and/or modify it under
9
# the terms of the Lesser GNU General Public License as published by
10
# the Free Software Foundation; either version 3 of the License, or
11
# (at your option) any later version.
13
# pyzmq is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# Lesser GNU General Public License for more details.
18
# You should have received a copy of the Lesser GNU General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
24
import pymongo.json_util
27
class MongoZMQ(object):
29
ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB.
31
NOTE: mongod must be started before using this class
34
def __init__(self, db_name, table_name, bind_addr="tcp://127.0.0.1:5000"):
36
bind_addr: address to bind zmq socket on
37
db_name: name of database to write to (created if doesnt exist)
38
table_name: name of mongodb 'table' in the db to write to (created if doesnt exist)
40
self._bind_addr = bind_addr
41
self._db_name = db_name
42
self._table_name = table_name
43
self._conn = pymongo.Connection()
44
self._db = self._conn[self._db_name]
45
self._table = self._db[self._table_name]
47
def _doc_to_json(self, doc):
48
return json.dumps(doc,default=pymongo.json_util.default)
50
def add_document(self, doc):
52
Inserts a document (dictionary) into mongo database table
54
print 'adding docment %s' % (doc)
56
self._table.insert(doc)
58
return 'Error: %s' % e
60
def get_document_by_keys(self, keys):
62
Attempts to return a single document from database table that matches
63
each key/value in keys dictionary.
65
print 'attempting to retrieve document using keys: %s' % keys
67
return self._table.find_one(keys)
69
return 'Error: %s' % e
72
context = zmq.Context()
73
socket = context.socket(zmq.XREP)
74
socket.bind(self._bind_addr)
76
msg = socket.recv_multipart()
77
print "Received msg: ", msg
79
error_msg = 'invalid message received: %s' % msg
81
reply = [msg[0], error_msg]
82
socket.send_multipart(reply)
86
contents = json.loads(msg[2])
87
# always send back the id with XREP
89
if operation == 'add':
90
self.add_document(contents)
91
reply.append("success")
92
elif operation == 'get':
93
doc = self.get_document_by_keys(contents)
94
json_doc = self._doc_to_json(doc)
95
reply.append(json_doc)
97
print 'unknown request'
98
socket.send_multipart(reply)
101
MongoZMQ('ipcontroller','jobs').start()
103
if __name__ == "__main__":