~ubuntu-branches/ubuntu/precise/pyzmq/precise

« back to all changes in this revision

Viewing changes to zmq/devices/monitoredqueue.pxd

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-02-15 09:08:36 UTC
  • mfrom: (2.1.2 experimental)
  • Revision ID: james.westby@ubuntu.com-20110215090836-phh4slym1g6muucn
Tags: 2.0.10.1-2
* Team upload.
* Upload to unstable
* Add Breaks: ${python:Breaks}

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""MonitoredQueue class declarations.
 
2
 
 
3
Authors
 
4
-------
 
5
* MinRK
 
6
* Brian Granger
 
7
"""
 
8
 
 
9
#
 
10
#    Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
 
11
#
 
12
#    This file is part of pyzmq.
 
13
#
 
14
#    pyzmq is free software; you can redistribute it and/or modify it under
 
15
#    the terms of the Lesser GNU General Public License as published by
 
16
#    the Free Software Foundation; either version 3 of the License, or
 
17
#    (at your option) any later version.
 
18
#
 
19
#    pyzmq is distributed in the hope that it will be useful,
 
20
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
21
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
22
#    Lesser GNU General Public License for more details.
 
23
#
 
24
#    You should have received a copy of the Lesser GNU General Public License
 
25
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
26
#
 
27
 
 
28
#-----------------------------------------------------------------------------
 
29
# Imports
 
30
#-----------------------------------------------------------------------------
 
31
 
 
32
from czmq cimport *
 
33
 
 
34
#-----------------------------------------------------------------------------
 
35
# MonitoredQueue C functions
 
36
#-----------------------------------------------------------------------------
 
37
 
 
38
 
 
39
# the MonitoredQueue C function, adapted from zmq::queue.cpp :
 
40
cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
 
41
                        void *sidesocket_, zmq_msg_t in_msg, 
 
42
                        zmq_msg_t out_msg, int swap_ids) nogil:
 
43
    """The actual C function for a monitored queue device. 
 
44
 
 
45
    See ``monitored_queue()`` for details.
 
46
    """
 
47
 
 
48
    cdef int ids_done
 
49
    cdef zmq_msg_t msg
 
50
    cdef int rc = zmq_msg_init (&msg)
 
51
    cdef zmq_msg_t id_msg
 
52
    rc = zmq_msg_init (&id_msg)
 
53
    cdef zmq_msg_t side_msg
 
54
    rc = zmq_msg_init (&side_msg)
 
55
    # assert (rc == 0)
 
56
 
 
57
    cdef int64_t more
 
58
    cdef size_t moresz
 
59
 
 
60
    cdef zmq_pollitem_t items [2]
 
61
    items [0].socket = insocket_
 
62
    items [0].fd = 0
 
63
    items [0].events = ZMQ_POLLIN
 
64
    items [0].revents = 0
 
65
    items [1].socket = outsocket_
 
66
    items [1].fd = 0
 
67
    items [1].events = ZMQ_POLLIN
 
68
    items [1].revents = 0
 
69
    # I don't think sidesocket should be polled?
 
70
    # items [2].socket = sidesocket_
 
71
    # items [2].fd = 0
 
72
    # items [2].events = ZMQ_POLLIN
 
73
    # items [2].revents = 0
 
74
 
 
75
    while (True):
 
76
 
 
77
        # //  Wait while there are either requests or replies to process.
 
78
        rc = zmq_poll (&items [0], 2, -1)
 
79
        
 
80
        # //  The algorithm below asumes ratio of request and replies processed
 
81
        # //  under full load to be 1:1. Although processing requests replies
 
82
        # //  first is tempting it is suspectible to DoS attacks (overloading
 
83
        # //  the system with unsolicited replies).
 
84
        # 
 
85
        # //  Process a request.
 
86
        if (items [0].revents & ZMQ_POLLIN):
 
87
            # send in_prefix to side socket
 
88
            rc = zmq_msg_copy(&side_msg, &in_msg)
 
89
            rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
 
90
            if swap_ids:# both xrep, must send second identity first
 
91
                # recv two ids into msg, id_msg
 
92
                rc = zmq_recv (insocket_, &msg, 0)
 
93
                rc = zmq_recv (insocket_, &id_msg, 0)
 
94
                
 
95
                # send second id (id_msg) first
 
96
                #!!!! always send a copy before the original !!!!
 
97
                rc = zmq_msg_copy(&side_msg, &id_msg)
 
98
                rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
 
99
                rc = zmq_send (sidesocket_, &id_msg, ZMQ_SNDMORE)
 
100
                # send first id (msg) second
 
101
                rc = zmq_msg_copy(&side_msg, &msg)
 
102
                rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
 
103
                rc = zmq_send (sidesocket_, &msg, ZMQ_SNDMORE)
 
104
            while (True):
 
105
                rc = zmq_recv (insocket_, &msg, 0)
 
106
                # assert (rc == 0)
 
107
 
 
108
                moresz = sizeof (more)
 
109
                rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, &more, &moresz)
 
110
                # assert (rc == 0)
 
111
 
 
112
                rc = zmq_msg_copy(&side_msg, &msg)
 
113
                if more:
 
114
                    rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
 
115
                    rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
 
116
                else:
 
117
                    rc = zmq_send (outsocket_, &side_msg, 0)
 
118
                    rc = zmq_send (sidesocket_, &msg,0)
 
119
                # assert (rc == 0)
 
120
 
 
121
                if (not more):
 
122
                    break
 
123
        if (items [1].revents & ZMQ_POLLIN):
 
124
            rc = zmq_msg_copy(&side_msg, &out_msg)
 
125
            rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
 
126
            if swap_ids:
 
127
                # recv two ids into msg, id_msg
 
128
                rc = zmq_recv (outsocket_, &msg, 0)
 
129
                rc = zmq_recv (outsocket_, &id_msg, 0)
 
130
                
 
131
                # send second id (id_msg) first
 
132
                rc = zmq_msg_copy(&side_msg, &id_msg)
 
133
                rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
 
134
                rc = zmq_send (sidesocket_, &id_msg,ZMQ_SNDMORE)
 
135
                
 
136
                # send first id (msg) second
 
137
                rc = zmq_msg_copy(&side_msg, &msg)
 
138
                rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
 
139
                rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
 
140
            while (True):
 
141
                rc = zmq_recv (outsocket_, &msg, 0)
 
142
                # assert (rc == 0)
 
143
 
 
144
                moresz = sizeof (more)
 
145
                rc = zmq_getsockopt (outsocket_, ZMQ_RCVMORE, &more, &moresz)
 
146
                # assert (rc == 0)
 
147
                rc = zmq_msg_copy(&side_msg, &msg)
 
148
                if more:
 
149
                    rc = zmq_send (insocket_, &side_msg,ZMQ_SNDMORE)
 
150
                    rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
 
151
                else:
 
152
                    rc = zmq_send (insocket_, &side_msg,0)
 
153
                    rc = zmq_send (sidesocket_, &msg,0)
 
154
                # errno_assert (rc == 0)
 
155
 
 
156
                if (not more):
 
157
                    break
 
158
    return 0