~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to examples/logger/zmqlogger.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Simple example of using zmq log handlers
 
3
 
 
4
This starts a number of subprocesses with PUBHandlers that generate
 
5
log messages at a regular interval.  The main process has a SUB socket,
 
6
which aggregates and logs all of the messages to the root logger.
 
7
"""
 
8
 
1
9
import logging
2
 
import logging.config
 
10
from multiprocessing import Process
 
11
import os
 
12
import random
 
13
import sys
 
14
import time
 
15
 
3
16
import zmq
4
 
import multiprocessing
5
 
from zmq.eventloop import *
6
 
 
7
 
def sub_logger(port):
 
17
from zmq.log.handlers import PUBHandler
 
18
 
 
19
LOG_LEVELS = (logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL)
 
20
 
 
21
def sub_logger(port, level=logging.DEBUG):
8
22
    ctx = zmq.Context()
9
23
    sub = ctx.socket(zmq.SUB)
10
 
    sub.connect('tcp://127.0.0.1:%i'%port)
11
 
    sub.setsockopt(zmq.SUBSCRIBE,"")
12
 
    while True:
13
 
        message = sub.recv_multipart()
14
 
        name = message[0]
15
 
        msg = message[1:]
16
 
        if name == 'log':
17
 
            msg[0] = int(msg[0])
18
 
        getattr(logging, name)(*msg)
19
 
        
20
 
    
21
 
 
22
 
class ZLogger(object):
23
 
    
24
 
    def __init__(self,fname=None):
25
 
        if fname is not None:
26
 
            logging.config.fileConfig(fname)
27
 
        self.ctx = zmq.Context()
28
 
        self.pub = self.ctx.socket(zmq.PUB)
29
 
        self.port = self.pub.bind_to_random_port('tcp://127.0.0.1')
30
 
        self.sub_proc = multiprocessing.Process(target=sub_logger, args=(self.port,))
31
 
        self.sub_proc.start()
 
24
    sub.bind('tcp://127.0.0.1:%i' % port)
 
25
    sub.setsockopt(zmq.SUBSCRIBE, "")
 
26
    logging.basicConfig(level=level)
 
27
    
 
28
    while True:
 
29
        level, message = sub.recv_multipart()
 
30
        if message.endswith('\n'):
 
31
            # trim trailing newline, which will get appended again
 
32
            message = message[:-1]
 
33
        log = getattr(logging, level.lower())
 
34
        log(message)
 
35
 
 
36
def log_worker(port, interval=1, level=logging.DEBUG):
 
37
    ctx = zmq.Context()
 
38
    pub = ctx.socket(zmq.PUB)
 
39
    pub.connect('tcp://127.0.0.1:%i' % port)
 
40
    
 
41
    logger = logging.getLogger(str(os.getpid()))
 
42
    logger.setLevel(level)
 
43
    handler = PUBHandler(pub)
 
44
    logger.addHandler(handler)
 
45
    print "starting logger at %i with level=%s" % (os.getpid(), level)
 
46
 
 
47
    while True:
 
48
        level = random.choice(LOG_LEVELS)
 
49
        logger.log(level, "Hello from %i!" % os.getpid())
 
50
        time.sleep(interval)
 
51
 
 
52
if __name__ == '__main__':
 
53
    if len(sys.argv) > 1:
 
54
        n = int(sys.argv[1])
 
55
    else:
 
56
        n = 2
 
57
    
 
58
    port = 5555
 
59
    
 
60
    # start the log generators
 
61
    workers = [ Process(target=log_worker, args=(port,), kwargs=dict(level=random.choice(LOG_LEVELS))) for i in range(n) ]
 
62
    [ w.start() for w in workers ]
 
63
    
 
64
    # start the log watcher
 
65
    try:
 
66
        sub_logger(port)
 
67
    except KeyboardInterrupt:
32
68
        pass
33
 
    
34
 
    def log(self, level, msg):
35
 
        self.pub.send_multipart(['log', str(level), msg])
36
 
    
37
 
    def warn(self, msg):
38
 
        self.pub.send_multipart(['warn', msg])
39
 
    
40
 
    def error(self, msg):
41
 
        self.pub.send_multipart(['error', msg])
42
 
    
43
 
 
44
 
 
45
 
 
 
69
    finally:
 
70
        [ w.terminate() for w in workers ]