2
* $HeadURL: https://svn.apache.org/repos/asf/httpcomponents/httpcore/tags/4.0.1/httpcore-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java $
4
* $Date: 2009-02-14 18:23:26 +0100 (Sat, 14 Feb 2009) $
6
* ====================================================================
7
* Licensed to the Apache Software Foundation (ASF) under one
8
* or more contributor license agreements. See the NOTICE file
9
* distributed with this work for additional information
10
* regarding copyright ownership. The ASF licenses this file
11
* to you under the Apache License, Version 2.0 (the
12
* "License"); you may not use this file except in compliance
13
* with the License. You may obtain a copy of the License at
15
* http://www.apache.org/licenses/LICENSE-2.0
17
* Unless required by applicable law or agreed to in writing,
18
* software distributed under the License is distributed on an
19
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20
* KIND, either express or implied. See the License for the
21
* specific language governing permissions and limitations
23
* ====================================================================
25
* This software consists of voluntary contributions made by many
26
* individuals on behalf of the Apache Software Foundation. For more
27
* information on the Apache Software Foundation, please see
28
* <http://www.apache.org/>.
32
package org.apache.http.impl.nio.reactor;
34
import java.io.IOException;
35
import java.net.InetSocketAddress;
36
import java.net.SocketAddress;
37
import java.net.UnknownHostException;
38
import java.nio.channels.CancelledKeyException;
39
import java.nio.channels.SelectionKey;
40
import java.nio.channels.SocketChannel;
41
import java.util.Iterator;
42
import java.util.Queue;
44
import java.util.concurrent.ConcurrentLinkedQueue;
45
import java.util.concurrent.ThreadFactory;
47
import org.apache.http.nio.reactor.ConnectingIOReactor;
48
import org.apache.http.nio.reactor.IOReactorException;
49
import org.apache.http.nio.reactor.IOReactorStatus;
50
import org.apache.http.nio.reactor.SessionRequest;
51
import org.apache.http.nio.reactor.SessionRequestCallback;
52
import org.apache.http.params.HttpConnectionParams;
53
import org.apache.http.params.HttpParams;
56
* Default implementation of {@link ConnectingIOReactor}. This class extends
57
* {@link AbstractMultiworkerIOReactor} with capability to connect to remote
61
* @version $Revision: 744539 $
65
public class DefaultConnectingIOReactor extends AbstractMultiworkerIOReactor
66
implements ConnectingIOReactor {
68
private final Queue<SessionRequestImpl> requestQueue;
70
private long lastTimeoutCheck;
72
public DefaultConnectingIOReactor(
74
final ThreadFactory threadFactory,
75
final HttpParams params) throws IOReactorException {
76
super(workerCount, threadFactory, params);
77
this.requestQueue = new ConcurrentLinkedQueue<SessionRequestImpl>();
78
this.lastTimeoutCheck = System.currentTimeMillis();
81
public DefaultConnectingIOReactor(
83
final HttpParams params) throws IOReactorException {
84
this(workerCount, null, params);
88
protected void cancelRequests() throws IOReactorException {
89
SessionRequestImpl request;
90
while ((request = this.requestQueue.poll()) != null) {
96
protected void processEvents(int readyCount) throws IOReactorException {
97
processSessionRequests();
100
Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
101
for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
103
SelectionKey key = it.next();
107
selectedKeys.clear();
110
long currentTime = System.currentTimeMillis();
111
if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
112
this.lastTimeoutCheck = currentTime;
113
Set<SelectionKey> keys = this.selector.keys();
114
processTimeouts(keys);
118
private void processEvent(final SelectionKey key) {
121
if (key.isConnectable()) {
123
SocketChannel channel = (SocketChannel) key.channel();
124
// Get request handle
125
SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
126
SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
128
// Finish connection process
130
channel.finishConnect();
131
} catch (IOException ex) {
132
sessionRequest.failed(ex);
135
if (channel.isConnected()) {
138
prepareSocket(channel.socket());
139
} catch (IOException ex) {
140
if (this.exceptionHandler == null
141
|| !this.exceptionHandler.handle(ex)) {
142
throw new IOReactorException(
143
"Failure initalizing socket", ex);
146
ChannelEntry entry = new ChannelEntry(channel, sessionRequest);
148
} catch (IOException ex) {
149
sessionRequest.failed(ex);
154
} catch (CancelledKeyException ex) {
159
private void processTimeouts(final Set<SelectionKey> keys) {
160
long now = System.currentTimeMillis();
161
for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
162
SelectionKey key = it.next();
163
Object attachment = key.attachment();
165
if (attachment instanceof SessionRequestHandle) {
166
SessionRequestHandle handle = (SessionRequestHandle) key.attachment();
167
SessionRequestImpl sessionRequest = handle.getSessionRequest();
168
int timeout = sessionRequest.getConnectTimeout();
170
if (handle.getRequestTime() + timeout < now) {
171
sessionRequest.timeout();
179
public SessionRequest connect(
180
final SocketAddress remoteAddress,
181
final SocketAddress localAddress,
182
final Object attachment,
183
final SessionRequestCallback callback) {
185
if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
186
throw new IllegalStateException("I/O reactor has been shut down");
188
SessionRequestImpl sessionRequest = new SessionRequestImpl(
189
remoteAddress, localAddress, attachment, callback);
190
sessionRequest.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(this.params));
192
this.requestQueue.add(sessionRequest);
193
this.selector.wakeup();
195
return sessionRequest;
198
private void validateAddress(final SocketAddress address) throws UnknownHostException {
199
if (address == null) {
202
if (address instanceof InetSocketAddress) {
203
InetSocketAddress endpoint = (InetSocketAddress) address;
204
if (endpoint.isUnresolved()) {
205
throw new UnknownHostException(endpoint.getHostName());
210
private void processSessionRequests() throws IOReactorException {
211
SessionRequestImpl request;
212
while ((request = this.requestQueue.poll()) != null) {
213
if (request.isCompleted()) {
216
SocketChannel socketChannel;
218
socketChannel = SocketChannel.open();
219
socketChannel.configureBlocking(false);
220
} catch (IOException ex) {
221
throw new IOReactorException("Failure opening socket", ex);
224
validateAddress(request.getLocalAddress());
225
validateAddress(request.getRemoteAddress());
227
if (request.getLocalAddress() != null) {
228
socketChannel.socket().bind(request.getLocalAddress());
230
boolean connected = socketChannel.connect(request.getRemoteAddress());
232
prepareSocket(socketChannel.socket());
233
ChannelEntry entry = new ChannelEntry(socketChannel, request);
237
} catch (IOException ex) {
244
key = socketChannel.register(this.selector, 0);
246
} catch (IOException ex) {
247
throw new IOReactorException("Failure registering channel " +
248
"with the selector", ex);
251
SessionRequestHandle requestHandle = new SessionRequestHandle(request);
253
key.attach(requestHandle);
254
key.interestOps(SelectionKey.OP_CONNECT);
255
} catch (CancelledKeyException ex) {
256
// Ignore cancelled keys