1
1
/*******************************************************************************
2
*Copyright (c) 2009 Eucalyptus Systems, Inc.
4
* This program is free software: you can redistribute it and/or modify
5
* it under the terms of the GNU General Public License as published by
6
* the Free Software Foundation, only version 3 of the License.
9
* This file is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* You should have received a copy of the GNU General Public License along
15
* with this program. If not, see <http://www.gnu.org/licenses/>.
17
* Please contact Eucalyptus Systems, Inc., 130 Castilian
18
* Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
19
* if you need additional information or have any questions.
21
* This file may incorporate work covered under the following copyright and
24
* Software License Agreement (BSD License)
26
* Copyright (c) 2008, Regents of the University of California
27
* All rights reserved.
29
* Redistribution and use of this software in source and binary forms, with
30
* or without modification, are permitted provided that the following
33
* Redistributions of source code must retain the above copyright notice,
34
* this list of conditions and the following disclaimer.
36
* Redistributions in binary form must reproduce the above copyright
37
* notice, this list of conditions and the following disclaimer in the
38
* documentation and/or other materials provided with the distribution.
40
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
41
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
42
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
43
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
44
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
46
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
47
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
48
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
49
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
50
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
51
* THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
52
* LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
53
* SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
54
* IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
55
* BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
56
* THE REGENTSā DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
57
* OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
58
* WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
59
* ANY SUCH LICENSES OR RIGHTS.
60
*******************************************************************************/
2
*Copyright (c) 2009 Eucalyptus Systems, Inc.
4
* This program is free software: you can redistribute it and/or modify
5
* it under the terms of the GNU General Public License as published by
6
* the Free Software Foundation, only version 3 of the License.
9
* This file is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* You should have received a copy of the GNU General Public License along
15
* with this program. If not, see <http://www.gnu.org/licenses/>.
17
* Please contact Eucalyptus Systems, Inc., 130 Castilian
18
* Dr., Goleta, CA 93101 USA or visit <http://www.eucalyptus.com/licenses/>
19
* if you need additional information or have any questions.
21
* This file may incorporate work covered under the following copyright and
24
* Software License Agreement (BSD License)
26
* Copyright (c) 2008, Regents of the University of California
27
* All rights reserved.
29
* Redistribution and use of this software in source and binary forms, with
30
* or without modification, are permitted provided that the following
33
* Redistributions of source code must retain the above copyright notice,
34
* this list of conditions and the following disclaimer.
36
* Redistributions in binary form must reproduce the above copyright
37
* notice, this list of conditions and the following disclaimer in the
38
* documentation and/or other materials provided with the distribution.
40
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
41
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
42
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
43
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
44
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
45
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
46
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
47
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
48
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
49
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
50
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. USERS OF
51
* THIS SOFTWARE ACKNOWLEDGE THE POSSIBLE PRESENCE OF OTHER OPEN SOURCE
52
* LICENSED MATERIAL, COPYRIGHTED MATERIAL OR PATENTED MATERIAL IN THIS
53
* SOFTWARE, AND IF ANY SUCH MATERIAL IS DISCOVERED THE PARTY DISCOVERING
54
* IT MAY INFORM DR. RICH WOLSKI AT THE UNIVERSITY OF CALIFORNIA, SANTA
55
* BARBARA WHO WILL THEN ASCERTAIN THE MOST APPROPRIATE REMEDY, WHICH IN
56
* THE REGENTSā DISCRETION MAY INCLUDE, WITHOUT LIMITATION, REPLACEMENT
57
* OF THE CODE SO IDENTIFIED, LICENSING OF THE CODE SO IDENTIFIED, OR
58
* WITHDRAWAL OF THE CODE CAPABILITY TO THE EXTENT NEEDED TO COMPLY WITH
59
* ANY SUCH LICENSES OR RIGHTS.
60
*******************************************************************************/
63
63
* Author: Sunil Soman sunils@cs.ucsb.edu
68
68
import edu.ucsb.eucalyptus.storage.StorageManager;
69
69
import edu.ucsb.eucalyptus.storage.fs.FileIO;
70
70
import edu.ucsb.eucalyptus.util.WalrusDataMessage;
71
import edu.ucsb.eucalyptus.util.WalrusSemaphore;
71
import edu.ucsb.eucalyptus.util.EucaSemaphore;
72
72
import org.apache.log4j.Logger;
74
import java.nio.ByteBuffer;
74
75
import java.util.concurrent.LinkedBlockingQueue;
76
77
public class ObjectReader extends Thread {
78
private static Logger LOG = Logger.getLogger(ObjectReader.class);
80
private String bucketName;
81
private String objectName;
82
private long objectSize;
83
private LinkedBlockingQueue<WalrusDataMessage> getQueue;
84
private StorageManager storageManager;
85
private long byteRangeStart;
86
private long byteRangeEnd;
87
private boolean compressed;
88
private boolean deleteAfterXfer;
89
private WalrusSemaphore semaphore;
91
public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, StorageManager storageManager) {
92
this.bucketName = bucketName;
93
this.objectName = objectName;
94
this.objectSize = objectSize;
95
this.getQueue = getQueue;
96
this.storageManager = storageManager;
100
public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, long byteRangeStart, long byteRangeEnd, StorageManager storageManager) {
101
this.bucketName = bucketName;
102
this.objectName = objectName;
103
this.objectSize = objectSize;
104
this.getQueue = getQueue;
105
this.byteRangeStart = byteRangeStart;
106
this.byteRangeEnd = byteRangeEnd;
107
this.storageManager = storageManager;
110
public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, boolean deleteAfterXfer, WalrusSemaphore semaphore, StorageManager storageManager) {
111
this(bucketName, objectName, objectSize, getQueue, storageManager);
112
this.deleteAfterXfer = deleteAfterXfer;
113
this.semaphore = semaphore;
118
FileIO fileIO = storageManager.prepareForRead(bucketName, objectName);
120
long bytesRemaining = objectSize;
121
long offset = byteRangeStart;
123
if (byteRangeEnd > 0) {
124
assert (byteRangeEnd <= objectSize);
125
assert (byteRangeEnd >= byteRangeStart);
126
bytesRemaining = byteRangeEnd - byteRangeStart;
127
if (byteRangeEnd > objectSize || (byteRangeStart < 0))
132
LOG.warn("putting data");
133
getQueue.put(WalrusDataMessage.StartOfData(bytesRemaining));
135
while (bytesRemaining > 0) {
136
int bytesRead = fileIO.read(offset);
138
LOG.error("Unable to read object: " + bucketName + "/" + objectName);
141
if ((bytesRemaining - bytesRead) > 0)
142
getQueue.put(WalrusDataMessage.DataMessage(fileIO.getBuffer(), bytesRead));
144
getQueue.put(WalrusDataMessage.DataMessage(fileIO.getBuffer(), (int) bytesRemaining));
146
bytesRemaining -= bytesRead;
150
getQueue.put(WalrusDataMessage.EOF());
151
} catch (Exception ex) {
154
if (semaphore != null) {
156
synchronized (semaphore) {
157
semaphore.notifyAll();
160
if (deleteAfterXfer) {
162
storageManager.deleteObject(bucketName, objectName);
163
} catch (Exception ex) {
167
} catch(Exception ex) {
169
getQueue.put(WalrusDataMessage.EOF());
170
} catch(InterruptedException e) {
79
private static Logger LOG = Logger.getLogger(ObjectReader.class);
81
private String bucketName;
82
private String objectName;
83
private long objectSize;
84
private LinkedBlockingQueue<WalrusDataMessage> getQueue;
85
private StorageManager storageManager;
86
private long byteRangeStart;
87
private long byteRangeEnd;
88
private boolean compressed;
89
private boolean deleteAfterXfer;
90
private EucaSemaphore semaphore;
92
public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, StorageManager storageManager) {
93
this.bucketName = bucketName;
94
this.objectName = objectName;
95
this.objectSize = objectSize;
96
this.getQueue = getQueue;
97
this.storageManager = storageManager;
101
public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, long byteRangeStart, long byteRangeEnd, StorageManager storageManager) {
102
this.bucketName = bucketName;
103
this.objectName = objectName;
104
this.objectSize = objectSize;
105
this.getQueue = getQueue;
106
this.byteRangeStart = byteRangeStart;
107
this.byteRangeEnd = byteRangeEnd;
108
this.storageManager = storageManager;
111
public ObjectReader(String bucketName, String objectName, long objectSize, LinkedBlockingQueue<WalrusDataMessage> getQueue, boolean deleteAfterXfer, EucaSemaphore semaphore, StorageManager storageManager) {
112
this(bucketName, objectName, objectSize, getQueue, storageManager);
113
this.deleteAfterXfer = deleteAfterXfer;
114
this.semaphore = semaphore;
119
FileIO fileIO = storageManager.prepareForRead(bucketName, objectName);
121
long bytesRemaining = objectSize;
122
long offset = byteRangeStart;
124
if (byteRangeEnd > 0) {
125
assert (byteRangeEnd <= objectSize);
126
assert (byteRangeEnd >= byteRangeStart);
127
bytesRemaining = byteRangeEnd - byteRangeStart;
128
if (byteRangeEnd > objectSize || (byteRangeStart < 0))
133
LOG.warn("putting data");
134
getQueue.put(WalrusDataMessage.StartOfData(bytesRemaining));
136
while (bytesRemaining > 0) {
137
int bytesRead = fileIO.read(offset);
139
LOG.error("Unable to read object: " + bucketName + "/" + objectName);
142
if ((bytesRemaining - bytesRead) > 0) {
143
ByteBuffer buffer = fileIO.getBuffer();
145
getQueue.put(WalrusDataMessage.DataMessage(buffer, bytesRead));
147
ByteBuffer buffer = fileIO.getBuffer();
149
getQueue.put(WalrusDataMessage.DataMessage(buffer, (int) bytesRemaining));
151
bytesRemaining -= bytesRead;
155
getQueue.put(WalrusDataMessage.EOF());
156
} catch (Exception ex) {
159
if (semaphore != null) {
161
synchronized (semaphore) {
162
semaphore.notifyAll();
165
if (deleteAfterXfer) {
167
storageManager.deleteObject(bucketName, objectName);
168
} catch (Exception ex) {
172
} catch(Exception ex) {
174
getQueue.put(WalrusDataMessage.EOF());
175
} catch(InterruptedException e) {