~tritone-team/tritone/eucalyptus

« back to all changes in this revision

Viewing changes to clc/modules/walrus/src/main/java/edu/ucsb/eucalyptus/cloud/ws/ObjectReader.java

  • Committer: Bazaar Package Importer
  • Author(s): Dustin Kirkland
  • Date: 2009-12-01 21:09:28 UTC
  • mto: This revision was merged to the branch mainline in revision 75.
  • Revision ID: james.westby@ubuntu.com-20091201210928-o2dvg0ubljhb0ft6
Tags: upstream-1.6.1~bzr1083
ImportĀ upstreamĀ versionĀ 1.6.1~bzr1083

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*******************************************************************************
2
 
*Copyright (c) 2009  Eucalyptus Systems, Inc.
3
 
4
 
*  This program is free software: you can redistribute it and/or modify
5
 
*  it under the terms of the GNU General Public License as published by
6
 
*  the Free Software Foundation, only version 3 of the License.
7
 
8
 
9
 
*  This file is distributed in the hope that it will be useful, but WITHOUT
10
 
*  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
 
*  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12
 
*  for more details.
13
 
14
 
*  You should have received a copy of the GNU General Public License along
15
 
*  with this program.  If not, see <http://www.gnu.org/licenses/>.
16
 
17
 
*  Please contact Eucalyptus Systems, Inc., 130 Castilian
18
 
*  Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
19
 
*  if you need additional information or have any questions.
20
 
21
 
*  This file may incorporate work covered under the following copyright and
22
 
*  permission notice:
23
 
24
 
*    Software License Agreement (BSD License)
25
 
26
 
*    Copyright (c) 2008, Regents of the University of California
27
 
*    All rights reserved.
28
 
29
 
*    Redistribution and use of this software in source and binary forms, with
30
 
*    or without modification, are permitted provided that the following
31
 
*    conditions are met:
32
 
33
 
*      Redistributions of source code must retain the above copyright notice,
34
 
*      this list of conditions and the following disclaimer.
35
 
36
 
*      Redistributions in binary form must reproduce the above copyright
37
 
*      notice, this list of conditions and the following disclaimer in the
38
 
*      documentation and/or other materials provided with the distribution.
39
 
40
 
*    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
41
 
*    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
42
 
*    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
43
 
*    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
44
 
*    OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45
 
*    EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
46
 
*    PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
47
 
*    PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
48
 
*    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
49
 
*    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
50
 
*    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
51
 
*    THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
52
 
*    LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
53
 
*    SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
54
 
*    IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
55
 
*    BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
56
 
*    THE REGENTSā€™ DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
57
 
*    OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
58
 
*    WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
59
 
*    ANY SUCH LICENSES OR RIGHTS.
60
 
*******************************************************************************/
 
2
 *Copyright (c) 2009  Eucalyptus Systems, Inc.
 
3
 * 
 
4
 *  This program is free software: you can redistribute it and/or modify
 
5
 *  it under the terms of the GNU General Public License as published by
 
6
 *  the Free Software Foundation, only version 3 of the License.
 
7
 * 
 
8
 * 
 
9
 *  This file is distributed in the hope that it will be useful, but WITHOUT
 
10
 *  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
11
 *  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 
12
 *  for more details.
 
13
 * 
 
14
 *  You should have received a copy of the GNU General Public License along
 
15
 *  with this program.  If not, see <http://www.gnu.org/licenses/>.
 
16
 * 
 
17
 *  Please contact Eucalyptus Systems, Inc., 130 Castilian
 
18
 *  Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
 
19
 *  if you need additional information or have any questions.
 
20
 * 
 
21
 *  This file may incorporate work covered under the following copyright and
 
22
 *  permission notice:
 
23
 * 
 
24
 *    Software License Agreement (BSD License)
 
25
 * 
 
26
 *    Copyright (c) 2008, Regents of the University of California
 
27
 *    All rights reserved.
 
28
 * 
 
29
 *    Redistribution and use of this software in source and binary forms, with
 
30
 *    or without modification, are permitted provided that the following
 
31
 *    conditions are met:
 
32
 * 
 
33
 *      Redistributions of source code must retain the above copyright notice,
 
34
 *      this list of conditions and the following disclaimer.
 
35
 * 
 
36
 *      Redistributions in binary form must reproduce the above copyright
 
37
 *      notice, this list of conditions and the following disclaimer in the
 
38
 *      documentation and/or other materials provided with the distribution.
 
39
 * 
 
40
 *    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 
41
 *    IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 
42
 *    TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 
43
 *    PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
 
44
 *    OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 
45
 *    EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 
46
 *    PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 
47
 *    PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 
48
 *    LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 
49
 *    NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
50
 *    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
 
51
 *    THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
 
52
 *    LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
 
53
 *    SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
 
54
 *    IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
 
55
 *    BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
 
56
 *    THE REGENTSā€™ DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
 
57
 *    OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
 
58
 *    WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
 
59
 *    ANY SUCH LICENSES OR RIGHTS.
 
60
 *******************************************************************************/
61
61
/*
62
62
 *
63
63
 * Author: Sunil Soman sunils@cs.ucsb.edu
68
68
import edu.ucsb.eucalyptus.storage.StorageManager;
69
69
import edu.ucsb.eucalyptus.storage.fs.FileIO;
70
70
import edu.ucsb.eucalyptus.util.WalrusDataMessage;
71
 
import edu.ucsb.eucalyptus.util.WalrusSemaphore;
 
71
import edu.ucsb.eucalyptus.util.EucaSemaphore;
72
72
import org.apache.log4j.Logger;
73
73
 
 
74
import java.nio.ByteBuffer;
74
75
import java.util.concurrent.LinkedBlockingQueue;
75
76
 
76
77
public class ObjectReader extends Thread {
77
78
 
78
 
    private static Logger LOG = Logger.getLogger(ObjectReader.class);
79
 
 
80
 
    private String bucketName;
81
 
    private String objectName;
82
 
    private long objectSize;
83
 
    private LinkedBlockingQueue<WalrusDataMessage> getQueue;
84
 
    private StorageManager storageManager;
85
 
    private long byteRangeStart;
86
 
    private long byteRangeEnd;
87
 
    private boolean compressed;
88
 
    private boolean deleteAfterXfer;
89
 
    private WalrusSemaphore semaphore;
90
 
 
91
 
    public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, StorageManager storageManager) {
92
 
        this.bucketName = bucketName;
93
 
        this.objectName = objectName;
94
 
        this.objectSize = objectSize;
95
 
        this.getQueue = getQueue;
96
 
        this.storageManager = storageManager;
97
 
    }
98
 
 
99
 
 
100
 
    public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, long byteRangeStart, long byteRangeEnd, StorageManager storageManager) {
101
 
        this.bucketName = bucketName;
102
 
        this.objectName = objectName;
103
 
        this.objectSize = objectSize;
104
 
        this.getQueue = getQueue;
105
 
        this.byteRangeStart = byteRangeStart;
106
 
        this.byteRangeEnd = byteRangeEnd;
107
 
        this.storageManager = storageManager;
108
 
    }
109
 
 
110
 
    public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, boolean deleteAfterXfer, WalrusSemaphore semaphore, StorageManager storageManager) {
111
 
        this(bucketName, objectName, objectSize, getQueue, storageManager);
112
 
        this.deleteAfterXfer = deleteAfterXfer;
113
 
        this.semaphore = semaphore;
114
 
    }
115
 
 
116
 
    public void run() {
117
 
        try {
118
 
            FileIO fileIO = storageManager.prepareForRead(bucketName, objectName);
119
 
 
120
 
            long bytesRemaining = objectSize;
121
 
            long offset = byteRangeStart;
122
 
 
123
 
            if (byteRangeEnd > 0) {
124
 
                assert (byteRangeEnd <= objectSize);
125
 
                assert (byteRangeEnd >= byteRangeStart);
126
 
                bytesRemaining = byteRangeEnd - byteRangeStart;
127
 
                if (byteRangeEnd > objectSize || (byteRangeStart < 0))
128
 
                    bytesRemaining = 0;
129
 
            }
130
 
 
131
 
            try {
132
 
                LOG.warn("putting data");
133
 
                getQueue.put(WalrusDataMessage.StartOfData(bytesRemaining));
134
 
 
135
 
                while (bytesRemaining > 0) {
136
 
                    int bytesRead = fileIO.read(offset);
137
 
                    if (bytesRead < 0) {
138
 
                        LOG.error("Unable to read object: " + bucketName + "/" + objectName);
139
 
                        break;
140
 
                    }
141
 
                    if ((bytesRemaining - bytesRead) > 0)
142
 
                        getQueue.put(WalrusDataMessage.DataMessage(fileIO.getBuffer(), bytesRead));
143
 
                    else
144
 
                        getQueue.put(WalrusDataMessage.DataMessage(fileIO.getBuffer(), (int) bytesRemaining));
145
 
 
146
 
                    bytesRemaining -= bytesRead;
147
 
                    offset += bytesRead;
148
 
                }
149
 
                fileIO.finish();
150
 
                getQueue.put(WalrusDataMessage.EOF());
151
 
            } catch (Exception ex) {
152
 
                LOG.error(ex, ex);
153
 
            }
154
 
            if (semaphore != null) {
155
 
                semaphore.release();
156
 
                synchronized (semaphore) {
157
 
                    semaphore.notifyAll();
158
 
                }
159
 
            }
160
 
            if (deleteAfterXfer) {
161
 
                try {
162
 
                    storageManager.deleteObject(bucketName, objectName);
163
 
                } catch (Exception ex) {
164
 
                    LOG.error(ex, ex);
165
 
                }
166
 
            }
167
 
        } catch(Exception ex) {
168
 
            try {
169
 
                getQueue.put(WalrusDataMessage.EOF());
170
 
            } catch(InterruptedException e) {
171
 
                LOG.error(e);
172
 
            }
173
 
        }
174
 
    }
 
79
        private static Logger LOG = Logger.getLogger(ObjectReader.class);
 
80
 
 
81
        private String bucketName;
 
82
        private String objectName;
 
83
        private long objectSize;
 
84
        private LinkedBlockingQueue<WalrusDataMessage> getQueue;
 
85
        private StorageManager storageManager;
 
86
        private long byteRangeStart;
 
87
        private long byteRangeEnd;
 
88
        private boolean compressed;
 
89
        private boolean deleteAfterXfer;
 
90
        private EucaSemaphore semaphore;
 
91
 
 
92
        public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, StorageManager storageManager) {
 
93
                this.bucketName = bucketName;
 
94
                this.objectName = objectName;
 
95
                this.objectSize = objectSize;
 
96
                this.getQueue = getQueue;
 
97
                this.storageManager = storageManager;
 
98
        }
 
99
 
 
100
 
 
101
        public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, long byteRangeStart, long byteRangeEnd, StorageManager storageManager) {
 
102
                this.bucketName = bucketName;
 
103
                this.objectName = objectName;
 
104
                this.objectSize = objectSize;
 
105
                this.getQueue = getQueue;
 
106
                this.byteRangeStart = byteRangeStart;
 
107
                this.byteRangeEnd = byteRangeEnd;
 
108
                this.storageManager = storageManager;
 
109
        }
 
110
 
 
111
        public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, boolean deleteAfterXfer, EucaSemaphore semaphore, StorageManager storageManager) {
 
112
                this(bucketName, objectName, objectSize, getQueue, storageManager);
 
113
                this.deleteAfterXfer = deleteAfterXfer;
 
114
                this.semaphore = semaphore;
 
115
        }
 
116
 
 
117
        public void run() {
 
118
                try {
 
119
                        FileIO fileIO = storageManager.prepareForRead(bucketName, objectName);
 
120
 
 
121
                        long bytesRemaining = objectSize;
 
122
                        long offset = byteRangeStart;
 
123
 
 
124
                        if (byteRangeEnd > 0) {
 
125
                                assert (byteRangeEnd <= objectSize);
 
126
                                assert (byteRangeEnd >= byteRangeStart);
 
127
                                bytesRemaining = byteRangeEnd - byteRangeStart;
 
128
                                if (byteRangeEnd > objectSize || (byteRangeStart < 0))
 
129
                                        bytesRemaining = 0;
 
130
                        }
 
131
 
 
132
                        try {
 
133
                                LOG.warn("putting data");
 
134
                                getQueue.put(WalrusDataMessage.StartOfData(bytesRemaining));
 
135
 
 
136
                                while (bytesRemaining > 0) {
 
137
                                        int bytesRead = fileIO.read(offset);
 
138
                                        if (bytesRead < 0) {
 
139
                                                LOG.error("Unable to read object: " + bucketName + "/" + objectName);
 
140
                                                break;
 
141
                                        }
 
142
                                        if ((bytesRemaining - bytesRead) > 0) {
 
143
                                                ByteBuffer buffer = fileIO.getBuffer();
 
144
                                                if(buffer != null)
 
145
                                                        getQueue.put(WalrusDataMessage.DataMessage(buffer, bytesRead));
 
146
                                        } else {
 
147
                                                ByteBuffer buffer = fileIO.getBuffer();
 
148
                                                if(buffer != null)
 
149
                                                        getQueue.put(WalrusDataMessage.DataMessage(buffer, (int) bytesRemaining));
 
150
                                        }
 
151
                                        bytesRemaining -= bytesRead;
 
152
                                        offset += bytesRead;
 
153
                                }
 
154
                                fileIO.finish();
 
155
                                getQueue.put(WalrusDataMessage.EOF());
 
156
                        } catch (Exception ex) {
 
157
                                LOG.error(ex, ex);
 
158
                        }
 
159
                        if (semaphore != null) {
 
160
                                semaphore.release();
 
161
                                synchronized (semaphore) {
 
162
                                        semaphore.notifyAll();
 
163
                                }
 
164
                        }
 
165
                        if (deleteAfterXfer) {
 
166
                                try {
 
167
                                        storageManager.deleteObject(bucketName, objectName);
 
168
                                } catch (Exception ex) {
 
169
                                        LOG.error(ex, ex);
 
170
                                }
 
171
                        }
 
172
                } catch(Exception ex) {
 
173
                        try {
 
174
                                getQueue.put(WalrusDataMessage.EOF());
 
175
                        } catch(InterruptedException e) {
 
176
                                LOG.error(e);
 
177
                        }
 
178
                }
 
179
        }
175
180
}