~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/devices/monitoredqueue.pyx

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-02-15 09:08:36 UTC
  • mfrom: (2.1.2 experimental)
  • Revision ID: james.westby@ubuntu.com-20110215090836-phh4slym1g6muucn
Tags: 2.0.10.1-2
* Team upload.
* Upload to unstable
* Add Breaks: ${python:Breaks}

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""MonitoredQueue classes and functions.
 
2
 
 
3
Authors
 
4
-------
 
5
* MinRK
 
6
* Brian Granger
 
7
"""
 
8
 
 
9
#
 
10
#    Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
 
11
#
 
12
#    This file is part of pyzmq.
 
13
#
 
14
#    pyzmq is free software; you can redistribute it and/or modify it under
 
15
#    the terms of the Lesser GNU General Public License as published by
 
16
#    the Free Software Foundation; either version 3 of the License, or
 
17
#    (at your option) any later version.
 
18
#
 
19
#    pyzmq is distributed in the hope that it will be useful,
 
20
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
21
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
22
#    Lesser GNU General Public License for more details.
 
23
#
 
24
#    You should have received a copy of the Lesser GNU General Public License
 
25
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
26
#
 
27
 
 
28
#-----------------------------------------------------------------------------
 
29
# Imports
 
30
#-----------------------------------------------------------------------------
 
31
 
 
32
cdef extern from "Python.h":
 
33
    ctypedef int Py_ssize_t
 
34
 
 
35
from buffers cimport asbuffer_r
 
36
from czmq cimport *
 
37
 
 
38
from zmq.core.socket cimport Socket
 
39
 
 
40
from zmq.core import XREP, ZMQError
 
41
 
 
42
#-----------------------------------------------------------------------------
 
43
# MonitoredQueue functions
 
44
#-----------------------------------------------------------------------------
 
45
 
 
46
 
 
47
def monitored_queue(Socket in_socket, Socket out_socket, Socket mon_socket,
 
48
                    object in_prefix='in', object out_prefix='out'):
 
49
    """monitored_queue(in_socket, out_socket, mon_socket,
 
50
                       in_prefix='in', out_prefix='out')
 
51
 
 
52
    Start a monitored queue device.
 
53
 
 
54
    A monitored queue behaves just like a zmq QUEUE device as far as in_socket
 
55
    and out_socket are concerned, except that all messages *also* go out on
 
56
    mon_socket. mon_socket also prefixes the messages coming from each with a
 
57
    prefix, by defaout 'in' and 'out', so all messages sent by mon_socket are
 
58
    multipart.
 
59
    
 
60
    The only difference between this and a QUEUE as far as in/out are
 
61
    concerned is that it works with two XREP sockets by swapping the IDENT
 
62
    prefixes.
 
63
    
 
64
    Parameters
 
65
    ----------
 
66
    in_socket : Socket
 
67
        One of the sockets to the Queue. Its messages will be prefixed with
 
68
        'in'.
 
69
    out_socket : Socket
 
70
        One of the sockets to the Queue. Its messages will be prefixed with
 
71
        'out'. The only difference between in/out socket is this prefix.
 
72
    mon_socket : Socket
 
73
        This socket sends out every message received by each of the others
 
74
        with an in/out prefix specifying which one it was.
 
75
    in_prefix : str
 
76
        Prefix added to broadcast messages from in_socket.
 
77
    out_prefix : str
 
78
        Prefix added to broadcast messages from out_socket.
 
79
    """
 
80
    
 
81
    cdef void *ins=in_socket.handle
 
82
    cdef void *outs=out_socket.handle
 
83
    cdef void *mons=mon_socket.handle
 
84
    cdef zmq_msg_t in_msg
 
85
    cdef zmq_msg_t out_msg
 
86
    cdef bint swap_ids
 
87
    cdef char *msg_c = NULL
 
88
    cdef Py_ssize_t msg_c_len
 
89
    cdef int rc
 
90
 
 
91
    for prefix in (in_prefix, out_prefix):
 
92
        if not isinstance(prefix, bytes):
 
93
            raise TypeError("prefix must be bytes, not %s"%type(prefix))
 
94
 
 
95
    # force swap_ids if both XREP
 
96
    swap_ids = (in_socket.socket_type == XREP and 
 
97
                out_socket.socket_type == XREP)
 
98
    
 
99
    # build zmq_msg objects from str prefixes
 
100
    asbuffer_r(in_prefix, <void **>&msg_c, &msg_c_len)
 
101
    rc = zmq_msg_init_size(&in_msg, msg_c_len)
 
102
    if rc != 0:
 
103
        raise ZMQError()
 
104
    memcpy(zmq_msg_data(&in_msg), msg_c, zmq_msg_size(&in_msg))
 
105
    
 
106
    asbuffer_r(out_prefix, <void **>&msg_c, &msg_c_len)
 
107
    rc = zmq_msg_init_size(&out_msg, msg_c_len)
 
108
    if rc != 0:
 
109
        raise ZMQError()
 
110
    memcpy(zmq_msg_data(&out_msg), msg_c, zmq_msg_size(&out_msg))
 
111
    
 
112
    with nogil:
 
113
        rc = c_monitored_queue(ins, outs, mons, in_msg, out_msg, swap_ids)
 
114
    return rc
 
115
 
 
116
__all__ = ['monitored_queue']
 
 
b'\\ No newline at end of file'