~tritone-team/tritone/eucalyptus

« back to all changes in this revision

Viewing changes to clc/modules/core/src/main/java/edu/ucsb/eucalyptus/util/WalrusDataMessenger.java

  • Committer: Bazaar Package Importer
  • Author(s): Dustin Kirkland
  • Date: 2009-12-01 21:09:28 UTC
  • mto: This revision was merged to the branch mainline in revision 75.
  • Revision ID: james.westby@ubuntu.com-20091201210928-o2dvg0ubljhb0ft6
Tags: upstream-1.6.1~bzr1083
Import upstream version 1.6.1~bzr1083

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*******************************************************************************
2
 
*Copyright (c) 2009  Eucalyptus Systems, Inc.
3
 
4
 
*  This program is free software: you can redistribute it and/or modify
5
 
*  it under the terms of the GNU General Public License as published by
6
 
*  the Free Software Foundation, only version 3 of the License.
7
 
8
 
9
 
*  This file is distributed in the hope that it will be useful, but WITHOUT
10
 
*  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
 
*  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12
 
*  for more details.
13
 
14
 
*  You should have received a copy of the GNU General Public License along
15
 
*  with this program.  If not, see <http://www.gnu.org/licenses/>.
16
 
17
 
*  Please contact Eucalyptus Systems, Inc., 130 Castilian
18
 
*  Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
19
 
*  if you need additional information or have any questions.
20
 
21
 
*  This file may incorporate work covered under the following copyright and
22
 
*  permission notice:
23
 
24
 
*    Software License Agreement (BSD License)
25
 
26
 
*    Copyright (c) 2008, Regents of the University of California
27
 
*    All rights reserved.
28
 
29
 
*    Redistribution and use of this software in source and binary forms, with
30
 
*    or without modification, are permitted provided that the following
31
 
*    conditions are met:
32
 
33
 
*      Redistributions of source code must retain the above copyright notice,
34
 
*      this list of conditions and the following disclaimer.
35
 
36
 
*      Redistributions in binary form must reproduce the above copyright
37
 
*      notice, this list of conditions and the following disclaimer in the
38
 
*      documentation and/or other materials provided with the distribution.
39
 
40
 
*    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
41
 
*    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
42
 
*    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
43
 
*    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
44
 
*    OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45
 
*    EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
46
 
*    PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
47
 
*    PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
48
 
*    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
49
 
*    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
50
 
*    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
51
 
*    THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
52
 
*    LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
53
 
*    SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
54
 
*    IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
55
 
*    BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
56
 
*    THE REGENTS’ DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
57
 
*    OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
58
 
*    WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
59
 
*    ANY SUCH LICENSES OR RIGHTS.
60
 
*******************************************************************************/
 
2
 *Copyright (c) 2009  Eucalyptus Systems, Inc.
 
3
 * 
 
4
 *  This program is free software: you can redistribute it and/or modify
 
5
 *  it under the terms of the GNU General Public License as published by
 
6
 *  the Free Software Foundation, only version 3 of the License.
 
7
 * 
 
8
 * 
 
9
 *  This file is distributed in the hope that it will be useful, but WITHOUT
 
10
 *  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
11
 *  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 
12
 *  for more details.
 
13
 * 
 
14
 *  You should have received a copy of the GNU General Public License along
 
15
 *  with this program.  If not, see <http://www.gnu.org/licenses/>.
 
16
 * 
 
17
 *  Please contact Eucalyptus Systems, Inc., 130 Castilian
 
18
 *  Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
 
19
 *  if you need additional information or have any questions.
 
20
 * 
 
21
 *  This file may incorporate work covered under the following copyright and
 
22
 *  permission notice:
 
23
 * 
 
24
 *    Software License Agreement (BSD License)
 
25
 * 
 
26
 *    Copyright (c) 2008, Regents of the University of California
 
27
 *    All rights reserved.
 
28
 * 
 
29
 *    Redistribution and use of this software in source and binary forms, with
 
30
 *    or without modification, are permitted provided that the following
 
31
 *    conditions are met:
 
32
 * 
 
33
 *      Redistributions of source code must retain the above copyright notice,
 
34
 *      this list of conditions and the following disclaimer.
 
35
 * 
 
36
 *      Redistributions in binary form must reproduce the above copyright
 
37
 *      notice, this list of conditions and the following disclaimer in the
 
38
 *      documentation and/or other materials provided with the distribution.
 
39
 * 
 
40
 *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 
41
 *    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 
42
 *    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 
43
 *    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
 
44
 *    OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 
45
 *    EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
46
 *    PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 
47
 *    PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 
48
 *    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 
49
 *    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
50
 *    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
 
51
 *    THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
 
52
 *    LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
 
53
 *    SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
 
54
 *    IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
 
55
 *    BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
 
56
 *    THE REGENTS’ DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
 
57
 *    OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
 
58
 *    WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
 
59
 *    ANY SUCH LICENSES OR RIGHTS.
 
60
 *******************************************************************************/
61
61
/*
62
62
 *
63
63
 * Author: Sunil Soman sunils@cs.ucsb.edu
72
72
// Currently, the queues are a LinkedBlockingQueue and producers/consumers do not timeout.
73
73
 
74
74
public class WalrusDataMessenger {
75
 
    private static Logger LOG = Logger.getLogger( WalrusDataMessenger.class );
76
 
    private static final int DATA_QUEUE_SIZE = 3;
77
 
 
78
 
    private ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>> queueMap;
79
 
    private ConcurrentHashMap<String, WalrusMonitor> monitorMap;
80
 
    private ConcurrentHashMap<String, WalrusSemaphore> semaphoreMap;
81
 
 
82
 
    public WalrusDataMessenger() {
83
 
        queueMap = new ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>>();
84
 
        monitorMap = new ConcurrentHashMap<String, WalrusMonitor>();
85
 
        semaphoreMap = new ConcurrentHashMap<String, WalrusSemaphore>();
86
 
    }
87
 
 
88
 
    public LinkedBlockingQueue<WalrusDataMessage> getQueue(String key1, String key2) {
89
 
        ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.putIfAbsent(key1, new ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>>());
90
 
        if (queues == null) {
91
 
            queues = queueMap.get(key1);
92
 
        }
93
 
        LinkedBlockingQueue<WalrusDataMessage> queue = queues.putIfAbsent(key2, new LinkedBlockingQueue<WalrusDataMessage>(DATA_QUEUE_SIZE));
94
 
        if (queue == null) {
95
 
            queue = queues.get(key2);
96
 
        }
97
 
        return queue;
98
 
    }
99
 
 
100
 
    public LinkedBlockingQueue<WalrusDataMessage> interruptAllAndGetQueue(String key1, String key2) {
101
 
        ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
102
 
        if(queues != null) {
103
 
            for (LinkedBlockingQueue<WalrusDataMessage> queue: queues.values()) {
104
 
                try {
105
 
                    queue.put(WalrusDataMessage.InterruptTransaction());
106
 
                } catch(InterruptedException ex) {
107
 
                    LOG.warn(ex, ex);
108
 
                    return null;
109
 
                }
110
 
        
111
 
            }
112
 
        }
113
 
        return getQueue(key1, key2);
114
 
    }
115
 
 
116
 
    public void removeQueue(String key1, String key2) {
117
 
        if(queueMap.containsKey(key1)) {
118
 
            ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
119
 
            if(queues.containsKey(key2)) {
120
 
                queues.remove(key2);
121
 
            }
122
 
        }
123
 
    }
124
 
 
125
 
    public WalrusMonitor getMonitor(String key) {
126
 
        WalrusMonitor monitor = monitorMap.putIfAbsent(key, new WalrusMonitor());
127
 
        if (monitor == null) {
128
 
            monitor = monitorMap.get(key);
129
 
        }
130
 
        return monitor;
131
 
    }
132
 
 
133
 
    public void removeMonitor(String key) {
134
 
        if(monitorMap.containsKey(key)) {
135
 
            monitorMap.remove(key);
136
 
        }
137
 
    }
138
 
 
139
 
    public WalrusSemaphore getSemaphore(String key) {
140
 
        WalrusSemaphore semaphore = semaphoreMap.putIfAbsent(key, new WalrusSemaphore());
141
 
        if (semaphore == null) {
142
 
            semaphore = semaphoreMap.get(key);
143
 
        }
144
 
        return semaphore;
145
 
    }
146
 
 
147
 
    public void removeSemaphore(String key) {
148
 
        if(semaphoreMap.containsKey(key)) {
149
 
            semaphoreMap.remove(key);
150
 
        }
151
 
    }
 
75
        private static Logger LOG = Logger.getLogger( WalrusDataMessenger.class );
 
76
        private static final int DATA_QUEUE_SIZE = 3;
 
77
 
 
78
        private ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>> queueMap;
 
79
        private ConcurrentHashMap<String, WalrusMonitor> monitorMap;
 
80
 
 
81
        public WalrusDataMessenger() {
 
82
                queueMap = new ConcurrentHashMap<String, ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>>>();
 
83
                monitorMap = new ConcurrentHashMap<String, WalrusMonitor>();
 
84
        }
 
85
 
 
86
        public LinkedBlockingQueue<WalrusDataMessage> getQueue(String key1, String key2) {
 
87
                ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.putIfAbsent(key1, new ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>>());
 
88
                if (queues == null) {
 
89
                        queues = queueMap.get(key1);
 
90
                }
 
91
                LinkedBlockingQueue<WalrusDataMessage> queue = queues.putIfAbsent(key2, new LinkedBlockingQueue<WalrusDataMessage>(DATA_QUEUE_SIZE));
 
92
                if (queue == null) {
 
93
                        queue = queues.get(key2);
 
94
                }
 
95
                return queue;
 
96
        }
 
97
 
 
98
        public LinkedBlockingQueue<WalrusDataMessage> interruptAllAndGetQueue(String key1, String key2) {
 
99
                ConcurrentHashMap<String,LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
 
100
                if(queues != null) {
 
101
                        for (LinkedBlockingQueue<WalrusDataMessage> queue: queues.values()) {
 
102
                                try {
 
103
                                        queue.put(WalrusDataMessage.InterruptTransaction());
 
104
                                } catch(InterruptedException ex) {
 
105
                                        LOG.warn(ex, ex);
 
106
                                        return null;
 
107
                                }
 
108
 
 
109
                        }
 
110
                }
 
111
                return getQueue(key1, key2);
 
112
        }
 
113
 
 
114
        public void removeQueue(String key1, String key2) {
 
115
                if(queueMap.containsKey(key1)) {
 
116
                        ConcurrentHashMap<String, LinkedBlockingQueue<WalrusDataMessage>> queues = queueMap.get(key1);
 
117
                        if(queues.containsKey(key2)) {
 
118
                                queues.remove(key2);
 
119
                        }
 
120
                        queueMap.remove(key1);
 
121
                }
 
122
        }
 
123
 
 
124
        public WalrusMonitor getMonitor(String key) {
 
125
                WalrusMonitor monitor = monitorMap.putIfAbsent(key, new WalrusMonitor());
 
126
                if (monitor == null) {
 
127
                        monitor = monitorMap.get(key);
 
128
                }
 
129
                return monitor;
 
130
        }
 
131
 
 
132
        public void removeMonitor(String key) {
 
133
                if(monitorMap.containsKey(key)) {
 
134
                        monitorMap.remove(key);
 
135
                }
 
136
        }
152
137
}