1
1
/*******************************************************************************
2
*Copyright (c) 2009 Eucalyptus Systems, Inc.
4
* This program is free software: you can redistribute it and/or modify
5
* it under the terms of the GNU General Public License as published by
6
* the Free Software Foundation, only version 3 of the License.
9
* This file is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* You should have received a copy of the GNU General Public License along
15
* with this program. If not, see <http://www.gnu.org/licenses/>.
17
* Please contact Eucalyptus Systems, Inc., 130 Castilian
18
* Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
19
* if you need additional information or have any questions.
21
* This file may incorporate work covered under the following copyright and
24
* Software License Agreement (BSD License)
26
* Copyright (c) 2008, Regents of the University of California
27
* All rights reserved.
29
* Redistribution and use of this software in source and binary forms, with
30
* or without modification, are permitted provided that the following
33
* Redistributions of source code must retain the above copyright notice,
34
* this list of conditions and the following disclaimer.
36
* Redistributions in binary form must reproduce the above copyright
37
* notice, this list of conditions and the following disclaimer in the
38
* documentation and/or other materials provided with the distribution.
40
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
41
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
42
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
43
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
44
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
46
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
47
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
48
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
49
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
50
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
51
* THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
52
* LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
53
* SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
54
* IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
55
* BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
56
* THE REGENTS’ DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
57
* OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
58
* WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
59
* ANY SUCH LICENSES OR RIGHTS.
60
*******************************************************************************/
2
*Copyright (c) 2009 Eucalyptus Systems, Inc.
4
* This program is free software: you can redistribute it and/or modify
5
* it under the terms of the GNU General Public License as published by
6
* the Free Software Foundation, only version 3 of the License.
9
* This file is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* You should have received a copy of the GNU General Public License along
15
* with this program. If not, see <http://www.gnu.org/licenses/>.
17
* Please contact Eucalyptus Systems, Inc., 130 Castilian
18
* Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
19
* if you need additional information or have any questions.
21
* This file may incorporate work covered under the following copyright and
24
* Software License Agreement (BSD License)
26
* Copyright (c) 2008, Regents of the University of California
27
* All rights reserved.
29
* Redistribution and use of this software in source and binary forms, with
30
* or without modification, are permitted provided that the following
33
* Redistributions of source code must retain the above copyright notice,
34
* this list of conditions and the following disclaimer.
36
* Redistributions in binary form must reproduce the above copyright
37
* notice, this list of conditions and the following disclaimer in the
38
* documentation and/or other materials provided with the distribution.
40
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
41
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
42
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
43
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
44
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
46
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
47
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
48
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
49
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
50
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
51
* THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
52
* LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
53
* SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
54
* IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
55
* BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
56
* THE REGENTS’ DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
57
* OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
58
* WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
59
* ANY SUCH LICENSES OR RIGHTS.
60
*******************************************************************************/
63
63
* Author: Sunil Soman sunils@cs.ucsb.edu
72
72
// Currently, the queues are a LinkedBlockingQueue and producers/consumers do not timeout.
74
74
public class WalrusDataMessenger {
75
private static Logger LOG = Logger.getLogger( WalrusDataMessenger.class );
76
private static final int DATA_QUEUE_SIZE = 3;
78
private ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>> queueMap;
79
private ConcurrentHashMap<String, WalrusMonitor> monitorMap;
80
private ConcurrentHashMap<String, WalrusSemaphore> semaphoreMap;
82
public WalrusDataMessenger() {
83
queueMap = new ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>>();
84
monitorMap = new ConcurrentHashMap<String, WalrusMonitor>();
85
semaphoreMap = new ConcurrentHashMap<String, WalrusSemaphore>();
88
public LinkedBlockingQueue<WalrusDataMessage> getQueue(String key1, String key2) {
89
ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.putIfAbsent(key1, new ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>>());
91
queues = queueMap.get(key1);
93
LinkedBlockingQueue<WalrusDataMessage> queue = queues.putIfAbsent(key2, new LinkedBlockingQueue<WalrusDataMessage>(DATA_QUEUE_SIZE));
95
queue = queues.get(key2);
100
public LinkedBlockingQueue<WalrusDataMessage> interruptAllAndGetQueue(String key1, String key2) {
101
ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
103
for (LinkedBlockingQueue<WalrusDataMessage> queue: queues.values()) {
105
queue.put(WalrusDataMessage.InterruptTransaction());
106
} catch(InterruptedException ex) {
113
return getQueue(key1, key2);
116
public void removeQueue(String key1, String key2) {
117
if(queueMap.containsKey(key1)) {
118
ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
119
if(queues.containsKey(key2)) {
125
public WalrusMonitor getMonitor(String key) {
126
WalrusMonitor monitor = monitorMap.putIfAbsent(key, new WalrusMonitor());
127
if (monitor == null) {
128
monitor = monitorMap.get(key);
133
public void removeMonitor(String key) {
134
if(monitorMap.containsKey(key)) {
135
monitorMap.remove(key);
139
public WalrusSemaphore getSemaphore(String key) {
140
WalrusSemaphore semaphore = semaphoreMap.putIfAbsent(key, new WalrusSemaphore());
141
if (semaphore == null) {
142
semaphore = semaphoreMap.get(key);
147
public void removeSemaphore(String key) {
148
if(semaphoreMap.containsKey(key)) {
149
semaphoreMap.remove(key);
75
private static Logger LOG = Logger.getLogger( WalrusDataMessenger.class );
76
private static final int DATA_QUEUE_SIZE = 3;
78
private ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>> queueMap;
79
private ConcurrentHashMap<String, WalrusMonitor> monitorMap;
81
public WalrusDataMessenger() {
82
queueMap = new ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>>();
83
monitorMap = new ConcurrentHashMap<String, WalrusMonitor>();
86
public LinkedBlockingQueue<WalrusDataMessage> getQueue(String key1, String key2) {
87
ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.putIfAbsent(key1, new ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>>());
89
queues = queueMap.get(key1);
91
LinkedBlockingQueue<WalrusDataMessage> queue = queues.putIfAbsent(key2, new LinkedBlockingQueue<WalrusDataMessage>(DATA_QUEUE_SIZE));
93
queue = queues.get(key2);
98
public LinkedBlockingQueue<WalrusDataMessage> interruptAllAndGetQueue(String key1, String key2) {
99
ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
101
for (LinkedBlockingQueue<WalrusDataMessage> queue: queues.values()) {
103
queue.put(WalrusDataMessage.InterruptTransaction());
104
} catch(InterruptedException ex) {
111
return getQueue(key1, key2);
114
public void removeQueue(String key1, String key2) {
115
if(queueMap.containsKey(key1)) {
116
ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
117
if(queues.containsKey(key2)) {
120
queueMap.remove(key1);
124
public WalrusMonitor getMonitor(String key) {
125
WalrusMonitor monitor = monitorMap.putIfAbsent(key, new WalrusMonitor());
126
if (monitor == null) {
127
monitor = monitorMap.get(key);
132
public void removeMonitor(String key) {
133
if(monitorMap.containsKey(key)) {
134
monitorMap.remove(key);