~elambert/gearmanij/gearman_java_library

« back to all changes in this revision

Viewing changes to src/gearmanij/common/GearmanNIOJobServerConnection.java

  • Committer: Eric Lambert
  • Date: 2009-07-07 02:18:15 UTC
  • mfrom: (57.1.65 gearmanij-trunk)
  • Revision ID: eric.d.lambert@gmail.com-20090707021815-0xbupi72ubyoa62a
merge from trunk. ReverseWorkerTest has been ignored, it was failing claiming that certain ops were not supported, since there is duplication in the worker code, for the time being will just ignore this issue, will resolve once the worker code has been straightened out

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 * To change this template, choose Tools | Templates
3
 
 * and open the template in the editor.
4
 
 */
5
 
 
6
 
package gearmanij.common;
7
 
 
8
 
import gearmanij.Constants;
9
 
import gearmanij.Packet;
10
 
import org.gearman.PacketHeader;
11
 
import java.io.BufferedInputStream;
12
 
import java.io.ByteArrayInputStream;
13
 
import java.io.IOException;
14
 
import java.net.InetSocketAddress;
15
 
import java.nio.ByteBuffer;
16
 
import java.nio.channels.SelectionKey;
17
 
import java.nio.channels.Selector;
18
 
import java.nio.channels.SocketChannel;
19
 
import java.util.logging.Level;
20
 
import java.util.logging.Logger;
21
 
import org.gearman.GearmanJobServerConnection;
22
 
import org.gearman.GearmanPacket;
23
 
 
24
 
public class GearmanNIOJobServerConnection implements GearmanJobServerConnection{
25
 
 
26
 
    InetSocketAddress remote;
27
 
    private SocketChannel serverConnection = null;
28
 
    private Selector selector = null;
29
 
    private SelectionKey selectorKey = null;
30
 
    private static final Logger LOG = Logger.getLogger(
31
 
            Constants.GEARMAN_CLIENT_LOGGER_NAME); //TODO change this
32
 
    private ByteBuffer bytesReceived;
33
 
    private ByteBuffer bytesToSend;
34
 
    
35
 
    
36
 
    
37
 
    public GearmanNIOJobServerConnection(InetSocketAddress remote) throws IllegalArgumentException {
38
 
        if (remote == null) {
39
 
            throw new IllegalArgumentException("Remote can not be null");
40
 
        }
41
 
        this.remote = remote;
42
 
        bytesReceived = ByteBuffer.allocate(Constants.GEARMAN_DEFAULT_SOCKET_RECV_SIZE);
43
 
        bytesToSend = ByteBuffer.allocate(Constants.GEARMAN_DEFAULT_SOCKET_RECV_SIZE);
44
 
    }
45
 
 
46
 
    public void open() throws IOException{
47
 
        if (isInitialized()) {
48
 
            throw new IllegalStateException("A session can not be " +
49
 
                    "initialized twice");
50
 
        }
51
 
        try {
52
 
            serverConnection = SocketChannel.open(remote);
53
 
            serverConnection.configureBlocking(false);
54
 
            serverConnection.finishConnect();
55
 
            selector = Selector.open();
56
 
            selectorKey = serverConnection.register(selector,
57
 
                    SelectionKey.OP_WRITE | SelectionKey.OP_READ);
58
 
        } catch (IOException ioe) {
59
 
            LOG.log(Level.WARNING, "Received IOException while attempting to" +
60
 
                    " initialize session " + this +
61
 
                    ". Shuting down session", ioe);
62
 
            if (serverConnection != null && serverConnection.isOpen()) {
63
 
                if (selector != null && selector.isOpen()) {
64
 
                    try {
65
 
                        selector.close();
66
 
                    } catch (IOException selioe) {
67
 
                        LOG.log(Level.WARNING, "Received IOException while" +
68
 
                                " attempting to close selector.", selioe);
69
 
                    }
70
 
                }
71
 
                try {
72
 
                    serverConnection.close();
73
 
                } catch (IOException closeioe) {
74
 
                    LOG.log(Level.WARNING, "Received IOException while" +
75
 
                            " attempting to close connection to server. " +
76
 
                            "Giving up!", closeioe);
77
 
                }
78
 
            }
79
 
            throw new IOException();
80
 
        }
81
 
    }
82
 
 
83
 
    public void close() {
84
 
        if (!isInitialized()) {
85
 
            throw new IllegalStateException("Can not close a session that " +
86
 
                    "has not been initialized");
87
 
        }
88
 
        LOG.log(Level.FINE, "Session " + this + " is being closed.");
89
 
        selectorKey.cancel();
90
 
        try {
91
 
            selector.close();
92
 
        } catch (IOException ioe) {
93
 
            LOG.log(Level.WARNING, "Received IOException while attempting to " +
94
 
                    "close selector attached to session " + this, ioe);
95
 
        } finally {
96
 
            try {
97
 
                serverConnection.close();
98
 
            } catch (IOException cioe) {
99
 
                LOG.log(Level.WARNING, "Received IOException while attempting" +
100
 
                        " to close connection for session " + this, cioe);
101
 
            }
102
 
            serverConnection = null;
103
 
        }
104
 
        LOG.log(Level.FINE, "Session " + this + " has successfully closed.");
105
 
    }
106
 
 
107
 
    public void write(GearmanPacket request) throws IOException{
108
 
        int ps = request.getData().length + Constants.GEARMAN_PACKET_HEADER_SIZE;
109
 
        if (bytesToSend.remaining() < ps) {
110
 
            //TODO allocate more
111
 
            ByteBuffer bb = ByteBuffer.allocate(bytesToSend.capacity() + (ps * 10));
112
 
            bb.put(bytesToSend);
113
 
            bytesToSend = bb;
114
 
        }
115
 
        byte[] bytes = request.toBytes();
116
 
        ByteBuffer bb = ByteBuffer.allocate(bytes.length);
117
 
        bb.put(bytes);
118
 
        bb.rewind();
119
 
        bytesToSend.put(bb);
120
 
        selector.selectNow();
121
 
        if (selectorKey.isWritable()) {
122
 
            bytesToSend.limit(bytesToSend.position());
123
 
            bytesToSend.rewind();
124
 
            serverConnection.write(bytesToSend);
125
 
            bytesToSend.compact();
126
 
        }
127
 
    }
128
 
 
129
 
    public GearmanPacket read() throws IOException {
130
 
        GearmanPacket returnPacket = null;
131
 
        selector.selectNow();
132
 
        if (selectorKey.isReadable()) {
133
 
            int bytesRead = serverConnection.read(bytesReceived);
134
 
            if (bytesRead >= 0) {
135
 
                LOG.log(Level.FINER, "Session " + this + " has read " + 
136
 
                        bytesRead + " bytes from its job server. Buffer has " +
137
 
                        bytesReceived.remaining());
138
 
            } else {
139
 
                //TODO do something smarter here
140
 
                throw new IOException("Connection to job server severed");
141
 
            }
142
 
        }
143
 
        if (bufferContainsCompletePacket(bytesReceived)) {
144
 
            byte[] pb = new byte[getSizeOfPacket(bytesReceived)];
145
 
            bytesReceived.limit(bytesReceived.position());
146
 
            bytesReceived.rewind();
147
 
            bytesReceived.get(pb);
148
 
            bytesReceived.compact();
149
 
            returnPacket = new Packet(new BufferedInputStream(new ByteArrayInputStream(pb)));
150
 
        }
151
 
        return returnPacket;
152
 
    }
153
 
 
154
 
    public SelectionKey registerSelector(Selector s, int mask)  throws IOException{
155
 
        return serverConnection.register(s, mask);
156
 
 
157
 
    }
158
 
    
159
 
    public boolean canRead () {
160
 
        try {
161
 
            selector.selectNow();
162
 
        } catch (IOException ioe) {
163
 
            LOG.log(Level.WARNING,"Failed to select on connection " + this,ioe);
164
 
        }
165
 
        return (selectorKey.isReadable() || bufferContainsCompletePacket(bytesReceived));
166
 
    }
167
 
    
168
 
    public boolean canWrite () {
169
 
        try {
170
 
            selector.selectNow();
171
 
        } catch (IOException ioe) {
172
 
            LOG.log(Level.WARNING,"Failed to select on connection " + this,ioe);
173
 
        }
174
 
        return (bytesToSend.hasRemaining() || selectorKey.isWritable());
175
 
    }
176
 
 
177
 
    public Selector getSelector() {
178
 
        return selector;
179
 
    }
180
 
 
181
 
    public boolean isInitialized() {
182
 
        return (serverConnection != null && serverConnection.isConnected());
183
 
    }
184
 
 
185
 
    boolean bufferContainsCompletePacket(ByteBuffer b) {
186
 
        if (b.position() < Constants.GEARMAN_PACKET_HEADER_SIZE ) {
187
 
            return false;
188
 
        }
189
 
        return b.position() >= getSizeOfPacket(b) ? true : false;
190
 
    }
191
 
 
192
 
    // DO NOT CALL UNLESS YOU ARE SURE THAT BYTEBUFFER HAS AT LEAST
193
 
    // GEARMAN_PACKET_HEADER_SIZE BYTES!
194
 
    int getSizeOfPacket(ByteBuffer buffer) {
195
 
        int originalPosition = buffer.position();
196
 
        byte[] header =new byte[Constants.GEARMAN_PACKET_HEADER_SIZE];
197
 
        buffer.rewind();
198
 
        buffer.get(header);
199
 
        buffer.position(originalPosition);
200
 
        PacketHeader ph = new PacketHeader(header);
201
 
        return ph.getDataLength() + Constants.GEARMAN_PACKET_HEADER_SIZE;
202
 
    }
203
 
 
204
 
}