~ubuntu-branches/ubuntu/quantal/ehcache/quantal

« back to all changes in this revision

Viewing changes to src/main/java/net/sf/ehcache/transaction/xa/XATransactionStore.java

  • Committer: Package Import Robot
  • Author(s): Miguel Landaeta
  • Date: 2012-03-01 19:15:46 UTC
  • mfrom: (1.1.6) (2.1.7 sid)
  • Revision ID: package-import@ubuntu.com-20120301191546-rvlk8tomip0c2m9p
Tags: 2.5.0-1
* Team upload.
* New upstream release. (Closes: #661450).
* Bump Standards-Version to 3.9.3. No changes were required.
* Fix lintian warning with copyright file.
* Remove unnecessary dependencies on JRE for libehcache-java.
* Wrap list of dependencies and uploaders.
* Fix clean target by adding a mh_clean call, to allow twice in a row builds.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 *  Copyright 2003-2010 Terracotta, Inc.
 
3
 *
 
4
 *  Licensed under the Apache License, Version 2.0 (the "License");
 
5
 *  you may not use this file except in compliance with the License.
 
6
 *  You may obtain a copy of the License at
 
7
 *
 
8
 *      http://www.apache.org/licenses/LICENSE-2.0
 
9
 *
 
10
 *  Unless required by applicable law or agreed to in writing, software
 
11
 *  distributed under the License is distributed on an "AS IS" BASIS,
 
12
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
13
 *  See the License for the specific language governing permissions and
 
14
 *  limitations under the License.
 
15
 */
 
16
package net.sf.ehcache.transaction.xa;
 
17
 
 
18
import java.util.HashMap;
 
19
import java.util.Iterator;
 
20
import java.util.List;
 
21
import java.util.Map;
 
22
import java.util.Map.Entry;
 
23
import java.util.Set;
 
24
import java.util.concurrent.ConcurrentHashMap;
 
25
 
 
26
import javax.transaction.RollbackException;
 
27
import javax.transaction.SystemException;
 
28
import javax.transaction.Transaction;
 
29
import javax.transaction.xa.XAException;
 
30
 
 
31
import net.sf.ehcache.CacheEntry;
 
32
import net.sf.ehcache.CacheException;
 
33
import net.sf.ehcache.Ehcache;
 
34
import net.sf.ehcache.Element;
 
35
import net.sf.ehcache.search.attribute.AttributeExtractor;
 
36
import net.sf.ehcache.store.ElementValueComparator;
 
37
import net.sf.ehcache.store.Store;
 
38
import net.sf.ehcache.store.compound.ReadWriteCopyStrategy;
 
39
import net.sf.ehcache.transaction.AbstractTransactionStore;
 
40
import net.sf.ehcache.transaction.SoftLock;
 
41
import net.sf.ehcache.transaction.SoftLockFactory;
 
42
import net.sf.ehcache.transaction.TransactionAwareAttributeExtractor;
 
43
import net.sf.ehcache.transaction.TransactionException;
 
44
import net.sf.ehcache.transaction.TransactionIDFactory;
 
45
import net.sf.ehcache.transaction.TransactionInterruptedException;
 
46
import net.sf.ehcache.transaction.TransactionTimeoutException;
 
47
import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
 
48
import net.sf.ehcache.transaction.xa.commands.StorePutCommand;
 
49
import net.sf.ehcache.transaction.xa.commands.StoreRemoveCommand;
 
50
import net.sf.ehcache.util.LargeSet;
 
51
import net.sf.ehcache.util.SetWrapperList;
 
52
import net.sf.ehcache.writer.CacheWriterManager;
 
53
 
 
54
import org.slf4j.Logger;
 
55
import org.slf4j.LoggerFactory;
 
56
 
 
57
/**
 
58
 * @author Ludovic Orban
 
59
 */
 
60
public class XATransactionStore extends AbstractTransactionStore {
 
61
 
 
62
    private static final Logger LOG = LoggerFactory.getLogger(XATransactionStore.class.getName());
 
63
    private static final long MILLISECOND_PER_SECOND = 1000L;
 
64
 
 
65
    private final TransactionManagerLookup transactionManagerLookup;
 
66
    private final TransactionIDFactory transactionIdFactory;
 
67
    private final SoftLockFactory softLockFactory;
 
68
    private final Ehcache cache;
 
69
 
 
70
    private final ConcurrentHashMap<Transaction, EhcacheXAResource> transactionToXAResourceMap =
 
71
            new ConcurrentHashMap<Transaction, EhcacheXAResource>();
 
72
    private final ConcurrentHashMap<Transaction, Long> transactionToTimeoutMap = new ConcurrentHashMap<Transaction, Long>();
 
73
 
 
74
    /**
 
75
     * Constructor
 
76
     * @param transactionManagerLookup the transaction manager lookup implementation
 
77
     * @param softLockFactory the soft lock factory
 
78
     * @param transactionIdFactory the transaction ID factory
 
79
     * @param cache the cache
 
80
     * @param store the underlying store
 
81
     * @param copyStrategy the original copy strategy
 
82
     */
 
83
    public XATransactionStore(TransactionManagerLookup transactionManagerLookup, SoftLockFactory softLockFactory,
 
84
                              TransactionIDFactory transactionIdFactory, Ehcache cache, Store store,
 
85
                              ReadWriteCopyStrategy<Element> copyStrategy) {
 
86
        super(store, copyStrategy);
 
87
        this.transactionManagerLookup = transactionManagerLookup;
 
88
        this.transactionIdFactory = transactionIdFactory;
 
89
        if (transactionManagerLookup.getTransactionManager() == null) {
 
90
            throw new TransactionException("no JTA transaction manager could be located, cannot bind twopc cache with JTA");
 
91
        }
 
92
        this.softLockFactory = softLockFactory;
 
93
        this.cache = cache;
 
94
    }
 
95
 
 
96
    private Transaction getCurrentTransaction() throws SystemException {
 
97
        Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
 
98
        if (transaction == null) {
 
99
            throw new TransactionException("JTA transaction not started");
 
100
        }
 
101
        return transaction;
 
102
    }
 
103
 
 
104
    /**
 
105
     * Get or create the XAResource of this XA store
 
106
     * @return the EhcacheXAResource of this store
 
107
     * @throws SystemException when something goes wrong with the transaction manager
 
108
     */
 
109
    public EhcacheXAResourceImpl getOrCreateXAResource() throws SystemException {
 
110
        Transaction transaction = getCurrentTransaction();
 
111
        EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
 
112
        if (xaResource == null) {
 
113
            LOG.debug("creating new XAResource");
 
114
            xaResource = new EhcacheXAResourceImpl(cache, underlyingStore, transactionManagerLookup,
 
115
                    softLockFactory, transactionIdFactory, copyStrategy);
 
116
            transactionToXAResourceMap.put(transaction, xaResource);
 
117
            xaResource.addTwoPcExecutionListener(new CleanupXAResource(getCurrentTransaction()));
 
118
        }
 
119
        return xaResource;
 
120
    }
 
121
 
 
122
    private XATransactionContext getTransactionContext() {
 
123
        try {
 
124
            Transaction transaction = getCurrentTransaction();
 
125
            EhcacheXAResourceImpl xaResource = (EhcacheXAResourceImpl) transactionToXAResourceMap.get(transaction);
 
126
            if (xaResource == null) {
 
127
                return null;
 
128
            }
 
129
            XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
 
130
 
 
131
            if (transactionContext == null) {
 
132
                transactionManagerLookup.register(xaResource);
 
133
                LOG.debug("creating new XA context");
 
134
                transactionContext = xaResource.createTransactionContext();
 
135
                xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
 
136
            } else {
 
137
                transactionContext = xaResource.getCurrentTransactionContext();
 
138
            }
 
139
 
 
140
            LOG.debug("using XA context {}", transactionContext);
 
141
            return transactionContext;
 
142
        } catch (SystemException e) {
 
143
            throw new TransactionException("cannot get the current transaction", e);
 
144
        } catch (RollbackException e) {
 
145
            throw new TransactionException("transaction rolled back", e);
 
146
        }
 
147
    }
 
148
 
 
149
    private XATransactionContext getOrCreateTransactionContext() {
 
150
        try {
 
151
            EhcacheXAResourceImpl xaResource = getOrCreateXAResource();
 
152
            XATransactionContext transactionContext = xaResource.getCurrentTransactionContext();
 
153
 
 
154
            if (transactionContext == null) {
 
155
                transactionManagerLookup.register(xaResource);
 
156
                LOG.debug("creating new XA context");
 
157
                transactionContext = xaResource.createTransactionContext();
 
158
                xaResource.addTwoPcExecutionListener(new UnregisterXAResource());
 
159
            } else {
 
160
                transactionContext = xaResource.getCurrentTransactionContext();
 
161
            }
 
162
 
 
163
            LOG.debug("using XA context {}", transactionContext);
 
164
            return transactionContext;
 
165
        } catch (SystemException e) {
 
166
            throw new TransactionException("cannot get the current transaction", e);
 
167
        } catch (RollbackException e) {
 
168
            throw new TransactionException("transaction rolled back", e);
 
169
        }
 
170
    }
 
171
 
 
172
    /**
 
173
     * This class is used to clean up the transactionToXAResourceMap after a transaction
 
174
     * committed or rolled back.
 
175
     */
 
176
    private final class CleanupXAResource implements XAExecutionListener {
 
177
        private final Transaction transaction;
 
178
 
 
179
        private CleanupXAResource(Transaction transaction) {
 
180
            this.transaction = transaction;
 
181
        }
 
182
 
 
183
        public void beforePrepare(EhcacheXAResource xaResource) {
 
184
        }
 
185
 
 
186
        public void afterCommitOrRollback(EhcacheXAResource xaResource) {
 
187
            transactionToXAResourceMap.remove(transaction);
 
188
            transactionToTimeoutMap.remove(transaction);
 
189
        }
 
190
    }
 
191
 
 
192
    /**
 
193
     * This class is used to unregister the XAResource after a transaction
 
194
     * committed or rolled back.
 
195
     */
 
196
    private final class UnregisterXAResource implements XAExecutionListener {
 
197
 
 
198
        public void beforePrepare(EhcacheXAResource xaResource) {
 
199
        }
 
200
 
 
201
        public void afterCommitOrRollback(EhcacheXAResource xaResource) {
 
202
            transactionManagerLookup.unregister(xaResource);
 
203
        }
 
204
    }
 
205
 
 
206
 
 
207
    /**
 
208
     * @return milliseconds left before timeout
 
209
     */
 
210
    private long assertNotTimedOut() {
 
211
        try {
 
212
            if (Thread.interrupted()) {
 
213
                throw new TransactionInterruptedException("transaction interrupted");
 
214
            }
 
215
 
 
216
            Transaction transaction = getCurrentTransaction();
 
217
 
 
218
            EhcacheXAResource xaResource = transactionToXAResourceMap.get(transaction);
 
219
            Long timeoutTimestamp = transactionToTimeoutMap.get(transaction);
 
220
            if (xaResource != null && timeoutTimestamp == null) {
 
221
                int xaResourceTimeout = xaResource.getTransactionTimeout();
 
222
 
 
223
                timeoutTimestamp = System.currentTimeMillis() + (xaResourceTimeout * MILLISECOND_PER_SECOND);
 
224
                transactionToTimeoutMap.put(transaction, timeoutTimestamp);
 
225
            } else if (timeoutTimestamp == null) {
 
226
                int defaultTransactionTimeout = cache.getCacheManager().getTransactionController().getDefaultTransactionTimeout();
 
227
                timeoutTimestamp = System.currentTimeMillis() + (defaultTransactionTimeout * MILLISECOND_PER_SECOND);
 
228
                transactionToTimeoutMap.put(transaction, timeoutTimestamp);
 
229
            }
 
230
 
 
231
            if (timeoutTimestamp <= System.currentTimeMillis()) {
 
232
                throw new TransactionTimeoutException("transaction timed out");
 
233
            }
 
234
 
 
235
            return timeoutTimestamp - System.currentTimeMillis();
 
236
        } catch (SystemException e) {
 
237
            throw new TransactionException("cannot get the current transaction", e);
 
238
        } catch (XAException e) {
 
239
            throw new TransactionException("cannot get the XAResource transaction timeout", e);
 
240
        }
 
241
    }
 
242
 
 
243
    /* transactional methods */
 
244
 
 
245
    /**
 
246
     * {@inheritDoc}
 
247
     */
 
248
    public Element get(Object key) {
 
249
        LOG.debug("cache {} get {}", cache.getName(), key);
 
250
        XATransactionContext context = getTransactionContext();
 
251
        Element element;
 
252
        if (context == null) {
 
253
            element = getFromUnderlyingStore(key);
 
254
        } else {
 
255
            element = context.get(key);
 
256
            if (element == null && !context.isRemoved(key)) {
 
257
                element = getFromUnderlyingStore(key);
 
258
            }
 
259
        }
 
260
        return copyElementForRead(element);
 
261
    }
 
262
 
 
263
 
 
264
    /**
 
265
     * {@inheritDoc}
 
266
     */
 
267
    public Element getQuiet(Object key) {
 
268
        LOG.debug("cache {} getQuiet {}", cache.getName(), key);
 
269
        XATransactionContext context = getTransactionContext();
 
270
        Element element;
 
271
        if (context == null) {
 
272
            element = getQuietFromUnderlyingStore(key);
 
273
        } else {
 
274
            element = context.get(key);
 
275
            if (element == null && !context.isRemoved(key)) {
 
276
                element = getQuietFromUnderlyingStore(key);
 
277
            }
 
278
        }
 
279
        return copyElementForRead(element);
 
280
    }
 
281
 
 
282
    /**
 
283
     * {@inheritDoc}
 
284
     */
 
285
    public int getSize() {
 
286
        LOG.debug("cache {} getSize", cache.getName());
 
287
        XATransactionContext context = getOrCreateTransactionContext();
 
288
        int size = underlyingStore.getSize();
 
289
        return size + context.getSizeModifier();
 
290
    }
 
291
 
 
292
    /**
 
293
     * {@inheritDoc}
 
294
     */
 
295
    public int getTerracottaClusteredSize() {
 
296
        try {
 
297
            Transaction transaction = transactionManagerLookup.getTransactionManager().getTransaction();
 
298
            if (transaction == null) {
 
299
                return underlyingStore.getTerracottaClusteredSize();
 
300
            }
 
301
        } catch (SystemException se) {
 
302
            throw new TransactionException("cannot get the current transaction", se);
 
303
        }
 
304
 
 
305
        LOG.debug("cache {} getTerracottaClusteredSize", cache.getName());
 
306
        XATransactionContext context = getOrCreateTransactionContext();
 
307
        int size = underlyingStore.getTerracottaClusteredSize();
 
308
        return size + context.getSizeModifier();
 
309
    }
 
310
 
 
311
    /**
 
312
     * {@inheritDoc}
 
313
     */
 
314
    public boolean containsKey(Object key) {
 
315
        LOG.debug("cache {} containsKey", cache.getName(), key);
 
316
        XATransactionContext context = getOrCreateTransactionContext();
 
317
        return !context.isRemoved(key) && (context.getAddedKeys().contains(key) || underlyingStore.containsKey(key));
 
318
    }
 
319
 
 
320
    /**
 
321
     * {@inheritDoc}
 
322
     */
 
323
    public List getKeys() {
 
324
        LOG.debug("cache {} getKeys", cache.getName());
 
325
        XATransactionContext context = getOrCreateTransactionContext();
 
326
        Set<Object> keys = new LargeSet<Object>() {
 
327
 
 
328
            @Override
 
329
            public int sourceSize() {
 
330
                return underlyingStore.getSize();
 
331
            }
 
332
 
 
333
            @Override
 
334
            public Iterator<Object> sourceIterator() {
 
335
                return underlyingStore.getKeys().iterator();
 
336
            }
 
337
        };
 
338
        keys.addAll(context.getAddedKeys());
 
339
        keys.removeAll(context.getRemovedKeys());
 
340
        return new SetWrapperList(keys);
 
341
    }
 
342
 
 
343
 
 
344
    private Element getFromUnderlyingStore(final Object key) {
 
345
        while (true) {
 
346
            long timeLeft = assertNotTimedOut();
 
347
            LOG.debug("cache {} underlying.get key {} not timed out, time left: " + timeLeft, cache.getName(), key);
 
348
 
 
349
            Element element = underlyingStore.get(key);
 
350
            if (element == null) {
 
351
                return null;
 
352
            }
 
353
            Object value = element.getObjectValue();
 
354
            if (value instanceof SoftLock) {
 
355
                SoftLock softLock = (SoftLock) value;
 
356
                try {
 
357
                    LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
 
358
                    boolean gotLock = softLock.tryLock(timeLeft);
 
359
                    if (gotLock) {
 
360
                        softLock.clearTryLock();
 
361
                    }
 
362
                } catch (InterruptedException e) {
 
363
                    Thread.currentThread().interrupt();
 
364
                }
 
365
            } else {
 
366
                return element;
 
367
            }
 
368
        }
 
369
    }
 
370
 
 
371
    private Element getQuietFromUnderlyingStore(final Object key) {
 
372
        while (true) {
 
373
            long timeLeft = assertNotTimedOut();
 
374
            LOG.debug("cache {} underlying.getQuiet key {} not timed out, time left: " + timeLeft, cache.getName(), key);
 
375
 
 
376
            Element element = underlyingStore.getQuiet(key);
 
377
            if (element == null) {
 
378
                return null;
 
379
            }
 
380
            Object value = element.getObjectValue();
 
381
            if (value instanceof SoftLock) {
 
382
                SoftLock softLock = (SoftLock) value;
 
383
                try {
 
384
                    LOG.debug("cache {} key {} soft locked, awaiting unlock...", cache.getName(), key);
 
385
                    boolean gotLock = softLock.tryLock(timeLeft);
 
386
                    if (gotLock) {
 
387
                        softLock.clearTryLock();
 
388
                    }
 
389
                } catch (InterruptedException e) {
 
390
                    Thread.currentThread().interrupt();
 
391
                }
 
392
            } else {
 
393
                return element;
 
394
            }
 
395
        }
 
396
    }
 
397
 
 
398
    private Element getCurrentElement(final Object key, final XATransactionContext context) {
 
399
        Element previous = context.get(key);
 
400
        if (previous == null && !context.isRemoved(key)) {
 
401
            previous = getQuietFromUnderlyingStore(key);
 
402
        }
 
403
        return previous;
 
404
    }
 
405
 
 
406
    /**
 
407
     * {@inheritDoc}
 
408
     */
 
409
    public boolean put(Element element) throws CacheException {
 
410
        LOG.debug("cache {} put {}", cache.getName(), element);
 
411
        // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
 
412
        getOrCreateTransactionContext();
 
413
 
 
414
        Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
 
415
        return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
 
416
    }
 
417
 
 
418
    /**
 
419
     * {@inheritDoc}
 
420
     */
 
421
    public boolean putWithWriter(Element element, CacheWriterManager writerManager) throws CacheException {
 
422
        LOG.debug("cache {} putWithWriter {}", cache.getName(), element);
 
423
        // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
 
424
        getOrCreateTransactionContext();
 
425
 
 
426
        Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
 
427
        if (writerManager != null) {
 
428
            writerManager.put(element);
 
429
        } else {
 
430
            cache.getWriterManager().put(element);
 
431
        }
 
432
        return internalPut(new StorePutCommand(oldElement, copyElementForWrite(element)));
 
433
    }
 
434
 
 
435
    private boolean internalPut(final StorePutCommand putCommand) {
 
436
        final Element element = putCommand.getElement();
 
437
        boolean isNull;
 
438
        if (element == null) {
 
439
            return true;
 
440
        }
 
441
        XATransactionContext context = getOrCreateTransactionContext();
 
442
        // In case this key is currently being updated...
 
443
        isNull = underlyingStore.get(element.getKey()) == null;
 
444
        if (isNull) {
 
445
            isNull = context.get(element.getKey()) == null;
 
446
        }
 
447
        context.addCommand(putCommand, element);
 
448
        return isNull;
 
449
    }
 
450
 
 
451
 
 
452
    /**
 
453
     * {@inheritDoc}
 
454
     */
 
455
    public Element remove(Object key) {
 
456
        LOG.debug("cache {} remove {}", cache.getName(), key);
 
457
        // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
 
458
        getOrCreateTransactionContext();
 
459
 
 
460
        Element oldElement = getQuietFromUnderlyingStore(key);
 
461
        return removeInternal(new StoreRemoveCommand(key, oldElement));
 
462
    }
 
463
 
 
464
    private Element removeInternal(final StoreRemoveCommand command) {
 
465
        Element element = command.getEntry().getElement();
 
466
        getOrCreateTransactionContext().addCommand(command, element);
 
467
        return copyElementForRead(element);
 
468
    }
 
469
 
 
470
    /**
 
471
     * {@inheritDoc}
 
472
     */
 
473
    public Element removeWithWriter(Object key, CacheWriterManager writerManager) throws CacheException {
 
474
        LOG.debug("cache {} removeWithWriter {}", cache.getName(), key);
 
475
        // this forces enlistment so the XA transaction timeout can be propagated to the XA resource
 
476
        getOrCreateTransactionContext();
 
477
 
 
478
        Element oldElement = getQuietFromUnderlyingStore(key);
 
479
        if (writerManager != null) {
 
480
            writerManager.remove(new CacheEntry(key, null));
 
481
        } else {
 
482
            cache.getWriterManager().remove(new CacheEntry(key, null));
 
483
        }
 
484
        return removeInternal(new StoreRemoveCommand(key, oldElement));
 
485
    }
 
486
 
 
487
    /**
 
488
     * {@inheritDoc}
 
489
     */
 
490
    public void removeAll() throws CacheException {
 
491
        LOG.debug("cache {} removeAll", cache.getName());
 
492
        List keys = getKeys();
 
493
        for (Object key : keys) {
 
494
            remove(key);
 
495
        }
 
496
    }
 
497
 
 
498
    /**
 
499
     * {@inheritDoc}
 
500
     */
 
501
    public Element putIfAbsent(Element element) throws NullPointerException {
 
502
        LOG.debug("cache {} putIfAbsent {}", cache.getName(), element);
 
503
        XATransactionContext context = getOrCreateTransactionContext();
 
504
        Element previous = getCurrentElement(element.getObjectKey(), context);
 
505
 
 
506
        if (previous == null) {
 
507
            Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
 
508
            Element elementForWrite = copyElementForWrite(element);
 
509
            context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
 
510
        }
 
511
 
 
512
        return copyElementForRead(previous);
 
513
    }
 
514
 
 
515
    /**
 
516
     * {@inheritDoc}
 
517
     */
 
518
    public Element removeElement(Element element, ElementValueComparator comparator) throws NullPointerException {
 
519
        LOG.debug("cache {} removeElement {}", cache.getName(), element);
 
520
        XATransactionContext context = getOrCreateTransactionContext();
 
521
        Element previous = getCurrentElement(element.getKey(), context);
 
522
 
 
523
        Element elementForWrite = copyElementForWrite(element);
 
524
        if (previous != null && comparator.equals(previous, elementForWrite)) {
 
525
            Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
 
526
            context.addCommand(new StoreRemoveCommand(element.getObjectKey(), oldElement), elementForWrite);
 
527
            return copyElementForRead(previous);
 
528
        }
 
529
        return null;
 
530
    }
 
531
 
 
532
    /**
 
533
     * {@inheritDoc}
 
534
     */
 
535
    public boolean replace(Element old, Element element, ElementValueComparator comparator)
 
536
            throws NullPointerException, IllegalArgumentException {
 
537
        LOG.debug("cache {} replace2 {}", cache.getName(), element);
 
538
        XATransactionContext context = getOrCreateTransactionContext();
 
539
        Element previous = getCurrentElement(element.getKey(), context);
 
540
 
 
541
        boolean replaced = false;
 
542
        if (previous != null && comparator.equals(previous, copyElementForWrite(old))) {
 
543
            Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
 
544
            Element elementForWrite = copyElementForWrite(element);
 
545
            context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
 
546
            replaced = true;
 
547
        }
 
548
        return replaced;
 
549
    }
 
550
 
 
551
    /**
 
552
     * {@inheritDoc}
 
553
     */
 
554
    public Element replace(Element element) throws NullPointerException {
 
555
        LOG.debug("cache {} replace1 {}", cache.getName(), element);
 
556
        XATransactionContext context = getOrCreateTransactionContext();
 
557
        Element previous = getCurrentElement(element.getKey(), context);
 
558
 
 
559
        if (previous != null) {
 
560
            Element oldElement = getQuietFromUnderlyingStore(element.getObjectKey());
 
561
            Element elementForWrite = copyElementForWrite(element);
 
562
            context.addCommand(new StorePutCommand(oldElement, elementForWrite), elementForWrite);
 
563
        }
 
564
        return copyElementForRead(previous);
 
565
    }
 
566
 
 
567
    /**
 
568
     * {@inheritDoc}
 
569
     */
 
570
    @Override
 
571
    public void setAttributeExtractors(Map<String, AttributeExtractor> extractors) {
 
572
        Map<String, AttributeExtractor> wrappedExtractors = new HashMap(extractors.size());
 
573
        for (Entry<String, AttributeExtractor> e : extractors.entrySet()) {
 
574
            wrappedExtractors.put(e.getKey(), new TransactionAwareAttributeExtractor(copyStrategy, e.getValue()));
 
575
        }
 
576
        underlyingStore.setAttributeExtractors(wrappedExtractors);
 
577
    }
 
578
 
 
579
}