2
* To change this template, choose Tools | Templates
3
* and open the template in the editor.
6
package gearmanij.common;
8
import gearmanij.Constants;
9
import gearmanij.Packet;
10
import org.gearman.PacketHeader;
11
import java.io.BufferedInputStream;
12
import java.io.ByteArrayInputStream;
13
import java.io.IOException;
14
import java.net.InetSocketAddress;
15
import java.nio.ByteBuffer;
16
import java.nio.channels.SelectionKey;
17
import java.nio.channels.Selector;
18
import java.nio.channels.SocketChannel;
19
import java.util.logging.Level;
20
import java.util.logging.Logger;
21
import org.gearman.GearmanJobServerConnection;
22
import org.gearman.GearmanPacket;
24
public class GearmanNIOJobServerConnection implements GearmanJobServerConnection{
26
InetSocketAddress remote;
27
private SocketChannel serverConnection = null;
28
private Selector selector = null;
29
private SelectionKey selectorKey = null;
30
private static final Logger LOG = Logger.getLogger(
31
Constants.GEARMAN_CLIENT_LOGGER_NAME); //TODO change this
32
private ByteBuffer bytesReceived;
33
private ByteBuffer bytesToSend;
37
public GearmanNIOJobServerConnection(InetSocketAddress remote) throws IllegalArgumentException {
39
throw new IllegalArgumentException("Remote can not be null");
42
bytesReceived = ByteBuffer.allocate(Constants.GEARMAN_DEFAULT_SOCKET_RECV_SIZE);
43
bytesToSend = ByteBuffer.allocate(Constants.GEARMAN_DEFAULT_SOCKET_RECV_SIZE);
46
public void open() throws IOException{
47
if (isInitialized()) {
48
throw new IllegalStateException("A session can not be " +
52
serverConnection = SocketChannel.open(remote);
53
serverConnection.configureBlocking(false);
54
serverConnection.finishConnect();
55
selector = Selector.open();
56
selectorKey = serverConnection.register(selector,
57
SelectionKey.OP_WRITE | SelectionKey.OP_READ);
58
} catch (IOException ioe) {
59
LOG.log(Level.WARNING, "Received IOException while attempting to" +
60
" initialize session " + this +
61
". Shuting down session", ioe);
62
if (serverConnection != null && serverConnection.isOpen()) {
63
if (selector != null && selector.isOpen()) {
66
} catch (IOException selioe) {
67
LOG.log(Level.WARNING, "Received IOException while" +
68
" attempting to close selector.", selioe);
72
serverConnection.close();
73
} catch (IOException closeioe) {
74
LOG.log(Level.WARNING, "Received IOException while" +
75
" attempting to close connection to server. " +
76
"Giving up!", closeioe);
79
throw new IOException();
84
if (!isInitialized()) {
85
throw new IllegalStateException("Can not close a session that " +
86
"has not been initialized");
88
LOG.log(Level.FINE, "Session " + this + " is being closed.");
92
} catch (IOException ioe) {
93
LOG.log(Level.WARNING, "Received IOException while attempting to " +
94
"close selector attached to session " + this, ioe);
97
serverConnection.close();
98
} catch (IOException cioe) {
99
LOG.log(Level.WARNING, "Received IOException while attempting" +
100
" to close connection for session " + this, cioe);
102
serverConnection = null;
104
LOG.log(Level.FINE, "Session " + this + " has successfully closed.");
107
public void write(GearmanPacket request) throws IOException{
108
int ps = request.getData().length + Constants.GEARMAN_PACKET_HEADER_SIZE;
109
if (bytesToSend.remaining() < ps) {
111
ByteBuffer bb = ByteBuffer.allocate(bytesToSend.capacity() + (ps * 10));
115
byte[] bytes = request.toBytes();
116
ByteBuffer bb = ByteBuffer.allocate(bytes.length);
120
selector.selectNow();
121
if (selectorKey.isWritable()) {
122
bytesToSend.limit(bytesToSend.position());
123
bytesToSend.rewind();
124
serverConnection.write(bytesToSend);
125
bytesToSend.compact();
129
public GearmanPacket read() throws IOException {
130
GearmanPacket returnPacket = null;
131
selector.selectNow();
132
if (selectorKey.isReadable()) {
133
int bytesRead = serverConnection.read(bytesReceived);
134
if (bytesRead >= 0) {
135
LOG.log(Level.FINER, "Session " + this + " has read " +
136
bytesRead + " bytes from its job server. Buffer has " +
137
bytesReceived.remaining());
139
//TODO do something smarter here
140
throw new IOException("Connection to job server severed");
143
if (bufferContainsCompletePacket(bytesReceived)) {
144
byte[] pb = new byte[getSizeOfPacket(bytesReceived)];
145
bytesReceived.limit(bytesReceived.position());
146
bytesReceived.rewind();
147
bytesReceived.get(pb);
148
bytesReceived.compact();
149
returnPacket = new Packet(new BufferedInputStream(new ByteArrayInputStream(pb)));
154
public SelectionKey registerSelector(Selector s, int mask) throws IOException{
155
return serverConnection.register(s, mask);
159
public boolean canRead () {
161
selector.selectNow();
162
} catch (IOException ioe) {
163
LOG.log(Level.WARNING,"Failed to select on connection " + this,ioe);
165
return (selectorKey.isReadable() || bufferContainsCompletePacket(bytesReceived));
168
public boolean canWrite () {
170
selector.selectNow();
171
} catch (IOException ioe) {
172
LOG.log(Level.WARNING,"Failed to select on connection " + this,ioe);
174
return (bytesToSend.hasRemaining() || selectorKey.isWritable());
177
public Selector getSelector() {
181
public boolean isInitialized() {
182
return (serverConnection != null && serverConnection.isConnected());
185
boolean bufferContainsCompletePacket(ByteBuffer b) {
186
if (b.position() < Constants.GEARMAN_PACKET_HEADER_SIZE ) {
189
return b.position() >= getSizeOfPacket(b) ? true : false;
192
// DO NOT CALL UNLESS YOU ARE SURE THAT BYTEBUFFER HAS AT LEAST
193
// GEARMAN_PACKET_HEADER_SIZE BYTES!
194
int getSizeOfPacket(ByteBuffer buffer) {
195
int originalPosition = buffer.position();
196
byte[] header =new byte[Constants.GEARMAN_PACKET_HEADER_SIZE];
199
buffer.position(originalPosition);
200
PacketHeader ph = new PacketHeader(header);
201
return ph.getDataLength() + Constants.GEARMAN_PACKET_HEADER_SIZE;