~ubuntu-branches/ubuntu/saucy/zeromq3/saucy

« back to all changes in this revision

Viewing changes to perf/inproc_thr.cpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-06-04 21:21:09 UTC
  • Revision ID: package-import@ubuntu.com-20120604212109-b7b3m0rn21o8oo2q
Tags: upstream-3.1.0~beta+dfsg
ImportĀ upstreamĀ versionĀ 3.1.0~beta+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2009-2011 250bpm s.r.o.
 
3
    Copyright (c) 2007-2009 iMatix Corporation
 
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
5
 
 
6
    This file is part of 0MQ.
 
7
 
 
8
    0MQ is free software; you can redistribute it and/or modify it under
 
9
    the terms of the GNU Lesser General Public License as published by
 
10
    the Free Software Foundation; either version 3 of the License, or
 
11
    (at your option) any later version.
 
12
 
 
13
    0MQ is distributed in the hope that it will be useful,
 
14
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
    GNU Lesser General Public License for more details.
 
17
 
 
18
    You should have received a copy of the GNU Lesser General Public License
 
19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
*/
 
21
 
 
22
#include "../include/zmq.h"
 
23
#include "../include/zmq_utils.h"
 
24
 
 
25
#include <stdio.h>
 
26
#include <stdlib.h>
 
27
#include <string.h>
 
28
 
 
29
#include "../src/platform.hpp"
 
30
 
 
31
#if defined ZMQ_HAVE_WINDOWS
 
32
#include <windows.h>
 
33
#include <process.h>
 
34
#else
 
35
#include <pthread.h>
 
36
#endif
 
37
 
 
38
static int message_count;
 
39
static size_t message_size;
 
40
 
 
41
#if defined ZMQ_HAVE_WINDOWS
 
42
static unsigned int __stdcall worker (void *ctx_)
 
43
#else
 
44
static void *worker (void *ctx_)
 
45
#endif
 
46
{
 
47
    void *s;
 
48
    int rc;
 
49
    int i;
 
50
    zmq_msg_t msg;
 
51
 
 
52
    s = zmq_socket (ctx_, ZMQ_PUSH);
 
53
    if (!s) {
 
54
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
 
55
        exit (1);
 
56
    }
 
57
 
 
58
    rc = zmq_connect (s, "inproc://thr_test");
 
59
    if (rc != 0) {
 
60
        printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
 
61
        exit (1);
 
62
    }
 
63
 
 
64
    for (i = 0; i != message_count; i++) {
 
65
 
 
66
        rc = zmq_msg_init_size (&msg, message_size);
 
67
        if (rc != 0) {
 
68
            printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
 
69
            exit (1);
 
70
        }
 
71
#if defined ZMQ_MAKE_VALGRIND_HAPPY
 
72
        memset (zmq_msg_data (&msg), 0, message_size);
 
73
#endif
 
74
 
 
75
        rc = zmq_sendmsg (s, &msg, 0);
 
76
        if (rc < 0) {
 
77
            printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
 
78
            exit (1);
 
79
        }
 
80
        rc = zmq_msg_close (&msg);
 
81
        if (rc != 0) {
 
82
            printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
 
83
            exit (1);
 
84
        }
 
85
    }
 
86
 
 
87
    rc = zmq_close (s);
 
88
    if (rc != 0) {
 
89
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
 
90
        exit (1);
 
91
    }
 
92
 
 
93
#if defined ZMQ_HAVE_WINDOWS
 
94
    return 0;
 
95
#else
 
96
    return NULL;
 
97
#endif
 
98
}
 
99
 
 
100
int main (int argc, char *argv [])
 
101
{
 
102
#if defined ZMQ_HAVE_WINDOWS
 
103
    HANDLE local_thread;
 
104
#else
 
105
    pthread_t local_thread;
 
106
#endif
 
107
    void *ctx;
 
108
    void *s;
 
109
    int rc;
 
110
    int i;
 
111
    zmq_msg_t msg;
 
112
    void *watch;
 
113
    unsigned long elapsed;
 
114
    unsigned long throughput;
 
115
    double megabits;
 
116
 
 
117
    if (argc != 3) {
 
118
        printf ("usage: thread_thr <message-size> <message-count>\n");
 
119
        return 1;
 
120
    }
 
121
 
 
122
    message_size = atoi (argv [1]);
 
123
    message_count = atoi (argv [2]);
 
124
 
 
125
    ctx = zmq_init (1);
 
126
    if (!ctx) {
 
127
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
 
128
        return -1;
 
129
    }
 
130
 
 
131
    s = zmq_socket (ctx, ZMQ_PULL);
 
132
    if (!s) {
 
133
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
 
134
        return -1;
 
135
    }
 
136
 
 
137
    rc = zmq_bind (s, "inproc://thr_test");
 
138
    if (rc != 0) {
 
139
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
 
140
        return -1;
 
141
    }
 
142
 
 
143
#if defined ZMQ_HAVE_WINDOWS
 
144
    local_thread = (HANDLE) _beginthreadex (NULL, 0,
 
145
        worker, ctx, 0 , NULL);
 
146
    if (local_thread == 0) {
 
147
        printf ("error in _beginthreadex\n");
 
148
        return -1;
 
149
    }
 
150
#else
 
151
    rc = pthread_create (&local_thread, NULL, worker, ctx);
 
152
    if (rc != 0) {
 
153
        printf ("error in pthread_create: %s\n", zmq_strerror (rc));
 
154
        return -1;
 
155
    }
 
156
#endif
 
157
 
 
158
    rc = zmq_msg_init (&msg);
 
159
    if (rc != 0) {
 
160
        printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
 
161
        return -1;
 
162
    }
 
163
 
 
164
    printf ("message size: %d [B]\n", (int) message_size);
 
165
    printf ("message count: %d\n", (int) message_count);
 
166
 
 
167
    rc = zmq_recvmsg (s, &msg, 0);
 
168
    if (rc < 0) {
 
169
        printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
 
170
        return -1;
 
171
    }
 
172
    if (zmq_msg_size (&msg) != message_size) {
 
173
        printf ("message of incorrect size received\n");
 
174
        return -1;
 
175
    }
 
176
 
 
177
    watch = zmq_stopwatch_start ();
 
178
 
 
179
    for (i = 0; i != message_count - 1; i++) {
 
180
        rc = zmq_recvmsg (s, &msg, 0);
 
181
        if (rc < 0) {
 
182
            printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
 
183
            return -1;
 
184
        }
 
185
        if (zmq_msg_size (&msg) != message_size) {
 
186
            printf ("message of incorrect size received\n");
 
187
            return -1;
 
188
        }
 
189
    }
 
190
 
 
191
    elapsed = zmq_stopwatch_stop (watch);
 
192
    if (elapsed == 0)
 
193
        elapsed = 1;
 
194
 
 
195
    rc = zmq_msg_close (&msg);
 
196
    if (rc != 0) {
 
197
        printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
 
198
        return -1;
 
199
    }
 
200
 
 
201
#if defined ZMQ_HAVE_WINDOWS
 
202
    DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
 
203
    if (rc2 == WAIT_FAILED) {
 
204
        printf ("error in WaitForSingleObject\n");
 
205
        return -1;
 
206
    }
 
207
    BOOL rc3 = CloseHandle (local_thread);
 
208
    if (rc3 == 0) {
 
209
        printf ("error in CloseHandle\n");
 
210
        return -1;
 
211
    }
 
212
#else
 
213
    rc = pthread_join (local_thread, NULL);
 
214
    if (rc != 0) {
 
215
        printf ("error in pthread_join: %s\n", zmq_strerror (rc));
 
216
        return -1;
 
217
    }
 
218
#endif
 
219
 
 
220
    rc = zmq_close (s);
 
221
    if (rc != 0) {
 
222
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
 
223
        return -1;
 
224
    }
 
225
 
 
226
    rc = zmq_term (ctx);
 
227
    if (rc != 0) {
 
228
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
 
229
        return -1;
 
230
    }
 
231
 
 
232
    throughput = (unsigned long)
 
233
        ((double) message_count / (double) elapsed * 1000000);
 
234
    megabits = (double) (throughput * message_size * 8) / 1000000;
 
235
 
 
236
    printf ("mean throughput: %d [msg/s]\n", (int) throughput);
 
237
    printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
 
238
 
 
239
    return 0;
 
240
}
 
241