1
// The contents of this file are subject to the Mozilla Public License
2
// Version 1.1 (the "License"); you may not use this file except in
3
// compliance with the License. You may obtain a copy of the License
4
// at http://www.mozilla.org/MPL/
6
// Software distributed under the License is distributed on an "AS IS"
7
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8
// the License for the specific language governing rights and
9
// limitations under the License.
11
// The Original Code is RabbitMQ.
13
// The Initial Developer of the Original Code is VMware, Inc.
14
// Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
17
package com.rabbitmq.mqtt.test;
19
import com.rabbitmq.client.*;
20
import junit.framework.Assert;
21
import junit.framework.TestCase;
22
import org.eclipse.paho.client.mqttv3.MqttCallback;
23
import org.eclipse.paho.client.mqttv3.MqttClient;
24
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
25
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
26
import org.eclipse.paho.client.mqttv3.MqttException;
27
import org.eclipse.paho.client.mqttv3.MqttMessage;
28
import org.eclipse.paho.client.mqttv3.MqttTopic;
29
import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
30
import org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule;
31
import org.eclipse.paho.client.mqttv3.internal.trace.Trace;
32
import org.eclipse.paho.client.mqttv3.internal.wire.MqttOutputStream;
33
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish;
35
import javax.net.SocketFactory;
36
import java.io.IOException;
37
import java.net.InetAddress;
38
import java.net.Socket;
39
import java.util.ArrayList;
40
import java.util.Arrays;
41
import java.util.List;
45
* TODO: synchronise access to variables
48
public class MqttTest extends TestCase implements MqttCallback {
50
private final String host = "localhost";
51
private final int port = 1883;
52
private final String brokerUrl = "tcp://" + host + ":" + port;
53
private String clientId;
54
private String clientId2;
55
private MqttClient client;
56
private MqttClient client2;
57
private MqttConnectOptions conOpt;
58
private ArrayList<MqttMessage> receivedMessages;
60
private final byte[] payload = "payload".getBytes();
61
private final String topic = "test-topic";
62
private int testDelay = 2000;
63
private long lastReceipt;
64
private boolean expectConnectionFailure;
66
private ConnectionFactory connectionFactory;
67
private Connection conn;
71
private class MyConnOpts extends MqttConnectOptions {
72
private int keepAliveInterval = 60;
74
public void setKeepAliveInterval(int keepAliveInterval) {
75
this.keepAliveInterval = keepAliveInterval;
78
public int getKeepAliveInterval() {
79
return keepAliveInterval;
84
public void setUp() throws MqttException {
85
clientId = getClass().getSimpleName() + ((int) (10000*Math.random()));
86
clientId2 = clientId + "-2";
87
client = new MqttClient(brokerUrl, clientId, null);
88
client2 = new MqttClient(brokerUrl, clientId2, null);
89
conOpt = new MyConnOpts();
91
receivedMessages = new ArrayList();
92
expectConnectionFailure = false;
96
public void tearDown() throws MqttException {
97
// clean any sticky sessions
99
client = new MqttClient(brokerUrl, clientId, null);
101
client.connect(conOpt);
103
} catch (Exception _) {}
105
client2 = new MqttClient(brokerUrl, clientId2, null);
107
client2.connect(conOpt);
108
client2.disconnect();
109
} catch (Exception _) {}
112
private void setUpAmqp() throws IOException {
113
connectionFactory = new ConnectionFactory();
114
connectionFactory.setHost(host);
115
conn = connectionFactory.newConnection();
116
ch = conn.createChannel();
119
private void tearDownAmqp() throws IOException {
123
private void setConOpts(MqttConnectOptions conOpts) {
124
// provide authentication if the broker needs it
125
// conOpts.setUserName("guest");
126
// conOpts.setPassword("guest".toCharArray());
127
conOpts.setCleanSession(true);
128
conOpts.setKeepAliveInterval(60);
131
public void testConnectFirst() throws MqttException, IOException, InterruptedException {
132
MqttPublish publish = new MqttPublish(this.getName(), new MqttMessage(payload));
133
NetworkModule networkModule = new TCPNetworkModule(Trace.getTrace(""), SocketFactory.getDefault(), host, port);
134
networkModule.start();
135
MqttOutputStream mqttOut = new MqttOutputStream(networkModule.getOutputStream());
137
for (int i=1; i<1000; i++) {
138
mqttOut.write(publish);
141
Thread.sleep(testDelay);
142
fail("Error expected if CONNECT is not first packet");
143
} catch (IOException _) {}
146
public void testInvalidUser() throws MqttException {
147
conOpt.setUserName("invalid-user");
149
client.connect(conOpt);
150
fail("Authentication failure expected");
151
} catch (MqttException ex) {
152
Assert.assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
156
public void testInvalidPassword() throws MqttException {
157
conOpt.setUserName("invalid-user");
158
conOpt.setPassword("invalid-password".toCharArray());
160
client.connect(conOpt);
161
fail("Authentication failure expected");
162
} catch (MqttException ex) {
163
Assert.assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, ex.getReasonCode());
168
public void testSubscribeQos0() throws MqttException, InterruptedException {
169
client.connect(conOpt);
170
client.setCallback(this);
171
client.subscribe(topic, 0);
173
publish(client, topic, 0, payload);
174
Thread.sleep(testDelay);
175
Assert.assertEquals(1, receivedMessages.size());
176
Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
177
Assert.assertEquals(0, receivedMessages.get(0).getQos());
181
public void testSubscribeUnsubscribe() throws MqttException, InterruptedException {
182
client.connect(conOpt);
183
client.setCallback(this);
184
client.subscribe(topic, 0);
186
publish(client, topic, 1, payload);
187
Thread.sleep(testDelay);
188
Assert.assertEquals(1, receivedMessages.size());
189
Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
190
Assert.assertEquals(0, receivedMessages.get(0).getQos());
192
client.unsubscribe(topic);
193
publish(client, topic, 0, payload);
194
Thread.sleep(testDelay);
195
Assert.assertEquals(1, receivedMessages.size());
199
public void testSubscribeQos1() throws MqttException, InterruptedException {
200
client.connect(conOpt);
201
client.setCallback(this);
202
client.subscribe(topic, 1);
204
publish(client, topic, 0, payload);
205
publish(client, topic, 1, payload);
206
Thread.sleep(testDelay);
208
Assert.assertEquals(2, receivedMessages.size());
209
MqttMessage msg1 = receivedMessages.get(0);
210
MqttMessage msg2 = receivedMessages.get(1);
212
Assert.assertEquals(true, Arrays.equals(msg1.getPayload(), payload));
213
Assert.assertEquals(0, msg1.getQos());
215
Assert.assertEquals(true, Arrays.equals(msg2.getPayload(), payload));
216
Assert.assertEquals(1, msg2.getQos());
221
public void testTopics() throws MqttException, InterruptedException {
222
client.connect(conOpt);
223
client.setCallback(this);
224
client.subscribe("/+/mid/#");
225
String cases[] = {"/pre/mid2", "/mid", "/a/mid/b/c/d", "/frob/mid"};
226
List<String> expected = Arrays.asList("/a/mid/b/c/d", "/frob/mid");
227
for(String example : cases){
228
publish(client, example, 0, example.getBytes());
230
Thread.sleep(testDelay);
231
Assert.assertEquals(expected.size(), receivedMessages.size());
232
for (MqttMessage m : receivedMessages){
233
expected.contains(new String(m.getPayload()));
238
public void testNonCleanSession() throws MqttException, InterruptedException {
239
conOpt.setCleanSession(false);
240
client.connect(conOpt);
241
client.subscribe(topic, 1);
244
client2.connect(conOpt);
245
publish(client2, topic, 1, payload);
246
client2.disconnect();
248
client.connect(conOpt);
249
client.setCallback(this);
250
client.subscribe(topic, 1);
252
Thread.sleep(testDelay);
253
Assert.assertEquals(1, receivedMessages.size());
254
Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
255
client.unsubscribe(topic);
259
public void testCleanSession() throws MqttException, InterruptedException {
260
conOpt.setCleanSession(false);
261
client.connect(conOpt);
262
client.subscribe(topic, 1);
265
client2.connect(conOpt);
266
publish(client2, topic, 1, payload);
267
client2.disconnect();
269
conOpt.setCleanSession(true);
270
client.connect(conOpt);
271
client.setCallback(this);
272
client.subscribe(topic, 1);
274
Thread.sleep(testDelay);
275
Assert.assertEquals(0, receivedMessages.size());
276
client.unsubscribe(topic);
280
public void testMultipleClientIds() throws MqttException, InterruptedException {
281
client.connect(conOpt);
282
client2 = new MqttClient(brokerUrl, clientId, null);
283
client2.connect(conOpt);
284
Thread.sleep(testDelay);
285
Assert.assertFalse(client.isConnected());
286
client2.disconnect();
289
public void testPing() throws MqttException, InterruptedException {
290
conOpt.setKeepAliveInterval(1);
291
client.connect(conOpt);
293
Assert.assertEquals(true, client.isConnected());
297
public void testWill() throws MqttException, InterruptedException, IOException {
298
client2.connect(conOpt);
299
client2.subscribe(topic);
300
client2.setCallback(this);
302
final SocketFactory factory = SocketFactory.getDefault();
303
final ArrayList<Socket> sockets = new ArrayList<Socket>();
304
SocketFactory testFactory = new SocketFactory() {
305
public Socket createSocket(String s, int i) throws IOException {
306
Socket sock = factory.createSocket(s, i);
310
public Socket createSocket(String s, int i, InetAddress a, int i1) throws IOException {
313
public Socket createSocket(InetAddress a, int i) throws IOException {
316
public Socket createSocket(InetAddress a, int i, InetAddress a1, int i1) throws IOException {
320
conOpt.setSocketFactory(testFactory);
321
MqttTopic willTopic = client.getTopic(topic);
322
conOpt.setWill(willTopic, payload, 0, false);
323
conOpt.setCleanSession(false);
324
client.connect(conOpt);
326
Assert.assertEquals(1, sockets.size());
327
expectConnectionFailure = true;
328
sockets.get(0).close();
329
Thread.sleep(testDelay);
331
Assert.assertEquals(1, receivedMessages.size());
332
Assert.assertEquals(true, Arrays.equals(receivedMessages.get(0).getPayload(), payload));
333
client2.disconnect();
336
public void testSubscribeMultiple() throws MqttException {
337
client.connect(conOpt);
338
publish(client, "/topic/1", 1, "msq1-qos1".getBytes());
340
client2.connect(conOpt);
341
client2.setCallback(this);
342
client2.subscribe("/topic/#");
343
client2.subscribe("/topic/#");
345
publish(client, "/topic/2", 0, "msq2-qos0".getBytes());
346
publish(client, "/topic/3", 1, "msq3-qos1".getBytes());
347
publish(client, topic, 0, "msq4-qos0".getBytes());
348
publish(client, topic, 1, "msq4-qos1".getBytes());
350
Assert.assertEquals(2, receivedMessages.size());
352
client2.disconnect();
355
public void testPublishMultiple() throws MqttException, InterruptedException {
357
for (int subQos=0; subQos < 2; subQos++){
358
for (int pubQos=0; pubQos < 2; pubQos++){
359
client.connect(conOpt);
360
client.subscribe(topic, subQos);
361
client.setCallback(this);
362
long start = System.currentTimeMillis();
363
for (int i=0; i<pubCount; i++){
364
publish(client, topic, pubQos, payload);
366
Thread.sleep(testDelay);
367
Assert.assertEquals(pubCount, receivedMessages.size());
368
System.out.println("publish QOS" + pubQos + " subscribe QOS" + subQos +
369
", " + pubCount + " msgs took " +
370
(lastReceipt - start)/1000.0 + "sec");
372
receivedMessages.clear();
377
public void testInteropM2A() throws MqttException, IOException, InterruptedException {
379
String queue = ch.queueDeclare().getQueue();
380
ch.queueBind(queue, "amq.topic", topic);
382
client.connect(conOpt);
383
publish(client, topic, 1, payload);
385
Thread.sleep(testDelay);
387
GetResponse response = ch.basicGet(queue, true);
388
assertTrue(Arrays.equals(payload, response.getBody()));
389
assertNull(ch.basicGet(queue, true));
393
public void testInteropA2M() throws MqttException, IOException, InterruptedException {
394
client.connect(conOpt);
395
client.setCallback(this);
396
client.subscribe(topic, 1);
399
ch.basicPublish("amq.topic", topic, MessageProperties.MINIMAL_BASIC, payload);
401
Thread.sleep(testDelay);
403
Assert.assertEquals(1, receivedMessages.size());
407
private void publish(MqttClient client, String topicName, int qos, byte[] payload) throws MqttException {
408
MqttTopic topic = client.getTopic(topicName);
409
MqttMessage message = new MqttMessage(payload);
411
MqttDeliveryToken token = topic.publish(message);
412
token.waitForCompletion();
415
public void connectionLost(Throwable cause) {
416
if (!expectConnectionFailure)
417
fail("Connection unexpectedly lost");
420
public void messageArrived(MqttTopic topic, MqttMessage message) throws Exception {
421
lastReceipt = System.currentTimeMillis();
422
receivedMessages.add(message);
425
public void deliveryComplete(MqttDeliveryToken token) {