2
* Copyright (C) 2012 FuseSource, Inc.
3
* http://fusesource.com
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
* you may not use this file except in compliance with the License.
7
* You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
* See the License for the specific language governing permissions and
15
* limitations under the License.
18
package org.fusesource.hawtdispatch.transport;
20
import org.fusesource.hawtdispatch.*;
22
import java.net.InetSocketAddress;
24
import java.util.LinkedList;
25
import java.util.concurrent.Executor;
26
import java.util.concurrent.atomic.AtomicInteger;
30
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
32
public class PipeTransportServer implements TransportServer {
34
protected String connectURI;
35
protected TransportServerListener listener;
36
protected String name;
37
protected boolean marshal;
38
protected final AtomicInteger connectionCounter = new AtomicInteger();
39
DispatchQueue dispatchQueue;
41
private CustomDispatchSource<PipeTransport,LinkedList<PipeTransport>> acceptSource;
44
public String getBoundAddress() {
48
public InetSocketAddress getSocketAddress() {
52
public DispatchQueue getDispatchQueue() {
56
public void setDispatchQueue(DispatchQueue queue) {
57
dispatchQueue = queue;
60
public void suspend() {
61
acceptSource.suspend();
64
public void resume() {
65
acceptSource.resume();
68
public void setTransportServerListener(TransportServerListener listener) {
69
this.listener = listener;
73
public void start(Runnable onCompleted) throws Exception {
74
start(new TaskWrapper(onCompleted));
77
public void stop(Runnable onCompleted) throws Exception {
78
stop(new TaskWrapper(onCompleted));
81
public void start(Task onCompleted) throws Exception {
82
acceptSource = Dispatch.createSource(EventAggregators.<PipeTransport>linkedList(), dispatchQueue);
83
acceptSource.setEventHandler(new Task() {
85
LinkedList<PipeTransport> transports = acceptSource.getData();
86
for (PipeTransport transport : transports) {
88
listener.onAccept(transport);
89
} catch (Exception e) {
90
listener.onAcceptError(e);
95
acceptSource.resume();
96
if( onCompleted!=null ) {
97
dispatchQueue.execute(onCompleted);
101
public void stop(Task onCompleted) throws Exception {
102
PipeTransportRegistry.unbind(this);
103
acceptSource.setCancelHandler(onCompleted);
104
acceptSource.cancel();
107
public void setConnectURI(String connectURI) {
108
this.connectURI = connectURI;
111
public void setName(String name) {
115
public String getName() {
119
public PipeTransport connect() {
120
int connectionId = connectionCounter.incrementAndGet();
121
String remoteAddress = connectURI.toString() + "#" + connectionId;
122
assert this.listener != null : "Server does not have an accept listener";
124
PipeTransport clientTransport = createClientTransport();
125
PipeTransport serverTransport = createServerTransport();
126
clientTransport.peer = serverTransport;
127
serverTransport.peer = clientTransport;
129
clientTransport.setRemoteAddress(remoteAddress);
130
serverTransport.setRemoteAddress(remoteAddress);
132
serverTransport.setMarshal(marshal);
133
this.acceptSource.merge(serverTransport);
134
return clientTransport;
137
protected PipeTransport createClientTransport() {
138
return new PipeTransport(this);
141
protected PipeTransport createServerTransport() {
142
return new PipeTransport(this);
145
public boolean isMarshal() {
149
public void setMarshal(boolean marshal) {
150
this.marshal = marshal;
153
public Executor getBlockingExecutor() {
157
public void setBlockingExecutor(Executor blockingExecutor) {