2
* Copyright 1999,2004 The Apache Software Foundation.
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
16
package org.apache.catalina.tribes.group.interceptors;
18
import java.util.HashMap;
20
import org.apache.catalina.tribes.ChannelException;
21
import org.apache.catalina.tribes.ChannelMessage;
22
import org.apache.catalina.tribes.Member;
23
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
24
import org.apache.catalina.tribes.group.InterceptorPayload;
25
import org.apache.catalina.tribes.io.XByteBuffer;
31
* The order interceptor guarantees that messages are received in the same order they were
33
* This interceptor works best with the ack=true setting. <br>
34
* There is no point in
35
* using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
36
* If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
37
* this interceptor can really slow you down, as many messages will be completely out of order
38
* and the queue might become rather large. If this is the case, then you might want to set
39
* the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
40
* <br><b>Configuration Options</b><br>
41
* OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br>
42
* OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering.
43
* This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
44
* OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to
45
* do when a message has expired or the queue has grown larger than the maxQueue value.
46
* true means that the message is sent up the stack to the receiver that will receive and out of order message
47
* false means, forget the message and reset the message counter. <b>default=true</b>
53
public class OrderInterceptor extends ChannelInterceptorBase {
54
private HashMap outcounter = new HashMap();
55
private HashMap incounter = new HashMap();
56
private HashMap incoming = new HashMap();
57
private long expire = 3000;
58
private boolean forwardExpired = true;
59
private int maxQueue = Integer.MAX_VALUE;
61
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
62
for ( int i=0; i<destination.length; i++ ) {
63
int nr = incCounter(destination[i]);
65
msg.getMessage().append(nr);
67
getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
69
msg.getMessage().trim(4);
74
public void messageReceived(ChannelMessage msg) {
75
int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
76
msg.getMessage().trim(4);
77
MessageOrder order = new MessageOrder(msgnr,msg);
78
if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
81
public synchronized void processLeftOvers(Member member, boolean force) {
82
MessageOrder tmp = (MessageOrder)incoming.get(member);
84
Counter cnt = getInCounter(member);
85
cnt.setCounter(Integer.MAX_VALUE);
87
if ( tmp!= null ) processIncoming(tmp);
91
* @param order MessageOrder
92
* @return boolean - true if a message expired and was processed
94
public synchronized boolean processIncoming(MessageOrder order) {
95
boolean result = false;
96
Member member = order.getMessage().getAddress();
97
Counter cnt = getInCounter(member);
99
MessageOrder tmp = (MessageOrder)incoming.get(member);
101
order = MessageOrder.add(tmp,order);
105
while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter()) ) {
106
//we are right on target. process orders
107
if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
108
else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
109
super.messageReceived(order.getMessage());
110
order.setMessage(null);
113
MessageOrder head = order;
114
MessageOrder prev = null;
116
//flag to empty out the queue when it larger than maxQueue
117
boolean empty = order!=null?order.getCount()>=maxQueue:false;
118
while ( tmp != null ) {
119
//process expired messages or empty out the queue
120
if ( tmp.isExpired(expire) || empty ) {
122
if ( tmp == head ) head = tmp.next;
123
cnt.setCounter(tmp.getMsgNr()+1);
124
if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
125
tmp.setMessage(null);
127
if ( prev != null ) prev.next = tmp;
134
if ( head == null ) incoming.remove(member);
135
else incoming.put(member, head);
139
public void memberAdded(Member member) {
141
getInCounter(member);
142
getOutCounter(member);
143
super.memberAdded(member);
146
public void memberDisappeared(Member member) {
148
outcounter.remove(member);
149
incounter.remove(member);
150
//clear the remaining queue
151
processLeftOvers(member,true);
152
super.memberDisappeared(member);
155
public int incCounter(Member mbr) {
156
Counter cnt = getOutCounter(mbr);
160
public synchronized Counter getInCounter(Member mbr) {
161
Counter cnt = (Counter)incounter.get(mbr);
164
cnt.inc(); //always start at 1 for incoming
165
incounter.put(mbr,cnt);
170
public synchronized Counter getOutCounter(Member mbr) {
171
Counter cnt = (Counter)outcounter.get(mbr);
174
outcounter.put(mbr,cnt);
179
public static class Counter {
180
private int value = 0;
182
public int getCounter() {
186
public synchronized void setCounter(int counter) {
187
this.value = counter;
190
public synchronized int inc() {
195
public static class MessageOrder {
196
private long received = System.currentTimeMillis();
197
private MessageOrder next;
199
private ChannelMessage msg = null;
200
public MessageOrder(int msgNr,ChannelMessage msg) {
205
public boolean isExpired(long expireTime) {
206
return (System.currentTimeMillis()-received) > expireTime;
209
public ChannelMessage getMessage() {
213
public void setMessage(ChannelMessage msg) {
217
public void setNext(MessageOrder order) {
220
public MessageOrder getNext() {
224
public int getCount() {
226
MessageOrder tmp = next;
227
while ( tmp != null ) {
234
public static MessageOrder add(MessageOrder head, MessageOrder add) {
235
if ( head == null ) return add;
236
if ( add == null ) return head;
237
if ( head == add ) return add;
239
if ( head.getMsgNr() > add.getMsgNr() ) {
244
MessageOrder iter = head;
245
MessageOrder prev = null;
246
while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
250
if ( iter.getMsgNr() < add.getMsgNr() ) {
252
add.next = iter.next;
254
} else if (iter.getMsgNr() > add.getMsgNr()) {
260
throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");
266
public int getMsgNr() {
274
public void setExpire(long expire) {
275
this.expire = expire;
278
public void setForwardExpired(boolean forwardExpired) {
279
this.forwardExpired = forwardExpired;
282
public void setMaxQueue(int maxQueue) {
283
this.maxQueue = maxQueue;
286
public long getExpire() {
290
public boolean getForwardExpired() {
291
return forwardExpired;
294
public int getMaxQueue() {