~ubuntu-branches/ubuntu/wily/hawtdispatch/wily-proposed

« back to all changes in this revision

Viewing changes to hawtdispatch-transport/src/main/java/org/fusesource/hawtdispatch/transport/PipeTransportServer.java

  • Committer: Package Import Robot
  • Author(s): Emmanuel Bourg
  • Date: 2015-07-22 20:14:35 UTC
  • Revision ID: package-import@ubuntu.com-20150722201435-s694odej0ch4qung
Tags: upstream-1.20
ImportĀ upstreamĀ versionĀ 1.20

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 * Copyright (C) 2012 FuseSource, Inc.
 
3
 * http://fusesource.com
 
4
 *
 
5
 * Licensed under the Apache License, Version 2.0 (the "License");
 
6
 * you may not use this file except in compliance with the License.
 
7
 * You may obtain a copy of the License at
 
8
 *
 
9
 *    http://www.apache.org/licenses/LICENSE-2.0
 
10
 *
 
11
 * Unless required by applicable law or agreed to in writing, software
 
12
 * distributed under the License is distributed on an "AS IS" BASIS,
 
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
 * See the License for the specific language governing permissions and
 
15
 * limitations under the License.
 
16
 */
 
17
 
 
18
package org.fusesource.hawtdispatch.transport;
 
19
 
 
20
import org.fusesource.hawtdispatch.*;
 
21
 
 
22
import java.net.InetSocketAddress;
 
23
import java.net.URI;
 
24
import java.util.LinkedList;
 
25
import java.util.concurrent.Executor;
 
26
import java.util.concurrent.atomic.AtomicInteger;
 
27
 
 
28
/**
 
29
 *
 
30
 * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
 
31
 */
 
32
public class PipeTransportServer implements TransportServer {
 
33
 
 
34
    protected String connectURI;
 
35
    protected TransportServerListener listener;
 
36
    protected String name;
 
37
    protected boolean marshal;
 
38
    protected final AtomicInteger connectionCounter = new AtomicInteger();
 
39
    DispatchQueue dispatchQueue;
 
40
 
 
41
    private CustomDispatchSource<PipeTransport,LinkedList<PipeTransport>> acceptSource;
 
42
 
 
43
 
 
44
    public String getBoundAddress() {
 
45
        return connectURI;
 
46
    }
 
47
 
 
48
    public InetSocketAddress getSocketAddress() {
 
49
        return null;
 
50
    }
 
51
 
 
52
    public DispatchQueue getDispatchQueue() {
 
53
        return dispatchQueue;
 
54
    }
 
55
 
 
56
    public void setDispatchQueue(DispatchQueue queue) {
 
57
        dispatchQueue = queue;
 
58
    }
 
59
 
 
60
    public void suspend() {
 
61
        acceptSource.suspend();
 
62
    }
 
63
 
 
64
    public void resume() {
 
65
        acceptSource.resume();
 
66
    }
 
67
 
 
68
    public void setTransportServerListener(TransportServerListener listener) {
 
69
        this.listener = listener;
 
70
    }
 
71
 
 
72
    @Deprecated
 
73
    public void start(Runnable onCompleted) throws Exception {
 
74
        start(new TaskWrapper(onCompleted));
 
75
    }
 
76
    @Deprecated
 
77
    public void stop(Runnable onCompleted) throws Exception {
 
78
        stop(new TaskWrapper(onCompleted));
 
79
    }
 
80
 
 
81
    public void start(Task onCompleted) throws Exception {
 
82
        acceptSource = Dispatch.createSource(EventAggregators.<PipeTransport>linkedList(), dispatchQueue);
 
83
        acceptSource.setEventHandler(new Task() {
 
84
            public void run() {
 
85
                LinkedList<PipeTransport> transports = acceptSource.getData();
 
86
                for (PipeTransport transport : transports) {
 
87
                    try {
 
88
                        listener.onAccept(transport);
 
89
                    } catch (Exception e) {
 
90
                        listener.onAcceptError(e);
 
91
                    }
 
92
                }
 
93
            }
 
94
        });
 
95
        acceptSource.resume();
 
96
        if( onCompleted!=null ) {
 
97
            dispatchQueue.execute(onCompleted);
 
98
        }
 
99
    }
 
100
 
 
101
    public void stop(Task onCompleted) throws Exception {
 
102
        PipeTransportRegistry.unbind(this);
 
103
        acceptSource.setCancelHandler(onCompleted);
 
104
        acceptSource.cancel();
 
105
    }
 
106
 
 
107
    public void setConnectURI(String connectURI) {
 
108
        this.connectURI = connectURI;
 
109
    }
 
110
 
 
111
    public void setName(String name) {
 
112
        this.name = name;
 
113
    }
 
114
 
 
115
    public String getName() {
 
116
        return name;
 
117
    }
 
118
 
 
119
    public PipeTransport connect() {
 
120
        int connectionId = connectionCounter.incrementAndGet();
 
121
        String remoteAddress = connectURI.toString() + "#" + connectionId;
 
122
        assert this.listener != null : "Server does not have an accept listener";
 
123
 
 
124
        PipeTransport clientTransport = createClientTransport();
 
125
        PipeTransport serverTransport = createServerTransport();
 
126
        clientTransport.peer = serverTransport;
 
127
        serverTransport.peer = clientTransport;
 
128
 
 
129
        clientTransport.setRemoteAddress(remoteAddress);
 
130
        serverTransport.setRemoteAddress(remoteAddress);
 
131
 
 
132
        serverTransport.setMarshal(marshal);
 
133
        this.acceptSource.merge(serverTransport);
 
134
        return clientTransport;
 
135
    }
 
136
 
 
137
    protected PipeTransport createClientTransport() {
 
138
        return new PipeTransport(this);
 
139
    }
 
140
    
 
141
    protected PipeTransport createServerTransport() {
 
142
        return new PipeTransport(this);
 
143
    }
 
144
 
 
145
    public boolean isMarshal() {
 
146
        return marshal;
 
147
    }
 
148
 
 
149
    public void setMarshal(boolean marshal) {
 
150
        this.marshal = marshal;
 
151
    }
 
152
 
 
153
    public Executor getBlockingExecutor() {
 
154
        return null;
 
155
    }
 
156
 
 
157
    public void setBlockingExecutor(Executor blockingExecutor) {
 
158
    }
 
159
}