~ubuntu-branches/ubuntu/intrepid/tomcat5.5/intrepid

« back to all changes in this revision

Viewing changes to container/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2006-09-27 11:19:17 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20060927111917-wov6fmkz3x6rsl68
Tags: 5.5.17-1ubuntu1
(Build-) depend on libmx4j-java (>= 3.0).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
 * Copyright 1999,2004 The Apache Software Foundation.
 
3
 * 
 
4
 * Licensed under the Apache License, Version 2.0 (the "License");
 
5
 * you may not use this file except in compliance with the License.
 
6
 * You may obtain a copy of the License at
 
7
 * 
 
8
 *      http://www.apache.org/licenses/LICENSE-2.0
 
9
 * 
 
10
 * Unless required by applicable law or agreed to in writing, software
 
11
 * distributed under the License is distributed on an "AS IS" BASIS,
 
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
 * See the License for the specific language governing permissions and
 
14
 */
 
15
 
 
16
package org.apache.catalina.tribes.group.interceptors;
 
17
 
 
18
import java.util.HashMap;
 
19
 
 
20
import org.apache.catalina.tribes.ChannelException;
 
21
import org.apache.catalina.tribes.ChannelMessage;
 
22
import org.apache.catalina.tribes.Member;
 
23
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
 
24
import org.apache.catalina.tribes.group.InterceptorPayload;
 
25
import org.apache.catalina.tribes.io.XByteBuffer;
 
26
 
 
27
 
 
28
 
 
29
/**
 
30
 *
 
31
 * The order interceptor guarantees that messages are received in the same order they were 
 
32
 * sent.
 
33
 * This interceptor works best with the ack=true setting. <br>
 
34
 * There is no point in 
 
35
 * using this with the replicationMode="fastasynchqueue" as this mode guarantees ordering.<BR>
 
36
 * If you are using the mode ack=false replicationMode=pooled, and have a lot of concurrent threads,
 
37
 * this interceptor can really slow you down, as many messages will be completely out of order
 
38
 * and the queue might become rather large. If this is the case, then you might want to set 
 
39
 * the value OrderInterceptor.maxQueue = 25 (meaning that we will never keep more than 25 messages in our queue)
 
40
 * <br><b>Configuration Options</b><br>
 
41
 * OrderInteceptor.expire=<milliseconds> - if a message arrives out of order, how long before we act on it <b>default=3000ms</b><br>
 
42
 * OrderInteceptor.maxQueue=<max queue size> - how much can the queue grow to ensure ordering. 
 
43
 *   This setting is useful to avoid OutOfMemoryErrors<b>default=Integer.MAX_VALUE</b><br>
 
44
 * OrderInterceptor.forwardExpired=<boolean> - this flag tells the interceptor what to 
 
45
 * do when a message has expired or the queue has grown larger than the maxQueue value.
 
46
 * true means that the message is sent up the stack to the receiver that will receive and out of order message
 
47
 * false means, forget the message and reset the message counter. <b>default=true</b>
 
48
 * 
 
49
 * 
 
50
 * @author Filip Hanik
 
51
 * @version 1.0
 
52
 */
 
53
public class OrderInterceptor extends ChannelInterceptorBase {
 
54
    private HashMap outcounter = new HashMap();
 
55
    private HashMap incounter = new HashMap();
 
56
    private HashMap incoming = new HashMap();
 
57
    private long expire = 3000;
 
58
    private boolean forwardExpired = true;
 
59
    private int maxQueue = Integer.MAX_VALUE;
 
60
 
 
61
    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
 
62
        for ( int i=0; i<destination.length; i++ ) {
 
63
            int nr = incCounter(destination[i]);
 
64
            //reduce byte copy
 
65
            msg.getMessage().append(nr);
 
66
            try {
 
67
                getNext().sendMessage(new Member[] {destination[i]}, msg, payload);
 
68
            }finally {
 
69
                msg.getMessage().trim(4);
 
70
            }
 
71
        }
 
72
    }
 
73
 
 
74
    public void messageReceived(ChannelMessage msg) {
 
75
        int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
 
76
        msg.getMessage().trim(4);
 
77
        MessageOrder order = new MessageOrder(msgnr,msg);
 
78
        if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
 
79
    }
 
80
    
 
81
    public synchronized void processLeftOvers(Member member, boolean force) {
 
82
        MessageOrder tmp = (MessageOrder)incoming.get(member);
 
83
        if ( force ) {
 
84
            Counter cnt = getInCounter(member);
 
85
            cnt.setCounter(Integer.MAX_VALUE);
 
86
        }
 
87
        if ( tmp!= null ) processIncoming(tmp);
 
88
    }
 
89
    /**
 
90
     * 
 
91
     * @param order MessageOrder
 
92
     * @return boolean - true if a message expired and was processed
 
93
     */
 
94
    public synchronized boolean processIncoming(MessageOrder order) {
 
95
        boolean result = false;
 
96
        Member member = order.getMessage().getAddress();
 
97
        Counter cnt = getInCounter(member);
 
98
        
 
99
        MessageOrder tmp = (MessageOrder)incoming.get(member);
 
100
        if ( tmp != null ) {
 
101
            order = MessageOrder.add(tmp,order);
 
102
        }
 
103
        
 
104
        
 
105
        while ( (order!=null) && (order.getMsgNr() <= cnt.getCounter())  ) {
 
106
            //we are right on target. process orders
 
107
            if ( order.getMsgNr() == cnt.getCounter() ) cnt.inc();
 
108
            else if ( order.getMsgNr() > cnt.getCounter() ) cnt.setCounter(order.getMsgNr());
 
109
            super.messageReceived(order.getMessage());
 
110
            order.setMessage(null);
 
111
            order = order.next;
 
112
        }
 
113
        MessageOrder head = order;
 
114
        MessageOrder prev = null;
 
115
        tmp = order;
 
116
        //flag to empty out the queue when it larger than maxQueue
 
117
        boolean empty = order!=null?order.getCount()>=maxQueue:false;
 
118
        while ( tmp != null ) {
 
119
            //process expired messages or empty out the queue
 
120
            if ( tmp.isExpired(expire) || empty ) {
 
121
                //reset the head
 
122
                if ( tmp == head ) head = tmp.next;
 
123
                cnt.setCounter(tmp.getMsgNr()+1);
 
124
                if ( getForwardExpired() ) super.messageReceived(tmp.getMessage());
 
125
                tmp.setMessage(null);
 
126
                tmp = tmp.next;
 
127
                if ( prev != null ) prev.next = tmp;  
 
128
                result = true;
 
129
            } else {
 
130
                prev = tmp;
 
131
                tmp = tmp.next;
 
132
            }
 
133
        }
 
134
        if ( head == null ) incoming.remove(member);
 
135
        else incoming.put(member, head);
 
136
        return result;
 
137
    }
 
138
    
 
139
    public void memberAdded(Member member) {
 
140
        //notify upwards
 
141
        getInCounter(member);
 
142
        getOutCounter(member);
 
143
        super.memberAdded(member);
 
144
    }
 
145
 
 
146
    public void memberDisappeared(Member member) {
 
147
        //notify upwards
 
148
        outcounter.remove(member);
 
149
        incounter.remove(member);
 
150
        //clear the remaining queue
 
151
        processLeftOvers(member,true);
 
152
        super.memberDisappeared(member);
 
153
    }
 
154
    
 
155
    public int incCounter(Member mbr) { 
 
156
        Counter cnt = getOutCounter(mbr);
 
157
        return cnt.inc();
 
158
    }
 
159
    
 
160
    public synchronized Counter getInCounter(Member mbr) {
 
161
        Counter cnt = (Counter)incounter.get(mbr);
 
162
        if ( cnt == null ) {
 
163
            cnt = new Counter();
 
164
            cnt.inc(); //always start at 1 for incoming
 
165
            incounter.put(mbr,cnt);
 
166
        }
 
167
        return cnt;
 
168
    }
 
169
 
 
170
    public synchronized Counter getOutCounter(Member mbr) {
 
171
        Counter cnt = (Counter)outcounter.get(mbr);
 
172
        if ( cnt == null ) {
 
173
            cnt = new Counter();
 
174
            outcounter.put(mbr,cnt);
 
175
        }
 
176
        return cnt;
 
177
    }
 
178
 
 
179
    public static class Counter {
 
180
        private int value = 0;
 
181
        
 
182
        public int getCounter() {
 
183
            return value;
 
184
        }
 
185
        
 
186
        public synchronized void setCounter(int counter) {
 
187
            this.value = counter;
 
188
        }
 
189
        
 
190
        public synchronized int inc() {
 
191
            return ++value;
 
192
        }
 
193
    }
 
194
    
 
195
    public static class MessageOrder {
 
196
        private long received = System.currentTimeMillis();
 
197
        private MessageOrder next;
 
198
        private int msgNr;
 
199
        private ChannelMessage msg = null;
 
200
        public MessageOrder(int msgNr,ChannelMessage msg) {
 
201
            this.msgNr = msgNr;
 
202
            this.msg = msg;
 
203
        }
 
204
        
 
205
        public boolean isExpired(long expireTime) {
 
206
            return (System.currentTimeMillis()-received) > expireTime;
 
207
        }
 
208
        
 
209
        public ChannelMessage getMessage() {
 
210
            return msg;
 
211
        }
 
212
        
 
213
        public void setMessage(ChannelMessage msg) {
 
214
            this.msg = msg;
 
215
        }
 
216
        
 
217
        public void setNext(MessageOrder order) {
 
218
            this.next = order;
 
219
        }
 
220
        public MessageOrder getNext() {
 
221
            return next;
 
222
        }
 
223
        
 
224
        public int getCount() {
 
225
            int counter = 1;
 
226
            MessageOrder tmp = next;
 
227
            while ( tmp != null ) {
 
228
                counter++;
 
229
                tmp = tmp.next;
 
230
            }
 
231
            return counter;
 
232
        }
 
233
        
 
234
        public static MessageOrder add(MessageOrder head, MessageOrder add) {
 
235
            if ( head == null ) return add;
 
236
            if ( add == null ) return head;
 
237
            if ( head == add ) return add;
 
238
 
 
239
            if ( head.getMsgNr() > add.getMsgNr() ) {
 
240
                add.next = head;
 
241
                return add;
 
242
            }
 
243
            
 
244
            MessageOrder iter = head;
 
245
            MessageOrder prev = null;
 
246
            while ( iter.getMsgNr() < add.getMsgNr() && (iter.next !=null ) ) {
 
247
                prev = iter;
 
248
                iter = iter.next;
 
249
            }
 
250
            if ( iter.getMsgNr() < add.getMsgNr() ) {
 
251
                //add after
 
252
                add.next = iter.next;
 
253
                iter.next = add;
 
254
            } else if (iter.getMsgNr() > add.getMsgNr()) {
 
255
                //add before
 
256
                prev.next = add;
 
257
                add.next = iter;
 
258
                
 
259
            } else {
 
260
                throw new ArithmeticException("Message added has the same counter, synchronization bug. Disable the order interceptor");
 
261
            }
 
262
            
 
263
            return head;
 
264
        }
 
265
        
 
266
        public int getMsgNr() {
 
267
            return msgNr;
 
268
        }
 
269
        
 
270
        
 
271
        
 
272
    }
 
273
 
 
274
    public void setExpire(long expire) {
 
275
        this.expire = expire;
 
276
    }
 
277
 
 
278
    public void setForwardExpired(boolean forwardExpired) {
 
279
        this.forwardExpired = forwardExpired;
 
280
    }
 
281
 
 
282
    public void setMaxQueue(int maxQueue) {
 
283
        this.maxQueue = maxQueue;
 
284
    }
 
285
 
 
286
    public long getExpire() {
 
287
        return expire;
 
288
    }
 
289
 
 
290
    public boolean getForwardExpired() {
 
291
        return forwardExpired;
 
292
    }
 
293
 
 
294
    public int getMaxQueue() {
 
295
        return maxQueue;
 
296
    }
 
297
 
 
298
}