16
16
package net.sf.ehcache.concurrent;
18
import java.util.ArrayList;
18
import java.util.Arrays;
19
import java.util.Collections;
19
20
import java.util.List;
21
import java.util.SortedMap;
22
import java.util.TreeMap;
23
import java.util.concurrent.TimeoutException;
24
import java.util.concurrent.atomic.AtomicInteger;
21
import java.util.concurrent.locks.ReadWriteLock;
26
23
import net.sf.ehcache.CacheException;
66
62
* @param numberOfStripes - must be a factor of two
68
64
public StripedReadWriteLockSync(int numberOfStripes) {
69
if (numberOfStripes % 2 != 0) {
70
throw new CacheException("Cannot create a CacheLockProvider with an odd number of stripes");
65
if ((numberOfStripes & (numberOfStripes - 1)) != 0) {
66
throw new CacheException("Cannot create a CacheLockProvider with a non power-of-two number of stripes");
73
68
if (numberOfStripes == 0) {
74
69
throw new CacheException("A zero size CacheLockProvider does not have useful semantics.");
77
this.numberOfStripes = numberOfStripes;
78
72
mutexes = new ReadWriteLockSync[numberOfStripes];
80
for (int i = 0; i < numberOfStripes; i++) {
74
for (int i = 0; i < mutexes.length; i++) {
81
75
mutexes[i] = new ReadWriteLockSync();
77
mutexesAsList = Collections.unmodifiableList(Arrays.asList(mutexes));
91
86
* @return one of a limited number of Sync's.
93
88
public ReadWriteLockSync getSyncForKey(final Object key) {
94
int lockNumber = ConcurrencyUtil.selectLock(key, numberOfStripes);
89
int lockNumber = ConcurrencyUtil.selectLock(key, mutexes.length);
95
90
return mutexes[lockNumber];
101
public Sync[] getAndWriteLockAllSyncForKeys(Object... keys) {
102
SortedMap<ReadWriteLockSync, AtomicInteger> locks = getLockMap(keys);
104
Sync[] syncs = new Sync[locks.size()];
106
for (Map.Entry<ReadWriteLockSync, AtomicInteger> entry : locks.entrySet()) {
107
while (entry.getValue().getAndDecrement() > 0) {
108
entry.getKey().lock(LockType.WRITE);
110
syncs[i++] = entry.getKey();
118
public Sync[] getAndWriteLockAllSyncForKeys(long timeout, Object... keys) throws TimeoutException {
119
SortedMap<ReadWriteLockSync, AtomicInteger> locks = getLockMap(keys);
122
List<ReadWriteLockSync> heldLocks = new ArrayList<ReadWriteLockSync>();
124
Sync[] syncs = new Sync[locks.size()];
126
for (Map.Entry<ReadWriteLockSync, AtomicInteger> entry : locks.entrySet()) {
127
while (entry.getValue().getAndDecrement() > 0) {
129
ReadWriteLockSync writeLockSync = entry.getKey();
130
lockHeld = writeLockSync.tryLock(LockType.WRITE, timeout);
132
heldLocks.add(writeLockSync);
134
} catch (InterruptedException e) {
139
for (int j = heldLocks.size() - 1; j >= 0; j--) {
140
ReadWriteLockSync readWriteLockSync = heldLocks.get(j);
141
readWriteLockSync.unlock(LockType.WRITE);
143
throw new TimeoutException("could not acquire all locks in " + timeout + " ms");
146
syncs[i++] = entry.getKey();
154
public void unlockWriteLockForAllKeys(Object... keys) {
155
SortedMap<ReadWriteLockSync, AtomicInteger> locks = getLockMap(keys);
157
for (Map.Entry<ReadWriteLockSync, AtomicInteger> entry : locks.entrySet()) {
158
while (entry.getValue().getAndDecrement() > 0) {
159
entry.getKey().unlock(LockType.WRITE);
164
private SortedMap<ReadWriteLockSync, AtomicInteger> getLockMap(final Object... keys) {
165
SortedMap<ReadWriteLockSync, AtomicInteger> locks = new TreeMap<ReadWriteLockSync, AtomicInteger>();
166
for (Object key : keys) {
167
ReadWriteLockSync syncForKey = getSyncForKey(key);
168
if (locks.containsKey(syncForKey)) {
169
locks.get(syncForKey).incrementAndGet();
171
locks.put(syncForKey, new AtomicInteger(1));
94
* Gets the RWL Stripe to use for a given key.
96
* This lookup must always return the same RWL for a given key.
99
* @return one of a limited number of RWLs.
101
public ReadWriteLock getLockForKey(final Object key) {
102
int lockNumber = ConcurrencyUtil.selectLock(key, mutexes.length);
103
return mutexes[lockNumber].getReadWriteLock();
107
* Returns all internal syncs
108
* @return all internal syncs
110
public List<ReadWriteLockSync> getAllSyncs() {
111
return mutexesAsList;