2
;(:use ring.adapter.jetty)
3
(:require [clojure.tools.cli :as cli]
4
[clojure.string :as string])
9
[java.util.concurrent Executors]
10
[java.net InetSocketAddress]
11
[org.jboss.netty.channel
12
Channels SimpleChannelHandler ChannelPipelineFactory]
13
[org.jboss.netty.buffer ChannelBuffers]
14
[org.jboss.netty.channel.socket.nio NioClientSocketChannelFactory]
15
[org.jboss.netty.bootstrap ClientBootstrap]
16
[org.jboss.netty.handler.codec.base64 Base64]
17
[org.jboss.netty.util CharsetUtil]
19
;; Jetty WebSocket Server
20
[org.eclipse.jetty.server Server]
21
[org.eclipse.jetty.server.nio BlockingChannelConnector]
22
[org.eclipse.jetty.servlet
23
ServletContextHandler ServletHolder DefaultServlet]
24
[org.eclipse.jetty.websocket
25
WebSocket WebSocket$OnTextMessage
26
WebSocketClientFactory WebSocketClient WebSocketServlet]))
31
;; (defn tcp-channel [host port]
33
;; (let [address (InetSocketAddress. host port)
34
;; channel (doto (SocketChannel/open)
35
;; (.connect address))]
38
;; (println (str "Failed to connect to'" host ":" port "':" e))
41
;; http://docs.jboss.org/netty/3.2/guide/html/start.html#d0e51
42
;; http://stackoverflow.com/questions/5453602/highly-concurrent-http-with-netty-and-nio
43
;; https://github.com/datskos/ring-netty-adapter/blob/master/src/ring/adapter/netty.clj
46
(defn netty-client [host port open close message]
47
(let [handler (proxy [SimpleChannelHandler] []
48
(channelConnected [ctx e] (open ctx e))
49
(channelDisconnected [ctx e] (close ctx e))
50
(messageReceived [ctx e] (message ctx e))
51
(exceptionCaught [ctx e]
52
(println "exceptionCaught:" e)))
53
pipeline (proxy [ChannelPipelineFactory] []
55
(doto (Channels/pipeline)
56
(.addLast "handler" handler))))
57
bootstrap (doto (ClientBootstrap.
58
(NioClientSocketChannelFactory.
59
(Executors/newCachedThreadPool)
60
(Executors/newCachedThreadPool)))
61
(.setPipelineFactory pipeline)
62
(.setOption "tcpNoDelay" true)
63
(.setOption "keepAlive" true))
64
channel-future (.connect bootstrap (InetSocketAddress. host port))
65
channel (.. channel-future (awaitUninterruptibly) (getChannel))]
72
;; http://wiki.eclipse.org/Jetty/Feature/WebSockets
73
(defn make-websocket-servlet [open close message]
74
(proxy [WebSocketServlet] []
75
(doGet [request response]
76
;;(println "doGet" request)
77
(.. (proxy-super getServletContext)
78
(getNamedDispatcher (proxy-super getServletName))
79
(forward request response)))
80
(doWebSocketConnect [request response]
81
(println "doWebSocketConnect")
82
(reify WebSocket$OnTextMessage
83
(onOpen [this connection] (open this connection))
84
(onClose [this code message] (close this code message))
85
(onMessage [this data] (message this data))))))
87
(defn websocket-server
88
[port & {:keys [open close message ws-path web]
89
:or {open (fn [_ conn]
90
(println "New websocket client:" conn))
91
close (fn [_ code reason]
92
(println "Websocket client closed:" code reason))
94
(println "Websocket message:" data))
96
ws-path "/websocket"}}]
97
(let [http-servlet (doto (ServletHolder. (DefaultServlet.))
98
(.setInitParameter "dirAllowed" "true")
99
(.setInitParameter "resourceBase" web))
100
ws-servlet (ServletHolder.
101
(make-websocket-servlet open close message))
102
context (doto (ServletContextHandler.)
103
(.setContextPath "/")
104
(.addServlet ws-servlet ws-path))
105
connector (doto (BlockingChannelConnector.)
107
(.setMaxIdleTime Integer/MAX_VALUE))
108
server (doto (Server.)
109
(.setHandler context)
110
(.addConnector connector))]
112
(when web (.addServlet context http-servlet "/"))
119
(defonce settings (atom {}))
121
;; WebSocket client to TCP target mappings
123
(defonce clients (atom {}))
124
(defonce targets (atom {}))
127
(defn target-open [ctx e]
128
(println "Connected to target")
129
#_(println "channelConnected:" e))
131
(defn target-close [ctx e]
132
#_(println "channelDisconnected:" e)
133
(println "Target closed")
134
(when-let [channel (get @targets (.getChannel ctx))]
135
(.disconnect channel)))
137
(defn target-message [ctx e]
138
(let [channel (.getChannel ctx)
139
client (get @targets channel)
141
len (.readableBytes msg)
142
b64 (Base64/encode msg false)
143
blen (.readableBytes b64)]
144
#_(println "received" len "bytes from target")
145
#_(println "target receive:" (.toString msg 0 len CharsetUtil/UTF_8))
146
#_(println "sending to client:" (.toString b64 0 blen CharsetUtil/UTF_8))
147
(.sendMessage client (.toString b64 0 blen CharsetUtil/UTF_8))))
149
(defn client-open [this connection]
150
#_(println "Got WebSocket connection:" connection)
151
(println "New client")
152
(let [target (netty-client
153
(:target-host @settings)
154
(:target-port @settings)
155
target-open target-close target-message)]
156
(swap! clients assoc this {:client connection
158
(swap! targets assoc target connection)))
160
(defn client-close [this code message]
161
(println "WebSocket connection closed")
162
(when-let [target (:target (get @clients this))]
163
(println "Closing target")
165
(println "Target closed")
166
(swap! targets dissoc target))
167
(swap! clients dissoc this))
169
(defn client-message [this data]
170
#_(println "WebSocket onMessage:" data)
171
(let [target (:target (get @clients this))
172
cbuf (ChannelBuffers/copiedBuffer data CharsetUtil/UTF_8)
173
decbuf (Base64/decode cbuf)
174
rlen (.readableBytes decbuf)]
175
#_(println "Sending" rlen "bytes to target")
176
#_(println "Sending to target:" (.toString decbuf 0 rlen CharsetUtil/UTF_8))
177
(.write target decbuf)))
179
(defn start-websockify
180
[& {:keys [listen-port target-host target-port web]
181
:or {listen-port 6080
182
target-host "localhost"
189
(reset! settings {:target-host target-host
190
:target-port target-port})
191
(let [server (websocket-server listen-port
193
:ws-path "/websockify"
196
:message client-message)]
201
(println "Serving web requests from:" web)
202
(println "Not serving web requests"))
204
(defn stop-websockify []
205
(doseq [client (vals @clients)]
206
(.disconnect (:client client))
207
(.close (:target client)))
214
(let [[options args banner]
217
["-v" "--[no-]verbose" "Verbose output"]
218
["--web" "Run webserver with root at given location"]
219
["-h" "--help" "Show help" :default false :flag true]
221
(when (or (:help options)
222
(not= 2 (count args)))
227
(let [target (second args)
228
[target-host target-port] (string/split target #":")]
229
(start-websockify :listen-port (Integer/parseInt (first args))
230
:target-host target-host
231
:target-port (Integer/parseInt target-port)
232
:web (:web options))))
b'\\ No newline at end of file'