3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
27
#define IS_POWER_2(_x) (!((_x)&((_x)-1)))
28
#define UINT_WRAP_LTE(_x, _y) (((_y)-(_x)) < INT_MAX) // exploit overflow
29
#define UINT_WRAP_GTE(_x, _y) (((_x)-(_y)) < INT_MAX) // exploit overflow
30
#define UINT_WRAP_LT(_x, _y) (((_x)-(_y)) >= INT_MAX) // exploit overflow
32
// Given a key, finds the index of the alternate which matches
33
// used to get the alternate which is actually present in the document
36
get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
38
int alt_count = cache_vector->count();
42
for (int i = 0; i < alt_count; i++) {
43
obj = cache_vector->get(i);
44
if (obj->compare_object_key(&key)) {
45
// Debug("cache_key", "Resident alternate key %X", key.word(0));
52
// Adds/Deletes alternate to the od->vector (write_vector). If the vector
53
// is empty, deletes the directory entry pointing to the vector. Each
54
// CacheVC must write the vector down to disk after making changes. If we
55
// wait till the last writer, that writer will have the responsibility of
56
// of writing the vector even if the http state machine aborts. This
57
// makes it easier to handle situations where writers abort.
59
CacheVC::updateVector(int event, Event *e)
65
if (od->reading_vec || od->writing_vec)
66
VC_SCHED_LOCK_RETRY();
69
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
70
if (!lock || od->writing_vec)
71
VC_SCHED_LOCK_RETRY();
73
int vec = alternate.valid();
75
// all Update cases. Need to get the alternate index.
76
alternate_index = get_alternate_index(write_vector, update_key);
77
Debug("cache_update", "updating alternate index %d", alternate_index);
78
// if its an alternate delete
80
ink_assert(!total_len);
81
if (alternate_index >= 0) {
82
write_vector->remove(alternate_index, true);
83
alternate_index = CACHE_ALT_REMOVED;
84
if (!write_vector->count())
85
dir_delete(&first_key, part, &od->first_dir);
87
// the alternate is not there any more. somebody might have
88
// deleted it. Just close this writer
89
if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
90
SET_HANDLER(&CacheVC::openWriteCloseDir);
91
return openWriteCloseDir(EVENT_IMMEDIATE, 0);
94
if (update_key == od->single_doc_key && (total_len || !vec))
95
od->move_resident_alt = 0;
97
if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
98
if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
99
od->move_resident_alt = 0;
100
write_vector->remove(0, true);
103
alternate_index = write_vector->insert(&alternate, alternate_index);
105
if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
106
Doc *doc = (Doc *) first_buf->data();
107
int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
108
int have_res_alt = doc->key == od->single_doc_key;
109
// if the new alternate is not written with the vector
110
// then move the old one with the vector
111
// if its a header only update move the resident alternate
113
// We are sure that the body of the resident alternate that we are
114
// rewriting has not changed and the alternate is not being deleted,
115
// since we set od->move_resident_alt to 0 in that case
117
if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
118
// for multiple fragment document, we must have done
119
// CacheVC:openWriteCloseDataDone
120
ink_assert(!fragment || f.data_done);
121
od->move_resident_alt = 0;
122
f.rewrite_resident_alt = 1;
123
write_len = doc->data_len();
124
Debug("cache_update_alt",
125
"rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.word(0), first_key.word(0));
128
header_len = write_vector->marshal_length();
131
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
132
ret = do_write_call();
134
if (ret == EVENT_RETURN)
135
return handleEvent(AIO_EVENT_DONE, 0);
140
The following fields of the CacheVC are used when writing down a fragment.
141
Make sure that each of the fields is set to a valid value before calling
143
- frag_type. Checked to see if a vector needs to be marshalled.
144
- f.use_first_key. To decide if the vector should be marshalled and to set
145
the doc->key to the appropriate key (first_key or earliest_key)
146
- f.evac_vector. If set, the writer is pushed in the beginning of the
147
agg queue. And if !f.evac_vector && !f.update the alternate->object_size
148
is set to vc->total_len
149
- f.readers. If set, assumes that this is an evacuation, so the write
150
is not aborted even if part->agg_todo_size > agg_write_backlog
151
- f.evacuator. If this is an evacuation.
152
- f.rewrite_resident_alt. The resident alternate is rewritten.
153
- f.update. Used only if the write_vector needs to be written to disk.
154
Used to set the length of the alternate to total_len.
155
- write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
156
(f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
157
- alternate_index. Used only if write_vector needs to be written to disk.
158
Used to find out the VC's alternate in the write_vector and set its
160
- write_len. The number of bytes for this fragment.
161
- total_len. The total number of bytes for the document so far.
162
Doc->total_len and alternate's total len is set to this value.
163
- first_key. Doc's first_key is set to this value.
164
- pin_in_cache. Doc's pinned value is set to this + ink_get_hrtime().
165
- earliest_key. If f.use_first_key, Doc's key is set to this value.
166
- key. If !f.use_first_key, Doc's key is set to this value.
167
- blocks. Used only if write_len is set. Data to be written
168
- offset. Used only if write_len is set. offset into the block to copy
170
- buf. Used only if f.evacuator is set. Should point to the old document.
171
The functions sets the length, offset, pinned, head and phase of vc->dir.
175
CacheVC::handleWrite(int event, Event *e)
178
NOWARN_UNUSED(event);
181
ink_assert(!trigger);
182
if (f.use_first_key && fragment) {
183
frag_len = (fragment-1) * sizeof(Frag);
186
set_agg_write_in_progress();
188
agg_len = part->round_to_approx_size(write_len + header_len + frag_len + sizeofDoc);
189
part->agg_todo_size += agg_len;
191
(agg_len > AGG_SIZE || header_len + sizeofDoc > MAX_FRAG_SIZE ||
192
(!f.readers && (part->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
193
#ifdef CACHE_AGG_FAIL_RATE
194
agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
195
(uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
197
bool max_doc_error = (cache_config_max_doc_size &&
198
(cache_config_max_doc_size < vio.ndone ||
199
(vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
201
if (agg_error || max_doc_error) {
202
CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
203
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
204
part->agg_todo_size -= agg_len;
205
io.aio_result = AIO_SOFT_FAILURE;
206
if (event == EVENT_CALL)
208
return handleEvent(AIO_EVENT_DONE, 0);
210
ink_assert(agg_len <= AGG_SIZE);
212
part->agg.push(this);
214
part->agg.enqueue(this);
215
if (!part->is_io_in_progress())
216
return part->aggWrite(event, this);
221
iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
223
IOBufferBlock *b = ab;
224
while (b && len >= 0) {
225
char *start = b->_start;
227
int max_bytes = end - start;
229
if (max_bytes <= 0) {
235
if (bytes >= max_bytes)
237
::memcpy(p, start + offset, bytes);
247
Part::force_evacuate_head(Dir *evac_dir, int pinned)
249
// build an evacuation block for the object
250
EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
251
// if we have already started evacuating this document, its too late
252
// to evacuate the head...bad luck
257
b = new_EvacuationBlock(mutex->thread_holding);
259
DDebug("cache_evac", "force: %d, %d", (int) dir_offset(evac_dir), (int) dir_phase(evac_dir));
260
evacuate[dir_evac_bucket(evac_dir)].push(b);
262
b->f.pinned = pinned;
263
b->f.evacuate_head = 1;
264
b->evac_frags.key.set(0, 0); // ensure that the block gets
265
// evacuated no matter what
266
b->readers = 0; // ensure that the block does not disappear
271
Part::scan_for_pinned_documents()
273
if (cache_config_permit_pinning) {
274
// we can't evacuate anything between header->write_pos and
275
// header->write_pos + AGG_SIZE.
276
int ps = offset_to_part_offset(this, header->write_pos + AGG_SIZE);
277
int pe = offset_to_part_offset(this, header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
278
int part_end_offset = offset_to_part_offset(this, len + skip);
279
int before_end_of_part = pe < part_end_offset;
280
DDebug("cache_evac", "scan %d %d", ps, pe);
281
for (int i = 0; i < part_direntries(this); i++) {
282
// is it a valid pinned object?
283
if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
284
// select objects only within this PIN_SCAN region
285
int o = dir_offset(&dir[i]);
286
if (dir_phase(&dir[i]) == header->phase) {
287
if (before_end_of_part || o >= (pe - part_end_offset))
293
force_evacuate_head(&dir[i], 1);
294
// DDebug("cache_evac", "scan pinned at offset %d %d %d %d %d %d",
295
// (int)dir_offset(&b->dir), ps, o , pe, i, (int)b->f.done);
301
/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
302
DON'T schedule any events on this thread using VC_SCHED_XXX or
303
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
304
eventProcessor.schedule_xxx().
307
Part::aggWriteDone(int event, Event *e)
310
NOWARN_UNUSED(event);
314
// ensure we have the cacheDirSync lock if we intend to call it later
315
// retaking the current mutex recursively is a NOOP
316
CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
318
eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
322
header->last_write_pos = header->write_pos;
323
header->write_pos += io.aiocb.aio_nbytes;
324
ink_assert(header->write_pos >= start);
325
DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
326
hash_id, header->write_pos, header->last_write_pos);
327
ink_assert(header->write_pos == header->agg_pos);
328
if (header->write_pos + EVACUATION_SIZE > scan_pos)
331
header->write_serial++;
333
// delete all the directory entries that we inserted
334
// for fragments is this aggregation buffer
335
Debug("cache_disk_error", "Write error on disk %s\n \
336
write range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
337
hash_id, io.aiocb.aio_offset, io.aiocb.aio_offset + io.aiocb.aio_nbytes,
338
io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
339
(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
342
for (int done = 0; done < agg_buf_pos;) {
343
Doc *doc = (Doc *) (agg_buffer + done);
344
dir_set_offset(&del_dir, header->write_pos + done);
345
dir_delete(&doc->key, this, &del_dir);
346
done += round_to_approx_size(doc->len);
350
set_io_not_in_progress();
351
// callback ready sync CacheVCs
353
while ((c = sync.dequeue())) {
354
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
355
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
357
sync.push(c); // put it back on the front
361
if (dir_sync_waiting) {
362
dir_sync_waiting = 0;
363
cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
365
if (agg.head || sync.head)
366
return aggWrite(event, e);
371
new_DocEvacuator(int nbytes, Part *part)
373
CacheVC *c = new_CacheVC(part);
374
ProxyMutex *mutex = part->mutex;
375
c->base_stat = cache_evacuate_active_stat;
376
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
377
c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
380
c->earliest_key = zero_key;
381
SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
386
CacheVC::evacuateReadHead(int event, Event *e)
389
NOWARN_UNUSED(event);
391
// The evacuator vc shares the lock with the partition mutex
392
ink_debug_assert(part->mutex->thread_holding == this_ethread());
394
Doc *doc = (Doc *) buf->data();
396
CacheHTTPInfo *alternate_tmp = 0;
400
// a directory entry which is nolonger valid may have been overwritten
401
if (!dir_valid(part, &dir)) {
402
last_collision = NULL;
405
if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key))
409
if (doc->ftype == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
410
// its an http document
411
if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen) {
412
Note("bad vector detected during evacuation");
415
alternate_index = get_alternate_index(&vector, earliest_key);
416
if (alternate_index < 0)
418
alternate_tmp = vector.get(alternate_index);
419
doc_len = alternate_tmp->object_size_get();
420
Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %d",
421
first_key.word(0), earliest_key.word(0), doc_len);
427
next_CacheKey(&next_key, &doc->key);
428
if (!(next_key == earliest_key))
430
doc_len = doc->total_len;
432
"evacuateReadHead non-http earliest %X first: %X len: %d", first_key.word(0), earliest_key.word(0), doc_len);
434
if (doc_len == total_len) {
435
// the whole document has been evacuated. Insert the directory
436
// entry in the directory.
437
dir_lookaside_fixup(&earliest_key, part);
438
return free_CacheVC(this);
442
if (dir_probe(&first_key, part, &dir, &last_collision)) {
443
int ret = do_read_call(&first_key);
444
if (ret == EVENT_RETURN)
445
return handleEvent(AIO_EVENT_DONE, 0);
449
dir_lookaside_remove(&earliest_key, part);
450
return free_CacheVC(this);
454
CacheVC::evacuateDocDone(int event, Event *e)
457
NOWARN_UNUSED(event);
459
ink_debug_assert(part->mutex->thread_holding == this_ethread());
460
Doc *doc = (Doc *) buf->data();
461
DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d",
462
(int) key.word(0), (int) dir_offset(&overwrite_dir),
463
(int) dir_phase(&overwrite_dir), (int) dir_offset(&dir), (int) dir_phase(&dir));
464
int i = dir_evac_bucket(&overwrite_dir);
465
// nasty beeping race condition, need to have the EvacuationBlock here
466
EvacuationBlock *b = part->evacuate[i].head;
467
for (; b; b = b->link.next) {
468
if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
470
// If the document is single fragment (although not tied to the vector),
471
// then we don't have to put the directory entry in the lookaside
472
// buffer. But, we have no way of finding out if the document is
473
// single fragment. doc->single_fragment() can be true for a multiple
474
// fragment document since total_len and doc->len could be equal at
475
// the time we write the fragment down. To be on the safe side, we
476
// only overwrite the entry in the directory if its not a head.
477
if (!dir_head(&overwrite_dir)) {
478
// find the earliest key
479
EvacuationKey *evac = &b->evac_frags;
480
for (; evac && !(evac->key == doc->key); evac = evac->link.next);
484
if (evac->earliest_key.fold()) {
485
DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X",
486
evac->key.word(0), evac->earliest_key.word(0));
487
EvacuationBlock *eblock = 0;
489
dir_lookaside_probe(&evac->earliest_key, part, &dir_tmp, &eblock);
491
CacheVC *earliest_evac = eblock->earliest_evacuator;
492
earliest_evac->total_len += doc->data_len();
493
if (earliest_evac->total_len == earliest_evac->doc_len) {
494
dir_lookaside_fixup(&evac->earliest_key, part);
495
free_CacheVC(earliest_evac);
499
dir_overwrite(&doc->key, part, &dir, &overwrite_dir);
501
// if the tag in the overwrite_dir matches the first_key in the
502
// document, then it has to be the vector. We gaurantee that
503
// the first_key and the earliest_key will never collide (see
504
// Cache::open_write). Once we know its the vector, we can
505
// safely overwrite the first_key in the directory.
506
if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
508
"evacuateDocDone evacuate_head %X %X hlen %d offset %d",
509
(int) key.word(0), (int) doc->key.word(0), doc->hlen, (int) dir_offset(&overwrite_dir));
511
if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
513
DDebug("cache_evac", "evacuating vector: %X %d",
514
(int) doc->first_key.word(0), (int) dir_offset(&overwrite_dir));
515
if ((cod = part->open_read(&doc->first_key))) {
517
DDebug("cache_evac", "overwriting the open directory %X %d %d",
518
(int) doc->first_key.word(0), (int) dir_offset(&cod->first_dir), (int) dir_offset(&dir));
519
ink_assert(dir_pinned(&dir));
520
cod->first_dir = dir;
523
if (dir_overwrite(&doc->first_key, part, &dir, &overwrite_dir)) {
524
part->ram_cache->fixup(&doc->first_key, 0, dir_offset(&overwrite_dir), 0, dir_offset(&dir));
527
DDebug("cache_evac", "evacuating earliest: %X %d", (int) doc->key.word(0), (int) dir_offset(&overwrite_dir));
528
ink_debug_assert(dir_compare_tag(&overwrite_dir, &doc->key));
529
ink_assert(b->earliest_evacuator == this);
530
total_len += doc->data_len();
531
first_key = doc->first_key;
533
if (dir_probe(&first_key, part, &dir, &last_collision) > 0) {
534
dir_lookaside_insert(b, part, &earliest_dir);
536
SET_HANDLER(&CacheVC::evacuateReadHead);
537
int ret = do_read_call(&first_key);
538
if (ret == EVENT_RETURN)
539
return handleEvent(AIO_EVENT_DONE, 0);
547
return free_CacheVC(this);
551
evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Part *part)
553
Dir dir, *last_collision = 0;
555
while (dir_probe(key, part, &dir, &last_collision)) {
556
// next fragment cannot be a head...if it is, it must have been a
557
// directory collision.
560
EvacuationBlock *b = evacuation_block_exists(&dir, part);
562
b = new_EvacuationBlock(part->mutex->thread_holding);
564
b->evac_frags.key = *key;
565
b->evac_frags.earliest_key = *earliest_key;
566
part->evacuate[dir_evac_bucket(&dir)].push(b);
569
ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
570
ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
571
EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
572
evac_frag->key = *key;
573
evac_frag->earliest_key = *earliest_key;
574
evac_frag->link.next = b->evac_frags.link.next;
575
b->evac_frags.link.next = evac_frag;
580
"next fragment %X Earliest: %X offset %d phase %d force %d",
581
(int) key->word(0), (int) earliest_key->word(0), (int) dir_offset(&dir), (int) dir_phase(&dir), force);
587
Part::evacuateWrite(CacheVC *evacuator, int event, Event *e)
590
NOWARN_UNUSED(event);
592
// push to front of aggregation write list, so it is written first
594
evacuator->agg_len = round_to_approx_size(((Doc *)evacuator->buf->data())->len);
595
agg_todo_size += evacuator->agg_len;
596
/* insert the evacuator after all the other evacuators */
597
CacheVC *cur = (CacheVC *) agg.head;
598
CacheVC *after = NULL;
599
for (; cur && cur->f.evacuator; cur = (CacheVC *) cur->link.next)
601
ink_assert(evacuator->agg_len <= AGG_SIZE);
602
agg.insert(evacuator, after);
603
return aggWrite(event, e);
607
Part::evacuateDocReadDone(int event, Event *e)
612
if (event != AIO_EVENT_DONE)
614
ink_assert(is_io_in_progress());
615
set_io_not_in_progress();
616
ink_debug_assert(mutex->thread_holding == this_ethread());
617
Doc *doc = (Doc *) doc_evacuator->buf->data();
619
EvacuationBlock *b = NULL;
620
if (doc->magic != DOC_MAGIC) {
621
Debug("cache_evac", "DOC magic: %X %d",
622
(int) dir_tag(&doc_evacuator->overwrite_dir), (int) dir_offset(&doc_evacuator->overwrite_dir));
623
ink_assert(doc->magic == DOC_MAGIC);
626
DDebug("cache_evac", "evacuateDocReadDone %X offset %d",
627
(int) doc->key.word(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
629
b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
631
if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir))
637
if ((b->f.pinned && !b->readers) && doc->pinned < (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND))
640
if (dir_head(&b->dir) && b->f.evacuate_head) {
641
ink_assert(!b->evac_frags.key.fold());
642
// if its a head (vector), evacuation is real simple...we just
643
// need to write this vector down and overwrite the directory entry.
644
if (dir_compare_tag(&b->dir, &doc->first_key)) {
645
doc_evacuator->key = doc->first_key;
646
b->evac_frags.key = doc->first_key;
647
DDebug("cache_evac", "evacuating vector %X offset %d",
648
(int) doc->first_key.word(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
651
// if its an earliest fragment (alternate) evacuation, things get
652
// a little tricky. We have to propagate the earliest key to the next
653
// fragments for this alternate. The last fragment to be evacuated
654
// fixes up the lookaside buffer.
655
doc_evacuator->key = doc->key;
656
doc_evacuator->earliest_key = doc->key;
657
b->evac_frags.key = doc->key;
658
b->evac_frags.earliest_key = doc->key;
659
b->earliest_evacuator = doc_evacuator;
660
DDebug("cache_evac", "evacuating earliest %X %X evac: %X offset: %d",
661
(int) b->evac_frags.key.word(0), (int) doc->key.word(0),
662
doc_evacuator, (int) dir_offset(&doc_evacuator->overwrite_dir));
666
// find which key matches the document
667
EvacuationKey *ek = &b->evac_frags;
668
for (; ek && !(ek->key == doc->key); ek = ek->link.next);
673
doc_evacuator->key = ek->key;
674
doc_evacuator->earliest_key = ek->earliest_key;
675
DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X",
676
(int) ek->key.word(0), (int) ek->earliest_key.word(0));
679
// if the tag in the c->dir does match the first_key in the
680
// document, then it has to be the earliest fragment. We gaurantee that
681
// the first_key and the earliest_key will never collide (see
682
// Cache::open_write).
683
if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
684
next_CacheKey(&next_key, &doc->key);
685
evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
687
return evacuateWrite(doc_evacuator, event, e);
689
free_CacheVC(doc_evacuator);
691
return aggWrite(event, e);
695
Part::evac_range(off_t low, off_t high, int evac_phase)
697
int s = offset_to_part_offset(this, low);
698
int e = offset_to_part_offset(this, high);
699
int si = dir_offset_evac_bucket(s);
700
int ei = dir_offset_evac_bucket(e);
702
for (int i = si; i <= ei; i++) {
703
EvacuationBlock *b = evacuate[i].head;
704
EvacuationBlock *first = 0;
705
int first_offset = INT_MAX;
706
for (; b; b = b->link.next) {
707
int64_t offset = dir_offset(&b->dir);
708
int phase = dir_phase(&b->dir);
709
if (offset >= s && offset < e && !b->f.done && phase == evac_phase)
710
if (offset < first_offset) {
712
first_offset = offset;
717
io.aiocb.aio_fildes = fd;
718
io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
719
io.aiocb.aio_offset = part_offset(this, &first->dir);
720
if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(skip + len))
721
io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
722
doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
723
doc_evacuator->overwrite_dir = first->dir;
725
io.aiocb.aio_buf = doc_evacuator->buf->data();
727
io.thread = AIO_CALLBACK_THREAD_ANY;
728
DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
729
SET_HANDLER(&Part::evacuateDocReadDone);
730
ink_assert(ink_aio_read(&io) >= 0);
739
agg_copy(char *p, CacheVC *vc)
741
Part *part = vc->part;
742
off_t o = part->header->write_pos + part->agg_buf_pos;
744
if (!vc->f.evacuator) {
745
Doc *doc = (Doc *) p;
746
IOBufferBlock *res_alt_blk = 0;
748
uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
749
ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
750
ink_debug_assert(part->round_to_approx_size(len) == vc->agg_len);
751
// update copy of directory entry for this document
752
dir_set_approx_size(&vc->dir, vc->agg_len);
753
dir_set_offset(&vc->dir, offset_to_part_offset(part, o));
754
ink_assert(part_offset(part, &vc->dir) < (part->skip + part->len));
755
dir_set_phase(&vc->dir, part->header->phase);
757
// fill in document header
758
doc->magic = DOC_MAGIC;
760
doc->hlen = vc->header_len;
761
doc->ftype = vc->frag_type;
762
doc->flen = vc->frag_len;
763
doc->total_len = vc->total_len;
764
doc->first_key = vc->first_key;
765
doc->sync_serial = part->header->sync_serial;
766
vc->write_serial = doc->write_serial = part->header->write_serial;
767
doc->checksum = DOC_NO_CHECKSUM;
768
if (vc->pin_in_cache) {
769
dir_set_pinned(&vc->dir, 1);
770
doc->pinned = (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
772
dir_set_pinned(&vc->dir, 0);
776
if (vc->f.use_first_key) {
778
doc->key = vc->earliest_key;
780
// the vector is being written by itself
781
prev_CacheKey(&doc->key, &vc->earliest_key);
783
dir_set_head(&vc->dir, true);
786
dir_set_head(&vc->dir, !vc->fragment);
789
memcpy(doc->frags(), &vc->frag[0], doc->flen);
792
if (vc->f.rewrite_resident_alt) {
793
ink_assert(vc->f.use_first_key);
794
Doc *res_doc = (Doc *) vc->first_buf->data();
795
res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeofDoc + res_doc->hlen);
796
doc->key = res_doc->key;
797
doc->total_len = res_doc->data_len();
800
// update the new_info object_key, and total_len and dirinfo
801
if (vc->header_len) {
802
ink_debug_assert(vc->f.use_first_key);
804
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
805
ink_debug_assert(vc->write_vector->count() > 0);
806
if (!vc->f.update && !vc->f.evac_vector) {
807
ink_debug_assert(!(vc->first_key == zero_key));
808
CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
809
http_info->object_size_set(vc->total_len);
811
// update + data_written => Update case (b)
812
// need to change the old alternate's object length
813
if (vc->f.update && vc->total_len) {
814
CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
815
http_info->object_size_set(vc->total_len);
817
ink_assert(!(((uintptr_t) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
818
ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
821
memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
822
// the single fragment flag is not used in the write call.
823
// putting it in for completeness.
824
vc->f.single_fragment = doc->single_fragment();
829
ProxyMutex RELEASE_UNUSED *mutex = vc->part->mutex;
830
ink_debug_assert(mutex->thread_holding == this_ethread());
831
CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
834
if (vc->f.rewrite_resident_alt)
835
iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
838
iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks, vc->offset);
839
#ifdef VERIFY_JTEST_DATA
840
if (f.use_first_key && header_len) {
843
new_info.request_get().url_get().print(xx, 500, &ib, &xd);
845
for (int q = 0; q < 3; q++)
846
x = strchr(x + 1, '/');
847
ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
852
if (cache_config_enable_checksum) {
854
for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
857
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment)
858
ink_assert(doc->hlen);
865
// for evacuated documents, copy the data, and update directory
866
Doc *doc = (Doc *) vc->buf->data();
867
int l = vc->part->round_to_approx_size(doc->len);
869
ProxyMutex RELEASE_UNUSED *mutex = vc->part->mutex;
870
ink_debug_assert(mutex->thread_holding == this_ethread());
871
CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
872
CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
874
doc->sync_serial = vc->part->header->sync_serial;
875
doc->write_serial = vc->part->header->write_serial;
877
memcpy(p, doc, doc->len);
879
vc->dir = vc->overwrite_dir;
880
dir_set_offset(&vc->dir, offset_to_part_offset(vc->part, o));
881
dir_set_phase(&vc->dir, vc->part->header->phase);
888
Part::evacuate_cleanup_blocks(int i)
890
EvacuationBlock *b = evacuate[i].head;
893
((header->phase != dir_phase(&b->dir) &&
894
header->write_pos > part_offset(this, &b->dir)) ||
895
(header->phase == dir_phase(&b->dir) && header->write_pos <= part_offset(this, &b->dir)))) {
896
EvacuationBlock *x = b;
897
DDebug("cache_evac", "evacuate cleanup free %X offset %d",
898
(int) b->evac_frags.key.word(0), (int) dir_offset(&b->dir));
900
evacuate[i].remove(x);
901
free_EvacuationBlock(x, mutex->thread_holding);
909
Part::evacuate_cleanup()
911
int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
912
int64_t e = dir_offset_evac_bucket(eo);
913
int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
917
if (e > evacuate_size)
921
for (i = s; i < e; i++)
922
evacuate_cleanup_blocks(i);
924
// if we have wrapped, handle the end bit
926
s = evacuate_size + sx - 2;
929
for (i = s; i < evacuate_size; i++)
930
evacuate_cleanup_blocks(i);
935
Part::periodic_scan()
938
scan_for_pinned_documents();
939
if (header->write_pos == start)
941
scan_pos += len / PIN_SCAN_EVERY;
947
header->write_pos = start;
948
header->phase = !header->phase;
951
header->agg_pos = header->write_pos;
952
dir_lookaside_cleanup(this);
953
dir_clean_part(this);
957
/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
958
DON'T schedule any events on this thread using VC_SCHED_XXX or
959
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
960
eventProcessor.schedule_xxx().
961
Also, make sure that any functions called by this also use
962
the eventProcessor to schedule events
965
Part::aggWrite(int event, void *e)
968
NOWARN_UNUSED(event);
969
ink_assert(!is_io_in_progress());
971
Que(CacheVC, link) tocall;
977
// calculate length of aggregated write
978
for (c = (CacheVC *) agg.head; c;) {
979
int writelen = c->agg_len;
980
ink_assert(writelen < AGG_SIZE);
981
if (agg_buf_pos + writelen > AGG_SIZE ||
982
header->write_pos + agg_buf_pos + writelen > (skip + len))
984
DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d",
985
agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.word(0));
986
int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
987
ink_assert(writelen == wrotelen);
988
agg_todo_size -= writelen;
989
agg_buf_pos += writelen;
990
CacheVC *n = (CacheVC *)c->link.next;
992
if (c->f.sync && c->f.use_first_key) {
993
CacheVC *last = sync.tail;
994
while (last && UINT_WRAP_LT(c->write_serial, last->write_serial))
995
last = (CacheVC*)last->link.prev;
996
sync.insert(c, last);
997
} else if (c->f.evacuator)
998
c->handleEvent(AIO_EVENT_DONE, 0);
1004
// if we got nothing...
1006
if (!agg.head && !sync.head) // nothing to get
1008
if (header->write_pos == start) {
1009
// write aggregation too long, bad bad, punt on everything.
1010
Note("write aggregation exceeds part size");
1011
ink_assert(!tocall.head);
1013
while ((c = agg.dequeue())) {
1014
agg_todo_size -= c->agg_len;
1015
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
1027
off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
1028
if (evac_range(header->write_pos, end, !header->phase) < 0)
1030
if (end > skip + len)
1031
if (evac_range(start, start + (end - (skip + len)), header->phase))
1034
// if agg.head, then we are near the end of the disk, so
1035
// write down the aggregation in whatever size it is.
1036
if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting)
1039
// write sync marker
1041
ink_assert(sync.head);
1042
int l = round_to_approx_size(sizeof(Doc));
1044
Doc *d = (Doc*)agg_buffer;
1045
memset(d, 0, sizeof(Doc));
1046
d->magic = DOC_MAGIC;
1048
d->sync_serial = header->sync_serial;
1049
d->write_serial = header->write_serial;
1053
header->agg_pos = header->write_pos + agg_buf_pos;
1055
io.aiocb.aio_fildes = fd;
1056
io.aiocb.aio_offset = header->write_pos;
1057
io.aiocb.aio_buf = agg_buffer;
1058
io.aiocb.aio_nbytes = agg_buf_pos;
1061
Callback on AIO thread so that we can issue a new write ASAP
1062
as all writes are serialized in the partition. This is not necessary
1063
for reads proceed independently.
1065
io.thread = AIO_CALLBACK_THREAD_AIO;
1066
SET_HANDLER(&Part::aggWriteDone);
1070
int ret = EVENT_CONT;
1071
while ((c = tocall.dequeue())) {
1072
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
1075
c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
1081
CacheVC::openWriteCloseDir(int event, Event *e)
1084
NOWARN_UNUSED(event);
1088
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
1090
SET_HANDLER(&CacheVC::openWriteCloseDir);
1091
ink_debug_assert(!is_io_in_progress());
1092
VC_SCHED_LOCK_RETRY();
1094
part->close_write(this);
1095
if (closed < 0 && fragment)
1096
dir_delete(&earliest_key, part, &earliest_dir);
1098
if (is_debug_tag_set("cache_update")) {
1099
if (f.update && closed > 0) {
1100
if (!total_len && alternate_index != CACHE_ALT_REMOVED) {
1101
Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")\n",
1102
DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
1104
} else if (total_len && alternate_index != CACHE_ALT_REMOVED) {
1105
Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")\n",
1106
DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
1107
} else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
1108
Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")\n",
1109
DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
1113
// update the appropriate stat variable
1114
// These variables may not give the current no of documents with
1115
// one, two and three or more fragments. This is because for
1116
// updates we dont decrement the variable corresponding the old
1117
// size of the document
1118
if ((closed == 1) && (total_len > 0)) {
1119
DDebug("cache_stats", "Fragment = %d", fragment);
1121
case 0: CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat); break;
1122
case 1: CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat); break;
1123
default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
1126
if (f.close_complete) {
1128
ink_debug_assert(!part || this_ethread() != part->mutex->thread_holding);
1129
vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *) &vio);
1132
return free_CacheVC(this);
1136
CacheVC::openWriteCloseHeadDone(int event, Event *e)
1139
if (event == AIO_EVENT_DONE)
1140
set_io_not_in_progress();
1141
else if (is_io_in_progress())
1144
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
1146
VC_LOCK_RETRY_EVENT();
1147
od->writing_vec = 0;
1150
ink_assert(f.use_first_key);
1151
if (!od->dont_update_directory) {
1152
if (dir_is_empty(&od->first_dir)) {
1153
dir_insert(&first_key, part, &dir);
1155
// multiple fragment vector write
1156
dir_overwrite(&first_key, part, &dir, &od->first_dir, false);
1157
// insert moved resident alternate
1158
if (od->move_resident_alt) {
1159
if (dir_valid(part, &od->single_doc_dir))
1160
dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
1161
od->move_resident_alt = 0;
1164
od->first_dir = dir;
1165
if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
1166
// fragment is tied to the vector
1167
od->move_resident_alt = 1;
1168
if (!f.rewrite_resident_alt) {
1169
od->single_doc_key = earliest_key;
1171
dir_assign(&od->single_doc_dir, &dir);
1172
dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
1177
return openWriteCloseDir(event, e);
1181
CacheVC::openWriteCloseHead(int event, Event *e)
1186
f.use_first_key = 1;
1188
ink_assert(fragment || (length == (int64_t)total_len));
1190
return openWriteCloseDir(event, e);
1196
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
1197
SET_HANDLER(&CacheVC::updateVector);
1198
return updateVector(EVENT_IMMEDIATE, 0);
1201
header_len = header_to_write_len;
1202
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
1203
return do_write_lock();
1210
CacheVC::openWriteCloseDataDone(int event, Event *e)
1215
if (event == AIO_EVENT_DONE)
1216
set_io_not_in_progress();
1217
else if (is_io_in_progress())
1220
return openWriteCloseDir(event, e);
1222
CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
1224
VC_LOCK_RETRY_EVENT();
1226
ink_assert(key == earliest_key);
1230
frag = &integral_frags[0];
1232
if (fragment-1 >= INTEGRAL_FRAGS && IS_POWER_2((uint32)(fragment-1))) {
1234
frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
1235
memcpy(frag, t, sizeof(Frag) * (fragment-1));
1236
if (t != integral_frags)
1240
frag[fragment-1].offset = write_pos;
1243
write_pos += write_len;
1244
dir_insert(&key, part, &dir);
1245
blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
1246
next_CacheKey(&key, &key);
1249
if (write_len > MAX_FRAG_SIZE)
1250
write_len = MAX_FRAG_SIZE;
1251
if ((ret = do_write_call()) == EVENT_RETURN)
1256
return openWriteCloseHead(event, e); // must be called under part lock from here
1259
return handleEvent(AIO_EVENT_DONE, 0);
1263
CacheVC::openWriteClose(int event, Event *e)
1267
if (is_io_in_progress()) {
1268
if (event != AIO_EVENT_DONE)
1270
set_io_not_in_progress();
1272
return openWriteCloseDir(event, e);
1275
if (total_len == 0) {
1278
return updateVector(event, e);
1280
// If we've been CLOSE'd but nothing has been written then
1281
// this close is transformed into an abort.
1283
return openWriteCloseDir(event, e);
1286
return openWriteCloseDir(event, e);
1289
if (length && (fragment || length > MAX_FRAG_SIZE)) {
1290
SET_HANDLER(&CacheVC::openWriteCloseDataDone);
1292
if (write_len > MAX_FRAG_SIZE)
1293
write_len = MAX_FRAG_SIZE;
1294
return do_write_lock_call();
1296
return openWriteCloseHead(event, e);
1298
return openWriteCloseDir(event, e);
1302
CacheVC::openWriteWriteDone(int event, Event *e)
1307
if (event == AIO_EVENT_DONE)
1308
set_io_not_in_progress();
1310
if (is_io_in_progress())
1312
// In the event of VC_EVENT_ERROR, the cont must do an io_close
1318
SET_HANDLER(&CacheVC::openWriteMain);
1319
return calluser(VC_EVENT_ERROR);
1322
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
1324
VC_LOCK_RETRY_EVENT();
1325
// store the earliest directory. Need to remove the earliest dir
1326
// in case the writer aborts.
1328
ink_assert(key == earliest_key);
1332
frag = &integral_frags[0];
1334
if (fragment-1 >= INTEGRAL_FRAGS && IS_POWER_2((uint32_t)(fragment-1))) {
1336
frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
1337
memcpy(frag, t, sizeof(Frag) * (fragment-1));
1338
if (t != integral_frags)
1342
frag[fragment-1].offset = write_pos;
1345
write_pos += write_len;
1346
dir_insert(&key, part, &dir);
1347
DDebug("cache_insert", "WriteDone: %X, %X, %d", key.word(0), first_key.word(0), write_len);
1348
blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
1349
next_CacheKey(&key, &key);
1353
SET_HANDLER(&CacheVC::openWriteMain);
1354
return openWriteMain(event, e);
1357
static inline int target_fragment_size() {
1358
return cache_config_target_fragment_size - sizeofDoc;
1362
CacheVC::openWriteMain(int event, Event *e)
1365
NOWARN_UNUSED(event);
1367
int called_user = 0;
1368
ink_debug_assert(!is_io_in_progress());
1370
if (!vio.buffer.writer()) {
1371
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
1373
if (!vio.buffer.writer())
1376
if (vio.ntodo() <= 0) {
1378
if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
1380
ink_assert(!f.close_complete || !"close expected after write COMPLETE");
1381
if (vio.ntodo() <= 0)
1384
int64_t ntodo = (int64_t)(vio.ntodo() + length);
1385
int64_t total_avail = vio.buffer.reader()->read_avail();
1386
int64_t avail = total_avail;
1387
int64_t towrite = avail + length;
1388
if (towrite > ntodo) {
1389
avail -= (towrite - ntodo);
1392
if (towrite > MAX_FRAG_SIZE) {
1393
avail -= (towrite - MAX_FRAG_SIZE);
1394
towrite = MAX_FRAG_SIZE;
1396
if (!blocks && towrite) {
1397
blocks = vio.buffer.reader()->block;
1398
offset = vio.buffer.reader()->start_offset;
1401
vio.buffer.reader()->consume(avail);
1405
length = (uint64_t)towrite;
1406
if (length > target_fragment_size() &&
1407
(length < target_fragment_size() + target_fragment_size() / 4))
1408
write_len = target_fragment_size();
1411
bool not_writing = towrite != ntodo && towrite < target_fragment_size();
1415
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
1418
} else if (vio.ntodo() <= 0)
1423
if (towrite == ntodo && f.close_complete) {
1425
SET_HANDLER(&CacheVC::openWriteClose);
1426
return openWriteClose(EVENT_NONE, NULL);
1428
SET_HANDLER(&CacheVC::openWriteWriteDone);
1429
return do_write_lock_call();
1434
CacheVC::openWriteOverwrite(int event, Event *e)
1439
if (event != AIO_EVENT_DONE) {
1440
if (event == EVENT_IMMEDIATE)
1444
set_io_not_in_progress();
1445
if (_action.cancelled)
1446
return openWriteCloseDir(event, e);
1449
doc = (Doc *) buf->data();
1450
if (!(doc->first_key == first_key))
1452
od->first_dir = dir;
1458
CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
1460
VC_LOCK_RETRY_EVENT();
1461
int res = dir_probe(&first_key, part, &dir, &last_collision);
1463
if ((res = do_read_call(&first_key)) == EVENT_RETURN)
1469
SET_HANDLER(&CacheVC::openWriteMain);
1470
return callcont(CACHE_EVENT_OPEN_WRITE);
1472
return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
1476
// openWriteStartDone handles vector read (addition of alternates)
1479
CacheVC::openWriteStartDone(int event, Event *e)
1483
intptr_t err = ECACHE_NO_DOC;
1485
if (is_io_in_progress()) {
1486
if (event != AIO_EVENT_DONE)
1488
set_io_not_in_progress();
1490
if (_action.cancelled && (!od || !od->has_multiple_writers()))
1493
if (event == AIO_EVENT_DONE) { // vector read done
1494
Doc *doc = (Doc *) buf->data();
1496
err = ECACHE_READ_FAIL;
1501
A directory entry which is nolonger valid may have been overwritten.
1502
We need to start afresh from the beginning by setting last_collision
1505
if (!dir_valid(part, &dir)) {
1506
DDebug("cache_write",
1507
"OpenReadStartDone: Dir not valid: Write Head: %d, Dir: %d",
1508
offset_to_part_offset(part, part->header->write_pos), dir_offset(&dir));
1509
last_collision = NULL;
1512
if (!(doc->first_key == first_key))
1514
if (doc->magic != DOC_MAGIC) {
1515
err = ECACHE_BAD_META_DATA;
1519
err = ECACHE_BAD_META_DATA;
1522
ink_assert((((uintptr_t) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK) == 0);
1524
if (write_vector->get_handles(doc->hdr(), doc->hlen, buf) != doc->hlen) {
1525
err = ECACHE_BAD_META_DATA;
1528
ink_debug_assert(write_vector->count() > 0);
1529
od->first_dir = dir;
1531
if (doc->single_fragment()) {
1532
// fragment is tied to the vector
1533
od->move_resident_alt = 1;
1534
od->single_doc_key = doc->key;
1535
dir_assign(&od->single_doc_dir, &dir);
1536
dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
1544
int if_writers = ((uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES);
1545
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
1547
VC_LOCK_RETRY_EVENT();
1549
if ((err = part->open_write(
1550
this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
1552
if (od->has_multiple_writers()) {
1553
MUTEX_RELEASE(lock);
1554
SET_HANDLER(&CacheVC::openWriteMain);
1555
return callcont(CACHE_EVENT_OPEN_WRITE);
1558
// check for collision
1559
if (dir_probe(&first_key, part, &dir, &last_collision)) {
1560
od->reading_vec = 1;
1561
int ret = do_read_call(&first_key);
1562
if (ret == EVENT_RETURN)
1567
// fail update because vector has been GC'd
1572
od->reading_vec = 0;
1573
if (_action.cancelled)
1575
SET_HANDLER(&CacheVC::openWriteMain);
1576
return callcont(CACHE_EVENT_OPEN_WRITE);
1579
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
1580
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
1583
od->reading_vec = 0;
1584
return openWriteCloseDir(event, e);
1586
return free_CacheVC(this);
1588
return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
1592
// handle lock failures from main Cache::open_write entry points below
1594
CacheVC::openWriteStartBegin(int event, Event *e)
1597
NOWARN_UNUSED(event);
1601
if (_action.cancelled)
1602
return free_CacheVC(this);
1603
if (((err = part->open_write_lock(this, false, 1)) > 0)) {
1604
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
1606
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
1610
VC_SCHED_LOCK_RETRY();
1612
SET_HANDLER(&CacheVC::openWriteOverwrite);
1613
return openWriteOverwrite(EVENT_IMMEDIATE, 0);
1616
SET_HANDLER(&CacheVC::openWriteMain);
1617
return callcont(CACHE_EVENT_OPEN_WRITE);
1621
// main entry point for writing of of non-http documents
1623
Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
1624
int options, time_t apin_in_cache, char *hostname, int host_len)
1627
if (!CACHE_READY(frag_type)) {
1628
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
1629
return ACTION_RESULT_DONE;
1632
ink_assert(caches[frag_type] == this);
1635
CacheVC *c = new_CacheVC(cont);
1636
ProxyMutex *mutex = cont->mutex;
1637
MUTEX_LOCK(lock, c->mutex, this_ethread());
1638
c->vio.op = VIO::WRITE;
1639
c->base_stat = cache_write_active_stat;
1640
c->part = key_to_part(key, hostname, host_len);
1641
Part *part = c->part;
1642
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
1643
c->first_key = c->key = *key;
1644
c->frag_type = frag_type;
1646
The transition from single fragment document to a multi-fragment document
1647
would cause a problem if the key and the first_key collide. In case of
1648
a collision, old vector data could be served to HTTP. Need to avoid that.
1649
Also, when evacuating a fragment, we have to decide if its the first_key
1650
or the earliest_key based on the dir_tag.
1653
rand_CacheKey(&c->key, cont->mutex);
1654
} while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
1655
c->earliest_key = c->key;
1659
c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
1660
c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
1661
c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
1662
c->pin_in_cache = (uint32_t) apin_in_cache;
1664
if ((res = c->part->open_write_lock(c, false, 1)) > 0) {
1665
// document currently being written, abort
1666
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
1667
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -res);
1669
return ACTION_RESULT_DONE;
1672
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
1673
c->trigger = CONT_SCHED_LOCK_RETRY(c);
1676
if (!c->f.overwrite) {
1677
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
1678
c->callcont(CACHE_EVENT_OPEN_WRITE);
1679
return ACTION_RESULT_DONE;
1681
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
1682
if (c->openWriteOverwrite(EVENT_IMMEDIATE, 0) == EVENT_DONE)
1683
return ACTION_RESULT_DONE;
1690
// main entry point for writing of http documents
1692
Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
1693
CacheKey *key1, CacheFragType type, char *hostname, int host_len)
1695
NOWARN_UNUSED(key1);
1697
if (!CACHE_READY(type)) {
1698
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
1699
return ACTION_RESULT_DONE;
1702
ink_assert(caches[type] == this);
1704
int if_writers = (uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES;
1705
CacheVC *c = new_CacheVC(cont);
1706
ProxyMutex *mutex = cont->mutex;
1707
c->vio.op = VIO::WRITE;
1708
c->first_key = *key;
1710
The transition from single fragment document to a multi-fragment document
1711
would cause a problem if the key and the first_key collide. In case of
1712
a collision, old vector data could be served to HTTP. Need to avoid that.
1713
Also, when evacuating a fragment, we have to decide if its the first_key
1714
or the earliest_key based on the dir_tag.
1717
rand_CacheKey(&c->key, cont->mutex);
1719
while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
1720
c->earliest_key = c->key;
1721
c->frag_type = CACHE_FRAG_TYPE_HTTP;
1722
c->part = key_to_part(key, hostname, host_len);
1723
Part *part = c->part;
1725
if (c->info && (uintptr_t) info != CACHE_ALLOW_MULTIPLE_WRITES) {
1727
Update has the following code paths :
1728
a) Update alternate header only :
1729
In this case the vector has to be rewritten. The content
1730
length(update_len) and the key for the document are set in the
1731
new_info in the set_http_info call.
1733
open_write with info set
1734
set_http_info new_info
1737
b) Update alternate and data
1738
In this case both the vector and the data needs to be rewritten.
1739
This case is similar to the standard write of a document case except
1740
that the new_info is inserted into the vector at the alternate_index
1741
(overwriting the old alternate) rather than the end of the vector.
1743
open_write with info set
1744
set_http_info new_info
1745
do_io_write => (total_len > 0)
1747
c) Delete an alternate
1748
The vector may need to be deleted (if there was only one alternate) or
1749
rewritten (if there were more than one alternate). The deletion of the
1750
vector is done in openWriteRemoveVector.
1752
open_write with info set
1756
c->base_stat = cache_update_active_stat;
1757
DDebug("cache_update", "Update called");
1758
info->object_key_get(&c->update_key);
1759
ink_debug_assert(!(c->update_key == zero_key));
1760
c->update_len = info->object_size_get();
1762
c->base_stat = cache_write_active_stat;
1763
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
1764
c->pin_in_cache = (uint32_t) apin_in_cache;
1767
CACHE_TRY_LOCK(lock, c->part->mutex, cont->mutex->thread_holding);
1769
if ((err = c->part->open_write(c, if_writers,
1770
cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
1772
// If there are multiple writers, then this one cannot be an update.
1773
// Only the first writer can do an update. If that's the case, we can
1774
// return success to the state machine now.;
1775
if (c->od->has_multiple_writers())
1777
if (!dir_probe(key, c->part, &c->dir, &c->last_collision)) {
1779
// fail update because vector has been GC'd
1780
// This situation can also arise in openWriteStartDone
1781
err = ECACHE_NO_DOC;
1784
// document doesn't exist, begin write
1787
c->od->reading_vec = 1;
1788
// document exists, read vector
1789
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
1790
switch (c->do_read_call(&c->first_key)) {
1791
case EVENT_DONE: return ACTION_RESULT_DONE;
1792
case EVENT_RETURN: goto Lcallreturn;
1793
default: return &c->_action;
1798
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
1799
CONT_SCHED_LOCK_RETRY(c);
1804
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
1805
c->callcont(CACHE_EVENT_OPEN_WRITE);
1806
return ACTION_RESULT_DONE;
1809
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
1810
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
1812
c->openWriteCloseDir(EVENT_IMMEDIATE, 0);
1813
return ACTION_RESULT_DONE;
1816
return ACTION_RESULT_DONE;
1819
if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
1820
return ACTION_RESULT_DONE;