1
package org.apache.solr.update;
4
* Licensed to the Apache Software Foundation (ASF) under one or more
5
* contributor license agreements. See the NOTICE file distributed with
6
* this work for additional information regarding copyright ownership.
7
* The ASF licenses this file to You under the Apache License, Version 2.0
8
* (the "License"); you may not use this file except in compliance with
9
* the License. You may obtain a copy of the License at
11
* http://www.apache.org/licenses/LICENSE-2.0
13
* Unless required by applicable law or agreed to in writing, software
14
* distributed under the License is distributed on an "AS IS" BASIS,
15
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
* See the License for the specific language governing permissions and
17
* limitations under the License.
20
import java.util.concurrent.Executors;
21
import java.util.concurrent.ScheduledExecutorService;
22
import java.util.concurrent.ScheduledFuture;
23
import java.util.concurrent.TimeUnit;
24
import java.util.concurrent.atomic.AtomicInteger;
25
import java.util.concurrent.atomic.AtomicLong;
27
import org.apache.solr.common.SolrException;
28
import org.apache.solr.common.params.ModifiableSolrParams;
29
import org.apache.solr.core.SolrCore;
30
import org.apache.solr.request.LocalSolrQueryRequest;
31
import org.apache.solr.request.SolrQueryRequest;
32
import org.slf4j.Logger;
33
import org.slf4j.LoggerFactory;
36
* Helper class for tracking autoCommit state.
38
* Note: This is purely an implementation detail of autoCommit and will
39
* definitely change in the future, so the interface should not be relied-upon
41
* Note: all access must be synchronized.
43
final class CommitTracker implements Runnable {
44
protected final static Logger log = LoggerFactory.getLogger(CommitTracker.class);
46
// scheduler delay for maxDoc-triggered autocommits
47
public final int DOC_COMMIT_DELAY_MS = 1;
49
// settings, not final so we can change them in testing
50
private int docsUpperBound;
51
private long timeUpperBound;
53
private final ScheduledExecutorService scheduler = Executors
54
.newScheduledThreadPool(1);
55
private ScheduledFuture pending;
58
private AtomicLong docsSinceCommit = new AtomicLong(0);
59
private AtomicInteger autoCommitCount = new AtomicInteger(0);
61
private final SolrCore core;
63
private final boolean softCommit;
64
private final boolean waitSearcher;
68
public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, boolean waitSearcher, boolean softCommit) {
73
this.docsUpperBound = docsUpperBound;
74
this.timeUpperBound = timeUpperBound;
76
this.softCommit = softCommit;
77
this.waitSearcher = waitSearcher;
79
SolrCore.log.info(name + " AutoCommit: " + this);
82
public synchronized void close() {
83
if (pending != null) {
87
scheduler.shutdownNow();
90
/** schedule individual commits */
91
public void scheduleCommitWithin(long commitMaxTime) {
92
_scheduleCommitWithin(commitMaxTime);
95
private void _scheduleCommitWithin(long commitMaxTime) {
96
if (commitMaxTime <= 0) return;
98
if (pending != null && pending.getDelay(TimeUnit.MILLISECONDS) <= commitMaxTime) {
99
// There is already a pending commit that will happen first, so
100
// nothing else to do here.
101
// log.info("###returning since getDelay()==" + pending.getDelay(TimeUnit.MILLISECONDS) + " less than " + commitMaxTime);
106
if (pending != null) {
107
// we need to schedule a commit to happen sooner than the existing one,
108
// so lets try to cancel the existing one first.
109
boolean canceled = pending.cancel(false);
111
// It looks like we can't cancel... it must have just started running!
112
// this is possible due to thread scheduling delays and a low commitMaxTime.
113
// Nothing else to do since we obviously can't schedule our commit *before*
114
// the one that just started running (or has just completed).
115
// log.info("###returning since cancel failed");
120
// log.info("###scheduling for " + commitMaxTime);
122
// schedule our new commit
123
pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
128
* Indicate that documents have been added
130
public void addedDocument(int commitWithin) {
131
// maxDocs-triggered autoCommit. Use == instead of > so we only trigger once on the way up
132
if (docsUpperBound > 0) {
133
long docs = docsSinceCommit.incrementAndGet();
134
if (docs == docsUpperBound + 1) {
135
// reset the count here instead of run() so we don't miss other documents being added
136
docsSinceCommit.set(0);
137
_scheduleCommitWithin(DOC_COMMIT_DELAY_MS);
141
// maxTime-triggered autoCommit
142
long ctime = (commitWithin > 0) ? commitWithin : timeUpperBound;
145
_scheduleCommitWithin(ctime);
149
/** Inform tracker that a commit has occurred */
150
public void didCommit() {
153
/** Inform tracker that a rollback has occurred, cancel any pending commits */
154
public void didRollback() {
155
synchronized (this) {
156
if (pending != null) {
157
pending.cancel(false);
158
pending = null; // let it start another one
160
docsSinceCommit.set(0);
164
/** This is the worker part for the ScheduledFuture **/
166
synchronized (this) {
167
// log.info("###start commit. pending=null");
168
pending = null; // allow a new commit to be scheduled
171
SolrQueryRequest req = new LocalSolrQueryRequest(core,
172
new ModifiableSolrParams());
174
CommitUpdateCommand command = new CommitUpdateCommand(false);
175
command.waitSearcher = waitSearcher;
176
// no need for command.maxOptimizeSegments = 1; since it is not optimizing
178
// we increment this *before* calling commit because it was causing a race
179
// in the tests (the new searcher was registered and the test proceeded
180
// to check the commit count before we had incremented it.)
181
autoCommitCount.incrementAndGet();
183
core.getUpdateHandler().commit(command);
184
} catch (Exception e) {
185
SolrException.log(log, "auto commit error...", e);
187
// log.info("###done committing");
192
// to facilitate testing: blocks if called during commit
193
public int getCommitCount() {
194
return autoCommitCount.get();
198
public String toString() {
199
if (timeUpperBound > 0 || docsUpperBound > 0) {
200
return (timeUpperBound > 0 ? ("if uncommited for " + timeUpperBound + "ms; ")
202
+ (docsUpperBound > 0 ? ("if " + docsUpperBound + " uncommited docs ")
210
public long getTimeUpperBound() {
211
return timeUpperBound;
214
int getDocsUpperBound() {
215
return docsUpperBound;
218
void setDocsUpperBound(int docsUpperBound) {
219
this.docsUpperBound = docsUpperBound;
222
void setTimeUpperBound(long timeUpperBound) {
223
this.timeUpperBound = timeUpperBound;