67
const Ipc::StoreMapAnchor &
68
Rock::IoState::readAnchor() const
70
assert(readableAnchor_);
71
return *readableAnchor_;
75
Rock::IoState::writeAnchor()
77
assert(writeableAnchor_);
78
return *writeableAnchor_;
81
/// convenience wrapper returning the map slot we are reading now
82
const Ipc::StoreMapSlice &
83
Rock::IoState::currentReadableSlice() const
85
return dir->map->readableSlice(swap_filen, sidCurrent);
52
89
Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
91
debugs(79, 7, swap_filen << " reads from " << coreOff);
54
93
assert(theFile != NULL);
55
94
assert(coreOff >= 0);
58
// we skip our cell header; it is only read when building the map
59
const int64_t cellOffset = sizeof(DbCellHeader) +
60
static_cast<int64_t>(coreOff);
61
assert(cellOffset <= payloadEnd);
63
// Core specifies buffer length, but we must not exceed stored entry size
64
if (cellOffset + (int64_t)len > payloadEnd)
65
len = payloadEnd - cellOffset;
96
// if we are dealing with the first read or
97
// if the offset went backwords, start searching from the beginning
98
if (sidCurrent < 0 || coreOff < objOffset) {
99
sidCurrent = readAnchor().start;
103
while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
104
objOffset += currentReadableSlice().size;
105
sidCurrent = currentReadableSlice().next;
67
108
assert(read.callback == NULL);
68
109
assert(read.callback_data == NULL);
69
110
read.callback = cb;
70
111
read.callback_data = cbdataReference(data);
72
theFile->read(new ReadRequest(
73
::ReadRequest(buf, diskOffset + cellOffset, len), this));
113
// punt if read offset is too big (because of client bugs or collapsing)
114
if (sidCurrent < 0) {
115
debugs(79, 5, "no " << coreOff << " in " << *e);
116
callReaderBack(buf, 0);
122
static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
123
const uint64_t diskOffset = dir->diskOffset(sidCurrent);
124
theFile->read(new ReadRequest(::ReadRequest(buf,
125
diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
76
// We only buffer data here; we actually write when close() is called.
77
// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
78
// of the slot when the write does not end at the page or sector boundary.
129
Rock::IoState::callReaderBack(const char *buf, int rlen)
131
debugs(79, 5, rlen << " bytes for " << *e);
132
StoreIOState::STRCB *callb = read.callback;
134
read.callback = NULL;
136
if (cbdataReferenceValidDone(read.callback_data, &cbdata))
137
callb(cbdata, buf, rlen, this);
140
/// wraps tryWrite() to handle deep write failures centrally and safely
80
142
Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
82
// TODO: move to create?
84
assert(theBuf.isNull());
85
assert(payloadEnd <= slotSize);
86
theBuf.init(min(payloadEnd, slotSize), slotSize);
87
// start with our header; TODO: consider making it a trailer
89
assert(static_cast<int64_t>(sizeof(header)) <= payloadEnd);
90
header.payloadSize = payloadEnd - sizeof(header);
91
theBuf.append(reinterpret_cast<const char*>(&header), sizeof(header));
93
// Core uses -1 offset as "append". Sigh.
94
assert(coreOff == -1);
95
assert(!theBuf.isNull());
144
bool success = false;
146
tryWrite(buf, size, coreOff);
148
} catch (const std::exception &ex) { // TODO: should we catch ... as well?
149
debugs(79, 2, "db write error: " << ex.what());
151
finishedWriting(DISK_ERROR);
152
// 'this' might be gone beyond this point; fall through to free buf
98
theBuf.append(buf, size);
99
offset_ += size; // so that Core thinks we wrote it
155
// careful: 'this' might be gone here
102
158
(dtor)(const_cast<char*>(buf)); // cast due to a broken API?
105
// write what was buffered during write() calls
107
Rock::IoState::startWriting()
164
* Possibly send data to be written to disk:
165
* We only write data when full slot is accumulated or when close() is called.
166
* We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
167
* the slot when the write does not end at the page or sector boundary.
170
Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
172
debugs(79, 7, swap_filen << " writes " << size << " more");
174
// either this is the first write or append; we do not support write gaps
175
assert(!coreOff || coreOff == -1);
177
// allocate the first slice during the first write
179
assert(sidCurrent < 0);
180
sidCurrent = reserveSlotForWriting(); // throws on failures
181
assert(sidCurrent >= 0);
182
writeAnchor().start = sidCurrent;
185
// buffer incoming data in slot buffer and write overflowing or final slots
186
// quit when no data left or we stopped writing on reentrant error
187
while (size > 0 && theFile != NULL) {
188
assert(sidCurrent >= 0);
189
const size_t processed = writeToBuffer(buf, size);
192
const bool overflow = size > 0;
194
// We do not write a full buffer without overflow because
195
// we would not yet know what to set the nextSlot to.
197
const SlotId sidNext = reserveSlotForWriting(); // throws
198
assert(sidNext >= 0);
199
writeToDisk(sidNext);
200
} else if (Store::Root().transientReaders(*e)) {
201
// write partial buffer for all remote hit readers to see
202
writeBufToDisk(-1, false);
208
/// Buffers incoming data for the current slot.
209
/// \return the number of bytes buffered
211
Rock::IoState::writeToBuffer(char const *buf, size_t size)
213
// do not buffer a cell header for nothing
218
// will fill the header in writeToDisk when the next slot is known
219
theBuf.appended(sizeof(DbCellHeader));
222
size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
223
theBuf.append(buf, forCurrentSlot);
224
offset_ += forCurrentSlot; // so that Core thinks we wrote it
225
return forCurrentSlot;
228
/// write what was buffered during write() calls
229
/// negative sidNext means this is the last write request for this entry
231
Rock::IoState::writeToDisk(const SlotId sidNext)
109
233
assert(theFile != NULL);
110
assert(!theBuf.isNull());
234
assert(theBuf.size >= sizeof(DbCellHeader));
112
236
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
113
237
// to avoid triggering read-page;new_head+old_tail;write-page overheads
239
writeBufToDisk(sidNext, sidNext < 0);
242
sidCurrent = sidNext;
245
/// creates and submits a request to write current slot buffer to disk
246
/// eof is true if and only this is the last slot
248
Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
250
// no slots after the last/eof slot (but partial slots may have a nil next)
251
assert(!eof || sidNext < 0);
253
// finalize db cell header
255
memcpy(header.key, e->key, sizeof(header.key));
256
header.firstSlot = writeAnchor().start;
257
header.nextSlot = sidNext;
258
header.payloadSize = theBuf.size - sizeof(DbCellHeader);
259
header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
260
header.version = writeAnchor().basics.timestamp;
262
// copy finalized db cell header into buffer
263
memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
265
// and now allocate another buffer for the WriteRequest so that
266
// we can support concurrent WriteRequests (and to ease cleaning)
267
// TODO: should we limit the number of outstanding requests?
269
void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
270
memcpy(wBuf, theBuf.mem, theBuf.size);
272
const uint64_t diskOffset = dir->diskOffset(sidCurrent);
115
273
debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
116
theBuf.contentSize());
118
assert(theBuf.contentSize() <= slotSize);
276
WriteRequest *const r = new WriteRequest(
277
::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
278
memFreeBufFunc(wBufCap)), this);
279
r->sidCurrent = sidCurrent;
280
r->sidNext = sidNext;
119
283
// theFile->write may call writeCompleted immediatelly
120
theFile->write(new WriteRequest(::WriteRequest(theBuf.content(),
121
diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this));
287
/// finds and returns a free db slot to fill or throws
289
Rock::IoState::reserveSlotForWriting()
291
Ipc::Mem::PageId pageId;
292
if (dir->useFreeSlot(pageId))
293
return pageId.number-1;
295
// This may happen when the number of available db slots is close to the
296
// number of concurrent requests reading or writing those slots, which may
297
// happen when the db is "small" compared to the request traffic OR when we
298
// are rebuilding and have not loaded "many" entries or empty slots yet.
299
throw TexcHere("ran out of free db slots");
126
303
Rock::IoState::finishedWriting(const int errFlag)
128
305
// we incremented offset_ while accumulating data in write()
306
// we do not reset writeableAnchor_ here because we still keep the lock
307
CollapsedForwarding::Broadcast(*e);
129
308
callBack(errFlag);
133
312
Rock::IoState::close(int how)
135
debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
137
if (how == wroteAll && !theBuf.isNull())
140
callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
314
debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
315
" buf: " << theBuf.size << " callback: " << callback);
318
debugs(79, 3, "I/O already canceled");
320
// We keep writeableAnchor_ after callBack() on I/O errors.
321
assert(!readableAnchor_);
327
assert(theBuf.size > 0); // we never flush last bytes on our own
328
writeToDisk(-1); // flush last, yet unwritten slot to disk
329
return; // writeCompleted() will callBack()
332
assert(writeableAnchor_);
333
dir->writeError(*e); // abort a partially stored entry
334
finishedWriting(DISK_ERROR);
143
343
/// close callback (STIOCB) dialer: breaks dependencies and