2
* Copyright 2003-2010 Terracotta, Inc.
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
8
* http://www.apache.org/licenses/LICENSE-2.0
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
16
package net.sf.ehcache.transaction.xa;
18
import java.util.HashMap;
19
import java.util.Iterator;
20
import java.util.List;
22
import java.util.Map.Entry;
24
import java.util.concurrent.ConcurrentHashMap;
26
import javax.transaction.RollbackException;
27
import javax.transaction.SystemException;
28
import javax.transaction.Transaction;
29
import javax.transaction.xa.XAException;
31
import net.sf.ehcache.CacheEntry;
32
import net.sf.ehcache.CacheException;
33
import net.sf.ehcache.Ehcache;
34
import net.sf.ehcache.Element;
35
import net.sf.ehcache.search.attribute.AttributeExtractor;
36
import net.sf.ehcache.store.ElementValueComparator;
37
import net.sf.ehcache.store.Store;
38
import net.sf.ehcache.store.compound.ReadWriteCopyStrategy;
39
import net.sf.ehcache.transaction.AbstractTransactionStore;
40
import net.sf.ehcache.transaction.SoftLock;
41
import net.sf.ehcache.transaction.SoftLockFactory;
42
import net.sf.ehcache.transaction.TransactionAwareAttributeExtractor;
43
import net.sf.ehcache.transaction.TransactionException;
44
import net.sf.ehcache.transaction.TransactionIDFactory;
45
import net.sf.ehcache.transaction.TransactionInterruptedException;
46
import net.sf.ehcache.transaction.TransactionTimeoutException;
47
import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
48
import net.sf.ehcache.transaction.xa.commands.StorePutCommand;
49
import net.sf.ehcache.transaction.xa.commands.StoreRemoveCommand;
50
import net.sf.ehcache.util.LargeSet;
51
import net.sf.ehcache.util.SetWrapperList;
52
import net.sf.ehcache.writer.CacheWriterManager;
54
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
58
* @author Ludovic Orban
60
public class XATransactionStore extends AbstractTransactionStore {
62
private static final Logger LOG = LoggerFactory.getLogger(XATransactionStore.class.getName());
63
private static final long MILLISECOND_PER_SECOND = 1000L;
65
private final TransactionManagerLookup transactionManagerLookup;
66
private final TransactionIDFactory transactionIdFactory;
67
private final SoftLockFactory softLockFactory;
68
private final Ehcache cache;
70
private final ConcurrentHashMap<Transaction, EhcacheXAResource> transactionToXAResourceMap =
71
new ConcurrentHashMap<Transaction, EhcacheXAResource>();
72
private final ConcurrentHashMap<Transaction, Long> transactionToTimeoutMap = new ConcurrentHashMap<Transaction, Long>();
76
* @param transactionManagerLookup the transaction manager lookup implementation
77
* @param softLockFactory the soft lock factory
78
* @param transactionIdFactory the transaction ID factory
79
* @param cache the cache
80
* @param store the underlying store
81
* @param copyStrategy the original copy strategy
83
public XATransactionStore(TransactionManagerLookup transactionManagerLookup, SoftLockFactory softLockFactory,
84
TransactionIDFactory transactionIdFactory, Ehcache cache, Store store,
85
ReadWriteCopyStrategy<Element> copyStrategy) {
86
super(store, copyStrategy);
87
this.transactionManagerLookup = transactionManagerLookup;
88
this.transactionIdFactory = transactionIdFactory;
89
if (transactionManagerLookup.getTransactionManager() == null) {
90
throw new TransactionException("no JTA transaction manager could be located, cannot bind twopc cache with JTA");
92
this.softLockFactory = softLockFactory;
96
private Transaction getCurrentTransaction() throws SystemException {
97
Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
98
if (transaction == null) {
99
throw new TransactionException("JTA transaction not started");
105
* Get or create the XAResource of this XA store
106
* @return the EhcacheXAResource of this store
107
* @throws SystemException when something goes wrong with the transaction manager
109
public EhcacheXAResourceImpl getOrCreateXAResource() throws SystemException {
110
Transaction transaction = getCurrentTransaction();
111
EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
112
if (xaResource == null) {
113
LOG.debug("creating new XAResource");
114
xaResource = new EhcacheXAResourceImpl(cache, underlyingStore, transactionManagerLookup,
115
softLockFactory, transactionIdFactory, copyStrategy);
116
transactionToXAResourceMap.put(transaction, xaResource);
117
xaResource.addTwoPcExecutionListener(new CleanupXAResource(getCurrentTransaction()));
122
private XATransactionContext getTransactionContext() {
124
Transaction transaction = getCurrentTransaction();
125
EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
126
if (xaResource == null) {
129
XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
131
if (transactionContext == null) {
132
transactionManagerLookup.register(xaResource);
133
LOG.debug("creating new XA context");
134
transactionContext = xaResource.createTransactionContext();
135
xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
137
transactionContext = xaResource.getCurrentTransactionContext();
140
LOG.debug("using XA context {}", transactionContext);
141
return transactionContext;
142
} catch (SystemException e) {
143
throw new TransactionException("cannot get the current transaction", e);
144
} catch (RollbackException e) {
145
throw new TransactionException("transaction rolled back", e);
149
private XATransactionContext getOrCreateTransactionContext() {
151
EhcacheXAResourceImpl xaResource = getOrCreateXAResource();
152
XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
154
if (transactionContext == null) {
155
transactionManagerLookup.register(xaResource);
156
LOG.debug("creating new XA context");
157
transactionContext = xaResource.createTransactionContext();
158
xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
160
transactionContext = xaResource.getCurrentTransactionContext();
163
LOG.debug("using XA context {}", transactionContext);
164
return transactionContext;
165
} catch (SystemException e) {
166
throw new TransactionException("cannot get the current transaction", e);
167
} catch (RollbackException e) {
168
throw new TransactionException("transaction rolled back", e);
173
* This class is used to clean up the transactionToXAResourceMap after a transaction
174
* committed or rolled back.
176
private final class CleanupXAResource implements XAExecutionListener {
177
private final Transaction transaction;
179
private CleanupXAResource(Transaction transaction) {
180
this.transaction = transaction;
183
public void beforePrepare(EhcacheXAResource xaResource) {
186
public void afterCommitOrRollback(EhcacheXAResource xaResource) {
187
transactionToXAResourceMap.remove(transaction);
188
transactionToTimeoutMap.remove(transaction);
193
* This class is used to unregister the XAResource after a transaction
194
* committed or rolled back.
196
private final class UnregisterXAResource implements XAExecutionListener {
198
public void beforePrepare(EhcacheXAResource xaResource) {
201
public void afterCommitOrRollback(EhcacheXAResource xaResource) {
202
transactionManagerLookup.unregister(xaResource);
208
* @return milliseconds left before timeout
210
private long assertNotTimedOut() {
212
if (Thread.interrupted()) {
213
throw new TransactionInterruptedException("transaction interrupted");
216
Transaction transaction = getCurrentTransaction();
218
EhcacheXAResource xaResource = transactionToXAResourceMap.get(transaction);
219
Long timeoutTimestamp = transactionToTimeoutMap.get(transaction);
220
if (xaResource != null && timeoutTimestamp == null) {
221
int xaResourceTimeout = xaResource.getTransactionTimeout();
223
timeoutTimestamp = System.currentTimeMillis() + (xaResourceTimeout * MILLISECOND_PER_SECOND);
224
transactionToTimeoutMap.put(transaction, timeoutTimestamp);
225
} else if (timeoutTimestamp == null) {
226
int defaultTransactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
227
timeoutTimestamp = System.currentTimeMillis() + (defaultTransactionTimeout * MILLISECOND_PER_SECOND);
228
transactionToTimeoutMap.put(transaction, timeoutTimestamp);
231
if (timeoutTimestamp <= System.currentTimeMillis()) {
232
throw new TransactionTimeoutException("transaction timed out");
235
return timeoutTimestamp - System.currentTimeMillis();
236
} catch (SystemException e) {
237
throw new TransactionException("cannot get the current transaction", e);
238
} catch (XAException e) {
239
throw new TransactionException("cannot get the XAResource transaction timeout", e);
243
/* transactional methods */
248
public Element get(Object key) {
249
LOG.debug("cache {} get {}", cache.getName(), key);
250
XATransactionContext context = getTransactionContext();
252
if (context == null) {
253
element = getFromUnderlyingStore(key);
255
element = context.get(key);
256
if (element == null && !context.isRemoved(key)) {
257
element = getFromUnderlyingStore(key);
260
return copyElementForRead(element);
267
public Element getQuiet(Object key) {
268
LOG.debug("cache {} getQuiet {}", cache.getName(), key);
269
XATransactionContext context = getTransactionContext();
271
if (context == null) {
272
element = getQuietFromUnderlyingStore(key);
274
element = context.get(key);
275
if (element == null && !context.isRemoved(key)) {
276
element = getQuietFromUnderlyingStore(key);
279
return copyElementForRead(element);
285
public int getSize() {
286
LOG.debug("cache {} getSize", cache.getName());
287
XATransactionContext context = getOrCreateTransactionContext();
288
int size = underlyingStore.getSize();
289
return size + context.getSizeModifier();
295
public int getTerracottaClusteredSize() {
297
Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
298
if (transaction == null) {
299
return underlyingStore.getTerracottaClusteredSize();
301
} catch (SystemException se) {
302
throw new TransactionException("cannot get the current transaction", se);
305
LOG.debug("cache {} getTerracottaClusteredSize", cache.getName());
306
XATransactionContext context = getOrCreateTransactionContext();
307
int size = underlyingStore.getTerracottaClusteredSize();
308
return size + context.getSizeModifier();
314
public boolean containsKey(Object key) {
315
LOG.debug("cache {} containsKey", cache.getName(), key);
316
XATransactionContext context = getOrCreateTransactionContext();
317
return !context.isRemoved(key) && (context.getAddedKeys().contains(key) || underlyingStore.containsKey(key));
323
public List getKeys() {
324
LOG.debug("cache {} getKeys", cache.getName());
325
XATransactionContext context = getOrCreateTransactionContext();
326
Set<Object> keys = new LargeSet<Object>() {
329
public int sourceSize() {
330
return underlyingStore.getSize();
334
public Iterator<Object> sourceIterator() {
335
return underlyingStore.getKeys().iterator();
338
keys.addAll(context.getAddedKeys());
339
keys.removeAll(context.getRemovedKeys());
340
return new SetWrapperList(keys);
344
private Element getFromUnderlyingStore(final Object key) {
346
long timeLeft = assertNotTimedOut();
347
LOG.debug("cache {} underlying.get key {} not timed out, time left: " + timeLeft, cache.getName(), key);
349
Element element = underlyingStore.get(key);
350
if (element == null) {
353
Object value = element.getObjectValue();
354
if (value instanceof SoftLock) {
355
SoftLock softLock = (SoftLock) value;
357
LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
358
boolean gotLock = softLock.tryLock(timeLeft);
360
softLock.clearTryLock();
362
} catch (InterruptedException e) {
363
Thread.currentThread().interrupt();
371
private Element getQuietFromUnderlyingStore(final Object key) {
373
long timeLeft = assertNotTimedOut();
374
LOG.debug("cache {} underlying.getQuiet key {} not timed out, time left: " + timeLeft, cache.getName(), key);
376
Element element = underlyingStore.getQuiet(key);
377
if (element == null) {
380
Object value = element.getObjectValue();
381
if (value instanceof SoftLock) {
382
SoftLock softLock = (SoftLock) value;
384
LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
385
boolean gotLock = softLock.tryLock(timeLeft);
387
softLock.clearTryLock();
389
} catch (InterruptedException e) {
390
Thread.currentThread().interrupt();
398
private Element getCurrentElement(final Object key, final XATransactionContext context) {
399
Element previous = context.get(key);
400
if (previous == null && !context.isRemoved(key)) {
401
previous = getQuietFromUnderlyingStore(key);
409
public boolean put(Element element) throws CacheException {
410
LOG.debug("cache {} put {}", cache.getName(), element);
411
// this forces enlistment so the XA transaction timeout can be propagated to the XA resource
412
getOrCreateTransactionContext();
414
Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
415
return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
421
public boolean putWithWriter(Element element, CacheWriterManager writerManager) throws CacheException {
422
LOG.debug("cache {} putWithWriter {}", cache.getName(), element);
423
// this forces enlistment so the XA transaction timeout can be propagated to the XA resource
424
getOrCreateTransactionContext();
426
Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
427
if (writerManager != null) {
428
writerManager.put(element);
430
cache.getWriterManager().put(element);
432
return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
435
private boolean internalPut(final StorePutCommand putCommand) {
436
final Element element = putCommand.getElement();
438
if (element == null) {
441
XATransactionContext context = getOrCreateTransactionContext();
442
// In case this key is currently being updated...
443
isNull = underlyingStore.get(element.getKey()) == null;
445
isNull = context.get(element.getKey()) == null;
447
context.addCommand(putCommand, element);
455
public Element remove(Object key) {
456
LOG.debug("cache {} remove {}", cache.getName(), key);
457
// this forces enlistment so the XA transaction timeout can be propagated to the XA resource
458
getOrCreateTransactionContext();
460
Element oldElement = getQuietFromUnderlyingStore(key);
461
return removeInternal(new StoreRemoveCommand(key, oldElement));
464
private Element removeInternal(final StoreRemoveCommand command) {
465
Element element = command.getEntry().getElement();
466
getOrCreateTransactionContext().addCommand(command, element);
467
return copyElementForRead(element);
473
public Element removeWithWriter(Object key, CacheWriterManager writerManager) throws CacheException {
474
LOG.debug("cache {} removeWithWriter {}", cache.getName(), key);
475
// this forces enlistment so the XA transaction timeout can be propagated to the XA resource
476
getOrCreateTransactionContext();
478
Element oldElement = getQuietFromUnderlyingStore(key);
479
if (writerManager != null) {
480
writerManager.remove(new CacheEntry(key, null));
482
cache.getWriterManager().remove(new CacheEntry(key, null));
484
return removeInternal(new StoreRemoveCommand(key, oldElement));
490
public void removeAll() throws CacheException {
491
LOG.debug("cache {} removeAll", cache.getName());
492
List keys = getKeys();
493
for (Object key : keys) {
501
public Element putIfAbsent(Element element) throws NullPointerException {
502
LOG.debug("cache {} putIfAbsent {}", cache.getName(), element);
503
XATransactionContext context = getOrCreateTransactionContext();
504
Element previous = getCurrentElement(element.getObjectKey(), context);
506
if (previous == null) {
507
Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
508
Element elementForWrite = copyElementForWrite(element);
509
context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
512
return copyElementForRead(previous);
518
public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException {
519
LOG.debug("cache {} removeElement {}", cache.getName(), element);
520
XATransactionContext context = getOrCreateTransactionContext();
521
Element previous = getCurrentElement(element.getKey(), context);
523
Element elementForWrite = copyElementForWrite(element);
524
if (previous != null && comparator.equals(previous, elementForWrite)) {
525
Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
526
context.addCommand(new StoreRemoveCommand(element.getObjectKey(), oldElement), elementForWrite);
527
return copyElementForRead(previous);
535
public boolean replace(Element old, Element element, ElementValueComparator comparator)
536
throws NullPointerException, IllegalArgumentException {
537
LOG.debug("cache {} replace2 {}", cache.getName(), element);
538
XATransactionContext context = getOrCreateTransactionContext();
539
Element previous = getCurrentElement(element.getKey(), context);
541
boolean replaced = false;
542
if (previous != null && comparator.equals(previous, copyElementForWrite(old))) {
543
Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
544
Element elementForWrite = copyElementForWrite(element);
545
context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
554
public Element replace(Element element) throws NullPointerException {
555
LOG.debug("cache {} replace1 {}", cache.getName(), element);
556
XATransactionContext context = getOrCreateTransactionContext();
557
Element previous = getCurrentElement(element.getKey(), context);
559
if (previous != null) {
560
Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
561
Element elementForWrite = copyElementForWrite(element);
562
context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
564
return copyElementForRead(previous);
571
public void setAttributeExtractors(Map<String, AttributeExtractor> extractors) {
572
Map<String, AttributeExtractor> wrappedExtractors = new HashMap(extractors.size());
573
for (Entry<String, AttributeExtractor> e : extractors.entrySet()) {
574
wrappedExtractors.put(e.getKey(), new TransactionAwareAttributeExtractor(copyStrategy, e.getValue()));
576
underlyingStore.setAttributeExtractors(wrappedExtractors);