~ubuntu-branches/ubuntu/wily/pyzmq/wily

« back to all changes in this revision

Viewing changes to zmq/devices/monitoredqueue.pxd

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-24 19:23:15 UTC
  • mfrom: (1.2.1) (9 sid)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20130224192315-qhmwp3m3ymk8r60d
Tags: 2.2.0.1-1
* New upstream release
* relicense debian packaging to LGPL-3
* update watch file to use github directly
  thanks to Bart Martens for the file
* add autopkgtests
* drop obsolete DM-Upload-Allowed
* bump standard to 3.9.4, no changes required

Show diffs side-by-side

added added

removed removed

Lines of Context:
9
9
#
10
10
#    Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
11
11
#
12
 
#    This file is part of pyzmq.
 
12
#    This file is part of pyzmq, but is derived and adapted from zmq_queue.cpp
 
13
#    originally from libzmq-2.1.6, used under LGPLv3
13
14
#
14
15
#    pyzmq is free software; you can redistribute it and/or modify it under
15
16
#    the terms of the Lesser GNU General Public License as published by
29
30
# Imports
30
31
#-----------------------------------------------------------------------------
31
32
 
32
 
from czmq cimport *
 
33
from libzmq cimport *
33
34
 
34
35
#-----------------------------------------------------------------------------
35
36
# MonitoredQueue C functions
36
37
#-----------------------------------------------------------------------------
37
38
 
 
39
cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_, 
 
40
                zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg,
 
41
                bint swap_ids) nogil:
 
42
    cdef int rc
 
43
    cdef int64_t flag_2
 
44
    cdef int flag_3
 
45
    cdef int flags
 
46
    cdef bint more
 
47
    cdef size_t flagsz
 
48
    cdef void * flag_ptr
 
49
    
 
50
    if ZMQ_VERSION_MAJOR < 3:
 
51
        flagsz = sizeof (int64_t)
 
52
        flag_ptr = &flag_2
 
53
    else:
 
54
        flagsz = sizeof (int)
 
55
        flag_ptr = &flag_3
 
56
    
 
57
    if swap_ids:# both router, must send second identity first
 
58
        # recv two ids into msg, id_msg
 
59
        rc = zmq_recvmsg (insocket_, &msg, 0)
 
60
        rc = zmq_recvmsg (insocket_, &id_msg, 0)
 
61
 
 
62
        # send second id (id_msg) first
 
63
        #!!!! always send a copy before the original !!!!
 
64
        rc = zmq_msg_copy(&side_msg, &id_msg)
 
65
        rc = zmq_sendmsg (outsocket_, &side_msg, ZMQ_SNDMORE)
 
66
        rc = zmq_sendmsg (sidesocket_, &id_msg, ZMQ_SNDMORE)
 
67
        # send first id (msg) second
 
68
        rc = zmq_msg_copy(&side_msg, &msg)
 
69
        rc = zmq_sendmsg (outsocket_, &side_msg, ZMQ_SNDMORE)
 
70
        rc = zmq_sendmsg (sidesocket_, &msg, ZMQ_SNDMORE)
 
71
        if rc < 0:
 
72
            return rc
 
73
    while (True):
 
74
        rc = zmq_recvmsg (insocket_, &msg, 0)
 
75
        # assert (rc == 0)
 
76
        rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz)
 
77
        flags = 0
 
78
        if ZMQ_VERSION_MAJOR < 3:
 
79
            if flag_2:
 
80
                flags |= ZMQ_SNDMORE
 
81
        else:
 
82
            if flag_3:
 
83
                flags |= ZMQ_SNDMORE
 
84
            # LABEL has been removed:
 
85
            # rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz)
 
86
            # if flag_3:
 
87
            #     flags |= ZMQ_SNDLABEL
 
88
        # assert (rc == 0)
 
89
 
 
90
        rc = zmq_msg_copy(&side_msg, &msg)
 
91
        if flags:
 
92
            rc = zmq_sendmsg (outsocket_, &side_msg, flags)
 
93
            # only SNDMORE for side-socket
 
94
            rc = zmq_sendmsg (sidesocket_, &msg, ZMQ_SNDMORE)
 
95
        else:
 
96
            rc = zmq_sendmsg (outsocket_, &side_msg, 0)
 
97
            rc = zmq_sendmsg (sidesocket_, &msg, 0)
 
98
            break
 
99
    return rc
38
100
 
39
101
# the MonitoredQueue C function, adapted from zmq::queue.cpp :
40
102
cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
41
 
                        void *sidesocket_, zmq_msg_t in_msg, 
42
 
                        zmq_msg_t out_msg, int swap_ids) nogil:
 
103
                        void *sidesocket_, zmq_msg_t *in_msg_ptr, 
 
104
                        zmq_msg_t *out_msg_ptr, int swap_ids) nogil:
43
105
    """The actual C function for a monitored queue device. 
44
106
 
45
107
    See ``monitored_queue()`` for details.
46
108
    """
47
 
 
48
 
    cdef int ids_done
 
109
    
49
110
    cdef zmq_msg_t msg
50
111
    cdef int rc = zmq_msg_init (&msg)
51
112
    cdef zmq_msg_t id_msg
53
114
    cdef zmq_msg_t side_msg
54
115
    rc = zmq_msg_init (&side_msg)
55
116
    # assert (rc == 0)
56
 
 
57
 
    cdef int64_t more
58
 
    cdef size_t moresz
59
 
 
 
117
    
 
118
    
60
119
    cdef zmq_pollitem_t items [2]
61
120
    items [0].socket = insocket_
62
121
    items [0].fd = 0
71
130
    # items [2].fd = 0
72
131
    # items [2].events = ZMQ_POLLIN
73
132
    # items [2].revents = 0
74
 
 
 
133
    
75
134
    while (True):
76
 
 
 
135
    
77
136
        # //  Wait while there are either requests or replies to process.
78
137
        rc = zmq_poll (&items [0], 2, -1)
79
 
        
 
138
        if rc < 0:
 
139
            return rc
80
140
        # //  The algorithm below asumes ratio of request and replies processed
81
141
        # //  under full load to be 1:1. Although processing requests replies
82
142
        # //  first is tempting it is suspectible to DoS attacks (overloading
85
145
        # //  Process a request.
86
146
        if (items [0].revents & ZMQ_POLLIN):
87
147
            # send in_prefix to side socket
88
 
            rc = zmq_msg_copy(&side_msg, &in_msg)
89
 
            rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
90
 
            if swap_ids:# both xrep, must send second identity first
91
 
                # recv two ids into msg, id_msg
92
 
                rc = zmq_recv (insocket_, &msg, 0)
93
 
                rc = zmq_recv (insocket_, &id_msg, 0)
94
 
                
95
 
                # send second id (id_msg) first
96
 
                #!!!! always send a copy before the original !!!!
97
 
                rc = zmq_msg_copy(&side_msg, &id_msg)
98
 
                rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
99
 
                rc = zmq_send (sidesocket_, &id_msg, ZMQ_SNDMORE)
100
 
                # send first id (msg) second
101
 
                rc = zmq_msg_copy(&side_msg, &msg)
102
 
                rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
103
 
                rc = zmq_send (sidesocket_, &msg, ZMQ_SNDMORE)
104
 
            while (True):
105
 
                rc = zmq_recv (insocket_, &msg, 0)
106
 
                # assert (rc == 0)
107
 
 
108
 
                moresz = sizeof (more)
109
 
                rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, &more, &moresz)
110
 
                # assert (rc == 0)
111
 
 
112
 
                rc = zmq_msg_copy(&side_msg, &msg)
113
 
                if more:
114
 
                    rc = zmq_send (outsocket_, &side_msg, ZMQ_SNDMORE)
115
 
                    rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
116
 
                else:
117
 
                    rc = zmq_send (outsocket_, &side_msg, 0)
118
 
                    rc = zmq_send (sidesocket_, &msg,0)
119
 
                # assert (rc == 0)
120
 
 
121
 
                if (not more):
122
 
                    break
 
148
            rc = zmq_msg_copy(&side_msg, in_msg_ptr)
 
149
            rc = zmq_sendmsg (sidesocket_, &side_msg, ZMQ_SNDMORE)
 
150
            if rc < 0:
 
151
                return rc
 
152
            # relay the rest of the message
 
153
            rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
 
154
            if rc < 0:
 
155
                return rc
123
156
        if (items [1].revents & ZMQ_POLLIN):
124
 
            rc = zmq_msg_copy(&side_msg, &out_msg)
125
 
            rc = zmq_send (sidesocket_, &side_msg, ZMQ_SNDMORE)
126
 
            if swap_ids:
127
 
                # recv two ids into msg, id_msg
128
 
                rc = zmq_recv (outsocket_, &msg, 0)
129
 
                rc = zmq_recv (outsocket_, &id_msg, 0)
130
 
                
131
 
                # send second id (id_msg) first
132
 
                rc = zmq_msg_copy(&side_msg, &id_msg)
133
 
                rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
134
 
                rc = zmq_send (sidesocket_, &id_msg,ZMQ_SNDMORE)
135
 
                
136
 
                # send first id (msg) second
137
 
                rc = zmq_msg_copy(&side_msg, &msg)
138
 
                rc = zmq_send (insocket_, &side_msg, ZMQ_SNDMORE)
139
 
                rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
140
 
            while (True):
141
 
                rc = zmq_recv (outsocket_, &msg, 0)
142
 
                # assert (rc == 0)
143
 
 
144
 
                moresz = sizeof (more)
145
 
                rc = zmq_getsockopt (outsocket_, ZMQ_RCVMORE, &more, &moresz)
146
 
                # assert (rc == 0)
147
 
                rc = zmq_msg_copy(&side_msg, &msg)
148
 
                if more:
149
 
                    rc = zmq_send (insocket_, &side_msg,ZMQ_SNDMORE)
150
 
                    rc = zmq_send (sidesocket_, &msg,ZMQ_SNDMORE)
151
 
                else:
152
 
                    rc = zmq_send (insocket_, &side_msg,0)
153
 
                    rc = zmq_send (sidesocket_, &msg,0)
154
 
                # errno_assert (rc == 0)
155
 
 
156
 
                if (not more):
157
 
                    break
 
157
            # send out_prefix to side socket
 
158
            rc = zmq_msg_copy(&side_msg, out_msg_ptr)
 
159
            rc = zmq_sendmsg (sidesocket_, &side_msg, ZMQ_SNDMORE)
 
160
            if rc < 0:
 
161
                return rc
 
162
            # relay the rest of the message
 
163
            rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
 
164
            if rc < 0:
 
165
                return rc
158
166
    return 0