|
1
by Glyph Lefkowitz
add Java stuff |
1 |
package com.twistedmatrix.internet; |
2 |
||
3 |
import java.util.ArrayList; |
|
4 |
import java.util.TreeMap; |
|
5 |
import java.util.Iterator; |
|
6 |
import java.util.NoSuchElementException; |
|
7 |
import java.io.IOException; |
|
8 |
||
9 |
import java.net.InetSocketAddress; |
|
10 |
import java.net.ServerSocket; |
|
11 |
import java.net.Socket; |
|
12 |
||
13 |
import java.nio.ByteBuffer; |
|
14 |
||
15 |
import java.nio.channels.Selector; |
|
16 |
import java.nio.channels.ClosedChannelException; |
|
17 |
import java.nio.channels.ServerSocketChannel; |
|
18 |
import java.nio.channels.SocketChannel; |
|
19 |
import java.nio.channels.SelectionKey; |
|
20 |
||
21 |
public class Reactor { |
|
22 |
||
23 |
static int BUFFER_SIZE = 8 * 1024; |
|
24 |
||
25 |
public static Reactor get() { |
|
26 |
if (theReactor == null) { |
|
27 |
try { |
|
28 |
theReactor = new Reactor(); |
|
29 |
} catch (Throwable t) { |
|
30 |
t.printStackTrace(); |
|
31 |
}
|
|
32 |
}
|
|
33 |
return theReactor; |
|
34 |
}
|
|
35 |
||
36 |
/**
|
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
37 |
* Selectors were added or removed.
|
38 |
*
|
|
39 |
* In the normal case, this is simply a no-op. However, in the case where
|
|
40 |
* the "run()" thread is different from the application code thread, this
|
|
41 |
* is a hook for the reactor to "wakeup" its selector.
|
|
42 |
*/
|
|
43 |
void interestOpsChanged() { |
|
44 |
}
|
|
45 |
||
46 |
static Reactor theReactor; |
|
|
1
by Glyph Lefkowitz
add Java stuff |
47 |
|
48 |
public interface IListeningPort { |
|
49 |
void stopListening(); |
|
50 |
}
|
|
51 |
||
52 |
/* It appears that this interface is actually unnamed in
|
|
53 |
* Twisted... abstract.FileDescriptor serves this purpose.
|
|
54 |
*/
|
|
55 |
class Selectable { |
|
56 |
SelectionKey sk; |
|
57 |
||
58 |
void doRead() throws Throwable { |
|
59 |
msg("UNHANDLED READ "+this); |
|
60 |
}
|
|
61 |
void doWrite() throws Throwable { |
|
62 |
msg("UNHANDLED WRITE "+this); |
|
63 |
}
|
|
64 |
||
65 |
// and we don't need this, because there's no such thing as
|
|
66 |
// "acceptable" outside the magical fantasyland where Java lives.
|
|
67 |
void doAccept() throws Throwable { |
|
68 |
msg("UNHANDLED ACCEPT "+this); |
|
69 |
}
|
|
70 |
void doConnect() throws Throwable { |
|
71 |
msg("UNHANDLED CONNECT "+this); |
|
72 |
}
|
|
73 |
}
|
|
74 |
||
75 |
public class TCPPort extends Selectable implements IListeningPort { |
|
76 |
IProtocol.IFactory protocolFactory; |
|
77 |
ServerSocketChannel ssc; |
|
78 |
ServerSocket ss; |
|
79 |
InetSocketAddress addr; |
|
80 |
||
81 |
TCPPort(int portno, IProtocol.IFactory pf) throws IOException { |
|
82 |
this.protocolFactory = pf; |
|
83 |
this.ssc = ServerSocketChannel.open(); |
|
84 |
this.ssc.configureBlocking(false); |
|
85 |
this.ss = ssc.socket(); |
|
86 |
this.addr = new InetSocketAddress(portno); |
|
87 |
this.ss.bind(this.addr); |
|
88 |
this.startListening(); |
|
89 |
}
|
|
90 |
||
91 |
public void startListening() throws ClosedChannelException { |
|
92 |
this.sk = ssc.register(selector, SelectionKey.OP_ACCEPT, this); |
|
93 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
94 |
}
|
|
1
by Glyph Lefkowitz
add Java stuff |
95 |
|
96 |
public void stopListening() { |
|
97 |
/// ???
|
|
98 |
this.sk.cancel(); |
|
99 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
100 |
}
|
|
1
by Glyph Lefkowitz
add Java stuff |
101 |
|
102 |
public void doAccept() throws Throwable { |
|
103 |
SocketChannel newsc = ssc.accept(); |
|
104 |
if (null == newsc) { |
|
105 |
return; |
|
106 |
}
|
|
107 |
newsc.configureBlocking(false); |
|
108 |
Socket socket = newsc.socket(); |
|
109 |
||
110 |
IProtocol p = this.protocolFactory.buildProtocol(this.addr); |
|
111 |
new TCPServer(p, newsc, socket); |
|
112 |
}
|
|
113 |
}
|
|
114 |
||
115 |
abstract class TCPConnection extends Selectable implements ITransport { |
|
116 |
ByteBuffer inbuf; |
|
117 |
ArrayList<byte[]> outbufs; |
|
118 |
||
119 |
IProtocol protocol; |
|
120 |
SocketChannel channel; |
|
121 |
Socket socket; |
|
122 |
SelectionKey sk; |
|
123 |
||
124 |
boolean disconnecting; |
|
125 |
||
126 |
TCPConnection(IProtocol protocol, SocketChannel channel, Socket socket) throws Throwable { |
|
127 |
inbuf = ByteBuffer.allocate(BUFFER_SIZE); |
|
128 |
inbuf.clear(); |
|
129 |
outbufs = new ArrayList<byte[]>(); |
|
130 |
this.protocol = protocol; |
|
131 |
this.channel = channel; |
|
132 |
this.socket = socket; |
|
133 |
this.disconnecting = false; |
|
134 |
this.sk = channel.register(selector, SelectionKey.OP_READ, this); |
|
135 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
136 |
this.protocol.makeConnection(this); |
|
1
by Glyph Lefkowitz
add Java stuff |
137 |
}
|
138 |
||
139 |
// HAHAHAHA the fab four strike again
|
|
140 |
void startReading() { |
|
141 |
sk.interestOps(sk.interestOps() | SelectionKey.OP_READ); |
|
142 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
143 |
}
|
|
1
by Glyph Lefkowitz
add Java stuff |
144 |
|
145 |
void startWriting () { |
|
146 |
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); |
|
147 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
148 |
}
|
|
1
by Glyph Lefkowitz
add Java stuff |
149 |
|
150 |
void stopReading () { |
|
151 |
sk.interestOps(sk.interestOps() & ~SelectionKey.OP_READ); |
|
152 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
153 |
}
|
|
1
by Glyph Lefkowitz
add Java stuff |
154 |
|
155 |
void stopWriting () { |
|
156 |
sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE); |
|
157 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
158 |
}
|
|
1
by Glyph Lefkowitz
add Java stuff |
159 |
|
160 |
void doRead() throws Throwable { |
|
161 |
boolean failed = false; |
|
162 |
Throwable reason = null; |
|
163 |
try { |
|
164 |
int bytesread = channel.read(inbuf); |
|
165 |
failed = (-1 == bytesread); |
|
166 |
} catch (IOException ioe) { |
|
167 |
failed = true; |
|
168 |
reason = ioe; |
|
169 |
}
|
|
170 |
||
171 |
if (failed) { |
|
172 |
// this means the connection is closed, what???
|
|
173 |
channel.close(); |
|
174 |
sk.cancel(); |
|
175 |
interestOpsChanged(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
176 |
this.protocol.connectionLost(reason); |
|
1
by Glyph Lefkowitz
add Java stuff |
177 |
return; |
178 |
}
|
|
179 |
||
180 |
byte[] data = new byte[inbuf.position()]; |
|
181 |
inbuf.flip(); |
|
182 |
inbuf.get(data); |
|
183 |
inbuf.clear(); |
|
184 |
try { |
|
185 |
this.protocol.dataReceived(data); |
|
186 |
} catch (Throwable t) { |
|
187 |
t.printStackTrace(); |
|
188 |
this.loseConnection(); |
|
189 |
}
|
|
190 |
}
|
|
191 |
||
192 |
public void write(byte[] data) { |
|
193 |
this.outbufs.add(data); |
|
194 |
this.startWriting(); |
|
195 |
}
|
|
196 |
||
197 |
void doWrite() throws Throwable { |
|
198 |
/* XXX TODO: this cannot possibly be correct, but every example
|
|
199 |
* and every tutorial does this! Insane.
|
|
200 |
*/
|
|
201 |
if (0 == this.outbufs.size()) { |
|
202 |
if (this.disconnecting) { |
|
203 |
this.channel.close(); |
|
204 |
}
|
|
205 |
// else wtf?
|
|
206 |
} else { |
|
207 |
this.channel.write(ByteBuffer.wrap(this.outbufs.remove(0))); |
|
208 |
if (0 == this.outbufs.size()) { |
|
209 |
this.stopWriting(); |
|
210 |
}
|
|
211 |
}
|
|
212 |
}
|
|
213 |
||
214 |
public void loseConnection() { |
|
215 |
this.disconnecting = true; |
|
216 |
}
|
|
217 |
}
|
|
218 |
class TCPServer extends TCPConnection { |
|
219 |
// is there really any need for this to be a separate class?
|
|
220 |
public TCPServer(IProtocol a, SocketChannel b, Socket c) throws Throwable { |
|
221 |
super(a, b, c); |
|
222 |
}
|
|
223 |
}
|
|
224 |
||
225 |
Selector selector; |
|
226 |
boolean running; |
|
227 |
TreeMap<Long,Runnable> pendingCalls; |
|
228 |
||
229 |
Reactor () throws Throwable { |
|
230 |
this.selector = Selector.open(); |
|
231 |
this.running = false; |
|
232 |
this.pendingCalls = new TreeMap<Long,Runnable>(); |
|
233 |
}
|
|
234 |
||
235 |
public interface IDelayedCall { |
|
236 |
void cancel (); |
|
237 |
}
|
|
238 |
||
239 |
public void callLater(double secondsLater, Runnable runme) { |
|
240 |
long millisLater = (long) (secondsLater * 1000.0); |
|
241 |
synchronized(pendingCalls) { |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
242 |
pendingCalls.put(System.currentTimeMillis() + millisLater, |
243 |
runme); |
|
244 |
// This isn't actually an interestOps
|
|
245 |
interestOpsChanged(); |
|
246 |
}
|
|
247 |
}
|
|
|
1
by Glyph Lefkowitz
add Java stuff |
248 |
|
249 |
/**
|
|
250 |
* Run all runnables scheduled to run before right now, and return the
|
|
251 |
* timeout. Negative timeout means "no timeout".
|
|
252 |
*/
|
|
253 |
long runUntilCurrent(long now) { |
|
254 |
while (0 != pendingCalls.size()) { |
|
255 |
try { |
|
256 |
long then = pendingCalls.firstKey(); |
|
257 |
if (then < now) { |
|
258 |
Runnable r = pendingCalls.remove((Object) new Long(then)); |
|
259 |
r.run(); |
|
260 |
} else { |
|
261 |
return then - now; |
|
262 |
}
|
|
263 |
} catch (NoSuchElementException nsee) { |
|
264 |
nsee.printStackTrace(); |
|
265 |
throw new Error("This is impossible; pendingCalls.size was not zero"); |
|
266 |
}
|
|
267 |
}
|
|
268 |
return -1; |
|
269 |
}
|
|
270 |
||
271 |
void iterate() throws Throwable { |
|
|
15
by Glyph Lefkowitz
A very, very basic reactor for use with Swing. |
272 |
Iterator<SelectionKey> selectedKeys = |
273 |
this.selector.selectedKeys().iterator(); |
|
274 |
while (selectedKeys.hasNext()) { |
|
275 |
SelectionKey sk = selectedKeys.next(); |
|
276 |
selectedKeys.remove(); |
|
277 |
Selectable selectable = ((Selectable) sk.attachment()); |
|
278 |
if (sk.isValid() && sk.isWritable()) { |
|
279 |
selectable.doWrite(); |
|
280 |
}
|
|
281 |
if (sk.isValid() && sk.isReadable()) { |
|
282 |
selectable.doRead(); |
|
283 |
}
|
|
284 |
if (sk.isValid() && sk.isAcceptable()) { |
|
285 |
selectable.doAccept(); |
|
286 |
}
|
|
287 |
if (sk.isValid() && sk.isConnectable()) { |
|
288 |
selectable.doConnect(); |
|
289 |
}
|
|
290 |
}
|
|
291 |
}
|
|
292 |
||
293 |
/**
|
|
294 |
* This may need to run the runUntilCurrent() in a different thread.
|
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
295 |
*/
|
296 |
long processTimedEvents() { |
|
297 |
long now = System.currentTimeMillis(); |
|
298 |
return runUntilCurrent(now); |
|
299 |
}
|
|
300 |
||
301 |
/**
|
|
302 |
* Override this method in subclasses to run "iterate" in a different
|
|
|
15
by Glyph Lefkowitz
A very, very basic reactor for use with Swing. |
303 |
* context, i.e. in a thread.
|
304 |
*/
|
|
305 |
void doIteration() throws Throwable { |
|
306 |
iterate(); |
|
307 |
}
|
|
308 |
||
309 |
public void run() throws Throwable { |
|
|
1
by Glyph Lefkowitz
add Java stuff |
310 |
running = true; |
311 |
while (running) { |
|
312 |
int selected; |
|
313 |
long timeout = processTimedEvents(); |
|
|
18
by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed |
314 |
|
|
15
by Glyph Lefkowitz
A very, very basic reactor for use with Swing. |
315 |
if (timeout >= 0) { |
|
1
by Glyph Lefkowitz
add Java stuff |
316 |
this.selector.select(timeout); |
|
15
by Glyph Lefkowitz
A very, very basic reactor for use with Swing. |
317 |
} else { |
|
1
by Glyph Lefkowitz
add Java stuff |
318 |
this.selector.select(); |
|
15
by Glyph Lefkowitz
A very, very basic reactor for use with Swing. |
319 |
}
|
320 |
this.doIteration(); |
|
321 |
}
|
|
|
1
by Glyph Lefkowitz
add Java stuff |
322 |
}
|
323 |
||
324 |
public void stop() { |
|
325 |
this.running = false; |
|
326 |
}
|
|
327 |
||
328 |
public IListeningPort listenTCP(int portno, |
|
329 |
IProtocol.IFactory factory) |
|
330 |
throws IOException { |
|
331 |
return new TCPPort(portno, factory); |
|
332 |
}
|
|
333 |
||
334 |
public static void msg (String m) { |
|
335 |
System.out.println(m); |
|
336 |
}
|
|
337 |
||
338 |
static class ShowMessage implements Runnable { |
|
339 |
String x; |
|
340 |
ShowMessage(String s) { |
|
341 |
this.x = s; |
|
342 |
}
|
|
343 |
public void run () { |
|
344 |
msg(System.currentTimeMillis() + " " + this.x); |
|
345 |
}
|
|
346 |
}
|
|
347 |
||
348 |
public static void main (String[] args) throws Throwable { |
|
349 |
// The most basic server possible.
|
|
350 |
Reactor r = Reactor.get(); |
|
351 |
||
352 |
r.callLater(1, new ShowMessage("one!")); |
|
353 |
r.callLater(3, new ShowMessage("three!")); |
|
354 |
r.callLater(2, new ShowMessage("two!")); |
|
355 |
r.callLater(4, new ShowMessage("four!")); |
|
356 |
||
357 |
r.listenTCP(1234, new IProtocol.IFactory() { |
|
358 |
public IProtocol buildProtocol(Object addr) { |
|
359 |
return new Protocol() { |
|
360 |
public void dataReceived(byte[] data) { |
|
361 |
this.transport().write(data); |
|
362 |
Reactor.get().callLater(1, new ShowMessage("some data, delayed: " + new String(data))); |
|
363 |
}
|
|
364 |
};
|
|
365 |
}
|
|
366 |
});
|
|
367 |
r.run(); |
|
368 |
}
|
|
369 |
}
|
|
370 |