~ubuntu-branches/ubuntu/wily/zeromq3/wily-proposed

« back to all changes in this revision

Viewing changes to tests/test_inproc_connect.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2014-03-16 14:02:28 UTC
  • mfrom: (1.1.6) (6.1.1 experimental)
  • Revision ID: package-import@ubuntu.com-20140316140228-ig1sgh7czk59m9ux
Tags: 4.0.4+dfsg-1
* QA upload; orphan the package
  - Upload to unstable
* New upstream release
* Update repack.stub script
* Drop 02_fix-exported-symbols.patch and 03_fix-s390-rdtsc.patch
  (merged upstream)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
 
3
 
 
4
    This file is part of 0MQ.
 
5
 
 
6
    0MQ is free software; you can redistribute it and/or modify it under
 
7
    the terms of the GNU Lesser General Public License as published by
 
8
    the Free Software Foundation; either version 3 of the License, or
 
9
    (at your option) any later version.
 
10
 
 
11
    0MQ is distributed in the hope that it will be useful,
 
12
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
    GNU Lesser General Public License for more details.
 
15
 
 
16
    You should have received a copy of the GNU Lesser General Public License
 
17
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
*/
 
19
 
 
20
#include "testutil.hpp"
 
21
 
 
22
static void pusher (void *ctx)
 
23
{
 
24
    // Connect first
 
25
    void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
 
26
    assert (connectSocket);
 
27
    int rc = zmq_connect (connectSocket, "inproc://a");
 
28
    assert (rc == 0);
 
29
 
 
30
    // Queue up some data
 
31
    rc = zmq_send_const (connectSocket, "foobar", 6, 0);
 
32
    assert (rc == 6);
 
33
 
 
34
    // Cleanup
 
35
    rc = zmq_close (connectSocket);
 
36
    assert (rc == 0);
 
37
}
 
38
 
 
39
void test_bind_before_connect()
 
40
{
 
41
    void *ctx = zmq_ctx_new ();
 
42
    assert (ctx);
 
43
 
 
44
    // Bind first
 
45
    void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
 
46
    assert (bindSocket);
 
47
    int rc = zmq_bind (bindSocket, "inproc://a");
 
48
    assert (rc == 0);
 
49
 
 
50
    // Now connect
 
51
    void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
 
52
    assert (connectSocket);
 
53
    rc = zmq_connect (connectSocket, "inproc://a");
 
54
    assert (rc == 0);
 
55
    
 
56
    // Queue up some data
 
57
    rc = zmq_send_const (connectSocket, "foobar", 6, 0);
 
58
    assert (rc == 6);
 
59
 
 
60
    // Read pending message
 
61
    zmq_msg_t msg;
 
62
    rc = zmq_msg_init (&msg);
 
63
    assert (rc == 0);
 
64
    rc = zmq_msg_recv (&msg, bindSocket, 0);
 
65
    assert (rc == 6);
 
66
    void *data = zmq_msg_data (&msg);
 
67
    assert (memcmp ("foobar", data, 6) == 0);
 
68
 
 
69
    // Cleanup
 
70
    rc = zmq_close (connectSocket);
 
71
    assert (rc == 0);
 
72
 
 
73
    rc = zmq_close (bindSocket);
 
74
    assert (rc == 0);
 
75
 
 
76
    rc = zmq_ctx_term (ctx);
 
77
    assert (rc == 0);
 
78
}
 
79
 
 
80
void test_connect_before_bind()
 
81
{
 
82
    void *ctx = zmq_ctx_new ();
 
83
    assert (ctx);
 
84
 
 
85
    // Connect first
 
86
    void *connectSocket = zmq_socket (ctx, ZMQ_PAIR);
 
87
    assert (connectSocket);
 
88
    int rc = zmq_connect (connectSocket, "inproc://a");
 
89
    assert (rc == 0);
 
90
 
 
91
    // Queue up some data
 
92
    rc = zmq_send_const (connectSocket, "foobar", 6, 0);
 
93
    assert (rc == 6);
 
94
 
 
95
    // Now bind
 
96
    void *bindSocket = zmq_socket (ctx, ZMQ_PAIR);
 
97
    assert (bindSocket);
 
98
    rc = zmq_bind (bindSocket, "inproc://a");
 
99
    assert (rc == 0);
 
100
    
 
101
    // Read pending message
 
102
    zmq_msg_t msg;
 
103
    rc = zmq_msg_init (&msg);
 
104
    assert (rc == 0);
 
105
    rc = zmq_msg_recv (&msg, bindSocket, 0);
 
106
    assert (rc == 6);
 
107
    void *data = zmq_msg_data (&msg);
 
108
    assert (memcmp ("foobar", data, 6) == 0);
 
109
 
 
110
    // Cleanup
 
111
    rc = zmq_close (connectSocket);
 
112
    assert (rc == 0);
 
113
 
 
114
    rc = zmq_close (bindSocket);
 
115
    assert (rc == 0);
 
116
 
 
117
    rc = zmq_ctx_term (ctx);
 
118
    assert (rc == 0);
 
119
}
 
120
 
 
121
void test_connect_before_bind_pub_sub()
 
122
{
 
123
    void *ctx = zmq_ctx_new ();
 
124
    assert (ctx);
 
125
 
 
126
    // Connect first
 
127
    void *connectSocket = zmq_socket (ctx, ZMQ_PUB);
 
128
    assert (connectSocket);
 
129
    int rc = zmq_connect (connectSocket, "inproc://a");
 
130
    assert (rc == 0);
 
131
 
 
132
    // Queue up some data, this will be dropped
 
133
    rc = zmq_send_const (connectSocket, "before", 6, 0);
 
134
    assert (rc == 6);
 
135
 
 
136
    // Now bind
 
137
    void *bindSocket = zmq_socket (ctx, ZMQ_SUB);
 
138
    assert (bindSocket);
 
139
    rc = zmq_setsockopt (bindSocket, ZMQ_SUBSCRIBE, "", 0);
 
140
    assert (rc == 0);
 
141
    rc = zmq_bind (bindSocket, "inproc://a");
 
142
    assert (rc == 0);
 
143
   
 
144
    // Wait for pub-sub connection to happen
 
145
    msleep (SETTLE_TIME);
 
146
 
 
147
    // Queue up some data, this not will be dropped
 
148
    rc = zmq_send_const (connectSocket, "after", 6, 0);
 
149
    assert (rc == 6);
 
150
 
 
151
    // Read pending message
 
152
    zmq_msg_t msg;
 
153
    rc = zmq_msg_init (&msg);
 
154
    assert (rc == 0);
 
155
    rc = zmq_msg_recv (&msg, bindSocket, 0);
 
156
    assert (rc == 6);
 
157
    void *data = zmq_msg_data (&msg);
 
158
    assert (memcmp ("after", data, 5) == 0);
 
159
 
 
160
    // Cleanup
 
161
    rc = zmq_close (connectSocket);
 
162
    assert (rc == 0);
 
163
 
 
164
    rc = zmq_close (bindSocket);
 
165
    assert (rc == 0);
 
166
 
 
167
    rc = zmq_ctx_term (ctx);
 
168
    assert (rc == 0);
 
169
}
 
170
 
 
171
void test_multiple_connects()
 
172
{
 
173
    const unsigned int no_of_connects = 10;
 
174
    void *ctx = zmq_ctx_new ();
 
175
    assert (ctx);
 
176
 
 
177
    int rc;
 
178
    void *connectSocket[no_of_connects];
 
179
 
 
180
    // Connect first
 
181
    for (unsigned int i = 0; i < no_of_connects; ++i)
 
182
    {
 
183
        connectSocket [i] = zmq_socket (ctx, ZMQ_PUSH);
 
184
        assert (connectSocket [i]);
 
185
        rc = zmq_connect (connectSocket [i], "inproc://a");
 
186
        assert (rc == 0);
 
187
 
 
188
        // Queue up some data
 
189
        rc = zmq_send_const (connectSocket [i], "foobar", 6, 0);
 
190
        assert (rc == 6);
 
191
    }
 
192
 
 
193
    // Now bind
 
194
    void *bindSocket = zmq_socket (ctx, ZMQ_PULL);
 
195
    assert (bindSocket);
 
196
    rc = zmq_bind (bindSocket, "inproc://a");
 
197
    assert (rc == 0);
 
198
    
 
199
    for (unsigned int i = 0; i < no_of_connects; ++i)
 
200
    {
 
201
        // Read pending message
 
202
        zmq_msg_t msg;
 
203
        rc = zmq_msg_init (&msg);
 
204
        assert (rc == 0);
 
205
        rc = zmq_msg_recv (&msg, bindSocket, 0);
 
206
        assert (rc == 6);
 
207
        void *data = zmq_msg_data (&msg);
 
208
        assert (memcmp ("foobar", data, 6) == 0);
 
209
    }
 
210
 
 
211
    // Cleanup
 
212
    for (unsigned int i = 0; i < no_of_connects; ++i)
 
213
    {
 
214
        rc = zmq_close (connectSocket [i]);
 
215
        assert (rc == 0);
 
216
    }
 
217
 
 
218
    rc = zmq_close (bindSocket);
 
219
    assert (rc == 0);
 
220
 
 
221
    rc = zmq_ctx_term (ctx);
 
222
    assert (rc == 0);
 
223
}
 
224
 
 
225
void test_multiple_threads()
 
226
{
 
227
    const unsigned int no_of_threads = 30;
 
228
    void *ctx = zmq_ctx_new ();
 
229
    assert (ctx);
 
230
 
 
231
    int rc;
 
232
    void *threads [no_of_threads];
 
233
 
 
234
    // Connect first
 
235
    for (unsigned int i = 0; i < no_of_threads; ++i)
 
236
    {
 
237
        threads [i] = zmq_threadstart (&pusher, ctx);
 
238
    }
 
239
 
 
240
    // Now bind
 
241
    void *bindSocket = zmq_socket (ctx, ZMQ_PULL);
 
242
    assert (bindSocket);
 
243
    rc = zmq_bind (bindSocket, "inproc://a");
 
244
    assert (rc == 0);
 
245
 
 
246
    for (unsigned int i = 0; i < no_of_threads; ++i)
 
247
    {
 
248
        // Read pending message
 
249
        zmq_msg_t msg;
 
250
        rc = zmq_msg_init (&msg);
 
251
        assert (rc == 0);
 
252
        rc = zmq_msg_recv (&msg, bindSocket, 0);
 
253
        assert (rc == 6);
 
254
        void *data = zmq_msg_data (&msg);
 
255
        assert (memcmp ("foobar", data, 6) == 0);
 
256
    }
 
257
 
 
258
    // Cleanup
 
259
    for (unsigned int i = 0; i < no_of_threads; ++i)
 
260
    {
 
261
        zmq_threadclose (threads [i]);
 
262
    }
 
263
 
 
264
    rc = zmq_close (bindSocket);
 
265
    assert (rc == 0);
 
266
 
 
267
    rc = zmq_ctx_term (ctx);
 
268
    assert (rc == 0);
 
269
}
 
270
 
 
271
void test_identity()
 
272
{
 
273
    //  Create the infrastructure
 
274
    void *ctx = zmq_ctx_new ();
 
275
    assert (ctx);
 
276
    
 
277
    void *sc = zmq_socket (ctx, ZMQ_DEALER);
 
278
    assert (sc);
 
279
    
 
280
    int rc = zmq_connect (sc, "inproc://a");
 
281
    assert (rc == 0);
 
282
   
 
283
    void *sb = zmq_socket (ctx, ZMQ_ROUTER);
 
284
    assert (sb);
 
285
    
 
286
    rc = zmq_bind (sb, "inproc://a");
 
287
    assert (rc == 0);
 
288
 
 
289
    //  Send 2-part message.
 
290
    rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE);
 
291
    assert (rc == 1);
 
292
    rc = zmq_send (sc, "B", 1, 0);
 
293
    assert (rc == 1);
 
294
 
 
295
    //  Identity comes first.
 
296
    zmq_msg_t msg;
 
297
    rc = zmq_msg_init (&msg);
 
298
    assert (rc == 0);
 
299
    rc = zmq_msg_recv (&msg, sb, 0);
 
300
    assert (rc >= 0);
 
301
    int more = zmq_msg_more (&msg);
 
302
    assert (more == 1);
 
303
 
 
304
    //  Then the first part of the message body.
 
305
    rc = zmq_msg_recv (&msg, sb, 0);
 
306
    assert (rc == 1);
 
307
    more = zmq_msg_more (&msg);
 
308
    assert (more == 1);
 
309
 
 
310
    //  And finally, the second part of the message body.
 
311
    rc = zmq_msg_recv (&msg, sb, 0);
 
312
    assert (rc == 1);
 
313
    more = zmq_msg_more (&msg);
 
314
    assert (more == 0);
 
315
 
 
316
    //  Deallocate the infrastructure.
 
317
    rc = zmq_close (sc);
 
318
    assert (rc == 0);
 
319
    
 
320
    rc = zmq_close (sb);
 
321
    assert (rc == 0);
 
322
    
 
323
    rc = zmq_ctx_term (ctx);
 
324
    assert (rc == 0);
 
325
}
 
326
 
 
327
void test_connect_only ()
 
328
{
 
329
    void *ctx = zmq_ctx_new ();
 
330
    assert (ctx);
 
331
 
 
332
    void *connectSocket = zmq_socket (ctx, ZMQ_PUSH);
 
333
    assert (connectSocket);
 
334
    int rc = zmq_connect (connectSocket, "inproc://a");
 
335
    assert (rc == 0);
 
336
 
 
337
    rc = zmq_close (connectSocket);
 
338
    assert (rc == 0);
 
339
 
 
340
    rc = zmq_ctx_term (ctx);
 
341
    assert (rc == 0);
 
342
}
 
343
 
 
344
int main (void)
 
345
{
 
346
    setup_test_environment();
 
347
 
 
348
    test_bind_before_connect ();
 
349
    test_connect_before_bind ();
 
350
    test_connect_before_bind_pub_sub ();
 
351
    test_multiple_connects ();
 
352
    test_multiple_threads ();
 
353
    test_identity ();
 
354
    test_connect_only ();
 
355
 
 
356
    return 0;
 
357
}