2
# Copyright (c) 2010 Brian E. Granger
4
# This file is part of pyzmq.
6
# pyzmq is free software; you can redistribute it and/or modify it under
7
# the terms of the Lesser GNU General Public License as published by
8
# the Free Software Foundation; either version 3 of the License, or
9
# (at your option) any later version.
11
# pyzmq is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# Lesser GNU General Public License for more details.
16
# You should have received a copy of the Lesser GNU General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
20
#-----------------------------------------------------------------------------
22
#-----------------------------------------------------------------------------
25
from unittest import TestCase
28
from zmq.tests import BaseZMQTestCase
30
from zmq.log import handlers
33
#-----------------------------------------------------------------------------
35
#-----------------------------------------------------------------------------
37
class TestPubLog(BaseZMQTestCase):
39
iface = 'inproc://zmqlog'
45
logger = logging.getLogger('zmqtest')
46
logger.setLevel(logging.DEBUG)
49
def connect_handler(self):
51
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
52
handler = handlers.PUBHandler(pub)
53
handler.setLevel(logging.DEBUG)
54
handler.root_topic = self.topic
55
logger.addHandler(handler)
56
sub.setsockopt(zmq.SUBSCRIBE, self.topic)
58
return logger, handler, sub
60
def test_init_iface(self):
63
handler = handlers.PUBHandler(self.iface, ctx)
64
self.assertTrue(handler.ctx is ctx)
65
logger.removeHandler(handler)
66
handler.socket.close()
68
handler = handlers.PUBHandler(self.iface, self.context)
69
self.assertTrue(handler.ctx is ctx)
71
handler.setLevel(logging.DEBUG)
72
handler.root_topic = self.topic
73
logger.addHandler(handler)
75
sub = ctx.socket(zmq.SUB)
76
sub.connect(self.iface)
77
sub.setsockopt(zmq.SUBSCRIBE, self.topic)
78
import time; time.sleep(0.1)
82
(topic, msg2) = sub.recv_multipart()
83
self.assertEquals(topic, 'zmq.INFO'.encode())
84
self.assertEquals(msg2, (msg1+'\n').encode())
85
logger.removeHandler(handler)
86
handler.socket.close()
88
def test_init_socket(self):
89
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
91
handler = handlers.PUBHandler(pub)
92
handler.setLevel(logging.DEBUG)
93
handler.root_topic = self.topic
94
logger.addHandler(handler)
96
self.assertTrue(handler.socket is pub)
97
self.assertTrue(handler.ctx is pub.context)
98
self.assertTrue(handler.ctx is self.context)
99
# handler.socket.close()
100
sub.setsockopt(zmq.SUBSCRIBE, self.topic)
101
import time; time.sleep(0.1)
105
(topic, msg2) = sub.recv_multipart()
106
self.assertEquals(topic, 'zmq.INFO'.encode())
107
self.assertEquals(msg2, (msg1+'\n').encode())
108
logger.removeHandler(handler)
109
handler.socket.close()
111
def test_root_topic(self):
112
logger, handler, sub = self.connect_handler()
113
handler.socket.bind(self.iface)
114
sub2 = sub.context.socket(zmq.SUB)
115
sub2.connect(self.iface)
116
sub2.setsockopt(zmq.SUBSCRIBE, ''.encode())
117
handler.root_topic = 'twoonly'.encode()
120
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
121
topic,msg2 = sub2.recv_multipart()
122
self.assertEquals(topic, 'twoonly.INFO'.encode())
123
self.assertEquals(msg2, (msg1+'\n').encode())
127
logger.removeHandler(handler)