~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/cache/CacheWrite.cc

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
 
 
25
#include "P_Cache.h"
 
26
 
 
27
#define IS_POWER_2(_x) (!((_x)&((_x)-1)))
 
28
#define UINT_WRAP_LTE(_x, _y) (((_y)-(_x)) < INT_MAX) // exploit overflow
 
29
#define UINT_WRAP_GTE(_x, _y) (((_x)-(_y)) < INT_MAX) // exploit overflow
 
30
#define UINT_WRAP_LT(_x, _y) (((_x)-(_y)) >= INT_MAX) // exploit overflow
 
31
 
 
32
// Given a key, finds the index of the alternate which matches
 
33
// used to get the alternate which is actually present in the document
 
34
#ifdef HTTP_CACHE
 
35
int
 
36
get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
 
37
{
 
38
  int alt_count = cache_vector->count();
 
39
  CacheHTTPInfo *obj;
 
40
  if (!alt_count)
 
41
    return -1;
 
42
  for (int i = 0; i < alt_count; i++) {
 
43
    obj = cache_vector->get(i);
 
44
    if (obj->compare_object_key(&key)) {
 
45
      // Debug("cache_key", "Resident alternate key  %X", key.word(0));
 
46
      return i;
 
47
    }
 
48
  }
 
49
  return -1;
 
50
}
 
51
 
 
52
// Adds/Deletes alternate to the od->vector (write_vector). If the vector
 
53
// is empty, deletes the directory entry pointing to the vector. Each
 
54
// CacheVC must write the vector down to disk after making changes. If we
 
55
// wait till the last writer, that writer will have the responsibility of
 
56
// of writing the vector even if the http state machine aborts.  This
 
57
// makes it easier to handle situations where writers abort.
 
58
int
 
59
CacheVC::updateVector(int event, Event *e)
 
60
{
 
61
  NOWARN_UNUSED(e);
 
62
  NOWARN_UNUSED(event);
 
63
 
 
64
  cancel_trigger();
 
65
  if (od->reading_vec || od->writing_vec)
 
66
    VC_SCHED_LOCK_RETRY();
 
67
  int ret = 0;
 
68
  {
 
69
    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
 
70
    if (!lock || od->writing_vec)
 
71
      VC_SCHED_LOCK_RETRY();
 
72
 
 
73
    int vec = alternate.valid();
 
74
    if (f.update) {
 
75
      // all Update cases. Need to get the alternate index.
 
76
      alternate_index = get_alternate_index(write_vector, update_key);
 
77
      Debug("cache_update", "updating alternate index %d", alternate_index);
 
78
      // if its an alternate delete
 
79
      if (!vec) {
 
80
        ink_assert(!total_len);
 
81
        if (alternate_index >= 0) {
 
82
          write_vector->remove(alternate_index, true);
 
83
          alternate_index = CACHE_ALT_REMOVED;
 
84
          if (!write_vector->count())
 
85
            dir_delete(&first_key, part, &od->first_dir);
 
86
        }
 
87
        // the alternate is not there any more. somebody might have
 
88
        // deleted it. Just close this writer
 
89
        if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
 
90
          SET_HANDLER(&CacheVC::openWriteCloseDir);
 
91
          return openWriteCloseDir(EVENT_IMMEDIATE, 0);
 
92
        }
 
93
      }
 
94
      if (update_key == od->single_doc_key && (total_len || !vec))
 
95
        od->move_resident_alt = 0;
 
96
    }
 
97
    if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
 
98
      if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
 
99
        od->move_resident_alt = 0;
 
100
      write_vector->remove(0, true);
 
101
    }
 
102
    if (vec)
 
103
      alternate_index = write_vector->insert(&alternate, alternate_index);
 
104
 
 
105
    if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
 
106
      Doc *doc = (Doc *) first_buf->data();
 
107
      int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
 
108
      int have_res_alt = doc->key == od->single_doc_key;
 
109
      // if the new alternate is not written with the vector
 
110
      // then move the old one with the vector
 
111
      // if its a header only update move the resident alternate
 
112
      // with the vector.
 
113
      // We are sure that the body of the resident alternate that we are
 
114
      // rewriting has not changed and the alternate is not being deleted,
 
115
      // since we set od->move_resident_alt  to 0 in that case
 
116
      // (in updateVector)
 
117
      if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
 
118
        // for multiple fragment document, we must have done
 
119
        // CacheVC:openWriteCloseDataDone
 
120
        ink_assert(!fragment || f.data_done);
 
121
        od->move_resident_alt = 0;
 
122
        f.rewrite_resident_alt = 1;
 
123
        write_len = doc->data_len();
 
124
        Debug("cache_update_alt",
 
125
              "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.word(0), first_key.word(0));
 
126
      }
 
127
    }
 
128
    header_len = write_vector->marshal_length();
 
129
    od->writing_vec = 1;
 
130
    f.use_first_key = 1;
 
131
    SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
 
132
    ret = do_write_call();
 
133
  }
 
134
  if (ret == EVENT_RETURN)
 
135
    return handleEvent(AIO_EVENT_DONE, 0);
 
136
  return ret;
 
137
}
 
138
#endif
 
139
/*
 
140
   The following fields of the CacheVC are used when writing down a fragment.
 
141
   Make sure that each of the fields is set to a valid value before calling
 
142
   this function
 
143
   - frag_type. Checked to see if a vector needs to be marshalled.
 
144
   - f.use_first_key. To decide if the vector should be marshalled and to set
 
145
     the doc->key to the appropriate key (first_key or earliest_key)
 
146
   - f.evac_vector. If set, the writer is pushed in the beginning of the
 
147
     agg queue. And if !f.evac_vector && !f.update the alternate->object_size
 
148
     is set to vc->total_len
 
149
   - f.readers.  If set, assumes that this is an evacuation, so the write
 
150
     is not aborted even if part->agg_todo_size > agg_write_backlog
 
151
   - f.evacuator. If this is an evacuation.
 
152
   - f.rewrite_resident_alt. The resident alternate is rewritten.
 
153
   - f.update. Used only if the write_vector needs to be written to disk.
 
154
     Used to set the length of the alternate to total_len.
 
155
   - write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
 
156
     (f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
 
157
   - alternate_index. Used only if write_vector needs to be written to disk.
 
158
     Used to find out the VC's alternate in the write_vector and set its
 
159
     length to tatal_len.
 
160
   - write_len. The number of bytes for this fragment.
 
161
   - total_len. The total number of bytes for the document so far.
 
162
     Doc->total_len and alternate's total len is set to this value.
 
163
   - first_key. Doc's first_key is set to this value.
 
164
   - pin_in_cache. Doc's pinned value is set to this + ink_get_hrtime().
 
165
   - earliest_key. If f.use_first_key, Doc's key is set to this value.
 
166
   - key. If !f.use_first_key, Doc's key is set to this value.
 
167
   - blocks. Used only if write_len is set. Data to be written
 
168
   - offset. Used only if write_len is set. offset into the block to copy
 
169
     the data from.
 
170
   - buf. Used only if f.evacuator is set. Should point to the old document.
 
171
   The functions sets the length, offset, pinned, head and phase of vc->dir.
 
172
   */
 
173
 
 
174
int
 
175
CacheVC::handleWrite(int event, Event *e)
 
176
{
 
177
  NOWARN_UNUSED(e);
 
178
  NOWARN_UNUSED(event);
 
179
 
 
180
  // plain write case
 
181
  ink_assert(!trigger);
 
182
  if (f.use_first_key && fragment) {
 
183
    frag_len = (fragment-1) * sizeof(Frag);
 
184
  } else
 
185
    frag_len = 0;
 
186
  set_agg_write_in_progress();
 
187
  POP_HANDLER;
 
188
  agg_len = part->round_to_approx_size(write_len + header_len + frag_len + sizeofDoc);
 
189
  part->agg_todo_size += agg_len;
 
190
  bool agg_error =
 
191
    (agg_len > AGG_SIZE || header_len + sizeofDoc > MAX_FRAG_SIZE ||
 
192
     (!f.readers && (part->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
 
193
#ifdef CACHE_AGG_FAIL_RATE
 
194
  agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
 
195
                            (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
 
196
#endif
 
197
  bool max_doc_error = (cache_config_max_doc_size &&
 
198
                        (cache_config_max_doc_size < vio.ndone ||
 
199
                         (vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
 
200
 
 
201
  if (agg_error || max_doc_error) {
 
202
    CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
 
203
    CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
 
204
    part->agg_todo_size -= agg_len;
 
205
    io.aio_result = AIO_SOFT_FAILURE;
 
206
    if (event == EVENT_CALL)
 
207
      return EVENT_RETURN;
 
208
    return handleEvent(AIO_EVENT_DONE, 0);
 
209
  }
 
210
  ink_assert(agg_len <= AGG_SIZE);
 
211
  if (f.evac_vector)
 
212
    part->agg.push(this);
 
213
  else
 
214
    part->agg.enqueue(this);
 
215
  if (!part->is_io_in_progress())
 
216
    return part->aggWrite(event, this);
 
217
  return EVENT_CONT;
 
218
}
 
219
 
 
220
static char *
 
221
iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
 
222
{
 
223
  IOBufferBlock *b = ab;
 
224
  while (b && len >= 0) {
 
225
    char *start = b->_start;
 
226
    char *end = b->_end;
 
227
    int max_bytes = end - start;
 
228
    max_bytes -= offset;
 
229
    if (max_bytes <= 0) {
 
230
      offset = -max_bytes;
 
231
      b = b->next;
 
232
      continue;
 
233
    }
 
234
    int bytes = len;
 
235
    if (bytes >= max_bytes)
 
236
      bytes = max_bytes;
 
237
    ::memcpy(p, start + offset, bytes);
 
238
    p += bytes;
 
239
    len -= bytes;
 
240
    b = b->next;
 
241
    offset = 0;
 
242
  }
 
243
  return p;
 
244
}
 
245
 
 
246
EvacuationBlock *
 
247
Part::force_evacuate_head(Dir *evac_dir, int pinned)
 
248
{
 
249
  // build an evacuation block for the object
 
250
  EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
 
251
  // if we have already started evacuating this document, its too late
 
252
  // to evacuate the head...bad luck
 
253
  if (b && b->f.done)
 
254
    return b;
 
255
 
 
256
  if (!b) {
 
257
    b = new_EvacuationBlock(mutex->thread_holding);
 
258
    b->dir = *evac_dir;
 
259
    DDebug("cache_evac", "force: %d, %d", (int) dir_offset(evac_dir), (int) dir_phase(evac_dir));
 
260
    evacuate[dir_evac_bucket(evac_dir)].push(b);
 
261
  }
 
262
  b->f.pinned = pinned;
 
263
  b->f.evacuate_head = 1;
 
264
  b->evac_frags.key.set(0, 0);  // ensure that the block gets
 
265
  // evacuated no matter what
 
266
  b->readers = 0;             // ensure that the block does not disappear
 
267
  return b;
 
268
}
 
269
 
 
270
void
 
271
Part::scan_for_pinned_documents()
 
272
{
 
273
  if (cache_config_permit_pinning) {
 
274
    // we can't evacuate anything between header->write_pos and
 
275
    // header->write_pos + AGG_SIZE.
 
276
    int ps = offset_to_part_offset(this, header->write_pos + AGG_SIZE);
 
277
    int pe = offset_to_part_offset(this, header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
 
278
    int part_end_offset = offset_to_part_offset(this, len + skip);
 
279
    int before_end_of_part = pe < part_end_offset;
 
280
    DDebug("cache_evac", "scan %d %d", ps, pe);
 
281
    for (int i = 0; i < part_direntries(this); i++) {
 
282
      // is it a valid pinned object?
 
283
      if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
 
284
        // select objects only within this PIN_SCAN region
 
285
        int o = dir_offset(&dir[i]);
 
286
        if (dir_phase(&dir[i]) == header->phase) {
 
287
          if (before_end_of_part || o >= (pe - part_end_offset))
 
288
            continue;
 
289
        } else {
 
290
          if (o<ps || o>= pe)
 
291
            continue;
 
292
        }
 
293
        force_evacuate_head(&dir[i], 1);
 
294
        //      DDebug("cache_evac", "scan pinned at offset %d %d %d %d %d %d",
 
295
        //            (int)dir_offset(&b->dir), ps, o , pe, i, (int)b->f.done);
 
296
      }
 
297
    }
 
298
  }
 
299
}
 
300
 
 
301
/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
 
302
   DON'T schedule any events on this thread using VC_SCHED_XXX or
 
303
   mutex->thread_holding->schedule_xxx_local(). ALWAYS use
 
304
   eventProcessor.schedule_xxx().
 
305
   */
 
306
int
 
307
Part::aggWriteDone(int event, Event *e)
 
308
{
 
309
  NOWARN_UNUSED(e);
 
310
  NOWARN_UNUSED(event);
 
311
 
 
312
  cancel_trigger();
 
313
 
 
314
  // ensure we have the cacheDirSync lock if we intend to call it later
 
315
  // retaking the current mutex recursively is a NOOP
 
316
  CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
 
317
  if (!lock) {
 
318
    eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
 
319
    return EVENT_CONT;
 
320
  }
 
321
  if (io.ok()) {
 
322
    header->last_write_pos = header->write_pos;
 
323
    header->write_pos += io.aiocb.aio_nbytes;
 
324
    ink_assert(header->write_pos >= start);
 
325
    DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
 
326
          hash_id, header->write_pos, header->last_write_pos);
 
327
    ink_assert(header->write_pos == header->agg_pos);
 
328
    if (header->write_pos + EVACUATION_SIZE > scan_pos)
 
329
      periodic_scan();
 
330
    agg_buf_pos = 0;
 
331
    header->write_serial++;
 
332
  } else {
 
333
    // delete all the directory entries that we inserted
 
334
    // for fragments is this aggregation buffer
 
335
    Debug("cache_disk_error", "Write error on disk %s\n \
 
336
              write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
 
337
          hash_id, io.aiocb.aio_offset, io.aiocb.aio_offset + io.aiocb.aio_nbytes,
 
338
          io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
 
339
          (io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
 
340
    Dir del_dir;
 
341
    dir_clear(&del_dir);
 
342
    for (int done = 0; done < agg_buf_pos;) {
 
343
      Doc *doc = (Doc *) (agg_buffer + done);
 
344
      dir_set_offset(&del_dir, header->write_pos + done);
 
345
      dir_delete(&doc->key, this, &del_dir);
 
346
      done += round_to_approx_size(doc->len);
 
347
    }
 
348
    agg_buf_pos = 0;
 
349
  }
 
350
  set_io_not_in_progress();
 
351
  // callback ready sync CacheVCs
 
352
  CacheVC *c = 0;
 
353
  while ((c = sync.dequeue())) {
 
354
    if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
 
355
      c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
 
356
    else {
 
357
      sync.push(c); // put it back on the front
 
358
      break;
 
359
    }
 
360
  }
 
361
  if (dir_sync_waiting) {
 
362
    dir_sync_waiting = 0;
 
363
    cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
 
364
  }
 
365
  if (agg.head || sync.head)
 
366
    return aggWrite(event, e);
 
367
  return EVENT_CONT;
 
368
}
 
369
 
 
370
CacheVC *
 
371
new_DocEvacuator(int nbytes, Part *part)
 
372
{
 
373
  CacheVC *c = new_CacheVC(part);
 
374
  ProxyMutex *mutex = part->mutex;
 
375
  c->base_stat = cache_evacuate_active_stat;
 
376
  CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
 
377
  c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
 
378
  c->part = part;
 
379
  c->f.evacuator = 1;
 
380
  c->earliest_key = zero_key;
 
381
  SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
 
382
  return c;
 
383
}
 
384
 
 
385
int
 
386
CacheVC::evacuateReadHead(int event, Event *e)
 
387
{
 
388
  NOWARN_UNUSED(e);
 
389
  NOWARN_UNUSED(event);
 
390
 
 
391
  // The evacuator vc shares the lock with the partition mutex
 
392
  ink_debug_assert(part->mutex->thread_holding == this_ethread());
 
393
  cancel_trigger();
 
394
  Doc *doc = (Doc *) buf->data();
 
395
#ifdef HTTP_CACHE
 
396
  CacheHTTPInfo *alternate_tmp = 0;
 
397
#endif
 
398
  if (!io.ok())
 
399
    goto Ldone;
 
400
  // a directory entry which is nolonger valid may have been overwritten
 
401
  if (!dir_valid(part, &dir)) {
 
402
    last_collision = NULL;
 
403
    goto Lcollision;
 
404
  }
 
405
  if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key))
 
406
    goto Lcollision;
 
407
#ifdef HTTP_CACHE
 
408
  alternate_tmp = 0;
 
409
  if (doc->ftype == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
 
410
    // its an http document
 
411
    if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen) {
 
412
      Note("bad vector detected during evacuation");
 
413
      goto Ldone;
 
414
    }
 
415
    alternate_index = get_alternate_index(&vector, earliest_key);
 
416
    if (alternate_index < 0)
 
417
      goto Ldone;
 
418
    alternate_tmp = vector.get(alternate_index);
 
419
    doc_len = alternate_tmp->object_size_get();
 
420
    Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %d",
 
421
          first_key.word(0), earliest_key.word(0), doc_len);
 
422
  } else
 
423
#endif
 
424
  {
 
425
    // non-http document
 
426
    CacheKey next_key;
 
427
    next_CacheKey(&next_key, &doc->key);
 
428
    if (!(next_key == earliest_key))
 
429
      goto Ldone;
 
430
    doc_len = doc->total_len;
 
431
    DDebug("cache_evac",
 
432
          "evacuateReadHead non-http earliest %X first: %X len: %d", first_key.word(0), earliest_key.word(0), doc_len);
 
433
  }
 
434
  if (doc_len == total_len) {
 
435
    // the whole document has been evacuated. Insert the directory
 
436
    // entry in the directory.
 
437
    dir_lookaside_fixup(&earliest_key, part);
 
438
    return free_CacheVC(this);
 
439
  }
 
440
  return EVENT_CONT;
 
441
Lcollision:
 
442
  if (dir_probe(&first_key, part, &dir, &last_collision)) {
 
443
    int ret = do_read_call(&first_key);
 
444
    if (ret == EVENT_RETURN)
 
445
      return handleEvent(AIO_EVENT_DONE, 0);
 
446
    return ret;
 
447
  }
 
448
Ldone:
 
449
  dir_lookaside_remove(&earliest_key, part);
 
450
  return free_CacheVC(this);
 
451
}
 
452
 
 
453
int
 
454
CacheVC::evacuateDocDone(int event, Event *e)
 
455
{
 
456
  NOWARN_UNUSED(e);
 
457
  NOWARN_UNUSED(event);
 
458
 
 
459
  ink_debug_assert(part->mutex->thread_holding == this_ethread());
 
460
  Doc *doc = (Doc *) buf->data();
 
461
  DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d",
 
462
        (int) key.word(0), (int) dir_offset(&overwrite_dir),
 
463
        (int) dir_phase(&overwrite_dir), (int) dir_offset(&dir), (int) dir_phase(&dir));
 
464
  int i = dir_evac_bucket(&overwrite_dir);
 
465
  // nasty beeping race condition, need to have the EvacuationBlock here
 
466
  EvacuationBlock *b = part->evacuate[i].head;
 
467
  for (; b; b = b->link.next) {
 
468
    if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
 
469
 
 
470
      // If the document is single fragment (although not tied to the vector),
 
471
      // then we don't have to put the directory entry in the lookaside
 
472
      // buffer. But, we have no way of finding out if the document is
 
473
      // single fragment. doc->single_fragment() can be true for a multiple
 
474
      // fragment document since total_len and doc->len could be equal at
 
475
      // the time we write the fragment down. To be on the safe side, we
 
476
      // only overwrite the entry in the directory if its not a head.
 
477
      if (!dir_head(&overwrite_dir)) {
 
478
        // find the earliest key
 
479
        EvacuationKey *evac = &b->evac_frags;
 
480
        for (; evac && !(evac->key == doc->key); evac = evac->link.next);
 
481
        ink_assert(evac);
 
482
        if (!evac)
 
483
          break;
 
484
        if (evac->earliest_key.fold()) {
 
485
          DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X",
 
486
                evac->key.word(0), evac->earliest_key.word(0));
 
487
          EvacuationBlock *eblock = 0;
 
488
          Dir dir_tmp;
 
489
          dir_lookaside_probe(&evac->earliest_key, part, &dir_tmp, &eblock);
 
490
          if (eblock) {
 
491
            CacheVC *earliest_evac = eblock->earliest_evacuator;
 
492
            earliest_evac->total_len += doc->data_len();
 
493
            if (earliest_evac->total_len == earliest_evac->doc_len) {
 
494
              dir_lookaside_fixup(&evac->earliest_key, part);
 
495
              free_CacheVC(earliest_evac);
 
496
            }
 
497
          }
 
498
        }
 
499
        dir_overwrite(&doc->key, part, &dir, &overwrite_dir);
 
500
      }
 
501
      // if the tag in the overwrite_dir matches the first_key in the
 
502
      // document, then it has to be the vector. We gaurantee that
 
503
      // the first_key and the earliest_key will never collide (see
 
504
      // Cache::open_write). Once we know its the vector, we can
 
505
      // safely overwrite the first_key in the directory.
 
506
      if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
 
507
        DDebug("cache_evac",
 
508
              "evacuateDocDone evacuate_head %X %X hlen %d offset %d",
 
509
              (int) key.word(0), (int) doc->key.word(0), doc->hlen, (int) dir_offset(&overwrite_dir));
 
510
 
 
511
        if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
 
512
          OpenDirEntry *cod;
 
513
          DDebug("cache_evac", "evacuating vector: %X %d",
 
514
                (int) doc->first_key.word(0), (int) dir_offset(&overwrite_dir));
 
515
          if ((cod = part->open_read(&doc->first_key))) {
 
516
            // writer  exists
 
517
            DDebug("cache_evac", "overwriting the open directory %X %d %d",
 
518
                  (int) doc->first_key.word(0), (int) dir_offset(&cod->first_dir), (int) dir_offset(&dir));
 
519
            ink_assert(dir_pinned(&dir));
 
520
            cod->first_dir = dir;
 
521
 
 
522
          }
 
523
          if (dir_overwrite(&doc->first_key, part, &dir, &overwrite_dir)) {
 
524
            part->ram_cache->fixup(&doc->first_key, 0, dir_offset(&overwrite_dir), 0, dir_offset(&dir));
 
525
          }
 
526
        } else {
 
527
          DDebug("cache_evac", "evacuating earliest: %X %d", (int) doc->key.word(0), (int) dir_offset(&overwrite_dir));
 
528
          ink_debug_assert(dir_compare_tag(&overwrite_dir, &doc->key));
 
529
          ink_assert(b->earliest_evacuator == this);
 
530
          total_len += doc->data_len();
 
531
          first_key = doc->first_key;
 
532
          earliest_dir = dir;
 
533
          if (dir_probe(&first_key, part, &dir, &last_collision) > 0) {
 
534
            dir_lookaside_insert(b, part, &earliest_dir);
 
535
            // read the vector
 
536
            SET_HANDLER(&CacheVC::evacuateReadHead);
 
537
            int ret = do_read_call(&first_key);
 
538
            if (ret == EVENT_RETURN)
 
539
              return handleEvent(AIO_EVENT_DONE, 0);
 
540
            return ret;
 
541
          }
 
542
        }
 
543
      }
 
544
      break;
 
545
    }
 
546
  }
 
547
  return free_CacheVC(this);
 
548
}
 
549
 
 
550
static int
 
551
evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Part *part)
 
552
{
 
553
  Dir dir, *last_collision = 0;
 
554
  int i = 0;
 
555
  while (dir_probe(key, part, &dir, &last_collision)) {
 
556
    // next fragment cannot be a head...if it is, it must have been a
 
557
    // directory collision.
 
558
    if (dir_head(&dir))
 
559
      continue;
 
560
    EvacuationBlock *b = evacuation_block_exists(&dir, part);
 
561
    if (!b) {
 
562
      b = new_EvacuationBlock(part->mutex->thread_holding);
 
563
      b->dir = dir;
 
564
      b->evac_frags.key = *key;
 
565
      b->evac_frags.earliest_key = *earliest_key;
 
566
      part->evacuate[dir_evac_bucket(&dir)].push(b);
 
567
      i++;
 
568
    } else {
 
569
      ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
 
570
      ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
 
571
      EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
 
572
      evac_frag->key = *key;
 
573
      evac_frag->earliest_key = *earliest_key;
 
574
      evac_frag->link.next = b->evac_frags.link.next;
 
575
      b->evac_frags.link.next = evac_frag;
 
576
    }
 
577
    if (force)
 
578
      b->readers = 0;
 
579
    DDebug("cache_evac",
 
580
          "next fragment %X Earliest: %X offset %d phase %d force %d",
 
581
          (int) key->word(0), (int) earliest_key->word(0), (int) dir_offset(&dir), (int) dir_phase(&dir), force);
 
582
  }
 
583
  return i;
 
584
}
 
585
 
 
586
int
 
587
Part::evacuateWrite(CacheVC *evacuator, int event, Event *e)
 
588
{
 
589
  NOWARN_UNUSED(e);
 
590
  NOWARN_UNUSED(event);
 
591
 
 
592
  // push to front of aggregation write list, so it is written first
 
593
 
 
594
  evacuator->agg_len = round_to_approx_size(((Doc *)evacuator->buf->data())->len);
 
595
  agg_todo_size += evacuator->agg_len;
 
596
  /* insert the evacuator after all the other evacuators */
 
597
  CacheVC *cur = (CacheVC *) agg.head;
 
598
  CacheVC *after = NULL;
 
599
  for (; cur && cur->f.evacuator; cur = (CacheVC *) cur->link.next)
 
600
    after = cur;
 
601
  ink_assert(evacuator->agg_len <= AGG_SIZE);
 
602
  agg.insert(evacuator, after);
 
603
  return aggWrite(event, e);
 
604
}
 
605
 
 
606
int
 
607
Part::evacuateDocReadDone(int event, Event *e)
 
608
{
 
609
  NOWARN_UNUSED(e);
 
610
 
 
611
  cancel_trigger();
 
612
  if (event != AIO_EVENT_DONE)
 
613
    return EVENT_DONE;
 
614
  ink_assert(is_io_in_progress());
 
615
  set_io_not_in_progress();
 
616
  ink_debug_assert(mutex->thread_holding == this_ethread());
 
617
  Doc *doc = (Doc *) doc_evacuator->buf->data();
 
618
  CacheKey next_key;
 
619
  EvacuationBlock *b = NULL;
 
620
  if (doc->magic != DOC_MAGIC) {
 
621
    Debug("cache_evac", "DOC magic: %X %d",
 
622
          (int) dir_tag(&doc_evacuator->overwrite_dir), (int) dir_offset(&doc_evacuator->overwrite_dir));
 
623
    ink_assert(doc->magic == DOC_MAGIC);
 
624
    goto Ldone;
 
625
  }
 
626
  DDebug("cache_evac", "evacuateDocReadDone %X offset %d",
 
627
        (int) doc->key.word(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
 
628
 
 
629
  b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
 
630
  while (b) {
 
631
    if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir))
 
632
      break;
 
633
    b = b->link.next;
 
634
  }
 
635
  if (!b)
 
636
    goto Ldone;
 
637
  if ((b->f.pinned && !b->readers) && doc->pinned < (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND))
 
638
    goto Ldone;
 
639
 
 
640
  if (dir_head(&b->dir) && b->f.evacuate_head) {
 
641
    ink_assert(!b->evac_frags.key.fold());
 
642
    // if its a head (vector), evacuation is real simple...we just
 
643
    // need to write this vector down and overwrite the directory entry.
 
644
    if (dir_compare_tag(&b->dir, &doc->first_key)) {
 
645
      doc_evacuator->key = doc->first_key;
 
646
      b->evac_frags.key = doc->first_key;
 
647
      DDebug("cache_evac", "evacuating vector %X offset %d",
 
648
            (int) doc->first_key.word(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
 
649
      b->f.unused = 57;
 
650
    } else {
 
651
      // if its an earliest fragment (alternate) evacuation, things get
 
652
      // a little tricky. We have to propagate the earliest key to the next
 
653
      // fragments for this alternate. The last fragment to be evacuated
 
654
      // fixes up the lookaside buffer.
 
655
      doc_evacuator->key = doc->key;
 
656
      doc_evacuator->earliest_key = doc->key;
 
657
      b->evac_frags.key = doc->key;
 
658
      b->evac_frags.earliest_key = doc->key;
 
659
      b->earliest_evacuator = doc_evacuator;
 
660
      DDebug("cache_evac", "evacuating earliest %X %X evac: %X offset: %d",
 
661
            (int) b->evac_frags.key.word(0), (int) doc->key.word(0),
 
662
            doc_evacuator, (int) dir_offset(&doc_evacuator->overwrite_dir));
 
663
      b->f.unused = 67;
 
664
    }
 
665
  } else {
 
666
    // find which key matches the document
 
667
    EvacuationKey *ek = &b->evac_frags;
 
668
    for (; ek && !(ek->key == doc->key); ek = ek->link.next);
 
669
    if (!ek) {
 
670
      b->f.unused = 77;
 
671
      goto Ldone;
 
672
    }
 
673
    doc_evacuator->key = ek->key;
 
674
    doc_evacuator->earliest_key = ek->earliest_key;
 
675
    DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X",
 
676
          (int) ek->key.word(0), (int) ek->earliest_key.word(0));
 
677
    b->f.unused = 87;
 
678
  }
 
679
  // if the tag in the c->dir does match the first_key in the
 
680
  // document, then it has to be the earliest fragment. We gaurantee that
 
681
  // the first_key and the earliest_key will never collide (see
 
682
  // Cache::open_write).
 
683
  if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
 
684
    next_CacheKey(&next_key, &doc->key);
 
685
    evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
 
686
  }
 
687
  return evacuateWrite(doc_evacuator, event, e);
 
688
Ldone:
 
689
  free_CacheVC(doc_evacuator);
 
690
  doc_evacuator = 0;
 
691
  return aggWrite(event, e);
 
692
}
 
693
 
 
694
int
 
695
Part::evac_range(off_t low, off_t high, int evac_phase)
 
696
{
 
697
  int s = offset_to_part_offset(this, low);
 
698
  int e = offset_to_part_offset(this, high);
 
699
  int si = dir_offset_evac_bucket(s);
 
700
  int ei = dir_offset_evac_bucket(e);
 
701
 
 
702
  for (int i = si; i <= ei; i++) {
 
703
    EvacuationBlock *b = evacuate[i].head;
 
704
    EvacuationBlock *first = 0;
 
705
    int first_offset = INT_MAX;
 
706
    for (; b; b = b->link.next) {
 
707
      int64_t offset = dir_offset(&b->dir);
 
708
      int phase = dir_phase(&b->dir);
 
709
      if (offset >= s && offset < e && !b->f.done && phase == evac_phase)
 
710
        if (offset < first_offset) {
 
711
          first = b;
 
712
          first_offset = offset;
 
713
        }
 
714
    }
 
715
    if (first) {
 
716
      first->f.done = 1;
 
717
      io.aiocb.aio_fildes = fd;
 
718
      io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
 
719
      io.aiocb.aio_offset = part_offset(this, &first->dir);
 
720
      if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(skip + len))
 
721
        io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
 
722
      doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
 
723
      doc_evacuator->overwrite_dir = first->dir;
 
724
 
 
725
      io.aiocb.aio_buf = doc_evacuator->buf->data();
 
726
      io.action = this;
 
727
      io.thread = AIO_CALLBACK_THREAD_ANY;
 
728
      DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
 
729
      SET_HANDLER(&Part::evacuateDocReadDone);
 
730
      ink_assert(ink_aio_read(&io) >= 0);
 
731
      return -1;
 
732
    }
 
733
  }
 
734
  return 0;
 
735
}
 
736
 
 
737
 
 
738
static int
 
739
agg_copy(char *p, CacheVC *vc)
 
740
{
 
741
  Part *part = vc->part;
 
742
  off_t o = part->header->write_pos + part->agg_buf_pos;
 
743
 
 
744
  if (!vc->f.evacuator) {
 
745
    Doc *doc = (Doc *) p;
 
746
    IOBufferBlock *res_alt_blk = 0;
 
747
 
 
748
    uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
 
749
    ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
 
750
    ink_debug_assert(part->round_to_approx_size(len) == vc->agg_len);
 
751
    // update copy of directory entry for this document
 
752
    dir_set_approx_size(&vc->dir, vc->agg_len);
 
753
    dir_set_offset(&vc->dir, offset_to_part_offset(part, o));
 
754
    ink_assert(part_offset(part, &vc->dir) < (part->skip + part->len));
 
755
    dir_set_phase(&vc->dir, part->header->phase);
 
756
 
 
757
    // fill in document header
 
758
    doc->magic = DOC_MAGIC;
 
759
    doc->len = len;
 
760
    doc->hlen = vc->header_len;
 
761
    doc->ftype = vc->frag_type;
 
762
    doc->flen = vc->frag_len;
 
763
    doc->total_len = vc->total_len;
 
764
    doc->first_key = vc->first_key;
 
765
    doc->sync_serial = part->header->sync_serial;
 
766
    vc->write_serial = doc->write_serial = part->header->write_serial;
 
767
    doc->checksum = DOC_NO_CHECKSUM;
 
768
    if (vc->pin_in_cache) {
 
769
      dir_set_pinned(&vc->dir, 1);
 
770
      doc->pinned = (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
 
771
    } else {
 
772
      dir_set_pinned(&vc->dir, 0);
 
773
      doc->pinned = 0;
 
774
    }
 
775
 
 
776
    if (vc->f.use_first_key) {
 
777
      if (doc->data_len())
 
778
        doc->key = vc->earliest_key;
 
779
      else {
 
780
        // the vector is being written by itself
 
781
        prev_CacheKey(&doc->key, &vc->earliest_key);
 
782
      }
 
783
      dir_set_head(&vc->dir, true);
 
784
    } else {
 
785
      doc->key = vc->key;
 
786
      dir_set_head(&vc->dir, !vc->fragment);
 
787
    }
 
788
    if (doc->flen)
 
789
      memcpy(doc->frags(), &vc->frag[0], doc->flen);
 
790
 
 
791
#ifdef HTTP_CACHE
 
792
    if (vc->f.rewrite_resident_alt) {
 
793
      ink_assert(vc->f.use_first_key);
 
794
      Doc *res_doc = (Doc *) vc->first_buf->data();
 
795
      res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeofDoc + res_doc->hlen);
 
796
      doc->key = res_doc->key;
 
797
      doc->total_len = res_doc->data_len();
 
798
    }
 
799
#endif
 
800
    // update the new_info object_key, and total_len and dirinfo
 
801
    if (vc->header_len) {
 
802
      ink_debug_assert(vc->f.use_first_key);
 
803
#ifdef HTTP_CACHE
 
804
      if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
 
805
        ink_debug_assert(vc->write_vector->count() > 0);
 
806
        if (!vc->f.update && !vc->f.evac_vector) {
 
807
          ink_debug_assert(!(vc->first_key == zero_key));
 
808
          CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
 
809
          http_info->object_size_set(vc->total_len);
 
810
        }
 
811
        // update + data_written =>  Update case (b)
 
812
        // need to change the old alternate's object length
 
813
        if (vc->f.update && vc->total_len) {
 
814
          CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
 
815
          http_info->object_size_set(vc->total_len);
 
816
        }
 
817
        ink_assert(!(((uintptr_t) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
 
818
        ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
 
819
      } else
 
820
#endif
 
821
        memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
 
822
      // the single fragment flag is not used in the write call.
 
823
      // putting it in for completeness.
 
824
      vc->f.single_fragment = doc->single_fragment();
 
825
    }
 
826
    // move data
 
827
    if (vc->write_len) {
 
828
      {
 
829
        ProxyMutex RELEASE_UNUSED *mutex = vc->part->mutex;
 
830
        ink_debug_assert(mutex->thread_holding == this_ethread());
 
831
        CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
 
832
      }
 
833
#ifdef HTTP_CACHE
 
834
      if (vc->f.rewrite_resident_alt)
 
835
        iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
 
836
      else
 
837
#endif
 
838
        iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks, vc->offset);
 
839
#ifdef VERIFY_JTEST_DATA
 
840
      if (f.use_first_key && header_len) {
 
841
        int ib = 0, xd = 0;
 
842
        char xx[500];
 
843
        new_info.request_get().url_get().print(xx, 500, &ib, &xd);
 
844
        char *x = xx;
 
845
        for (int q = 0; q < 3; q++)
 
846
          x = strchr(x + 1, '/');
 
847
        ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
 
848
      }
 
849
#endif
 
850
 
 
851
    }
 
852
    if (cache_config_enable_checksum) {
 
853
      doc->checksum = 0;
 
854
      for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
 
855
        doc->checksum += *b;
 
856
    }
 
857
    if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment)
 
858
      ink_assert(doc->hlen);
 
859
 
 
860
    if (res_alt_blk)
 
861
      res_alt_blk->free();
 
862
 
 
863
    return vc->agg_len;
 
864
  } else {
 
865
    // for evacuated documents, copy the data, and update directory
 
866
    Doc *doc = (Doc *) vc->buf->data();
 
867
    int l = vc->part->round_to_approx_size(doc->len);
 
868
    {
 
869
      ProxyMutex RELEASE_UNUSED *mutex = vc->part->mutex;
 
870
      ink_debug_assert(mutex->thread_holding == this_ethread());
 
871
      CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
 
872
      CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
 
873
    }
 
874
    doc->sync_serial = vc->part->header->sync_serial;
 
875
    doc->write_serial = vc->part->header->write_serial;
 
876
 
 
877
    memcpy(p, doc, doc->len);
 
878
 
 
879
    vc->dir = vc->overwrite_dir;
 
880
    dir_set_offset(&vc->dir, offset_to_part_offset(vc->part, o));
 
881
    dir_set_phase(&vc->dir, vc->part->header->phase);
 
882
 
 
883
    return l;
 
884
  }
 
885
}
 
886
 
 
887
inline void
 
888
Part::evacuate_cleanup_blocks(int i)
 
889
{
 
890
  EvacuationBlock *b = evacuate[i].head;
 
891
  while (b) {
 
892
    if (b->f.done &&
 
893
        ((header->phase != dir_phase(&b->dir) &&
 
894
          header->write_pos > part_offset(this, &b->dir)) ||
 
895
         (header->phase == dir_phase(&b->dir) && header->write_pos <= part_offset(this, &b->dir)))) {
 
896
      EvacuationBlock *x = b;
 
897
      DDebug("cache_evac", "evacuate cleanup free %X offset %d",
 
898
            (int) b->evac_frags.key.word(0), (int) dir_offset(&b->dir));
 
899
      b = b->link.next;
 
900
      evacuate[i].remove(x);
 
901
      free_EvacuationBlock(x, mutex->thread_holding);
 
902
      continue;
 
903
    }
 
904
    b = b->link.next;
 
905
  }
 
906
}
 
907
 
 
908
void
 
909
Part::evacuate_cleanup()
 
910
{
 
911
  int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
 
912
  int64_t e = dir_offset_evac_bucket(eo);
 
913
  int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
 
914
  int64_t s = sx;
 
915
  int i;
 
916
 
 
917
  if (e > evacuate_size)
 
918
    e = evacuate_size;
 
919
  if (sx < 0)
 
920
    s = 0;
 
921
  for (i = s; i < e; i++)
 
922
    evacuate_cleanup_blocks(i);
 
923
 
 
924
  // if we have wrapped, handle the end bit
 
925
  if (sx <= 0) {
 
926
    s = evacuate_size + sx - 2;
 
927
    if (s < 0)
 
928
      s = 0;
 
929
    for (i = s; i < evacuate_size; i++)
 
930
      evacuate_cleanup_blocks(i);
 
931
  }
 
932
}
 
933
 
 
934
void
 
935
Part::periodic_scan()
 
936
{
 
937
  evacuate_cleanup();
 
938
  scan_for_pinned_documents();
 
939
  if (header->write_pos == start)
 
940
    scan_pos = start;
 
941
  scan_pos += len / PIN_SCAN_EVERY;
 
942
}
 
943
 
 
944
void
 
945
Part::agg_wrap()
 
946
{
 
947
  header->write_pos = start;
 
948
  header->phase = !header->phase;
 
949
 
 
950
  header->cycle++;
 
951
  header->agg_pos = header->write_pos;
 
952
  dir_lookaside_cleanup(this);
 
953
  dir_clean_part(this);
 
954
  periodic_scan();
 
955
}
 
956
 
 
957
/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
 
958
   DON'T schedule any events on this thread using VC_SCHED_XXX or
 
959
   mutex->thread_holding->schedule_xxx_local(). ALWAYS use
 
960
   eventProcessor.schedule_xxx().
 
961
   Also, make sure that any functions called by this also use
 
962
   the eventProcessor to schedule events
 
963
*/
 
964
int
 
965
Part::aggWrite(int event, void *e)
 
966
{
 
967
  NOWARN_UNUSED(e);
 
968
  NOWARN_UNUSED(event);
 
969
  ink_assert(!is_io_in_progress());
 
970
 
 
971
  Que(CacheVC, link) tocall;
 
972
  CacheVC *c;
 
973
 
 
974
  cancel_trigger();
 
975
 
 
976
Lagain:
 
977
  // calculate length of aggregated write
 
978
  for (c = (CacheVC *) agg.head; c;) {
 
979
    int writelen = c->agg_len;
 
980
    ink_assert(writelen < AGG_SIZE);
 
981
    if (agg_buf_pos + writelen > AGG_SIZE ||
 
982
        header->write_pos + agg_buf_pos + writelen > (skip + len))
 
983
      break;
 
984
    DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d",
 
985
          agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.word(0));
 
986
    int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
 
987
    ink_assert(writelen == wrotelen);
 
988
    agg_todo_size -= writelen;
 
989
    agg_buf_pos += writelen;
 
990
    CacheVC *n = (CacheVC *)c->link.next;
 
991
    agg.dequeue();
 
992
    if (c->f.sync && c->f.use_first_key) {
 
993
      CacheVC *last = sync.tail;
 
994
      while (last && UINT_WRAP_LT(c->write_serial, last->write_serial))
 
995
        last = (CacheVC*)last->link.prev;
 
996
      sync.insert(c, last);
 
997
    } else if (c->f.evacuator)
 
998
      c->handleEvent(AIO_EVENT_DONE, 0);
 
999
    else
 
1000
      tocall.enqueue(c);
 
1001
    c = n;
 
1002
  }
 
1003
 
 
1004
  // if we got nothing...
 
1005
  if (!agg_buf_pos) {
 
1006
    if (!agg.head && !sync.head) // nothing to get
 
1007
      return EVENT_CONT;
 
1008
    if (header->write_pos == start) {
 
1009
      // write aggregation too long, bad bad, punt on everything.
 
1010
      Note("write aggregation exceeds part size");
 
1011
      ink_assert(!tocall.head);
 
1012
      ink_assert(false);
 
1013
      while ((c = agg.dequeue())) {
 
1014
        agg_todo_size -= c->agg_len;
 
1015
        c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
 
1016
      }
 
1017
      return EVENT_CONT;
 
1018
    }
 
1019
    // start back
 
1020
    if (agg.head) {
 
1021
      agg_wrap();
 
1022
      goto Lagain;
 
1023
    }
 
1024
  }
 
1025
 
 
1026
  // evacuate space
 
1027
  off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
 
1028
  if (evac_range(header->write_pos, end, !header->phase) < 0)
 
1029
    goto Lwait;
 
1030
  if (end > skip + len)
 
1031
    if (evac_range(start, start + (end - (skip + len)), header->phase))
 
1032
      goto Lwait;
 
1033
 
 
1034
  // if agg.head, then we are near the end of the disk, so
 
1035
  // write down the aggregation in whatever size it is.
 
1036
  if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting)
 
1037
    goto Lwait;
 
1038
 
 
1039
  // write sync marker
 
1040
  if (!agg_buf_pos) {
 
1041
    ink_assert(sync.head);
 
1042
    int l = round_to_approx_size(sizeof(Doc));
 
1043
    agg_buf_pos = l;
 
1044
    Doc *d = (Doc*)agg_buffer;
 
1045
    memset(d, 0, sizeof(Doc));
 
1046
    d->magic = DOC_MAGIC;
 
1047
    d->len = l;
 
1048
    d->sync_serial = header->sync_serial;
 
1049
    d->write_serial = header->write_serial;
 
1050
  }
 
1051
 
 
1052
  // set write limit
 
1053
  header->agg_pos = header->write_pos + agg_buf_pos;
 
1054
 
 
1055
  io.aiocb.aio_fildes = fd;
 
1056
  io.aiocb.aio_offset = header->write_pos;
 
1057
  io.aiocb.aio_buf = agg_buffer;
 
1058
  io.aiocb.aio_nbytes = agg_buf_pos;
 
1059
  io.action = this;
 
1060
  /*
 
1061
    Callback on AIO thread so that we can issue a new write ASAP
 
1062
    as all writes are serialized in the partition.  This is not necessary
 
1063
    for reads proceed independently.
 
1064
   */
 
1065
  io.thread = AIO_CALLBACK_THREAD_AIO;
 
1066
  SET_HANDLER(&Part::aggWriteDone);
 
1067
  ink_aio_write(&io);
 
1068
 
 
1069
Lwait:
 
1070
  int ret = EVENT_CONT;
 
1071
  while ((c = tocall.dequeue())) {
 
1072
    if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
 
1073
      ret = EVENT_RETURN;
 
1074
    else
 
1075
      c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
 
1076
  }
 
1077
  return ret;
 
1078
}
 
1079
 
 
1080
int
 
1081
CacheVC::openWriteCloseDir(int event, Event *e)
 
1082
{
 
1083
  NOWARN_UNUSED(e);
 
1084
  NOWARN_UNUSED(event);
 
1085
 
 
1086
  cancel_trigger();
 
1087
  {
 
1088
    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
 
1089
    if (!lock) {
 
1090
      SET_HANDLER(&CacheVC::openWriteCloseDir);
 
1091
      ink_debug_assert(!is_io_in_progress());
 
1092
      VC_SCHED_LOCK_RETRY();
 
1093
    }
 
1094
    part->close_write(this);
 
1095
    if (closed < 0 && fragment)
 
1096
      dir_delete(&earliest_key, part, &earliest_dir);
 
1097
  }
 
1098
  if (is_debug_tag_set("cache_update")) {
 
1099
    if (f.update && closed > 0) {
 
1100
      if (!total_len && alternate_index != CACHE_ALT_REMOVED) {
 
1101
        Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")\n",
 
1102
              DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
 
1103
 
 
1104
      } else if (total_len && alternate_index != CACHE_ALT_REMOVED) {
 
1105
        Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")\n",
 
1106
              DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
 
1107
      } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
 
1108
        Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")\n",
 
1109
              DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
 
1110
      }
 
1111
    }
 
1112
  }
 
1113
  // update the appropriate stat variable
 
1114
  // These variables may not give the current no of documents with
 
1115
  // one, two and three or more fragments. This is because for
 
1116
  // updates we dont decrement the variable corresponding the old
 
1117
  // size of the document
 
1118
  if ((closed == 1) && (total_len > 0)) {
 
1119
    DDebug("cache_stats", "Fragment = %d", fragment);
 
1120
    switch (fragment) {
 
1121
      case 0: CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat); break;
 
1122
      case 1: CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat); break;
 
1123
      default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
 
1124
    }
 
1125
  }
 
1126
  if (f.close_complete) {
 
1127
    recursive++;
 
1128
    ink_debug_assert(!part || this_ethread() != part->mutex->thread_holding);
 
1129
    vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *) &vio);
 
1130
    recursive--;
 
1131
  }
 
1132
  return free_CacheVC(this);
 
1133
}
 
1134
 
 
1135
int
 
1136
CacheVC::openWriteCloseHeadDone(int event, Event *e)
 
1137
{
 
1138
  NOWARN_UNUSED(e);
 
1139
  if (event == AIO_EVENT_DONE)
 
1140
    set_io_not_in_progress();
 
1141
  else if (is_io_in_progress())
 
1142
    return EVENT_CONT;
 
1143
  {
 
1144
    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
 
1145
    if (!lock)
 
1146
      VC_LOCK_RETRY_EVENT();
 
1147
    od->writing_vec = 0;
 
1148
    if (!io.ok())
 
1149
      goto Lclose;
 
1150
    ink_assert(f.use_first_key);
 
1151
    if (!od->dont_update_directory) {
 
1152
      if (dir_is_empty(&od->first_dir)) {
 
1153
        dir_insert(&first_key, part, &dir);
 
1154
      } else {
 
1155
        // multiple fragment vector write
 
1156
        dir_overwrite(&first_key, part, &dir, &od->first_dir, false);
 
1157
        // insert moved resident alternate
 
1158
        if (od->move_resident_alt) {
 
1159
          if (dir_valid(part, &od->single_doc_dir))
 
1160
            dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
 
1161
          od->move_resident_alt = 0;
 
1162
        }
 
1163
      }
 
1164
      od->first_dir = dir;
 
1165
      if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
 
1166
        // fragment is tied to the vector
 
1167
        od->move_resident_alt = 1;
 
1168
        if (!f.rewrite_resident_alt) {
 
1169
          od->single_doc_key = earliest_key;
 
1170
        }
 
1171
        dir_assign(&od->single_doc_dir, &dir);
 
1172
        dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
 
1173
      }
 
1174
    }
 
1175
  }
 
1176
Lclose:
 
1177
  return openWriteCloseDir(event, e);
 
1178
}
 
1179
 
 
1180
int
 
1181
CacheVC::openWriteCloseHead(int event, Event *e)
 
1182
{
 
1183
  NOWARN_UNUSED(e);
 
1184
 
 
1185
  cancel_trigger();
 
1186
  f.use_first_key = 1;
 
1187
  if (io.ok())
 
1188
    ink_assert(fragment || (length == (int64_t)total_len));
 
1189
  else
 
1190
    return openWriteCloseDir(event, e);
 
1191
  if (f.data_done)
 
1192
    write_len = 0;
 
1193
  else
 
1194
    write_len = length;
 
1195
#ifdef HTTP_CACHE
 
1196
  if (frag_type == CACHE_FRAG_TYPE_HTTP) {
 
1197
    SET_HANDLER(&CacheVC::updateVector);
 
1198
    return updateVector(EVENT_IMMEDIATE, 0);
 
1199
  } else {
 
1200
#endif
 
1201
    header_len = header_to_write_len;
 
1202
    SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
 
1203
    return do_write_lock();
 
1204
#ifdef HTTP_CACHE
 
1205
  }
 
1206
#endif
 
1207
}
 
1208
 
 
1209
int
 
1210
CacheVC::openWriteCloseDataDone(int event, Event *e)
 
1211
{
 
1212
  NOWARN_UNUSED(e);
 
1213
  int ret = 0;
 
1214
 
 
1215
  if (event == AIO_EVENT_DONE)
 
1216
    set_io_not_in_progress();
 
1217
  else if (is_io_in_progress())
 
1218
    return EVENT_CONT;
 
1219
  if (!io.ok())
 
1220
    return openWriteCloseDir(event, e);
 
1221
  {
 
1222
    CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
 
1223
    if (!lock)
 
1224
      VC_LOCK_RETRY_EVENT();
 
1225
    if (!fragment) {
 
1226
      ink_assert(key == earliest_key);
 
1227
      earliest_dir = dir;
 
1228
    } else {
 
1229
      if (!frag)
 
1230
        frag = &integral_frags[0];
 
1231
      else {
 
1232
        if (fragment-1 >= INTEGRAL_FRAGS && IS_POWER_2((uint32)(fragment-1))) {
 
1233
          Frag *t = frag;
 
1234
          frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
 
1235
          memcpy(frag, t, sizeof(Frag) * (fragment-1));
 
1236
          if (t != integral_frags)
 
1237
            xfree(t);
 
1238
        }
 
1239
      }
 
1240
      frag[fragment-1].offset = write_pos;
 
1241
    }
 
1242
    fragment++;
 
1243
    write_pos += write_len;
 
1244
    dir_insert(&key, part, &dir);
 
1245
    blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
 
1246
    next_CacheKey(&key, &key);
 
1247
    if (length) {
 
1248
      write_len = length;
 
1249
      if (write_len > MAX_FRAG_SIZE)
 
1250
        write_len = MAX_FRAG_SIZE;
 
1251
      if ((ret = do_write_call()) == EVENT_RETURN)
 
1252
        goto Lcallreturn;
 
1253
      return ret;
 
1254
    }
 
1255
    f.data_done = 1;
 
1256
    return openWriteCloseHead(event, e); // must be called under part lock from here
 
1257
  }
 
1258
Lcallreturn:
 
1259
  return handleEvent(AIO_EVENT_DONE, 0);
 
1260
}
 
1261
 
 
1262
int
 
1263
CacheVC::openWriteClose(int event, Event *e)
 
1264
{
 
1265
  NOWARN_UNUSED(e);
 
1266
  cancel_trigger();
 
1267
  if (is_io_in_progress()) {
 
1268
    if (event != AIO_EVENT_DONE)
 
1269
      return EVENT_CONT;
 
1270
    set_io_not_in_progress();
 
1271
    if (!io.ok())
 
1272
      return openWriteCloseDir(event, e);
 
1273
  }
 
1274
  if (closed > 0) {
 
1275
    if (total_len == 0) {
 
1276
#ifdef HTTP_CACHE
 
1277
      if (f.update) {
 
1278
        return updateVector(event, e);
 
1279
      } else {
 
1280
        // If we've been CLOSE'd but nothing has been written then
 
1281
        // this close is transformed into an abort.
 
1282
        closed = -1;
 
1283
        return openWriteCloseDir(event, e);
 
1284
      }
 
1285
#else
 
1286
      return openWriteCloseDir(event, e);
 
1287
#endif
 
1288
    }
 
1289
    if (length && (fragment || length > MAX_FRAG_SIZE)) {
 
1290
      SET_HANDLER(&CacheVC::openWriteCloseDataDone);
 
1291
      write_len = length;
 
1292
      if (write_len > MAX_FRAG_SIZE)
 
1293
        write_len = MAX_FRAG_SIZE;
 
1294
      return do_write_lock_call();
 
1295
    } else
 
1296
      return openWriteCloseHead(event, e);
 
1297
  } else
 
1298
    return openWriteCloseDir(event, e);
 
1299
}
 
1300
 
 
1301
int
 
1302
CacheVC::openWriteWriteDone(int event, Event *e)
 
1303
{
 
1304
  NOWARN_UNUSED(e);
 
1305
 
 
1306
  cancel_trigger();
 
1307
  if (event == AIO_EVENT_DONE)
 
1308
    set_io_not_in_progress();
 
1309
  else
 
1310
    if (is_io_in_progress())
 
1311
      return EVENT_CONT;
 
1312
  // In the event of VC_EVENT_ERROR, the cont must do an io_close
 
1313
  if (!io.ok()) {
 
1314
    if (closed) {
 
1315
      closed = -1;
 
1316
      return die();
 
1317
    }
 
1318
    SET_HANDLER(&CacheVC::openWriteMain);
 
1319
    return calluser(VC_EVENT_ERROR);
 
1320
  }
 
1321
  {
 
1322
    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
 
1323
    if (!lock)
 
1324
      VC_LOCK_RETRY_EVENT();
 
1325
    // store the earliest directory. Need to remove the earliest dir
 
1326
    // in case the writer aborts.
 
1327
    if (!fragment) {
 
1328
      ink_assert(key == earliest_key);
 
1329
      earliest_dir = dir;
 
1330
    } else {
 
1331
      if (!frag)
 
1332
        frag = &integral_frags[0];
 
1333
      else {
 
1334
        if (fragment-1 >= INTEGRAL_FRAGS && IS_POWER_2((uint32_t)(fragment-1))) {
 
1335
          Frag *t = frag;
 
1336
          frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
 
1337
          memcpy(frag, t, sizeof(Frag) * (fragment-1));
 
1338
          if (t != integral_frags)
 
1339
            xfree(t);
 
1340
        }
 
1341
      }
 
1342
      frag[fragment-1].offset = write_pos;
 
1343
    }
 
1344
    fragment++;
 
1345
    write_pos += write_len;
 
1346
    dir_insert(&key, part, &dir);
 
1347
    DDebug("cache_insert", "WriteDone: %X, %X, %d", key.word(0), first_key.word(0), write_len);
 
1348
    blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
 
1349
    next_CacheKey(&key, &key);
 
1350
  }
 
1351
  if (closed)
 
1352
    return die();
 
1353
  SET_HANDLER(&CacheVC::openWriteMain);
 
1354
  return openWriteMain(event, e);
 
1355
}
 
1356
 
 
1357
static inline int target_fragment_size() {
 
1358
  return cache_config_target_fragment_size - sizeofDoc;
 
1359
}
 
1360
 
 
1361
int
 
1362
CacheVC::openWriteMain(int event, Event *e)
 
1363
{
 
1364
  NOWARN_UNUSED(e);
 
1365
  NOWARN_UNUSED(event);
 
1366
  cancel_trigger();
 
1367
  int called_user = 0;
 
1368
  ink_debug_assert(!is_io_in_progress());
 
1369
Lagain:
 
1370
  if (!vio.buffer.writer()) {
 
1371
    if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
 
1372
      return EVENT_DONE;
 
1373
    if (!vio.buffer.writer())
 
1374
      return EVENT_CONT;
 
1375
  }
 
1376
  if (vio.ntodo() <= 0) {
 
1377
    called_user = 1;
 
1378
    if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
 
1379
      return EVENT_DONE;
 
1380
    ink_assert(!f.close_complete || !"close expected after write COMPLETE");
 
1381
    if (vio.ntodo() <= 0)
 
1382
      return EVENT_CONT;
 
1383
  }
 
1384
  int64_t ntodo = (int64_t)(vio.ntodo() + length);
 
1385
  int64_t total_avail = vio.buffer.reader()->read_avail();
 
1386
  int64_t avail = total_avail;
 
1387
  int64_t towrite = avail + length;
 
1388
  if (towrite > ntodo) {
 
1389
    avail -= (towrite - ntodo);
 
1390
    towrite = ntodo;
 
1391
  }
 
1392
  if (towrite > MAX_FRAG_SIZE) {
 
1393
    avail -= (towrite - MAX_FRAG_SIZE);
 
1394
    towrite = MAX_FRAG_SIZE;
 
1395
  }
 
1396
  if (!blocks && towrite) {
 
1397
    blocks = vio.buffer.reader()->block;
 
1398
    offset = vio.buffer.reader()->start_offset;
 
1399
  }
 
1400
  if (avail > 0) {
 
1401
    vio.buffer.reader()->consume(avail);
 
1402
    vio.ndone += avail;
 
1403
    total_len += avail;
 
1404
  }
 
1405
  length = (uint64_t)towrite;
 
1406
  if (length > target_fragment_size() && 
 
1407
      (length < target_fragment_size() + target_fragment_size() / 4))
 
1408
    write_len = target_fragment_size();
 
1409
  else
 
1410
    write_len = length;
 
1411
  bool not_writing = towrite != ntodo && towrite < target_fragment_size();
 
1412
  if (!called_user) {
 
1413
    if (not_writing) {
 
1414
      called_user = 1;
 
1415
      if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
 
1416
        return EVENT_DONE;
 
1417
      goto Lagain;
 
1418
    } else if (vio.ntodo() <= 0)
 
1419
      goto Lagain;
 
1420
  }
 
1421
  if (not_writing)
 
1422
    return EVENT_CONT;
 
1423
  if (towrite == ntodo && f.close_complete) {
 
1424
    closed = 1;
 
1425
    SET_HANDLER(&CacheVC::openWriteClose);
 
1426
    return openWriteClose(EVENT_NONE, NULL);
 
1427
  }
 
1428
  SET_HANDLER(&CacheVC::openWriteWriteDone);
 
1429
  return do_write_lock_call();
 
1430
}
 
1431
 
 
1432
// begin overwrite
 
1433
int
 
1434
CacheVC::openWriteOverwrite(int event, Event *e)
 
1435
{
 
1436
  NOWARN_UNUSED(e);
 
1437
 
 
1438
  cancel_trigger();
 
1439
  if (event != AIO_EVENT_DONE) {
 
1440
    if (event == EVENT_IMMEDIATE)
 
1441
      last_collision = 0;
 
1442
  } else {
 
1443
    Doc *doc = NULL;
 
1444
    set_io_not_in_progress();
 
1445
    if (_action.cancelled)
 
1446
      return openWriteCloseDir(event, e);
 
1447
    if (!io.ok())
 
1448
      goto Ldone;
 
1449
    doc = (Doc *) buf->data();
 
1450
    if (!(doc->first_key == first_key))
 
1451
      goto Lcollision;
 
1452
    od->first_dir = dir;
 
1453
    first_buf = buf;
 
1454
    goto Ldone;
 
1455
  }
 
1456
Lcollision:
 
1457
  {
 
1458
    CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
 
1459
    if (!lock)
 
1460
      VC_LOCK_RETRY_EVENT();
 
1461
    int res = dir_probe(&first_key, part, &dir, &last_collision);
 
1462
    if (res > 0) {
 
1463
      if ((res = do_read_call(&first_key)) == EVENT_RETURN)
 
1464
        goto Lcallreturn;
 
1465
      return res;
 
1466
    }
 
1467
  }
 
1468
Ldone:
 
1469
  SET_HANDLER(&CacheVC::openWriteMain);
 
1470
  return callcont(CACHE_EVENT_OPEN_WRITE);
 
1471
Lcallreturn:
 
1472
  return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
 
1473
}
 
1474
 
 
1475
#ifdef HTTP_CACHE
 
1476
// openWriteStartDone handles vector read (addition of alternates)
 
1477
// and lock misses
 
1478
int
 
1479
CacheVC::openWriteStartDone(int event, Event *e)
 
1480
{
 
1481
  NOWARN_UNUSED(e);
 
1482
 
 
1483
  intptr_t err = ECACHE_NO_DOC;
 
1484
  cancel_trigger();
 
1485
  if (is_io_in_progress()) {
 
1486
    if (event != AIO_EVENT_DONE)
 
1487
      return EVENT_CONT;
 
1488
    set_io_not_in_progress();
 
1489
  }
 
1490
  if (_action.cancelled && (!od || !od->has_multiple_writers()))
 
1491
    goto Lcancel;
 
1492
 
 
1493
  if (event == AIO_EVENT_DONE) {        // vector read done
 
1494
    Doc *doc = (Doc *) buf->data();
 
1495
    if (!io.ok()) {
 
1496
      err = ECACHE_READ_FAIL;
 
1497
      goto Lfailure;
 
1498
    }
 
1499
 
 
1500
    /* INKqa07123.
 
1501
       A directory entry which is nolonger valid may have been overwritten.
 
1502
       We need to start afresh from the beginning by setting last_collision
 
1503
       to NULL.
 
1504
     */
 
1505
    if (!dir_valid(part, &dir)) {
 
1506
      DDebug("cache_write",
 
1507
            "OpenReadStartDone: Dir not valid: Write Head: %d, Dir: %d",
 
1508
            offset_to_part_offset(part, part->header->write_pos), dir_offset(&dir));
 
1509
      last_collision = NULL;
 
1510
      goto Lcollision;
 
1511
    }
 
1512
    if (!(doc->first_key == first_key))
 
1513
      goto Lcollision;
 
1514
    if (doc->magic != DOC_MAGIC) {
 
1515
      err = ECACHE_BAD_META_DATA;
 
1516
      goto Lfailure;
 
1517
    }
 
1518
    if (!doc->hlen) {
 
1519
      err = ECACHE_BAD_META_DATA;
 
1520
      goto Lfailure;
 
1521
    }
 
1522
    ink_assert((((uintptr_t) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK) == 0);
 
1523
 
 
1524
    if (write_vector->get_handles(doc->hdr(), doc->hlen, buf) != doc->hlen) {
 
1525
      err = ECACHE_BAD_META_DATA;
 
1526
      goto Lfailure;
 
1527
    }
 
1528
    ink_debug_assert(write_vector->count() > 0);
 
1529
    od->first_dir = dir;
 
1530
    first_dir = dir;
 
1531
    if (doc->single_fragment()) {
 
1532
      // fragment is tied to the vector
 
1533
      od->move_resident_alt = 1;
 
1534
      od->single_doc_key = doc->key;
 
1535
      dir_assign(&od->single_doc_dir, &dir);
 
1536
      dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
 
1537
    }
 
1538
    first_buf = buf;
 
1539
    goto Lsuccess;
 
1540
  }
 
1541
 
 
1542
Lcollision:
 
1543
  {
 
1544
    int if_writers = ((uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES);
 
1545
    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
 
1546
    if (!lock)
 
1547
      VC_LOCK_RETRY_EVENT();
 
1548
    if (!od) {
 
1549
      if ((err = part->open_write(
 
1550
             this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
 
1551
        goto Lfailure;
 
1552
      if (od->has_multiple_writers()) {
 
1553
        MUTEX_RELEASE(lock);
 
1554
        SET_HANDLER(&CacheVC::openWriteMain);
 
1555
        return callcont(CACHE_EVENT_OPEN_WRITE);
 
1556
      }
 
1557
    }
 
1558
    // check for collision
 
1559
    if (dir_probe(&first_key, part, &dir, &last_collision)) {
 
1560
      od->reading_vec = 1;
 
1561
      int ret = do_read_call(&first_key);
 
1562
      if (ret == EVENT_RETURN)
 
1563
        goto Lcallreturn;
 
1564
      return ret;
 
1565
    }
 
1566
    if (f.update) {
 
1567
      // fail update because vector has been GC'd
 
1568
      goto Lfailure;
 
1569
    }
 
1570
  }
 
1571
Lsuccess:
 
1572
  od->reading_vec = 0;
 
1573
  if (_action.cancelled)
 
1574
    goto Lcancel;
 
1575
  SET_HANDLER(&CacheVC::openWriteMain);
 
1576
  return callcont(CACHE_EVENT_OPEN_WRITE);
 
1577
 
 
1578
Lfailure:
 
1579
  CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
 
1580
  _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
 
1581
Lcancel:
 
1582
  if (od) {
 
1583
    od->reading_vec = 0;
 
1584
    return openWriteCloseDir(event, e);
 
1585
  } else
 
1586
    return free_CacheVC(this);
 
1587
Lcallreturn:
 
1588
  return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
 
1589
}
 
1590
#endif
 
1591
 
 
1592
// handle lock failures from main Cache::open_write entry points below
 
1593
int
 
1594
CacheVC::openWriteStartBegin(int event, Event *e)
 
1595
{
 
1596
  NOWARN_UNUSED(e);
 
1597
  NOWARN_UNUSED(event);
 
1598
 
 
1599
  intptr_t err;
 
1600
  cancel_trigger();
 
1601
  if (_action.cancelled)
 
1602
    return free_CacheVC(this);
 
1603
  if (((err = part->open_write_lock(this, false, 1)) > 0)) {
 
1604
    CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
 
1605
    free_CacheVC(this);
 
1606
    _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
 
1607
    return EVENT_DONE;
 
1608
  }
 
1609
  if (err < 0)
 
1610
    VC_SCHED_LOCK_RETRY();
 
1611
  if (f.overwrite) {
 
1612
    SET_HANDLER(&CacheVC::openWriteOverwrite);
 
1613
    return openWriteOverwrite(EVENT_IMMEDIATE, 0);
 
1614
  } else {
 
1615
    // write by key
 
1616
    SET_HANDLER(&CacheVC::openWriteMain);
 
1617
    return callcont(CACHE_EVENT_OPEN_WRITE);
 
1618
  }
 
1619
}
 
1620
 
 
1621
// main entry point for writing of of non-http documents
 
1622
Action *
 
1623
Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
 
1624
                  int options, time_t apin_in_cache, char *hostname, int host_len)
 
1625
{
 
1626
 
 
1627
  if (!CACHE_READY(frag_type)) {
 
1628
    cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
 
1629
    return ACTION_RESULT_DONE;
 
1630
  }
 
1631
 
 
1632
  ink_assert(caches[frag_type] == this);
 
1633
 
 
1634
  intptr_t res = 0;
 
1635
  CacheVC *c = new_CacheVC(cont);
 
1636
  ProxyMutex *mutex = cont->mutex;
 
1637
  MUTEX_LOCK(lock, c->mutex, this_ethread());
 
1638
  c->vio.op = VIO::WRITE;
 
1639
  c->base_stat = cache_write_active_stat;
 
1640
  c->part = key_to_part(key, hostname, host_len);
 
1641
  Part *part = c->part;
 
1642
  CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
 
1643
  c->first_key = c->key = *key;
 
1644
  c->frag_type = frag_type;
 
1645
  /*
 
1646
     The transition from single fragment document to a multi-fragment document
 
1647
     would cause a problem if the key and the first_key collide. In case of
 
1648
     a collision, old vector data could be served to HTTP. Need to avoid that.
 
1649
     Also, when evacuating a fragment, we have to decide if its the first_key
 
1650
     or the earliest_key based on the dir_tag.
 
1651
   */
 
1652
  do {
 
1653
    rand_CacheKey(&c->key, cont->mutex);
 
1654
  } while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
 
1655
  c->earliest_key = c->key;
 
1656
#ifdef HTTP_CACHE
 
1657
  c->info = 0;
 
1658
#endif
 
1659
  c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
 
1660
  c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
 
1661
  c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
 
1662
  c->pin_in_cache = (uint32_t) apin_in_cache;
 
1663
 
 
1664
  if ((res = c->part->open_write_lock(c, false, 1)) > 0) {
 
1665
    // document currently being written, abort
 
1666
    CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
 
1667
    cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -res);
 
1668
    free_CacheVC(c);
 
1669
    return ACTION_RESULT_DONE;
 
1670
  }
 
1671
  if (res < 0) {
 
1672
    SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
 
1673
    c->trigger = CONT_SCHED_LOCK_RETRY(c);
 
1674
    return &c->_action;
 
1675
  }
 
1676
  if (!c->f.overwrite) {
 
1677
    SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
 
1678
    c->callcont(CACHE_EVENT_OPEN_WRITE);
 
1679
    return ACTION_RESULT_DONE;
 
1680
  } else {
 
1681
    SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
 
1682
    if (c->openWriteOverwrite(EVENT_IMMEDIATE, 0) == EVENT_DONE)
 
1683
      return ACTION_RESULT_DONE;
 
1684
    else
 
1685
      return &c->_action;
 
1686
  }
 
1687
}
 
1688
 
 
1689
#ifdef HTTP_CACHE
 
1690
// main entry point for writing of http documents
 
1691
Action *
 
1692
Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
 
1693
                  CacheKey *key1, CacheFragType type, char *hostname, int host_len)
 
1694
{
 
1695
  NOWARN_UNUSED(key1);
 
1696
 
 
1697
  if (!CACHE_READY(type)) {
 
1698
    cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
 
1699
    return ACTION_RESULT_DONE;
 
1700
  }
 
1701
 
 
1702
  ink_assert(caches[type] == this);
 
1703
  intptr_t err = 0;
 
1704
  int if_writers = (uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES;
 
1705
  CacheVC *c = new_CacheVC(cont);
 
1706
  ProxyMutex *mutex = cont->mutex;
 
1707
  c->vio.op = VIO::WRITE;
 
1708
  c->first_key = *key;
 
1709
  /*
 
1710
     The transition from single fragment document to a multi-fragment document
 
1711
     would cause a problem if the key and the first_key collide. In case of
 
1712
     a collision, old vector data could be served to HTTP. Need to avoid that.
 
1713
     Also, when evacuating a fragment, we have to decide if its the first_key
 
1714
     or the earliest_key based on the dir_tag.
 
1715
   */
 
1716
  do {
 
1717
    rand_CacheKey(&c->key, cont->mutex);
 
1718
  }
 
1719
  while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
 
1720
  c->earliest_key = c->key;
 
1721
  c->frag_type = CACHE_FRAG_TYPE_HTTP;
 
1722
  c->part = key_to_part(key, hostname, host_len);
 
1723
  Part *part = c->part;
 
1724
  c->info = info;
 
1725
  if (c->info && (uintptr_t) info != CACHE_ALLOW_MULTIPLE_WRITES) {
 
1726
    /*
 
1727
       Update has the following code paths :
 
1728
       a) Update alternate header only :
 
1729
       In this case the vector has to be rewritten. The content
 
1730
       length(update_len) and the key for the document are set in the
 
1731
       new_info in the set_http_info call.
 
1732
       HTTP OPERATIONS
 
1733
       open_write with info set
 
1734
       set_http_info new_info
 
1735
       (total_len == 0)
 
1736
       close
 
1737
       b) Update alternate and data
 
1738
       In this case both the vector and the data needs to be rewritten.
 
1739
       This case is similar to the standard write of a document case except
 
1740
       that the new_info is inserted into the vector at the alternate_index
 
1741
       (overwriting the old alternate) rather than the end of the vector.
 
1742
       HTTP OPERATIONS
 
1743
       open_write with info set
 
1744
       set_http_info new_info
 
1745
       do_io_write =>  (total_len > 0)
 
1746
       close
 
1747
       c) Delete an alternate
 
1748
       The vector may need to be deleted (if there was only one alternate) or
 
1749
       rewritten (if there were more than one alternate). The deletion of the
 
1750
       vector is done in openWriteRemoveVector.
 
1751
       HTTP OPERATIONS
 
1752
       open_write with info set
 
1753
       close
 
1754
     */
 
1755
    c->f.update = 1;
 
1756
    c->base_stat = cache_update_active_stat;
 
1757
    DDebug("cache_update", "Update called");
 
1758
    info->object_key_get(&c->update_key);
 
1759
    ink_debug_assert(!(c->update_key == zero_key));
 
1760
    c->update_len = info->object_size_get();
 
1761
  } else
 
1762
    c->base_stat = cache_write_active_stat;
 
1763
  CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
 
1764
  c->pin_in_cache = (uint32_t) apin_in_cache;
 
1765
 
 
1766
  {
 
1767
    CACHE_TRY_LOCK(lock, c->part->mutex, cont->mutex->thread_holding);
 
1768
    if (lock) {
 
1769
      if ((err = c->part->open_write(c, if_writers,
 
1770
                                     cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
 
1771
        goto Lfailure;
 
1772
      // If there are multiple writers, then this one cannot be an update.
 
1773
      // Only the first writer can do an update. If that's the case, we can
 
1774
      // return success to the state machine now.;
 
1775
      if (c->od->has_multiple_writers())
 
1776
        goto Lmiss;
 
1777
      if (!dir_probe(key, c->part, &c->dir, &c->last_collision)) {
 
1778
        if (c->f.update) {
 
1779
          // fail update because vector has been GC'd
 
1780
          // This situation can also arise in openWriteStartDone
 
1781
          err = ECACHE_NO_DOC;
 
1782
          goto Lfailure;
 
1783
        }
 
1784
        // document doesn't exist, begin write
 
1785
        goto Lmiss;
 
1786
      } else {
 
1787
        c->od->reading_vec = 1;
 
1788
        // document exists, read vector
 
1789
        SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
 
1790
        switch (c->do_read_call(&c->first_key)) {
 
1791
          case EVENT_DONE: return ACTION_RESULT_DONE;
 
1792
          case EVENT_RETURN: goto Lcallreturn;
 
1793
          default: return &c->_action;
 
1794
        }
 
1795
      }
 
1796
    }
 
1797
    // missed lock
 
1798
    SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
 
1799
    CONT_SCHED_LOCK_RETRY(c);
 
1800
    return &c->_action;
 
1801
  }
 
1802
 
 
1803
Lmiss:
 
1804
  SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
 
1805
  c->callcont(CACHE_EVENT_OPEN_WRITE);
 
1806
  return ACTION_RESULT_DONE;
 
1807
 
 
1808
Lfailure:
 
1809
  CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
 
1810
  cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
 
1811
  if (c->od) {
 
1812
    c->openWriteCloseDir(EVENT_IMMEDIATE, 0);
 
1813
    return ACTION_RESULT_DONE;
 
1814
  }
 
1815
  free_CacheVC(c);
 
1816
  return ACTION_RESULT_DONE;
 
1817
 
 
1818
Lcallreturn:
 
1819
  if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
 
1820
    return ACTION_RESULT_DONE;
 
1821
  return &c->_action;
 
1822
}
 
1823
#endif