~ubuntu-branches/ubuntu/saucy/rabbitmq-server/saucy

« back to all changes in this revision

Viewing changes to plugins-src/rabbitmq-mqtt/test/src/com/rabbitmq/mqtt/test/MqttTest.java

  • Committer: Package Import Robot
  • Author(s): Emile Joubert
  • Date: 2012-11-19 11:42:31 UTC
  • mfrom: (0.2.18) (0.1.32 sid)
  • Revision ID: package-import@ubuntu.com-20121119114231-hvapkn4akng09etr
Tags: 3.0.0-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
//  The contents of this file are subject to the Mozilla Public License
 
2
//  Version 1.1 (the "License"); you may not use this file except in
 
3
//  compliance with the License. You may obtain a copy of the License
 
4
//  at http://www.mozilla.org/MPL/
 
5
//
 
6
//  Software distributed under the License is distributed on an "AS IS"
 
7
//  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 
8
//  the License for the specific language governing rights and
 
9
//  limitations under the License.
 
10
//
 
11
//  The Original Code is RabbitMQ.
 
12
//
 
13
//  The Initial Developer of the Original Code is VMware, Inc.
 
14
//  Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
 
15
//
 
16
 
 
17
package com.rabbitmq.mqtt.test;
 
18
 
 
19
import com.rabbitmq.client.*;
 
20
import junit.framework.Assert;
 
21
import junit.framework.TestCase;
 
22
import org.eclipse.paho.client.mqttv3.MqttCallback;
 
23
import org.eclipse.paho.client.mqttv3.MqttClient;
 
24
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 
25
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
 
26
import org.eclipse.paho.client.mqttv3.MqttException;
 
27
import org.eclipse.paho.client.mqttv3.MqttMessage;
 
28
import org.eclipse.paho.client.mqttv3.MqttTopic;
 
29
import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
 
30
import org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule;
 
31
import org.eclipse.paho.client.mqttv3.internal.trace.Trace;
 
32
import org.eclipse.paho.client.mqttv3.internal.wire.MqttOutputStream;
 
33
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish;
 
34
 
 
35
import javax.net.SocketFactory;
 
36
import java.io.IOException;
 
37
import java.net.InetAddress;
 
38
import java.net.Socket;
 
39
import java.util.ArrayList;
 
40
import java.util.Arrays;
 
41
import java.util.List;
 
42
 
 
43
/***
 
44
 *  MQTT v3.1 tests
 
45
 *  TODO: synchronise access to variables
 
46
 */
 
47
 
 
48
public class MqttTest extends TestCase implements MqttCallback {
 
49
 
 
50
    private final String host = "localhost";
 
51
    private final int port = 1883;
 
52
        private final String brokerUrl = "tcp://" + host + ":" + port;
 
53
    private String clientId;
 
54
    private String clientId2;
 
55
    private MqttClient client;
 
56
    private MqttClient client2;
 
57
        private MqttConnectOptions conOpt;
 
58
    private ArrayList<MqttMessage> receivedMessages;
 
59
 
 
60
    private final byte[] payload = "payload".getBytes();
 
61
    private final String topic = "test-topic";
 
62
    private int testDelay = 2000;
 
63
    private long lastReceipt;
 
64
    private boolean expectConnectionFailure;
 
65
 
 
66
    private ConnectionFactory connectionFactory;
 
67
    private Connection conn;
 
68
    private Channel ch;
 
69
 
 
70
    // override 10s limit
 
71
    private class MyConnOpts extends MqttConnectOptions {
 
72
        private int keepAliveInterval = 60;
 
73
        @Override
 
74
        public void setKeepAliveInterval(int keepAliveInterval) {
 
75
            this.keepAliveInterval = keepAliveInterval;
 
76
        }
 
77
        @Override
 
78
        public int getKeepAliveInterval() {
 
79
            return keepAliveInterval;
 
80
        }
 
81
    }
 
82
 
 
83
    @Override
 
84
    public void setUp() throws MqttException {
 
85
        clientId = getClass().getSimpleName() + ((int) (10000*Math.random()));
 
86
        clientId2 = clientId + "-2";
 
87
        client = new MqttClient(brokerUrl, clientId, null);
 
88
        client2 = new MqttClient(brokerUrl, clientId2, null);
 
89
        conOpt = new MyConnOpts();
 
90
        setConOpts(conOpt);
 
91
        receivedMessages = new ArrayList();
 
92
        expectConnectionFailure = false;
 
93
    }
 
94
 
 
95
    @Override
 
96
    public  void tearDown() throws MqttException {
 
97
        // clean any sticky sessions
 
98
        setConOpts(conOpt);
 
99
        client = new MqttClient(brokerUrl, clientId, null);
 
100
        try {
 
101
            client.connect(conOpt);
 
102
            client.disconnect();
 
103
        } catch (Exception _) {}
 
104
 
 
105
        client2 = new MqttClient(brokerUrl, clientId2, null);
 
106
        try {
 
107
            client2.connect(conOpt);
 
108
            client2.disconnect();
 
109
        } catch (Exception _) {}
 
110
    }
 
111
 
 
112
    private void setUpAmqp() throws IOException {
 
113
        connectionFactory = new ConnectionFactory();
 
114
        connectionFactory.setHost(host);
 
115
        conn = connectionFactory.newConnection();
 
116
        ch = conn.createChannel();
 
117
    }
 
118
 
 
119
    private void tearDownAmqp() throws IOException {
 
120
        conn.close();
 
121
    }
 
122
 
 
123
    private void setConOpts(MqttConnectOptions conOpts) {
 
124
        // provide authentication if the broker needs it
 
125
        // conOpts.setUserName("guest");
 
126
        // conOpts.setPassword("guest".toCharArray());
 
127
        conOpts.setCleanSession(true);
 
128
        conOpts.setKeepAliveInterval(60);
 
129
    }
 
130
 
 
131
    public void testConnectFirst() throws MqttException, IOException, InterruptedException {
 
132
        MqttPublish publish = new MqttPublish(this.getName(), new MqttMessage(payload));
 
133
        NetworkModule networkModule = new TCPNetworkModule(Trace.getTrace(""), SocketFactory.getDefault(), host, port);
 
134
        networkModule.start();
 
135
        MqttOutputStream mqttOut = new MqttOutputStream(networkModule.getOutputStream());
 
136
        try {
 
137
            for (int i=1; i<1000; i++) {
 
138
                mqttOut.write(publish);
 
139
                mqttOut.flush();
 
140
            }
 
141
            Thread.sleep(testDelay);
 
142
            fail("Error expected if CONNECT is not first packet");
 
143
        } catch (IOException _) {}
 
144
    }
 
145
 
 
146
    public void testInvalidUser() throws MqttException {
 
147
        conOpt.setUserName("invalid-user");
 
148
        try {
 
149
            client.connect(conOpt);
 
150
            fail("Authentication failure expected");
 
151
        } catch (MqttException ex) {
 
152
            Assert.assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
 
153
        }
 
154
    }
 
155
 
 
156
    public void testInvalidPassword() throws MqttException {
 
157
        conOpt.setUserName("invalid-user");
 
158
        conOpt.setPassword("invalid-password".toCharArray());
 
159
        try {
 
160
            client.connect(conOpt);
 
161
            fail("Authentication failure expected");
 
162
        } catch (MqttException ex) {
 
163
            Assert.assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
 
164
        }
 
165
    }
 
166
 
 
167
 
 
168
    public void testSubscribeQos0() throws MqttException, InterruptedException {
 
169
        client.connect(conOpt);
 
170
        client.setCallback(this);
 
171
        client.subscribe(topic, 0);
 
172
 
 
173
        publish(client, topic, 0, payload);
 
174
        Thread.sleep(testDelay);
 
175
        Assert.assertEquals(1, receivedMessages.size());
 
176
        Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
 
177
        Assert.assertEquals(0, receivedMessages.get(0).getQos());
 
178
        client.disconnect();
 
179
    }
 
180
 
 
181
    public void testSubscribeUnsubscribe() throws MqttException, InterruptedException {
 
182
        client.connect(conOpt);
 
183
        client.setCallback(this);
 
184
        client.subscribe(topic, 0);
 
185
 
 
186
        publish(client, topic, 1, payload);
 
187
        Thread.sleep(testDelay);
 
188
        Assert.assertEquals(1, receivedMessages.size());
 
189
        Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
 
190
        Assert.assertEquals(0, receivedMessages.get(0).getQos());
 
191
 
 
192
        client.unsubscribe(topic);
 
193
        publish(client, topic, 0, payload);
 
194
        Thread.sleep(testDelay);
 
195
        Assert.assertEquals(1, receivedMessages.size());
 
196
        client.disconnect();
 
197
    }
 
198
 
 
199
    public void testSubscribeQos1() throws MqttException, InterruptedException {
 
200
        client.connect(conOpt);
 
201
        client.setCallback(this);
 
202
        client.subscribe(topic, 1);
 
203
 
 
204
        publish(client, topic, 0, payload);
 
205
        publish(client, topic, 1, payload);
 
206
        Thread.sleep(testDelay);
 
207
 
 
208
        Assert.assertEquals(2, receivedMessages.size());
 
209
        MqttMessage msg1 = receivedMessages.get(0);
 
210
        MqttMessage msg2 = receivedMessages.get(1);
 
211
 
 
212
        Assert.assertEquals(true, Arrays.equals(msg1.getPayload(), payload));
 
213
        Assert.assertEquals(0, msg1.getQos());
 
214
 
 
215
        Assert.assertEquals(true, Arrays.equals(msg2.getPayload(), payload));
 
216
        Assert.assertEquals(1, msg2.getQos());
 
217
 
 
218
        client.disconnect();
 
219
    }
 
220
 
 
221
    public void testTopics() throws MqttException, InterruptedException {
 
222
        client.connect(conOpt);
 
223
        client.setCallback(this);
 
224
        client.subscribe("/+/mid/#");
 
225
        String cases[] = {"/pre/mid2", "/mid", "/a/mid/b/c/d", "/frob/mid"};
 
226
        List<String> expected = Arrays.asList("/a/mid/b/c/d", "/frob/mid");
 
227
        for(String example : cases){
 
228
            publish(client, example, 0, example.getBytes());
 
229
        }
 
230
        Thread.sleep(testDelay);
 
231
        Assert.assertEquals(expected.size(), receivedMessages.size());
 
232
        for (MqttMessage m : receivedMessages){
 
233
            expected.contains(new String(m.getPayload()));
 
234
        }
 
235
        client.disconnect();
 
236
    }
 
237
 
 
238
    public void testNonCleanSession() throws MqttException, InterruptedException {
 
239
        conOpt.setCleanSession(false);
 
240
        client.connect(conOpt);
 
241
        client.subscribe(topic, 1);
 
242
        client.disconnect();
 
243
 
 
244
        client2.connect(conOpt);
 
245
        publish(client2, topic, 1, payload);
 
246
        client2.disconnect();
 
247
 
 
248
        client.connect(conOpt);
 
249
        client.setCallback(this);
 
250
        client.subscribe(topic, 1);
 
251
 
 
252
        Thread.sleep(testDelay);
 
253
        Assert.assertEquals(1, receivedMessages.size());
 
254
        Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
 
255
        client.unsubscribe(topic);
 
256
        client.disconnect();
 
257
    }
 
258
 
 
259
    public void testCleanSession() throws MqttException, InterruptedException {
 
260
        conOpt.setCleanSession(false);
 
261
        client.connect(conOpt);
 
262
        client.subscribe(topic, 1);
 
263
        client.disconnect();
 
264
 
 
265
        client2.connect(conOpt);
 
266
        publish(client2, topic, 1, payload);
 
267
        client2.disconnect();
 
268
 
 
269
        conOpt.setCleanSession(true);
 
270
        client.connect(conOpt);
 
271
        client.setCallback(this);
 
272
        client.subscribe(topic, 1);
 
273
 
 
274
        Thread.sleep(testDelay);
 
275
        Assert.assertEquals(0, receivedMessages.size());
 
276
        client.unsubscribe(topic);
 
277
        client.disconnect();
 
278
    }
 
279
 
 
280
    public void testMultipleClientIds() throws MqttException, InterruptedException {
 
281
        client.connect(conOpt);
 
282
        client2 = new MqttClient(brokerUrl, clientId, null);
 
283
        client2.connect(conOpt);
 
284
        Thread.sleep(testDelay);
 
285
        Assert.assertFalse(client.isConnected());
 
286
        client2.disconnect();
 
287
    }
 
288
 
 
289
    public void testPing() throws MqttException, InterruptedException {
 
290
        conOpt.setKeepAliveInterval(1);
 
291
        client.connect(conOpt);
 
292
        Thread.sleep(3000);
 
293
        Assert.assertEquals(true, client.isConnected());
 
294
        client.disconnect();
 
295
    }
 
296
 
 
297
    public void testWill() throws MqttException, InterruptedException, IOException {
 
298
        client2.connect(conOpt);
 
299
        client2.subscribe(topic);
 
300
        client2.setCallback(this);
 
301
 
 
302
        final SocketFactory factory = SocketFactory.getDefault();
 
303
        final ArrayList<Socket> sockets = new ArrayList<Socket>();
 
304
        SocketFactory testFactory = new SocketFactory() {
 
305
            public Socket createSocket(String s, int i) throws IOException {
 
306
                Socket sock = factory.createSocket(s, i);
 
307
                sockets.add(sock);
 
308
                return sock;
 
309
            }
 
310
            public Socket createSocket(String s, int i, InetAddress a, int i1) throws IOException {
 
311
                return null;
 
312
            }
 
313
            public Socket createSocket(InetAddress a, int i) throws IOException {
 
314
                return null;
 
315
            }
 
316
            public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) throws IOException {
 
317
                return null;
 
318
            };
 
319
        };
 
320
        conOpt.setSocketFactory(testFactory);
 
321
        MqttTopic willTopic = client.getTopic(topic);
 
322
        conOpt.setWill(willTopic, payload, 0, false);
 
323
        conOpt.setCleanSession(false);
 
324
        client.connect(conOpt);
 
325
 
 
326
        Assert.assertEquals(1, sockets.size());
 
327
        expectConnectionFailure = true;
 
328
        sockets.get(0).close();
 
329
        Thread.sleep(testDelay);
 
330
 
 
331
        Assert.assertEquals(1, receivedMessages.size());
 
332
        Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
 
333
        client2.disconnect();
 
334
    }
 
335
 
 
336
    public void testSubscribeMultiple() throws MqttException {
 
337
        client.connect(conOpt);
 
338
        publish(client, "/topic/1", 1, "msq1-qos1".getBytes());
 
339
 
 
340
        client2.connect(conOpt);
 
341
        client2.setCallback(this);
 
342
        client2.subscribe("/topic/#");
 
343
        client2.subscribe("/topic/#");
 
344
 
 
345
        publish(client, "/topic/2", 0, "msq2-qos0".getBytes());
 
346
        publish(client, "/topic/3", 1, "msq3-qos1".getBytes());
 
347
        publish(client, topic, 0, "msq4-qos0".getBytes());
 
348
        publish(client, topic, 1, "msq4-qos1".getBytes());
 
349
 
 
350
        Assert.assertEquals(2, receivedMessages.size());
 
351
        client.disconnect();
 
352
        client2.disconnect();
 
353
    }
 
354
 
 
355
    public void testPublishMultiple() throws MqttException, InterruptedException {
 
356
        int pubCount = 50;
 
357
        for (int subQos=0; subQos < 2; subQos++){
 
358
            for (int pubQos=0; pubQos < 2; pubQos++){
 
359
                client.connect(conOpt);
 
360
                client.subscribe(topic, subQos);
 
361
                client.setCallback(this);
 
362
                long start = System.currentTimeMillis();
 
363
                for (int i=0; i<pubCount; i++){
 
364
                    publish(client, topic, pubQos, payload);
 
365
                }
 
366
                Thread.sleep(testDelay);
 
367
                Assert.assertEquals(pubCount, receivedMessages.size());
 
368
                System.out.println("publish QOS" + pubQos + " subscribe QOS" + subQos +
 
369
                                   ", " + pubCount + " msgs took " +
 
370
                                   (lastReceipt - start)/1000.0 + "sec");
 
371
                client.disconnect();
 
372
                receivedMessages.clear();
 
373
            }
 
374
        }
 
375
    }
 
376
 
 
377
    public void testInteropM2A() throws MqttException, IOException, InterruptedException {
 
378
        setUpAmqp();
 
379
        String queue = ch.queueDeclare().getQueue();
 
380
        ch.queueBind(queue, "amq.topic", topic);
 
381
 
 
382
        client.connect(conOpt);
 
383
        publish(client, topic, 1, payload);
 
384
        client.disconnect();
 
385
        Thread.sleep(testDelay);
 
386
 
 
387
        GetResponse response = ch.basicGet(queue, true);
 
388
        assertTrue(Arrays.equals(payload, response.getBody()));
 
389
        assertNull(ch.basicGet(queue, true));
 
390
        tearDownAmqp();
 
391
    }
 
392
 
 
393
    public void testInteropA2M() throws MqttException, IOException, InterruptedException {
 
394
        client.connect(conOpt);
 
395
        client.setCallback(this);
 
396
        client.subscribe(topic, 1);
 
397
 
 
398
        setUpAmqp();
 
399
        ch.basicPublish("amq.topic", topic, MessageProperties.MINIMAL_BASIC, payload);
 
400
        tearDownAmqp();
 
401
        Thread.sleep(testDelay);
 
402
 
 
403
        Assert.assertEquals(1, receivedMessages.size());
 
404
        client.disconnect();
 
405
    }
 
406
 
 
407
    private void publish(MqttClient client, String topicName, int qos, byte[] payload) throws MqttException {
 
408
        MqttTopic topic = client.getTopic(topicName);
 
409
                MqttMessage message = new MqttMessage(payload);
 
410
        message.setQos(qos);
 
411
        MqttDeliveryToken token = topic.publish(message);
 
412
        token.waitForCompletion();
 
413
    }
 
414
 
 
415
    public void connectionLost(Throwable cause) {
 
416
        if (!expectConnectionFailure)
 
417
            fail("Connection unexpectedly lost");
 
418
    }
 
419
 
 
420
    public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
 
421
        lastReceipt = System.currentTimeMillis();
 
422
        receivedMessages.add(message);
 
423
    }
 
424
 
 
425
    public void deliveryComplete(MqttDeliveryToken token) {
 
426
    }
 
427
}