~measurement-factory/squid/shared-ssl-sessions

« back to all changes in this revision

Viewing changes to src/fs/rock/RockIoState.cc

  • Committer: Christos Tsantilas
  • Date: 2014-01-08 11:31:19 UTC
  • mfrom: (12732.1.478 trunk)
  • Revision ID: chtsanti@users.sourceforge.net-20140108113119-8yl3ad4upjj687sm
mergeĀ fromĀ trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
3
3
 */
4
4
 
5
5
#include "squid.h"
6
 
#include "MemObject.h"
7
 
#include "Parsing.h"
 
6
#include "base/TextException.h"
 
7
#include "CollapsedForwarding.h"
8
8
#include "DiskIO/DiskIOModule.h"
9
9
#include "DiskIO/DiskIOStrategy.h"
10
10
#include "DiskIO/WriteRequest.h"
 
11
#include "fs/rock/RockIoRequests.h"
11
12
#include "fs/rock/RockIoState.h"
12
 
#include "fs/rock/RockIoRequests.h"
13
13
#include "fs/rock/RockSwapDir.h"
14
14
#include "globals.h"
 
15
#include "Mem.h"
 
16
#include "MemObject.h"
 
17
#include "Parsing.h"
 
18
#include "Transients.h"
15
19
 
16
 
Rock::IoState::IoState(SwapDir *dir,
 
20
Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
17
21
                       StoreEntry *anEntry,
18
22
                       StoreIOState::STFNCB *cbFile,
19
23
                       StoreIOState::STIOCB *cbIo,
20
24
                       void *data):
21
 
        slotSize(0),
22
 
        diskOffset(-1),
23
 
        payloadEnd(-1)
 
25
        readableAnchor_(NULL),
 
26
        writeableAnchor_(NULL),
 
27
        sidCurrent(-1),
 
28
        dir(aDir),
 
29
        slotSize(dir->slotSize),
 
30
        objOffset(0),
 
31
        theBuf(dir->slotSize)
24
32
{
25
33
    e = anEntry;
26
 
    // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller
27
 
    slotSize = dir->maxObjectSize();
 
34
    e->lock("rock I/O");
 
35
    // anchor, swap_filen, and swap_dirn are set by the caller
28
36
    file_callback = cbFile;
29
37
    callback = cbIo;
30
38
    callback_data = cbdataReference(data);
35
43
Rock::IoState::~IoState()
36
44
{
37
45
    --store_open_disk_fd;
 
46
 
 
47
    // The dir map entry may still be open for reading at the point because
 
48
    // the map entry lock is associated with StoreEntry, not IoState.
 
49
    // assert(!readableAnchor_);
 
50
    assert(shutting_down || !writeableAnchor_);
 
51
 
38
52
    if (callback_data)
39
53
        cbdataReferenceDone(callback_data);
40
54
    theFile = NULL;
 
55
 
 
56
    e->unlock("rock I/O");
41
57
}
42
58
 
43
59
void
48
64
    theFile = aFile;
49
65
}
50
66
 
 
67
const Ipc::StoreMapAnchor &
 
68
Rock::IoState::readAnchor() const
 
69
{
 
70
    assert(readableAnchor_);
 
71
    return *readableAnchor_;
 
72
}
 
73
 
 
74
Ipc::StoreMapAnchor &
 
75
Rock::IoState::writeAnchor()
 
76
{
 
77
    assert(writeableAnchor_);
 
78
    return *writeableAnchor_;
 
79
}
 
80
 
 
81
/// convenience wrapper returning the map slot we are reading now
 
82
const Ipc::StoreMapSlice &
 
83
Rock::IoState::currentReadableSlice() const
 
84
{
 
85
    return dir->map->readableSlice(swap_filen, sidCurrent);
 
86
}
 
87
 
51
88
void
52
89
Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
53
90
{
 
91
    debugs(79, 7, swap_filen << " reads from " << coreOff);
 
92
 
54
93
    assert(theFile != NULL);
55
94
    assert(coreOff >= 0);
56
 
    offset_ = coreOff;
57
 
 
58
 
    // we skip our cell header; it is only read when building the map
59
 
    const int64_t cellOffset = sizeof(DbCellHeader) +
60
 
                               static_cast<int64_t>(coreOff);
61
 
    assert(cellOffset <= payloadEnd);
62
 
 
63
 
    // Core specifies buffer length, but we must not exceed stored entry size
64
 
    if (cellOffset + (int64_t)len > payloadEnd)
65
 
        len = payloadEnd - cellOffset;
 
95
 
 
96
    // if we are dealing with the first read or
 
97
    // if the offset went backwords, start searching from the beginning
 
98
    if (sidCurrent < 0 || coreOff < objOffset) {
 
99
        sidCurrent = readAnchor().start;
 
100
        objOffset = 0;
 
101
    }
 
102
 
 
103
    while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
 
104
        objOffset += currentReadableSlice().size;
 
105
        sidCurrent = currentReadableSlice().next;
 
106
    }
66
107
 
67
108
    assert(read.callback == NULL);
68
109
    assert(read.callback_data == NULL);
69
110
    read.callback = cb;
70
111
    read.callback_data = cbdataReference(data);
71
112
 
72
 
    theFile->read(new ReadRequest(
73
 
                      ::ReadRequest(buf, diskOffset + cellOffset, len), this));
 
113
    // punt if read offset is too big (because of client bugs or collapsing)
 
114
    if (sidCurrent < 0) {
 
115
        debugs(79, 5, "no " << coreOff << " in " << *e);
 
116
        callReaderBack(buf, 0);
 
117
        return;
 
118
    }
 
119
 
 
120
    offset_ = coreOff;
 
121
    len = min(len,
 
122
              static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
 
123
    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
 
124
    theFile->read(new ReadRequest(::ReadRequest(buf,
 
125
                                  diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
74
126
}
75
127
 
76
 
// We only buffer data here; we actually write when close() is called.
77
 
// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
78
 
// of the slot when the write does not end at the page or sector boundary.
79
128
void
 
129
Rock::IoState::callReaderBack(const char *buf, int rlen)
 
130
{
 
131
    debugs(79, 5, rlen << " bytes for " << *e);
 
132
    StoreIOState::STRCB *callb = read.callback;
 
133
    assert(callb);
 
134
    read.callback = NULL;
 
135
    void *cbdata;
 
136
    if (cbdataReferenceValidDone(read.callback_data, &cbdata))
 
137
        callb(cbdata, buf, rlen, this);
 
138
}
 
139
 
 
140
/// wraps tryWrite() to handle deep write failures centrally and safely
 
141
bool
80
142
Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
81
143
{
82
 
    // TODO: move to create?
83
 
    if (!coreOff) {
84
 
        assert(theBuf.isNull());
85
 
        assert(payloadEnd <= slotSize);
86
 
        theBuf.init(min(payloadEnd, slotSize), slotSize);
87
 
        // start with our header; TODO: consider making it a trailer
88
 
        DbCellHeader header;
89
 
        assert(static_cast<int64_t>(sizeof(header)) <= payloadEnd);
90
 
        header.payloadSize = payloadEnd - sizeof(header);
91
 
        theBuf.append(reinterpret_cast<const char*>(&header), sizeof(header));
92
 
    } else {
93
 
        // Core uses -1 offset as "append". Sigh.
94
 
        assert(coreOff == -1);
95
 
        assert(!theBuf.isNull());
 
144
    bool success = false;
 
145
    try {
 
146
        tryWrite(buf, size, coreOff);
 
147
        success = true;
 
148
    } catch (const std::exception &ex) { // TODO: should we catch ... as well?
 
149
        debugs(79, 2, "db write error: " << ex.what());
 
150
        dir->writeError(*e);
 
151
        finishedWriting(DISK_ERROR);
 
152
        // 'this' might be gone beyond this point; fall through to free buf
96
153
    }
97
154
 
98
 
    theBuf.append(buf, size);
99
 
    offset_ += size; // so that Core thinks we wrote it
 
155
    // careful: 'this' might be gone here
100
156
 
101
157
    if (dtor)
102
158
        (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
103
 
}
104
 
 
105
 
// write what was buffered during write() calls
106
 
void
107
 
Rock::IoState::startWriting()
 
159
 
 
160
    return success;
 
161
}
 
162
 
 
163
/**
 
164
 * Possibly send data to be written to disk:
 
165
 * We only write data when full slot is accumulated or when close() is called.
 
166
 * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
 
167
 * the slot when the write does not end at the page or sector boundary.
 
168
 */
 
169
void
 
170
Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
 
171
{
 
172
    debugs(79, 7, swap_filen << " writes " << size << " more");
 
173
 
 
174
    // either this is the first write or append; we do not support write gaps
 
175
    assert(!coreOff || coreOff == -1);
 
176
 
 
177
    // allocate the first slice during the first write
 
178
    if (!coreOff) {
 
179
        assert(sidCurrent < 0);
 
180
        sidCurrent = reserveSlotForWriting(); // throws on failures
 
181
        assert(sidCurrent >= 0);
 
182
        writeAnchor().start = sidCurrent;
 
183
    }
 
184
 
 
185
    // buffer incoming data in slot buffer and write overflowing or final slots
 
186
    // quit when no data left or we stopped writing on reentrant error
 
187
    while (size > 0 && theFile != NULL) {
 
188
        assert(sidCurrent >= 0);
 
189
        const size_t processed = writeToBuffer(buf, size);
 
190
        buf += processed;
 
191
        size -= processed;
 
192
        const bool overflow = size > 0;
 
193
 
 
194
        // We do not write a full buffer without overflow because
 
195
        // we would not yet know what to set the nextSlot to.
 
196
        if (overflow) {
 
197
            const SlotId sidNext = reserveSlotForWriting(); // throws
 
198
            assert(sidNext >= 0);
 
199
            writeToDisk(sidNext);
 
200
        } else if (Store::Root().transientReaders(*e)) {
 
201
            // write partial buffer for all remote hit readers to see
 
202
            writeBufToDisk(-1, false);
 
203
        }
 
204
    }
 
205
 
 
206
}
 
207
 
 
208
/// Buffers incoming data for the current slot.
 
209
/// \return the number of bytes buffered
 
210
size_t
 
211
Rock::IoState::writeToBuffer(char const *buf, size_t size)
 
212
{
 
213
    // do not buffer a cell header for nothing
 
214
    if (!size)
 
215
        return 0;
 
216
 
 
217
    if (!theBuf.size) {
 
218
        // will fill the header in writeToDisk when the next slot is known
 
219
        theBuf.appended(sizeof(DbCellHeader));
 
220
    }
 
221
 
 
222
    size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
 
223
    theBuf.append(buf, forCurrentSlot);
 
224
    offset_ += forCurrentSlot; // so that Core thinks we wrote it
 
225
    return forCurrentSlot;
 
226
}
 
227
 
 
228
/// write what was buffered during write() calls
 
229
/// negative sidNext means this is the last write request for this entry
 
230
void
 
231
Rock::IoState::writeToDisk(const SlotId sidNext)
108
232
{
109
233
    assert(theFile != NULL);
110
 
    assert(!theBuf.isNull());
 
234
    assert(theBuf.size >= sizeof(DbCellHeader));
111
235
 
112
236
    // TODO: if DiskIO module is mmap-based, we should be writing whole pages
113
237
    // to avoid triggering read-page;new_head+old_tail;write-page overheads
114
238
 
 
239
    writeBufToDisk(sidNext, sidNext < 0);
 
240
    theBuf.clear();
 
241
 
 
242
    sidCurrent = sidNext;
 
243
}
 
244
 
 
245
/// creates and submits a request to write current slot buffer to disk
 
246
/// eof is true if and only this is the last slot
 
247
void
 
248
Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
 
249
{
 
250
    // no slots after the last/eof slot (but partial slots may have a nil next)
 
251
    assert(!eof || sidNext < 0);
 
252
 
 
253
    // finalize db cell header
 
254
    DbCellHeader header;
 
255
    memcpy(header.key, e->key, sizeof(header.key));
 
256
    header.firstSlot = writeAnchor().start;
 
257
    header.nextSlot = sidNext;
 
258
    header.payloadSize = theBuf.size - sizeof(DbCellHeader);
 
259
    header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
 
260
    header.version = writeAnchor().basics.timestamp;
 
261
 
 
262
    // copy finalized db cell header into buffer
 
263
    memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
 
264
 
 
265
    // and now allocate another buffer for the WriteRequest so that
 
266
    // we can support concurrent WriteRequests (and to ease cleaning)
 
267
    // TODO: should we limit the number of outstanding requests?
 
268
    size_t wBufCap = 0;
 
269
    void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
 
270
    memcpy(wBuf, theBuf.mem, theBuf.size);
 
271
 
 
272
    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
115
273
    debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
116
 
           theBuf.contentSize());
117
 
 
118
 
    assert(theBuf.contentSize() <= slotSize);
 
274
           theBuf.size);
 
275
 
 
276
    WriteRequest *const r = new WriteRequest(
 
277
        ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
 
278
                       memFreeBufFunc(wBufCap)), this);
 
279
    r->sidCurrent = sidCurrent;
 
280
    r->sidNext = sidNext;
 
281
    r->eof = eof;
 
282
 
119
283
    // theFile->write may call writeCompleted immediatelly
120
 
    theFile->write(new WriteRequest(::WriteRequest(theBuf.content(),
121
 
                                    diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this));
122
 
}
123
 
 
124
 
//
 
284
    theFile->write(r);
 
285
}
 
286
 
 
287
/// finds and returns a free db slot to fill or throws
 
288
Rock::SlotId
 
289
Rock::IoState::reserveSlotForWriting()
 
290
{
 
291
    Ipc::Mem::PageId pageId;
 
292
    if (dir->useFreeSlot(pageId))
 
293
        return pageId.number-1;
 
294
 
 
295
    // This may happen when the number of available db slots is close to the
 
296
    // number of concurrent requests reading or writing those slots, which may
 
297
    // happen when the db is "small" compared to the request traffic OR when we
 
298
    // are rebuilding and have not loaded "many" entries or empty slots yet.
 
299
    throw TexcHere("ran out of free db slots");
 
300
}
 
301
 
125
302
void
126
303
Rock::IoState::finishedWriting(const int errFlag)
127
304
{
128
305
    // we incremented offset_ while accumulating data in write()
 
306
    // we do not reset writeableAnchor_ here because we still keep the lock
 
307
    CollapsedForwarding::Broadcast(*e);
129
308
    callBack(errFlag);
130
309
}
131
310
 
132
311
void
133
312
Rock::IoState::close(int how)
134
313
{
135
 
    debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
136
 
           " how=" << how);
137
 
    if (how == wroteAll && !theBuf.isNull())
138
 
        startWriting();
139
 
    else
140
 
        callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
 
314
    debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
 
315
           " buf: " << theBuf.size << " callback: " << callback);
 
316
 
 
317
    if (!theFile) {
 
318
        debugs(79, 3, "I/O already canceled");
 
319
        assert(!callback);
 
320
        // We keep writeableAnchor_ after callBack() on I/O errors.
 
321
        assert(!readableAnchor_);
 
322
        return;
 
323
    }
 
324
 
 
325
    switch (how) {
 
326
    case wroteAll:
 
327
        assert(theBuf.size > 0); // we never flush last bytes on our own
 
328
        writeToDisk(-1); // flush last, yet unwritten slot to disk
 
329
        return; // writeCompleted() will callBack()
 
330
 
 
331
    case writerGone:
 
332
        assert(writeableAnchor_);
 
333
        dir->writeError(*e); // abort a partially stored entry
 
334
        finishedWriting(DISK_ERROR);
 
335
        return;
 
336
 
 
337
    case readerDone:
 
338
        callBack(0);
 
339
        return;
 
340
    }
141
341
}
142
342
 
143
343
/// close callback (STIOCB) dialer: breaks dependencies and