2
* Licensed to the Apache Software Foundation (ASF) under one or more
3
* contributor license agreements. See the NOTICE file distributed with
4
* this work for additional information regarding copyright ownership.
5
* The ASF licenses this file to You under the Apache License, Version 2.0
6
* (the "License"); you may not use this file except in compliance with
7
* the License. You may obtain a copy of the License at
9
* http://www.apache.org/licenses/LICENSE-2.0
11
* Unless required by applicable law or agreed to in writing, software
12
* distributed under the License is distributed on an "AS IS" BASIS,
13
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
* See the License for the specific language governing permissions and
15
* limitations under the License.
18
package org.apache.solr.client.solrj.impl;
20
import java.io.IOException;
21
import java.io.OutputStream;
22
import java.io.OutputStreamWriter;
23
import java.net.MalformedURLException;
24
import java.util.LinkedList;
25
import java.util.Queue;
26
import java.util.concurrent.*;
27
import java.util.concurrent.locks.Lock;
28
import java.util.concurrent.locks.ReentrantLock;
30
import org.apache.commons.httpclient.HttpClient;
31
import org.apache.commons.httpclient.HttpStatus;
32
import org.apache.commons.httpclient.methods.PostMethod;
33
import org.apache.commons.httpclient.methods.RequestEntity;
34
import org.apache.solr.client.solrj.SolrRequest;
35
import org.apache.solr.client.solrj.SolrServerException;
36
import org.apache.solr.client.solrj.request.UpdateRequest;
37
import org.apache.solr.client.solrj.util.ClientUtils;
38
import org.apache.solr.common.params.SolrParams;
39
import org.apache.solr.common.params.UpdateParams;
40
import org.apache.solr.common.util.NamedList;
41
import org.slf4j.Logger;
42
import org.slf4j.LoggerFactory;
45
* {@link StreamingUpdateSolrServer} buffers all added documents and writes them
46
* into open HTTP connections. This class is thread safe.
48
* Although any SolrServer request can be made with this implementation,
49
* it is only recommended to use the {@link StreamingUpdateSolrServer} with
50
* /update requests. The query interface is better suited for
52
* @version $Id: CommonsHttpSolrServer.java 724175 2008-12-07 19:07:11Z ryan $
55
public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
57
static final Logger log = LoggerFactory.getLogger( StreamingUpdateSolrServer.class );
59
final BlockingQueue<UpdateRequest> queue;
60
final ExecutorService scheduler = Executors.newCachedThreadPool();
61
final String updateUrl = "/update";
62
final Queue<Runner> runners;
63
volatile CountDownLatch lock = null; // used to block everything
64
final int threadCount;
67
* Uses an internal MultiThreadedHttpConnectionManager to manage http connections
69
* @param solrServerUrl The Solr server URL
70
* @param queueSize The buffer size before the documents are sent to the server
71
* @param threadCount The number of background threads used to empty the queue
72
* @throws MalformedURLException
74
public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount) throws MalformedURLException {
75
this(solrServerUrl, null, queueSize, threadCount);
79
* Uses the supplied HttpClient to send documents to the Solr server, the HttpClient should be instantiated using a
80
* MultiThreadedHttpConnectionManager.
82
public StreamingUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount) throws MalformedURLException {
83
super(solrServerUrl, client);
84
queue = new LinkedBlockingQueue<UpdateRequest>(queueSize);
85
this.threadCount = threadCount;
86
runners = new LinkedList<Runner>();
90
* Opens a connection and sends everything...
92
class Runner implements Runnable {
93
final Lock runnerLock = new ReentrantLock();
98
// info is ok since this should only happen once for each thread
99
log.info( "starting runner: {}" , this );
100
PostMethod method = null;
104
RequestEntity request = new RequestEntity() {
105
// we don't know the length
106
public long getContentLength() { return -1; }
107
public String getContentType() { return ClientUtils.TEXT_XML; }
108
public boolean isRepeatable() { return false; }
110
public void writeRequest(OutputStream out) throws IOException {
112
OutputStreamWriter writer = new OutputStreamWriter(out, "UTF-8");
113
writer.append( "<stream>" ); // can be anything...
114
UpdateRequest req = queue.poll( 250, TimeUnit.MILLISECONDS );
115
while( req != null ) {
116
log.debug( "sending: {}" , req );
117
req.writeXML( writer );
119
// check for commit or optimize
120
SolrParams params = req.getParams();
121
if( params != null ) {
123
if( params.getBool( UpdateParams.OPTIMIZE, false ) ) {
124
fmt = "<optimize waitSearcher=\"%s\" waitFlush=\"%s\" />";
126
else if( params.getBool( UpdateParams.COMMIT, false ) ) {
127
fmt = "<commit waitSearcher=\"%s\" waitFlush=\"%s\" />";
131
writer.write( String.format( fmt,
132
params.getBool( UpdateParams.WAIT_SEARCHER, false )+"",
133
params.getBool( UpdateParams.WAIT_FLUSH, false )+"") );
138
req = queue.poll( 250, TimeUnit.MILLISECONDS );
140
writer.append( "</stream>" );
143
catch (InterruptedException e) {
149
method = new PostMethod(_baseURL+updateUrl );
150
method.setRequestEntity( request );
151
method.setFollowRedirects( false );
152
method.addRequestHeader( "User-Agent", AGENT );
154
int statusCode = getHttpClient().executeMethod(method);
155
if (statusCode != HttpStatus.SC_OK) {
156
StringBuilder msg = new StringBuilder();
157
msg.append( method.getStatusLine().getReasonPhrase() );
158
msg.append( "\n\n" );
159
msg.append( method.getStatusText() );
160
msg.append( "\n\n" );
161
msg.append( "request: "+method.getURI() );
162
handleError( new Exception( msg.toString() ) );
166
// make sure to release the connection
168
method.releaseConnection();
170
catch( Exception ex ){}
172
} while( ! queue.isEmpty());
174
catch (Throwable e) {
179
// remove it from the list of running things unless we are the last runner and the queue is full...
180
// in which case, the next queue.put() would block and there would be no runners to handle it.
181
// This case has been further handled by using offer instead of put, and using a retry loop
182
// to avoid blocking forever (see request()).
183
synchronized (runners) {
184
if (runners.size() == 1 && queue.remainingCapacity() == 0) {
185
// keep this runner alive
186
scheduler.execute(this);
188
runners.remove( this );
192
log.info( "finished: {}" , this );
199
public NamedList<Object> request( final SolrRequest request ) throws SolrServerException, IOException
201
if( !(request instanceof UpdateRequest) ) {
202
return super.request( request );
204
UpdateRequest req = (UpdateRequest)request;
206
// this happens for commit...
207
if( req.getDocuments()==null || req.getDocuments().isEmpty() ) {
208
blockUntilFinished();
209
return super.request( request );
212
SolrParams params = req.getParams();
213
if( params != null ) {
214
// check if it is waiting for the searcher
215
if( params.getBool( UpdateParams.WAIT_SEARCHER, false ) ) {
216
log.info( "blocking for commit/optimize" );
217
blockUntilFinished(); // empty the queue
218
return super.request( request );
223
CountDownLatch tmpLock = lock;
224
if( tmpLock != null ) {
228
boolean success = queue.offer(req);
231
synchronized( runners ) {
232
if( runners.isEmpty()
233
|| (queue.remainingCapacity() < queue.size() // queue is half full and we can add more runners
234
&& runners.size() < threadCount) )
236
// We need more runners, so start a new one.
237
Runner r = new Runner();
239
scheduler.execute( r );
241
// break out of the retry loop if we added the element to the queue successfully, *and*
242
// while we are still holding the runners lock to prevent race conditions.
248
// Retry to add to the queue w/o the runners lock held (else we risk temporary deadlock)
249
// This retry could also fail because
250
// 1) existing runners were not able to take off any new elements in the queue
251
// 2) the queue was filled back up since our last try
252
// If we succeed, the queue may have been completely emptied, and all runners stopped.
253
// In all cases, we should loop back to the top to see if we need to start more runners.
256
success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
263
catch (InterruptedException e) {
264
log.error( "interrupted", e );
265
throw new IOException( e.getLocalizedMessage() );
268
// RETURN A DUMMY result
269
NamedList<Object> dummy = new NamedList<Object>();
270
dummy.add( "NOTE", "the request is processed in a background stream" );
274
public synchronized void blockUntilFinished()
276
lock = new CountDownLatch(1);
278
// Wait until no runners are running
281
synchronized(runners) {
282
runner = runners.peek();
284
if (runner == null) break;
285
runner.runnerLock.lock();
286
runner.runnerLock.unlock();
294
public void handleError( Throwable ex )
296
log.error( "error", ex );