~slub.team/goobi-indexserver/3.x

« back to all changes in this revision

Viewing changes to solr/solrj/src/java/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java

  • Committer: Sebastian Meyer
  • Date: 2012-08-03 09:12:40 UTC
  • Revision ID: sebastian.meyer@slub-dresden.de-20120803091240-x6861b0vabq1xror
Remove Lucene and Solr source code and add patches instead
Fix Bug #985487: Auto-suggestion for the search interface

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/**
2
 
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 
 * contributor license agreements.  See the NOTICE file distributed with
4
 
 * this work for additional information regarding copyright ownership.
5
 
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 
 * (the "License"); you may not use this file except in compliance with
7
 
 * the License.  You may obtain a copy of the License at
8
 
 *
9
 
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 
 *
11
 
 * Unless required by applicable law or agreed to in writing, software
12
 
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 
 * See the License for the specific language governing permissions and
15
 
 * limitations under the License.
16
 
 */
17
 
 
18
 
package org.apache.solr.client.solrj.impl;
19
 
 
20
 
import java.io.IOException;
21
 
import java.io.OutputStream;
22
 
import java.io.OutputStreamWriter;
23
 
import java.net.MalformedURLException;
24
 
import java.util.LinkedList;
25
 
import java.util.Queue;
26
 
import java.util.concurrent.*;
27
 
import java.util.concurrent.locks.Lock;
28
 
import java.util.concurrent.locks.ReentrantLock;
29
 
 
30
 
import org.apache.commons.httpclient.HttpClient;
31
 
import org.apache.commons.httpclient.HttpStatus;
32
 
import org.apache.commons.httpclient.methods.PostMethod;
33
 
import org.apache.commons.httpclient.methods.RequestEntity;
34
 
import org.apache.solr.client.solrj.SolrRequest;
35
 
import org.apache.solr.client.solrj.SolrServerException;
36
 
import org.apache.solr.client.solrj.request.UpdateRequest;
37
 
import org.apache.solr.client.solrj.util.ClientUtils;
38
 
import org.apache.solr.common.params.SolrParams;
39
 
import org.apache.solr.common.params.UpdateParams;
40
 
import org.apache.solr.common.util.NamedList;
41
 
import org.slf4j.Logger;
42
 
import org.slf4j.LoggerFactory;
43
 
 
44
 
/**
45
 
 * {@link StreamingUpdateSolrServer} buffers all added documents and writes them
46
 
 * into open HTTP connections. This class is thread safe.
47
 
 * 
48
 
 * Although any SolrServer request can be made with this implementation, 
49
 
 * it is only recommended to use the {@link StreamingUpdateSolrServer} with
50
 
 * /update requests.  The query interface is better suited for 
51
 
 * 
52
 
 * @version $Id: CommonsHttpSolrServer.java 724175 2008-12-07 19:07:11Z ryan $
53
 
 * @since solr 1.4
54
 
 */
55
 
public class StreamingUpdateSolrServer extends CommonsHttpSolrServer
56
 
{
57
 
  static final Logger log = LoggerFactory.getLogger( StreamingUpdateSolrServer.class );
58
 
  
59
 
  final BlockingQueue<UpdateRequest> queue;
60
 
  final ExecutorService scheduler = Executors.newCachedThreadPool();
61
 
  final String updateUrl = "/update";
62
 
  final Queue<Runner> runners;
63
 
  volatile CountDownLatch lock = null;  // used to block everything
64
 
  final int threadCount;
65
 
 
66
 
  /**
67
 
   * Uses an internal MultiThreadedHttpConnectionManager to manage http connections
68
 
   *
69
 
   * @param solrServerUrl The Solr server URL
70
 
   * @param queueSize     The buffer size before the documents are sent to the server
71
 
   * @param threadCount   The number of background threads used to empty the queue
72
 
   * @throws MalformedURLException
73
 
   */
74
 
  public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount) throws MalformedURLException {
75
 
    this(solrServerUrl, null, queueSize, threadCount);
76
 
  }
77
 
 
78
 
  /**
79
 
   * Uses the supplied HttpClient to send documents to the Solr server, the HttpClient should be instantiated using a
80
 
   * MultiThreadedHttpConnectionManager.
81
 
   */
82
 
  public StreamingUpdateSolrServer(String solrServerUrl, HttpClient client, int queueSize, int threadCount) throws MalformedURLException {
83
 
    super(solrServerUrl, client);
84
 
    queue = new LinkedBlockingQueue<UpdateRequest>(queueSize);
85
 
    this.threadCount = threadCount;
86
 
    runners = new LinkedList<Runner>();
87
 
  }
88
 
 
89
 
  /**
90
 
   * Opens a connection and sends everything...
91
 
   */
92
 
  class Runner implements Runnable {
93
 
    final Lock runnerLock = new ReentrantLock();
94
 
 
95
 
    public void run() {
96
 
      runnerLock.lock();
97
 
 
98
 
      // info is ok since this should only happen once for each thread
99
 
      log.info( "starting runner: {}" , this );
100
 
      PostMethod method = null;
101
 
      try {
102
 
        do {
103
 
          try {
104
 
            RequestEntity request = new RequestEntity() {
105
 
              // we don't know the length
106
 
              public long getContentLength() { return -1; }
107
 
              public String getContentType() { return ClientUtils.TEXT_XML; }
108
 
              public boolean isRepeatable()  { return false; }
109
 
      
110
 
              public void writeRequest(OutputStream out) throws IOException {
111
 
                try {
112
 
                  OutputStreamWriter writer = new OutputStreamWriter(out, "UTF-8");
113
 
                  writer.append( "<stream>" ); // can be anything...
114
 
                  UpdateRequest req = queue.poll( 250, TimeUnit.MILLISECONDS );
115
 
                  while( req != null ) {
116
 
                    log.debug( "sending: {}" , req );
117
 
                    req.writeXML( writer ); 
118
 
                    
119
 
                    // check for commit or optimize
120
 
                    SolrParams params = req.getParams();
121
 
                    if( params != null ) {
122
 
                      String fmt = null;
123
 
                      if( params.getBool( UpdateParams.OPTIMIZE, false ) ) {
124
 
                        fmt = "<optimize waitSearcher=\"%s\" waitFlush=\"%s\" />";
125
 
                      }
126
 
                      else if( params.getBool( UpdateParams.COMMIT, false ) ) {
127
 
                        fmt = "<commit waitSearcher=\"%s\" waitFlush=\"%s\" />";
128
 
                      }
129
 
                      if( fmt != null ) {
130
 
                        log.info( fmt );
131
 
                        writer.write( String.format( fmt, 
132
 
                            params.getBool( UpdateParams.WAIT_SEARCHER, false )+"",
133
 
                            params.getBool( UpdateParams.WAIT_FLUSH, false )+"") );
134
 
                      }
135
 
                    }
136
 
                    
137
 
                    writer.flush();
138
 
                    req = queue.poll( 250, TimeUnit.MILLISECONDS );
139
 
                  }
140
 
                  writer.append( "</stream>" );
141
 
                  writer.flush();
142
 
                }
143
 
                catch (InterruptedException e) {
144
 
                  e.printStackTrace();
145
 
                }
146
 
              }
147
 
            };
148
 
          
149
 
            method = new PostMethod(_baseURL+updateUrl );
150
 
            method.setRequestEntity( request );
151
 
            method.setFollowRedirects( false );
152
 
            method.addRequestHeader( "User-Agent", AGENT );
153
 
            
154
 
            int statusCode = getHttpClient().executeMethod(method);
155
 
            if (statusCode != HttpStatus.SC_OK) {
156
 
              StringBuilder msg = new StringBuilder();
157
 
              msg.append( method.getStatusLine().getReasonPhrase() );
158
 
              msg.append( "\n\n" );
159
 
              msg.append( method.getStatusText() );
160
 
              msg.append( "\n\n" );
161
 
              msg.append( "request: "+method.getURI() );
162
 
              handleError( new Exception( msg.toString() ) );
163
 
            }
164
 
          } finally {
165
 
            try {
166
 
              // make sure to release the connection
167
 
              if(method != null)
168
 
                method.releaseConnection();
169
 
            }
170
 
            catch( Exception ex ){}
171
 
          }
172
 
        } while( ! queue.isEmpty());
173
 
      }
174
 
      catch (Throwable e) {
175
 
        handleError( e );
176
 
      }
177
 
      finally {
178
 
 
179
 
        // remove it from the list of running things unless we are the last runner and the queue is full...
180
 
        // in which case, the next queue.put() would block and there would be no runners to handle it.
181
 
        // This case has been further handled by using offer instead of put, and using a retry loop
182
 
        // to avoid blocking forever (see request()).
183
 
        synchronized (runners) {
184
 
          if (runners.size() == 1 && queue.remainingCapacity() == 0) {
185
 
           // keep this runner alive
186
 
           scheduler.execute(this);
187
 
          } else {
188
 
            runners.remove( this );
189
 
          }
190
 
        }
191
 
 
192
 
        log.info( "finished: {}" , this );
193
 
        runnerLock.unlock();
194
 
      }
195
 
    }
196
 
  }
197
 
  
198
 
  @Override
199
 
  public NamedList<Object> request( final SolrRequest request ) throws SolrServerException, IOException
200
 
  {
201
 
    if( !(request instanceof UpdateRequest) ) {
202
 
      return super.request( request );
203
 
    }
204
 
    UpdateRequest req = (UpdateRequest)request;
205
 
    
206
 
    // this happens for commit...
207
 
    if( req.getDocuments()==null || req.getDocuments().isEmpty() ) {
208
 
      blockUntilFinished();
209
 
      return super.request( request );
210
 
    }
211
 
 
212
 
    SolrParams params = req.getParams();
213
 
    if( params != null ) {
214
 
      // check if it is waiting for the searcher
215
 
      if( params.getBool( UpdateParams.WAIT_SEARCHER, false ) ) {
216
 
        log.info( "blocking for commit/optimize" );
217
 
        blockUntilFinished();  // empty the queue
218
 
        return super.request( request );
219
 
      }
220
 
    }
221
 
 
222
 
    try {
223
 
      CountDownLatch tmpLock = lock;
224
 
      if( tmpLock != null ) {
225
 
        tmpLock.await();
226
 
      }
227
 
 
228
 
      boolean success = queue.offer(req);
229
 
 
230
 
      for(;;) {
231
 
        synchronized( runners ) {
232
 
          if( runners.isEmpty()
233
 
                  || (queue.remainingCapacity() < queue.size()    // queue is half full and we can add more runners
234
 
                  && runners.size() < threadCount) )
235
 
          {
236
 
            // We need more runners, so start a new one.
237
 
            Runner r = new Runner();
238
 
            runners.add( r );
239
 
            scheduler.execute( r );
240
 
          } else {
241
 
            // break out of the retry loop if we added the element to the queue successfully, *and*
242
 
            // while we are still holding the runners lock to prevent race conditions.
243
 
            // race conditions.
244
 
            if (success) break;
245
 
          }
246
 
        }
247
 
 
248
 
        // Retry to add to the queue w/o the runners lock held (else we risk temporary deadlock)
249
 
        // This retry could also fail because
250
 
        // 1) existing runners were not able to take off any new elements in the queue
251
 
        // 2) the queue was filled back up since our last try
252
 
        // If we succeed, the queue may have been completely emptied, and all runners stopped.
253
 
        // In all cases, we should loop back to the top to see if we need to start more runners.
254
 
        //
255
 
        if (!success) {
256
 
          success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
257
 
        }
258
 
 
259
 
      }
260
 
 
261
 
 
262
 
    }
263
 
    catch (InterruptedException e) {
264
 
      log.error( "interrupted", e );
265
 
      throw new IOException( e.getLocalizedMessage() );
266
 
    }
267
 
    
268
 
    // RETURN A DUMMY result
269
 
    NamedList<Object> dummy = new NamedList<Object>();
270
 
    dummy.add( "NOTE", "the request is processed in a background stream" );
271
 
    return dummy;
272
 
  }
273
 
 
274
 
  public synchronized void blockUntilFinished()
275
 
  {
276
 
    lock = new CountDownLatch(1);
277
 
    try {
278
 
      // Wait until no runners are running
279
 
      for(;;) {
280
 
        Runner runner;
281
 
        synchronized(runners) {
282
 
          runner = runners.peek();
283
 
        }
284
 
        if (runner == null) break;
285
 
        runner.runnerLock.lock();
286
 
        runner.runnerLock.unlock();
287
 
      }
288
 
    } finally {
289
 
      lock.countDown();
290
 
      lock=null;
291
 
    }
292
 
  }
293
 
  
294
 
  public void handleError( Throwable ex )
295
 
  {
296
 
    log.error( "error", ex );
297
 
  }
298
 
}