~glyph/+junk/amphacks

1 by Glyph Lefkowitz
add Java stuff
1
package com.twistedmatrix.internet;
2
3
import java.util.ArrayList;
4
import java.util.TreeMap;
5
import java.util.Iterator;
6
import java.util.NoSuchElementException;
7
import java.io.IOException;
8
9
import java.net.InetSocketAddress;
10
import java.net.ServerSocket;
11
import java.net.Socket;
12
13
import java.nio.ByteBuffer;
14
15
import java.nio.channels.Selector;
16
import java.nio.channels.ClosedChannelException;
17
import java.nio.channels.ServerSocketChannel;
18
import java.nio.channels.SocketChannel;
19
import java.nio.channels.SelectionKey;
20
21
public class Reactor {
22
23
    static int BUFFER_SIZE = 8 * 1024;
24
25
    public static Reactor get() {
26
        if (theReactor == null) {
27
            try {
28
                theReactor = new Reactor();
29
            } catch (Throwable t) {
30
                t.printStackTrace();
31
            }
32
        }
33
        return theReactor;
34
    }
35
36
    /**
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
37
     * Selectors were added or removed.
38
     *
39
     * In the normal case, this is simply a no-op.  However, in the case where
40
     * the "run()" thread is different from the application code thread, this
41
     * is a hook for the reactor to "wakeup" its selector.
42
     */
43
    void interestOpsChanged() {
44
    }
45
46
    static Reactor theReactor;
1 by Glyph Lefkowitz
add Java stuff
47
48
    public interface IListeningPort {
49
        void stopListening();
50
    }
51
52
    /* It appears that this interface is actually unnamed in
53
     * Twisted... abstract.FileDescriptor serves this purpose.
54
     */
55
    class Selectable {
56
        SelectionKey sk;
57
58
        void doRead() throws Throwable {
59
            msg("UNHANDLED READ "+this);
60
        }
61
        void doWrite() throws Throwable {
62
            msg("UNHANDLED WRITE "+this);
63
        }
64
65
        // and we don't need this, because there's no such thing as
66
        // "acceptable" outside the magical fantasyland where Java lives.
67
        void doAccept() throws Throwable {
68
            msg("UNHANDLED ACCEPT "+this);
69
        }
70
        void doConnect() throws Throwable {
71
            msg("UNHANDLED CONNECT "+this);
72
        }
73
    }
74
75
    public class TCPPort extends Selectable implements IListeningPort {
76
        IProtocol.IFactory protocolFactory;
77
        ServerSocketChannel ssc;
78
        ServerSocket ss;
79
        InetSocketAddress addr;
80
81
        TCPPort(int portno, IProtocol.IFactory pf) throws IOException {
82
            this.protocolFactory = pf;
83
            this.ssc = ServerSocketChannel.open();
84
            this.ssc.configureBlocking(false);
85
            this.ss = ssc.socket();
86
            this.addr = new InetSocketAddress(portno);
87
            this.ss.bind(this.addr);
88
            this.startListening();
89
        }
90
91
        public void startListening() throws ClosedChannelException {
92
            this.sk = ssc.register(selector, SelectionKey.OP_ACCEPT, this);
93
            interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
94
        }
1 by Glyph Lefkowitz
add Java stuff
95
96
        public void stopListening() {
97
            /// ???
98
            this.sk.cancel();
99
            interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
100
        }
1 by Glyph Lefkowitz
add Java stuff
101
102
        public void doAccept() throws Throwable {
103
            SocketChannel newsc = ssc.accept();
104
            if (null == newsc) {
105
                return;
106
            }
107
            newsc.configureBlocking(false);
108
            Socket socket = newsc.socket();
109
110
            IProtocol p = this.protocolFactory.buildProtocol(this.addr);
111
            new TCPServer(p, newsc, socket);
112
        }
113
    }
114
115
    abstract class TCPConnection extends Selectable implements ITransport {
116
        ByteBuffer inbuf;
117
        ArrayList<byte[]> outbufs;
118
119
        IProtocol protocol;
120
        SocketChannel channel;
121
        Socket socket;
122
        SelectionKey sk;
123
124
        boolean disconnecting;
125
126
        TCPConnection(IProtocol protocol, SocketChannel channel, Socket socket) throws Throwable {
127
            inbuf = ByteBuffer.allocate(BUFFER_SIZE);
128
            inbuf.clear();
129
            outbufs = new ArrayList<byte[]>();
130
            this.protocol = protocol;
131
            this.channel = channel;
132
            this.socket = socket;
133
            this.disconnecting = false;
134
            this.sk = channel.register(selector, SelectionKey.OP_READ, this);
135
            interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
136
            this.protocol.makeConnection(this);
1 by Glyph Lefkowitz
add Java stuff
137
        }
138
139
        // HAHAHAHA the fab four strike again
140
        void startReading() {
141
            sk.interestOps(sk.interestOps() | SelectionKey.OP_READ);
142
            interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
143
        }
1 by Glyph Lefkowitz
add Java stuff
144
145
        void startWriting () {
146
            sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
147
            interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
148
        }
1 by Glyph Lefkowitz
add Java stuff
149
150
        void stopReading () {
151
            sk.interestOps(sk.interestOps() & ~SelectionKey.OP_READ);
152
            interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
153
        }
1 by Glyph Lefkowitz
add Java stuff
154
155
        void stopWriting () {
156
            sk.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
157
            interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
158
        }
1 by Glyph Lefkowitz
add Java stuff
159
160
        void doRead() throws Throwable {
161
            boolean failed = false;
162
            Throwable reason = null;
163
            try {
164
                int bytesread = channel.read(inbuf);
165
                failed = (-1 == bytesread);
166
            } catch (IOException ioe) {
167
                failed = true;
168
                reason = ioe;
169
            }
170
171
            if (failed) {
172
                // this means the connection is closed, what???
173
                channel.close();
174
                sk.cancel();
175
                interestOpsChanged();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
176
                this.protocol.connectionLost(reason);
1 by Glyph Lefkowitz
add Java stuff
177
                return;
178
            }
179
180
            byte[] data = new byte[inbuf.position()];
181
            inbuf.flip();
182
            inbuf.get(data);
183
            inbuf.clear();
184
            try {
185
                this.protocol.dataReceived(data);
186
            } catch (Throwable t) {
187
                t.printStackTrace();
188
                this.loseConnection();
189
            }
190
        }
191
192
        public void write(byte[] data) {
193
            this.outbufs.add(data);
194
            this.startWriting();
195
        }
196
197
        void doWrite() throws Throwable {
198
            /* XXX TODO: this cannot possibly be correct, but every example
199
             * and every tutorial does this!  Insane.
200
             */
201
            if (0 == this.outbufs.size()) {
202
                if (this.disconnecting) {
203
                    this.channel.close();
204
                }
205
                // else wtf?
206
            } else {
207
                this.channel.write(ByteBuffer.wrap(this.outbufs.remove(0)));
208
                if (0 == this.outbufs.size()) {
209
                    this.stopWriting();
210
                }
211
            }
212
        }
213
214
        public void loseConnection() {
215
            this.disconnecting = true;
216
        }
217
    }
218
    class TCPServer extends TCPConnection {
219
        // is there really any need for this to be a separate class?
220
        public TCPServer(IProtocol a, SocketChannel b, Socket c) throws Throwable {
221
            super(a, b, c);
222
        }
223
    }
224
225
    Selector selector;
226
    boolean running;
227
    TreeMap<Long,Runnable> pendingCalls;
228
229
    Reactor () throws Throwable {
230
        this.selector = Selector.open();
231
        this.running = false;
232
        this.pendingCalls = new TreeMap<Long,Runnable>();
233
    }
234
235
    public interface IDelayedCall {
236
        void cancel ();
237
    }
238
239
    public void callLater(double secondsLater, Runnable runme) {
240
        long millisLater = (long) (secondsLater * 1000.0);
241
        synchronized(pendingCalls) {
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
242
            pendingCalls.put(System.currentTimeMillis() + millisLater,
243
                             runme);
244
            // This isn't actually an interestOps
245
            interestOpsChanged();
246
        }
247
    }
1 by Glyph Lefkowitz
add Java stuff
248
249
    /**
250
     * Run all runnables scheduled to run before right now, and return the
251
     * timeout.  Negative timeout means "no timeout".
252
     */
253
    long runUntilCurrent(long now) {
254
        while (0 != pendingCalls.size()) {
255
            try {
256
                long then = pendingCalls.firstKey();
257
                if (then < now) {
258
                    Runnable r = pendingCalls.remove((Object) new Long(then));
259
                    r.run();
260
                } else {
261
                    return then - now;
262
                }
263
            } catch (NoSuchElementException nsee) {
264
                nsee.printStackTrace();
265
                throw new Error("This is impossible; pendingCalls.size was not zero");
266
            }
267
        }
268
        return -1;
269
    }
270
271
    void iterate() throws Throwable {
15 by Glyph Lefkowitz
A very, very basic reactor for use with Swing.
272
        Iterator<SelectionKey> selectedKeys =
273
            this.selector.selectedKeys().iterator();
274
        while (selectedKeys.hasNext()) {
275
            SelectionKey sk = selectedKeys.next();
276
            selectedKeys.remove();
277
            Selectable selectable = ((Selectable) sk.attachment());
278
            if (sk.isValid() && sk.isWritable()) {
279
                selectable.doWrite();
280
            }
281
            if (sk.isValid() && sk.isReadable()) {
282
                selectable.doRead();
283
            }
284
            if (sk.isValid() && sk.isAcceptable()) {
285
                selectable.doAccept();
286
            }
287
            if (sk.isValid() && sk.isConnectable()) {
288
                selectable.doConnect();
289
            }
290
        }
291
    }
292
293
    /**
294
     * This may need to run the runUntilCurrent() in a different thread.
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
295
     */
296
    long processTimedEvents() {
297
        long now = System.currentTimeMillis();
298
        return runUntilCurrent(now);
299
    }
300
301
    /**
302
     * Override this method in subclasses to run "iterate" in a different
15 by Glyph Lefkowitz
A very, very basic reactor for use with Swing.
303
     * context, i.e. in a thread.
304
     */
305
    void doIteration() throws Throwable {
306
        iterate();
307
    }
308
309
    public void run() throws Throwable {
1 by Glyph Lefkowitz
add Java stuff
310
        running = true;
311
        while (running) {
312
            int selected;
313
            long timeout = processTimedEvents();
18 by Glyph Lefkowitz
sprinkle interestOpsChanged() calls everywhere that they might have changed
314
15 by Glyph Lefkowitz
A very, very basic reactor for use with Swing.
315
            if (timeout >= 0) {
1 by Glyph Lefkowitz
add Java stuff
316
                this.selector.select(timeout);
15 by Glyph Lefkowitz
A very, very basic reactor for use with Swing.
317
            } else {
1 by Glyph Lefkowitz
add Java stuff
318
                this.selector.select();
15 by Glyph Lefkowitz
A very, very basic reactor for use with Swing.
319
            }
320
            this.doIteration();
321
        }
1 by Glyph Lefkowitz
add Java stuff
322
    }
323
324
    public void stop() {
325
        this.running = false;
326
    }
327
328
    public IListeningPort listenTCP(int portno,
329
                                    IProtocol.IFactory factory)
330
        throws IOException {
331
        return new TCPPort(portno, factory);
332
    }
333
334
    public static void msg (String m) {
335
        System.out.println(m);
336
    }
337
338
    static class ShowMessage implements Runnable {
339
        String x;
340
        ShowMessage(String s) {
341
            this.x = s;
342
        }
343
        public void run () {
344
            msg(System.currentTimeMillis() + " " + this.x);
345
        }
346
    }
347
348
    public static void main (String[] args) throws Throwable {
349
        // The most basic server possible.
350
        Reactor r = Reactor.get();
351
352
        r.callLater(1, new ShowMessage("one!"));
353
        r.callLater(3, new ShowMessage("three!"));
354
        r.callLater(2, new ShowMessage("two!"));
355
        r.callLater(4, new ShowMessage("four!"));
356
357
        r.listenTCP(1234, new IProtocol.IFactory() {
358
                public IProtocol buildProtocol(Object addr) {
359
                    return new Protocol() {
360
                        public void dataReceived(byte[] data) {
361
                            this.transport().write(data);
362
                            Reactor.get().callLater(1, new ShowMessage("some data, delayed: " + new String(data)));
363
                        }
364
                    };
365
                }
366
            });
367
        r.run();
368
    }
369
}
370