4
* Copyright Ericsson AB 2001-2009. All Rights Reserved.
6
* The contents of this file are subject to the Erlang Public License,
7
* Version 1.1, (the "License"); you may not use this file except in
8
* compliance with the License. You should have received a copy of the
9
* Erlang Public License along with this software. If not, it can be
10
* retrieved online at http://www.erlang.org/.
12
* Software distributed under the License is distributed on an "AS IS"
13
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
14
* the License for the specific language governing rights and limitations
20
/* to test multiple threads in ei */
33
#include <sys/types.h>
34
#include <sys/socket.h>
35
#include <netinet/in.h>
49
static int my_listen(int port);
53
To be called from the test case ei_accept_SUITE:multi_thread
54
usage: eiaccnode <cookie> <n>
56
- start threads 0..n-1
58
- listen on "ei0" .. "ei<n-1>"
61
- send {i, <pid>} back
65
static const char* cookie, * desthost;
66
static int port; /* actually base port */
70
#define SD_SEND SHUT_WR
77
#define closesocket(fd) close(fd)
85
einode_thread(void* num)
89
char myname[100], destname[100];
95
sprintf(myname, "eiacc%d", n);
96
printf("thread %d (%s) listening\n", n, myname, destname);
97
r = ei_connect_init(&ec, myname, cookie, 0);
98
if ((listen = my_listen(port+n)) <= 0) {
99
printf("listen err\n");
102
if (ei_publish(&ec, port + n) == -1) {
103
printf("ei_publish port %d\n", port+n);
106
fd = ei_accept(&ec, listen, &conn);
107
printf("ei_accept %d\n", fd);
115
int got = ei_xreceive_msg(fd, &msg, &x);
118
if (got == ERL_ERROR) {
119
printf("receive error %d\n", n);
122
printf("received %d\n", got);
126
if (ei_decode_version(x.buff, &index, &version) != 0) {
127
printf("ei_decode_version %d\n", n);
130
if (ei_decode_pid(x.buff, &index, &pid) != 0) {
131
printf("ei_decode_pid %d\n", n);
134
/* fprintf(f, "got pid from %s \n", pid.node);*/
135
ei_x_new_with_version(&xs);
136
ei_x_encode_tuple_header(&xs, 2);
137
ei_x_encode_long(&xs, n);
138
ei_x_encode_pid(&xs, &pid);
139
r = ei_send(fd, &pid, xs.buff, xs.index);
140
/* fprintf(f, "sent %d bytes %d\n", xs.index, r);*/
141
shutdown(fd, SD_SEND);
146
printf("coudn't connect fd %d r %d\n", fd, r);
148
printf("done thread %d\n", n);
153
MAIN(int argc, char *argv[])
155
int i, n, no_threads;
160
pthread_t threads[100];
172
port = atoi(argv[4]);
174
no_threads = argv[5] != NULL && strcmp(argv[5], "nothreads") == 0;
178
for (i = 0; i < n; ++i) {
183
threads[i] = (HANDLE)_beginthreadex(NULL, 0, einode_thread,
186
pthread_create(&threads[i], NULL, einode_thread, (void*)i);
192
einode_thread((void*)i);
197
for (i = 0; i < n; ++i) {
199
if (WaitForSingleObject(threads[i], INFINITE) != WAIT_OBJECT_0)
201
if (pthread_join(threads[i], NULL) != 0)
203
printf("bad wait thread %d\n", i);
212
static int my_listen(int port)
215
struct sockaddr_in addr;
216
const char *on = "1";
218
if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
221
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, on, sizeof(on));
223
memset((void*) &addr, 0, (size_t) sizeof(addr));
224
addr.sin_family = AF_INET;
225
addr.sin_port = htons(port);
226
addr.sin_addr.s_addr = htonl(INADDR_ANY);
228
if (bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)) < 0)
231
listen(listen_fd, 5);