~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/cache/P_CacheInternal.h

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
 
 
25
#ifndef _P_CACHE_INTERNAL_H__
 
26
#define _P_CACHE_INTERNAL_H__
 
27
 
 
28
#include "libts.h"
 
29
 
 
30
#ifdef HTTP_CACHE
 
31
#include "HTTP.h"
 
32
#include "P_CacheHttp.h"
 
33
#endif
 
34
 
 
35
struct EvacuationBlock;
 
36
 
 
37
// Compilation Options
 
38
 
 
39
//#define HIT_EVACUATE                    1
 
40
#define ALTERNATES                      1
 
41
// #define CACHE_LOCK_FAIL_RATE         0.001
 
42
// #define CACHE_AGG_FAIL_RATE          0.005
 
43
// #define CACHE_INSPECTOR_PAGES
 
44
#define MAX_CACHE_VCS_PER_THREAD        500
 
45
 
 
46
#define INTEGRAL_FRAGS                  4
 
47
 
 
48
#ifdef CACHE_INSPECTOR_PAGES
 
49
#ifdef DEBUG
 
50
#define CACHE_STAT_PAGES
 
51
#endif
 
52
#endif
 
53
 
 
54
#ifdef DEBUG
 
55
#define DDebug Debug
 
56
#else
 
57
#define DDebug if (0) dummy_debug
 
58
#endif
 
59
 
 
60
#define AIO_SOFT_FAILURE                -100000
 
61
// retry read from writer delay
 
62
#define WRITER_RETRY_DELAY  HRTIME_MSECONDS(50)
 
63
 
 
64
#define CACHE_READY(_x) (CacheProcessor::cache_ready & (1 << (_x)))
 
65
 
 
66
#ifndef CACHE_LOCK_FAIL_RATE
 
67
#define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
 
68
#else
 
69
#define CACHE_TRY_LOCK(_l, _m, _t)                             \
 
70
  MUTEX_TRY_LOCK(_l, _m, _t);                                  \
 
71
  if ((uint32_t)_t->generator.random() <                         \
 
72
     (uint32_t)(UINT_MAX *CACHE_LOCK_FAIL_RATE))                 \
 
73
    CACHE_MUTEX_RELEASE(_l)
 
74
#endif
 
75
 
 
76
 
 
77
#define VC_LOCK_RETRY_EVENT() \
 
78
  do { \
 
79
    trigger = mutex->thread_holding->schedule_in_local(this,MUTEX_RETRY_DELAY,event); \
 
80
    return EVENT_CONT; \
 
81
  } while (0)
 
82
 
 
83
#define VC_SCHED_LOCK_RETRY() \
 
84
  do { \
 
85
    trigger = mutex->thread_holding->schedule_in_local(this,MUTEX_RETRY_DELAY); \
 
86
    return EVENT_CONT; \
 
87
  } while (0)
 
88
 
 
89
#define CONT_SCHED_LOCK_RETRY_RET(_c) \
 
90
  do { \
 
91
    _c->mutex->thread_holding->schedule_in_local(_c, MUTEX_RETRY_DELAY); \
 
92
    return EVENT_CONT; \
 
93
  } while (0)
 
94
 
 
95
#define CONT_SCHED_LOCK_RETRY(_c) \
 
96
  _c->mutex->thread_holding->schedule_in_local(_c, MUTEX_RETRY_DELAY)
 
97
 
 
98
#define VC_SCHED_WRITER_RETRY() \
 
99
  do { \
 
100
    ink_assert(!trigger); \
 
101
    writer_lock_retry++; \
 
102
    ink_hrtime _t = WRITER_RETRY_DELAY; \
 
103
    if (writer_lock_retry > 2) \
 
104
      _t = WRITER_RETRY_DELAY * 2; \
 
105
    else if (writer_lock_retry > 5) \
 
106
      _t = WRITER_RETRY_DELAY * 10; \
 
107
    else if (writer_lock_retry > 10) \
 
108
      _t = WRITER_RETRY_DELAY * 100; \
 
109
    trigger = mutex->thread_holding->schedule_in_local(this, _t); \
 
110
    return EVENT_CONT; \
 
111
  } while (0)
 
112
 
 
113
 
 
114
  // cache stats definitions
 
115
enum
 
116
{
 
117
  cache_bytes_used_stat,
 
118
  cache_bytes_total_stat,
 
119
  cache_ram_cache_bytes_stat,
 
120
  cache_ram_cache_bytes_total_stat,
 
121
  cache_direntries_total_stat,
 
122
  cache_direntries_used_stat,
 
123
  cache_ram_cache_hits_stat,
 
124
  cache_ram_cache_misses_stat,
 
125
  cache_pread_count_stat,
 
126
  cache_percent_full_stat,
 
127
  cache_lookup_active_stat,
 
128
  cache_lookup_success_stat,
 
129
  cache_lookup_failure_stat,
 
130
  cache_read_active_stat,
 
131
  cache_read_success_stat,
 
132
  cache_read_failure_stat,
 
133
  cache_write_active_stat,
 
134
  cache_write_success_stat,
 
135
  cache_write_failure_stat,
 
136
  cache_write_backlog_failure_stat,
 
137
  cache_update_active_stat,
 
138
  cache_update_success_stat,
 
139
  cache_update_failure_stat,
 
140
  cache_remove_active_stat,
 
141
  cache_remove_success_stat,
 
142
  cache_remove_failure_stat,
 
143
  cache_evacuate_active_stat,
 
144
  cache_evacuate_success_stat,
 
145
  cache_evacuate_failure_stat,
 
146
  cache_scan_active_stat,
 
147
  cache_scan_success_stat,
 
148
  cache_scan_failure_stat,
 
149
  cache_directory_collision_count_stat,
 
150
  cache_single_fragment_document_count_stat,
 
151
  cache_two_fragment_document_count_stat,
 
152
  cache_three_plus_plus_fragment_document_count_stat,
 
153
  cache_read_busy_success_stat,
 
154
  cache_read_busy_failure_stat,
 
155
  cache_gc_bytes_evacuated_stat,
 
156
  cache_gc_frags_evacuated_stat,
 
157
  cache_write_bytes_stat,
 
158
  cache_hdr_vector_marshal_stat,
 
159
  cache_hdr_marshal_stat,
 
160
  cache_hdr_marshal_bytes_stat,
 
161
  cache_stat_count
 
162
};
 
163
 
 
164
 
 
165
extern RecRawStatBlock *cache_rsb;
 
166
 
 
167
#define GLOBAL_CACHE_SET_DYN_STAT(x,y) \
 
168
        RecSetGlobalRawStatSum(cache_rsb, (x), (y))
 
169
 
 
170
#define CACHE_SET_DYN_STAT(x,y) \
 
171
        RecSetGlobalRawStatSum(cache_rsb, (x), (y)) \
 
172
        RecSetGlobalRawStatSum(part->cache_part->part_rsb, (x), (y))
 
173
 
 
174
#define CACHE_INCREMENT_DYN_STAT(x) \
 
175
        RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), 1); \
 
176
        RecIncrRawStat(part->cache_part->part_rsb, mutex->thread_holding, (int) (x), 1);
 
177
 
 
178
#define CACHE_DECREMENT_DYN_STAT(x) \
 
179
        RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), -1); \
 
180
        RecIncrRawStat(part->cache_part->part_rsb, mutex->thread_holding, (int) (x), -1);
 
181
 
 
182
#define CACHE_PART_SUM_DYN_STAT(x,y) \
 
183
        RecIncrRawStat(part->cache_part->part_rsb, mutex->thread_holding, (int) (x), (int) y);
 
184
 
 
185
#define CACHE_SUM_DYN_STAT(x, y) \
 
186
        RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), (int) (y)); \
 
187
        RecIncrRawStat(part->cache_part->part_rsb, mutex->thread_holding, (int) (x), (int) (y));
 
188
 
 
189
#define CACHE_SUM_DYN_STAT_THREAD(x, y) \
 
190
        RecIncrRawStat(cache_rsb, this_ethread(), (int) (x), (int) (y)); \
 
191
        RecIncrRawStat(part->cache_part->part_rsb, this_ethread(), (int) (x), (int) (y));
 
192
 
 
193
#define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
 
194
        RecIncrGlobalRawStatSum(cache_rsb,(x),(y))
 
195
 
 
196
#define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
 
197
        RecIncrGlobalRawStatSum(cache_rsb,(x),(y)) \
 
198
        RecIncrGlobalRawStatSum(part->cache_part->part_rsb,(x),(y))
 
199
 
 
200
#define CACHE_CLEAR_DYN_STAT(x) \
 
201
do { \
 
202
        RecSetRawStatSum(cache_rsb, (x), 0); \
 
203
        RecSetRawStatCount(cache_rsb, (x), 0); \
 
204
        RecSetRawStatSum(part->cache_part->part_rsb, (x), 0); \
 
205
        RecSetRawStatCount(part->cache_part->part_rsb, (x), 0); \
 
206
} while (0);
 
207
 
 
208
// Configuration
 
209
extern int cache_config_dir_sync_frequency;
 
210
extern int cache_config_http_max_alts;
 
211
extern int cache_config_permit_pinning;
 
212
extern int cache_config_select_alternate;
 
213
extern int cache_config_vary_on_user_agent;
 
214
extern int cache_config_max_doc_size;
 
215
extern int cache_config_min_average_object_size;
 
216
extern int cache_config_agg_write_backlog;
 
217
extern int cache_config_enable_checksum;
 
218
extern int cache_config_alt_rewrite_max_size;
 
219
extern int cache_config_read_while_writer;
 
220
extern char cache_system_config_directory[PATH_NAME_MAX + 1];
 
221
extern int cache_clustering_enabled;
 
222
extern int cache_config_agg_write_backlog;
 
223
extern int cache_config_ram_cache_compress;
 
224
extern int cache_config_ram_cache_compress_percent;
 
225
#ifdef HIT_EVACUATE
 
226
extern int cache_config_hit_evacuate_percent;
 
227
extern int cache_config_hit_evacuate_size_limit;
 
228
#endif
 
229
extern int cache_config_force_sector_size;
 
230
extern int cache_config_target_fragment_size;
 
231
 
 
232
// CacheVC
 
233
struct CacheVC: public CacheVConnection
 
234
{
 
235
  CacheVC();
 
236
 
 
237
  VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf);
 
238
  VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset);
 
239
  VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false);
 
240
  void do_io_close(int lerrno = -1);
 
241
  void reenable(VIO *avio);
 
242
  void reenable_re(VIO *avio);
 
243
  bool get_data(int i, void *data);
 
244
  bool set_data(int i, void *data);
 
245
  Action *action()
 
246
  {
 
247
    return &_action;
 
248
  }
 
249
  bool is_ram_cache_hit()
 
250
  {
 
251
    ink_assert(vio.op == VIO::READ);
 
252
    return !f.not_from_ram_cache;
 
253
  }
 
254
  int get_header(void **ptr, int *len)
 
255
  {
 
256
    if (first_buf.m_ptr) {
 
257
      Doc *doc = (Doc*)first_buf->data();
 
258
      *ptr = doc->hdr();
 
259
      *len = doc->hlen;
 
260
      return 0;
 
261
    } else
 
262
      return -1;
 
263
  }
 
264
  int set_header(void *ptr, int len)
 
265
  {
 
266
    header_to_write = ptr;
 
267
    header_to_write_len = len;
 
268
    return 0;
 
269
  }
 
270
  int get_single_data(void **ptr, int *len)
 
271
  {
 
272
    if (first_buf.m_ptr) {
 
273
      Doc *doc = (Doc*)first_buf->data();
 
274
      if (doc->data_len() == doc->total_len) {
 
275
        *ptr = doc->data();
 
276
        *len = doc->data_len();
 
277
        return 0;
 
278
      }
 
279
    }
 
280
    return -1;
 
281
  }
 
282
 
 
283
  bool writer_done();
 
284
  int calluser(int event);
 
285
  int callcont(int event);
 
286
  int die();
 
287
  int dead(int event, Event *e);
 
288
 
 
289
  int handleReadDone(int event, Event *e);
 
290
  int handleRead(int event, Event *e);
 
291
  int do_read_call(CacheKey *akey);
 
292
  int handleWrite(int event, Event *e);
 
293
  int handleWriteLock(int event, Event *e);
 
294
  int do_write_call();
 
295
  int do_write_lock();
 
296
  int do_write_lock_call();
 
297
  int do_sync(uint32_t target_write_serial);
 
298
 
 
299
  int openReadClose(int event, Event *e);
 
300
  int openReadReadDone(int event, Event *e);
 
301
  int openReadMain(int event, Event *e);
 
302
  int openReadStartEarliest(int event, Event *e);
 
303
#ifdef HTTP_CACHE
 
304
  int openReadVecWrite(int event, Event *e);
 
305
#endif
 
306
  int openReadStartHead(int event, Event *e);
 
307
  int openReadFromWriter(int event, Event *e);
 
308
  int openReadFromWriterMain(int event, Event *e);
 
309
  int openReadFromWriterFailure(int event, Event *);
 
310
  int openReadChooseWriter(int event, Event *e);
 
311
 
 
312
  int openWriteCloseDir(int event, Event *e);
 
313
  int openWriteCloseHeadDone(int event, Event *e);
 
314
  int openWriteCloseHead(int event, Event *e);
 
315
  int openWriteCloseDataDone(int event, Event *e);
 
316
  int openWriteClose(int event, Event *e);
 
317
  int openWriteRemoveVector(int event, Event *e);
 
318
  int openWriteWriteDone(int event, Event *e);
 
319
  int openWriteOverwrite(int event, Event *e);
 
320
  int openWriteMain(int event, Event *e);
 
321
  int openWriteStartDone(int event, Event *e);
 
322
  int openWriteStartBegin(int event, Event *e);
 
323
 
 
324
  int updateVector(int event, Event *e);
 
325
  int updateReadDone(int event, Event *e);
 
326
  int updateVecWrite(int event, Event *e);
 
327
 
 
328
  int removeEvent(int event, Event *e);
 
329
 
 
330
  int linkWrite(int event, Event *e);
 
331
  int derefRead(int event, Event *e);
 
332
 
 
333
  int scanPart(int event, Event *e);
 
334
  int scanObject(int event, Event *e);
 
335
  int scanUpdateDone(int event, Event *e);
 
336
  int scanOpenWrite(int event, Event *e);
 
337
  int scanRemoveDone(int event, Event *e);
 
338
 
 
339
  int is_io_in_progress()
 
340
  {
 
341
    return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
 
342
  }
 
343
  void set_io_not_in_progress()
 
344
  {
 
345
    io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
 
346
  }
 
347
  void set_agg_write_in_progress()
 
348
  {
 
349
    io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
 
350
  }
 
351
  int evacuateDocDone(int event, Event *e);
 
352
  int evacuateReadHead(int event, Event *e);
 
353
 
 
354
  void cancel_trigger();
 
355
  virtual int64_t get_object_size();
 
356
#ifdef HTTP_CACHE
 
357
  virtual void set_http_info(CacheHTTPInfo *info);
 
358
  virtual void get_http_info(CacheHTTPInfo ** info);
 
359
#endif
 
360
  virtual bool set_pin_in_cache(time_t time_pin);
 
361
  virtual time_t get_pin_in_cache();
 
362
  virtual bool set_disk_io_priority(int priority);
 
363
  virtual int get_disk_io_priority();
 
364
 
 
365
  // offsets from the base stat
 
366
#define CACHE_STAT_ACTIVE  0
 
367
#define CACHE_STAT_SUCCESS 1
 
368
#define CACHE_STAT_FAILURE 2
 
369
 
 
370
  // number of bytes to memset to 0 in the CacheVC when we free
 
371
  // it. All member variables starting from vio are memset to 0.
 
372
  // This variable is initialized in CacheVC constructor.
 
373
  static int size_to_init;
 
374
 
 
375
  // Start Region A
 
376
  // This set of variables are not reset when the cacheVC is freed.
 
377
  // A CacheVC must set these to the correct values whenever needed
 
378
  // These are variables that are always set to the correct values
 
379
  // before being used by the CacheVC
 
380
  CacheKey key, first_key, earliest_key, update_key;
 
381
  Dir dir, earliest_dir, overwrite_dir, first_dir;
 
382
  // end Region A
 
383
 
 
384
  // Start Region B
 
385
  // These variables are individually cleared or reset when the
 
386
  // CacheVC is freed. All these variables must be reset/cleared
 
387
  // in free_CacheVC.
 
388
  Action _action;
 
389
#ifdef HTTP_CACHE
 
390
  CacheHTTPHdr request;
 
391
#endif
 
392
  CacheHTTPInfoVector vector;
 
393
  CacheHTTPInfo alternate;
 
394
  Ptr<IOBufferData> buf;
 
395
  Ptr<IOBufferData> first_buf;
 
396
  Ptr<IOBufferBlock> blocks; // data available to write
 
397
  Ptr<IOBufferBlock> writer_buf;
 
398
 
 
399
  OpenDirEntry *od;
 
400
  AIOCallbackInternal io;
 
401
  int alternate_index;          // preferred position in vector
 
402
  LINK(CacheVC, opendir_link);
 
403
#ifdef CACHE_STAT_PAGES
 
404
  LINK(CacheVC, stat_link);
 
405
#endif
 
406
  // end Region B
 
407
 
 
408
  // Start Region C
 
409
  // These variables are memset to 0 when the structure is freed.
 
410
  // The size of this region is size_to_init which is initialized
 
411
  // in the CacheVC constuctor. It assumes that vio is the start
 
412
  // of this region.
 
413
  // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
 
414
  // size_to_init initialization
 
415
  VIO vio;
 
416
  EThread *initial_thread;  // initial thread open_XX was called on
 
417
  CacheFragType frag_type;
 
418
  CacheHTTPInfo *info;
 
419
  CacheHTTPInfoVector *write_vector;
 
420
#ifdef HTTP_CACHE
 
421
  CacheLookupHttpConfig *params;
 
422
#endif
 
423
  int header_len;       // for communicating with agg_copy
 
424
  int frag_len;         // for communicating with agg_copy
 
425
  uint32_t write_len;     // for communicating with agg_copy
 
426
  uint32_t agg_len;       // for communicating with aggWrite
 
427
  uint32_t write_serial;  // serial of the final write for SYNC
 
428
  Frag *frag;           // arraylist of fragment offset
 
429
  Frag integral_frags[INTEGRAL_FRAGS];
 
430
  Part *part;
 
431
  Dir *last_collision;
 
432
  Event *trigger;
 
433
  CacheKey *read_key;
 
434
  ContinuationHandler save_handler;
 
435
  uint32_t pin_in_cache;
 
436
  ink_hrtime start_time;
 
437
  int base_stat;
 
438
  int recursive;
 
439
  int closed;
 
440
  int64_t seek_to;                // pread offset
 
441
  int64_t offset;                 // offset into 'blocks' of data to write
 
442
  int64_t writer_offset;          // offset of the writer for reading from a writer
 
443
  int64_t length;                 // length of data available to write
 
444
  int64_t doc_pos;                // read position in 'buf'
 
445
  uint64_t write_pos;             // length written
 
446
  uint64_t total_len;             // total length written and available to write
 
447
  uint64_t doc_len;               // total_length (of the selected alternate for HTTP)
 
448
  uint64_t update_len;
 
449
  int fragment;
 
450
  int scan_msec_delay;
 
451
  CacheVC *write_vc;
 
452
  char *hostname;
 
453
  int host_len;
 
454
  int header_to_write_len;
 
455
  void *header_to_write;
 
456
  short writer_lock_retry;
 
457
 
 
458
  union
 
459
  {
 
460
    uint32_t flags;
 
461
    struct
 
462
    {
 
463
      unsigned int use_first_key:1;
 
464
      unsigned int overwrite:1; // overwrite first_key Dir if it exists
 
465
      unsigned int close_complete:1; // WRITE_COMPLETE is final
 
466
      unsigned int sync:1; // write to be committed to durable storage before WRITE_COMPLETE
 
467
      unsigned int evacuator:1;
 
468
      unsigned int single_fragment:1;
 
469
      unsigned int evac_vector:1;
 
470
      unsigned int lookup:1;
 
471
      unsigned int update:1;
 
472
      unsigned int remove:1;
 
473
      unsigned int remove_aborted_writers:1;
 
474
      unsigned int open_read_timeout:1; // UNUSED
 
475
      unsigned int data_done:1;
 
476
      unsigned int read_from_writer_called:1;
 
477
      unsigned int not_from_ram_cache:1;        // entire object was from ram cache
 
478
      unsigned int rewrite_resident_alt:1;
 
479
      unsigned int readers:1;
 
480
      unsigned int doc_from_ram_cache:1;
 
481
#ifdef HIT_EVACUATE
 
482
      unsigned int hit_evacuate:1;
 
483
#endif
 
484
    } f;
 
485
  };
 
486
  //end region C
 
487
};
 
488
 
 
489
#define PUSH_HANDLER(_x) do {                                           \
 
490
    ink_assert(handler != (ContinuationHandler)(&CacheVC::dead));       \
 
491
    save_handler = handler; handler = (ContinuationHandler)(_x);        \
 
492
} while (0)
 
493
 
 
494
#define POP_HANDLER do {                                          \
 
495
    handler = save_handler;                                       \
 
496
    ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
 
497
  } while (0)
 
498
 
 
499
struct CacheRemoveCont: public Continuation
 
500
{
 
501
  int event_handler(int event, void *data);
 
502
 
 
503
  CacheRemoveCont():Continuation(NULL) { }
 
504
};
 
505
 
 
506
 
 
507
// Global Data
 
508
 
 
509
extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
 
510
extern CacheKey zero_key;
 
511
extern CacheSync *cacheDirSync;
 
512
// Function Prototypes
 
513
#ifdef HTTP_CACHE
 
514
int cache_write(CacheVC *, CacheHTTPInfoVector *);
 
515
int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
 
516
#endif
 
517
int evacuate_segments(CacheKey *key, int force, Part *part);
 
518
CacheVC *new_DocEvacuator(int nbytes, Part *d);
 
519
 
 
520
// inline Functions
 
521
 
 
522
TS_INLINE CacheVC *
 
523
new_CacheVC(Continuation *cont)
 
524
{
 
525
  EThread *t = cont->mutex->thread_holding;
 
526
  CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
 
527
#ifdef HTTP_CACHE
 
528
  c->vector.data.data = &c->vector.data.fast_data[0];
 
529
#endif
 
530
  c->_action = cont;
 
531
  c->initial_thread = t;
 
532
  c->mutex = cont->mutex;
 
533
  c->start_time = ink_get_hrtime();
 
534
  ink_assert(c->trigger == NULL);
 
535
  Debug("cache_new", "new %p", c);
 
536
#ifdef CACHE_STAT_PAGES
 
537
  ink_assert(!c->stat_link.next);
 
538
  ink_assert(!c->stat_link.prev);
 
539
#endif
 
540
  dir_clear(&c->dir);
 
541
  return c;
 
542
}
 
543
 
 
544
TS_INLINE int
 
545
free_CacheVC(CacheVC *cont)
 
546
{
 
547
  Debug("cache_free", "free %p", cont);
 
548
  ProxyMutex *mutex = cont->mutex;
 
549
  Part *part = cont->part;
 
550
  CACHE_DECREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_ACTIVE);
 
551
  if (cont->closed > 0) {
 
552
    CACHE_INCREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_SUCCESS);
 
553
  }                             // else abort,cancel
 
554
  ink_debug_assert(mutex->thread_holding == this_ethread());
 
555
  if (cont->trigger)
 
556
    cont->trigger->cancel();
 
557
  ink_assert(!cont->is_io_in_progress());
 
558
  ink_assert(!cont->od);
 
559
  /* calling cont->io.action = NULL causes compile problem on 2.6 solaris
 
560
     release build....wierd??? For now, null out continuation and mutex
 
561
     of the action separately */
 
562
  cont->io.action.continuation = NULL;
 
563
  cont->io.action.mutex = NULL;
 
564
  cont->io.mutex.clear();
 
565
  cont->io.aio_result = 0;
 
566
  cont->io.aiocb.aio_nbytes = 0;
 
567
  cont->io.aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
 
568
#ifdef HTTP_CACHE
 
569
  cont->request.reset();
 
570
  cont->vector.clear();
 
571
#endif
 
572
  cont->vio.buffer.clear();
 
573
  cont->vio.mutex.clear();
 
574
#ifdef HTTP_CACHE
 
575
  if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT)
 
576
    cont->alternate.destroy();
 
577
  else
 
578
    cont->alternate.clear();
 
579
#endif
 
580
  cont->_action.cancelled = 0;
 
581
  cont->_action.mutex.clear();
 
582
  cont->mutex.clear();
 
583
  cont->buf.clear();
 
584
  cont->first_buf.clear();
 
585
  cont->blocks.clear();
 
586
  cont->writer_buf.clear();
 
587
  cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
 
588
  if (cont->frag && cont->frag != cont->integral_frags)
 
589
    xfree(cont->frag);
 
590
  memset((char *) &cont->vio, 0, cont->size_to_init);
 
591
#ifdef CACHE_STAT_PAGES
 
592
  ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
 
593
#endif
 
594
#ifdef DEBUG
 
595
  SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
 
596
#endif
 
597
  THREAD_FREE_TO(cont, cacheVConnectionAllocator, this_ethread(), MAX_CACHE_VCS_PER_THREAD);
 
598
  return EVENT_DONE;
 
599
}
 
600
 
 
601
TS_INLINE int
 
602
CacheVC::calluser(int event)
 
603
{
 
604
  recursive++;
 
605
  ink_debug_assert(!part || this_ethread() != part->mutex->thread_holding);
 
606
  vio._cont->handleEvent(event, (void *) &vio);
 
607
  recursive--;
 
608
  if (closed) {
 
609
    die();
 
610
    return EVENT_DONE;
 
611
  }
 
612
  return EVENT_CONT;
 
613
}
 
614
 
 
615
TS_INLINE int
 
616
CacheVC::callcont(int event)
 
617
{
 
618
  recursive++;
 
619
  ink_debug_assert(!part || this_ethread() != part->mutex->thread_holding);
 
620
  _action.continuation->handleEvent(event, this);
 
621
  recursive--;
 
622
  if (closed)
 
623
    die();
 
624
  else if (vio.vc_server)
 
625
    handleEvent(EVENT_IMMEDIATE, 0);
 
626
  return EVENT_DONE;
 
627
}
 
628
 
 
629
TS_INLINE int
 
630
CacheVC::do_read_call(CacheKey *akey)
 
631
{
 
632
  doc_pos = 0;
 
633
  read_key = akey;
 
634
  io.aiocb.aio_nbytes = dir_approx_size(&dir);
 
635
  PUSH_HANDLER(&CacheVC::handleRead);
 
636
  return handleRead(EVENT_CALL, 0);
 
637
}
 
638
 
 
639
TS_INLINE int
 
640
CacheVC::do_write_call()
 
641
{
 
642
  PUSH_HANDLER(&CacheVC::handleWrite);
 
643
  return handleWrite(EVENT_CALL, 0);
 
644
}
 
645
 
 
646
TS_INLINE void
 
647
CacheVC::cancel_trigger()
 
648
{
 
649
  if (trigger) {
 
650
    trigger->cancel_action();
 
651
    trigger = NULL;
 
652
  }
 
653
}
 
654
 
 
655
TS_INLINE int
 
656
CacheVC::die()
 
657
{
 
658
  if (vio.op == VIO::WRITE) {
 
659
#ifdef HTTP_CACHE
 
660
    if (f.update && total_len) {
 
661
      alternate.object_key_set(earliest_key);
 
662
    }
 
663
#endif
 
664
    if (!is_io_in_progress()) {
 
665
      SET_HANDLER(&CacheVC::openWriteClose);
 
666
      if (!recursive)
 
667
        openWriteClose(EVENT_NONE, NULL);
 
668
    }                           // else catch it at the end of openWriteWriteDone
 
669
    return EVENT_CONT;
 
670
  } else {
 
671
    if (is_io_in_progress())
 
672
      save_handler = (ContinuationHandler) & CacheVC::openReadClose;
 
673
    else {
 
674
      SET_HANDLER(&CacheVC::openReadClose);
 
675
      if (!recursive)
 
676
        openReadClose(EVENT_NONE, NULL);
 
677
    }
 
678
    return EVENT_CONT;
 
679
  }
 
680
}
 
681
 
 
682
TS_INLINE int
 
683
CacheVC::handleWriteLock(int event, Event *e)
 
684
{
 
685
  NOWARN_UNUSED(event);
 
686
  cancel_trigger();
 
687
  int ret = 0;
 
688
  {
 
689
    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
 
690
    if (!lock) {
 
691
      set_agg_write_in_progress();
 
692
      trigger = mutex->thread_holding->schedule_in_local(this, MUTEX_RETRY_DELAY);
 
693
      return EVENT_CONT;
 
694
    }
 
695
    ret = handleWrite(EVENT_CALL, e);
 
696
  }
 
697
  if (ret == EVENT_RETURN)
 
698
    return handleEvent(AIO_EVENT_DONE, 0);
 
699
  return EVENT_CONT;
 
700
}
 
701
 
 
702
TS_INLINE int
 
703
CacheVC::do_write_lock()
 
704
{
 
705
  PUSH_HANDLER(&CacheVC::handleWriteLock);
 
706
  return handleWriteLock(EVENT_NONE, 0);
 
707
}
 
708
 
 
709
TS_INLINE int
 
710
CacheVC::do_write_lock_call()
 
711
{
 
712
  PUSH_HANDLER(&CacheVC::handleWriteLock);
 
713
  return handleWriteLock(EVENT_CALL, 0);
 
714
}
 
715
 
 
716
TS_INLINE bool
 
717
CacheVC::writer_done()
 
718
{
 
719
  OpenDirEntry *cod = od;
 
720
  if (!cod)
 
721
    cod = part->open_read(&first_key);
 
722
  CacheVC *w = (cod) ? cod->writers.head : NULL;
 
723
  // If the write vc started after the reader, then its not the
 
724
  // original writer, since we never choose a writer that started
 
725
  // after the reader. The original writer was deallocated and then
 
726
  // reallocated for the same first_key
 
727
  for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *) w->opendir_link.next);
 
728
  if (!w)
 
729
    return true;
 
730
  return false;
 
731
}
 
732
 
 
733
TS_INLINE int
 
734
Part::close_write(CacheVC *cont)
 
735
{
 
736
 
 
737
#ifdef CACHE_STAT_PAGES
 
738
  ink_assert(stat_cache_vcs.head);
 
739
  stat_cache_vcs.remove(cont, cont->stat_link);
 
740
  ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
 
741
#endif
 
742
  return open_dir.close_write(cont);
 
743
}
 
744
 
 
745
// Returns 0 on success or a positive error code on failure
 
746
TS_INLINE int
 
747
Part::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
 
748
{
 
749
  Part *part = this;
 
750
  bool agg_error = false;
 
751
  if (!cont->f.remove) {
 
752
    agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog);
 
753
#ifdef CACHE_AGG_FAIL_RATE
 
754
    agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
 
755
                              (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
 
756
#endif
 
757
  }
 
758
  if (agg_error) {
 
759
    CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
 
760
    return ECACHE_WRITE_FAIL;
 
761
  }
 
762
  if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
 
763
#ifdef CACHE_STAT_PAGES
 
764
    ink_debug_assert(cont->mutex->thread_holding == this_ethread());
 
765
    ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
 
766
    stat_cache_vcs.enqueue(cont, cont->stat_link);
 
767
#endif
 
768
    return 0;
 
769
  }
 
770
  return ECACHE_DOC_BUSY;
 
771
}
 
772
 
 
773
TS_INLINE int
 
774
Part::close_write_lock(CacheVC *cont)
 
775
{
 
776
  EThread *t = cont->mutex->thread_holding;
 
777
  CACHE_TRY_LOCK(lock, mutex, t);
 
778
  if (!lock)
 
779
    return -1;
 
780
  return close_write(cont);
 
781
}
 
782
 
 
783
TS_INLINE int
 
784
Part::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
 
785
{
 
786
  EThread *t = cont->mutex->thread_holding;
 
787
  CACHE_TRY_LOCK(lock, mutex, t);
 
788
  if (!lock)
 
789
    return -1;
 
790
  return open_write(cont, allow_if_writers, max_writers);
 
791
}
 
792
 
 
793
TS_INLINE OpenDirEntry *
 
794
Part::open_read_lock(INK_MD5 *key, EThread *t)
 
795
{
 
796
  CACHE_TRY_LOCK(lock, mutex, t);
 
797
  if (!lock)
 
798
    return NULL;
 
799
  return open_dir.open_read(key);
 
800
}
 
801
 
 
802
TS_INLINE int
 
803
Part::begin_read_lock(CacheVC *cont)
 
804
{
 
805
  // no need for evacuation as the entire document is already in memory
 
806
#ifndef  CACHE_STAT_PAGES
 
807
  if (cont->f.single_fragment)
 
808
    return 0;
 
809
#endif
 
810
  // VC is enqueued in stat_cache_vcs in the begin_read call
 
811
  EThread *t = cont->mutex->thread_holding;
 
812
  CACHE_TRY_LOCK(lock, mutex, t);
 
813
  if (!lock)
 
814
    return -1;
 
815
  return begin_read(cont);
 
816
}
 
817
 
 
818
TS_INLINE int
 
819
Part::close_read_lock(CacheVC *cont)
 
820
{
 
821
  EThread *t = cont->mutex->thread_holding;
 
822
  CACHE_TRY_LOCK(lock, mutex, t);
 
823
  if (!lock)
 
824
    return -1;
 
825
  return close_read(cont);
 
826
}
 
827
 
 
828
TS_INLINE int
 
829
dir_delete_lock(CacheKey *key, Part *d, ProxyMutex *m, Dir *del)
 
830
{
 
831
  EThread *thread = m->thread_holding;
 
832
  CACHE_TRY_LOCK(lock, d->mutex, thread);
 
833
  if (!lock)
 
834
    return -1;
 
835
  return dir_delete(key, d, del);
 
836
}
 
837
 
 
838
TS_INLINE int
 
839
dir_insert_lock(CacheKey *key, Part *d, Dir *to_part, ProxyMutex *m)
 
840
{
 
841
  EThread *thread = m->thread_holding;
 
842
  CACHE_TRY_LOCK(lock, d->mutex, thread);
 
843
  if (!lock)
 
844
    return -1;
 
845
  return dir_insert(key, d, to_part);
 
846
}
 
847
 
 
848
TS_INLINE int
 
849
dir_overwrite_lock(CacheKey *key, Part *d, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
 
850
{
 
851
  EThread *thread = m->thread_holding;
 
852
  CACHE_TRY_LOCK(lock, d->mutex, thread);
 
853
  if (!lock)
 
854
    return -1;
 
855
  return dir_overwrite(key, d, to_part, overwrite, must_overwrite);
 
856
}
 
857
 
 
858
void TS_INLINE
 
859
rand_CacheKey(CacheKey *next_key, ProxyMutex *mutex)
 
860
{
 
861
  uint32_t *b = (uint32_t *) & next_key->b[0];
 
862
  InkRand & g = mutex->thread_holding->generator;
 
863
  for (int i = 0; i < 4; i++)
 
864
    b[i] = (uint32_t) g.random();
 
865
}
 
866
 
 
867
extern uint8_t CacheKey_next_table[];
 
868
void TS_INLINE
 
869
next_CacheKey(CacheKey *next_key, CacheKey *key)
 
870
{
 
871
  uint8_t *b = (uint8_t *) next_key;
 
872
  uint8_t *k = (uint8_t *) key;
 
873
  b[0] = CacheKey_next_table[k[0]];
 
874
  for (int i = 1; i < 16; i++)
 
875
    b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF];
 
876
}
 
877
extern uint8_t CacheKey_prev_table[];
 
878
void TS_INLINE
 
879
prev_CacheKey(CacheKey *prev_key, CacheKey *key)
 
880
{
 
881
  uint8_t *b = (uint8_t *) prev_key;
 
882
  uint8_t *k = (uint8_t *) key;
 
883
  for (int i = 15; i > 0; i--)
 
884
    b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1];
 
885
  b[0] = CacheKey_prev_table[k[0]];
 
886
}
 
887
 
 
888
TS_INLINE unsigned int
 
889
next_rand(unsigned int *p)
 
890
{
 
891
  unsigned int seed = *p;
 
892
  seed = 1103515145 * seed + 12345;
 
893
  *p = seed;
 
894
  return seed;
 
895
}
 
896
 
 
897
extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
 
898
 
 
899
TS_INLINE CacheRemoveCont *
 
900
new_CacheRemoveCont()
 
901
{
 
902
  CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
 
903
 
 
904
  cache_rm->mutex = new_ProxyMutex();
 
905
  SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
 
906
  return cache_rm;
 
907
}
 
908
 
 
909
TS_INLINE void
 
910
free_CacheRemoveCont(CacheRemoveCont *cache_rm)
 
911
{
 
912
  cache_rm->mutex = NULL;
 
913
  cacheRemoveContAllocator.free(cache_rm);
 
914
}
 
915
 
 
916
TS_INLINE int
 
917
CacheRemoveCont::event_handler(int event, void *data)
 
918
{
 
919
  (void) event;
 
920
  (void) data;
 
921
  free_CacheRemoveCont(this);
 
922
  return EVENT_DONE;
 
923
}
 
924
 
 
925
int64_t cache_bytes_used(void);
 
926
int64_t cache_bytes_total(void);
 
927
 
 
928
#ifdef DEBUG
 
929
#define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x)
 
930
#define CACHE_DEBUG_SUM_DYN_STAT(_x,_y) CACHE_SUM_DYN_STAT(_x,_y)
 
931
#else
 
932
#define CACHE_DEBUG_INCREMENT_DYN_STAT(_x)
 
933
#define CACHE_DEBUG_SUM_DYN_STAT(_x,_y)
 
934
#endif
 
935
 
 
936
struct CacheHostRecord;
 
937
struct Part;
 
938
class CacheHostTable;
 
939
 
 
940
struct Cache
 
941
{
 
942
  volatile int cache_read_done;
 
943
  volatile int total_good_npart;
 
944
  int total_npart;
 
945
  volatile int ready;
 
946
  int64_t cache_size;             //in store block size
 
947
  CacheHostTable *hosttable;
 
948
  volatile int total_initialized_part;
 
949
  int scheme;
 
950
 
 
951
  int open(bool reconfigure, bool fix);
 
952
  int close();
 
953
 
 
954
  Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len);
 
955
  inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int len);
 
956
  inkcoreapi Action *open_write(Continuation *cont, CacheKey *key,
 
957
                                CacheFragType frag_type, int options = 0,
 
958
                                time_t pin_in_cache = (time_t) 0, char *hostname = 0, int host_len = 0);
 
959
  inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
 
960
                            CacheFragType type = CACHE_FRAG_TYPE_HTTP,
 
961
                            bool user_agents = true, bool link = false,
 
962
                            char *hostname = 0, int host_len = 0);
 
963
  Action *scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500);
 
964
 
 
965
#ifdef HTTP_CACHE
 
966
  Action *lookup(Continuation *cont, URL *url, CacheFragType type);
 
967
  inkcoreapi Action *open_read(Continuation *cont, CacheKey *key,
 
968
                               CacheHTTPHdr *request,
 
969
                               CacheLookupHttpConfig *params, CacheFragType type, char *hostname, int host_len);
 
970
  Action *open_read(Continuation *cont, URL *url, CacheHTTPHdr *request,
 
971
                    CacheLookupHttpConfig *params, CacheFragType type);
 
972
  Action *open_write(Continuation *cont, CacheKey *key,
 
973
                     CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
 
974
                     CacheKey *key1 = NULL,
 
975
                     CacheFragType type = CACHE_FRAG_TYPE_HTTP, char *hostname = 0, int host_len = 0);
 
976
  Action *open_write(Continuation *cont, URL *url, CacheHTTPHdr *request,
 
977
                     CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
 
978
                     CacheFragType type = CACHE_FRAG_TYPE_HTTP);
 
979
  Action *remove(Continuation *cont, URL *url, CacheFragType type);
 
980
  static void generate_key(INK_MD5 *md5, URL *url, CacheHTTPHdr *request);
 
981
#endif
 
982
 
 
983
  Action *link(Continuation *cont, CacheKey *from, CacheKey *to, CacheFragType type, char *hostname, int host_len);
 
984
  Action *deref(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len);
 
985
 
 
986
  void part_initialized(bool result);
 
987
 
 
988
  int open_done();
 
989
 
 
990
  Part *key_to_part(CacheKey *key, char *hostname, int host_len);
 
991
 
 
992
  Cache():cache_read_done(0), total_good_npart(0), total_npart(0), ready(CACHE_INITIALIZING), cache_size(0),  // in store block size
 
993
          hosttable(NULL), total_initialized_part(0), scheme(CACHE_NONE_TYPE)
 
994
    {
 
995
    }
 
996
};
 
997
 
 
998
extern Cache *theCache;
 
999
extern Cache *theStreamCache;
 
1000
inkcoreapi extern Cache *caches[NUM_CACHE_FRAG_TYPES];
 
1001
 
 
1002
#ifdef HTTP_CACHE
 
1003
TS_INLINE Action *
 
1004
Cache::open_read(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
 
1005
                 CacheLookupHttpConfig *params, CacheFragType type)
 
1006
{
 
1007
  INK_MD5 md5;
 
1008
  int len;
 
1009
  url->MD5_get(&md5);
 
1010
  const char *hostname = url->host_get(&len);
 
1011
  return open_read(cont, &md5, request, params, type, (char *) hostname, len);
 
1012
}
 
1013
 
 
1014
TS_INLINE void
 
1015
Cache::generate_key(INK_MD5 *md5, URL *url, CacheHTTPHdr *request)
 
1016
{
 
1017
  NOWARN_UNUSED(request);
 
1018
#ifdef BROKEN_HACK_FOR_VARY_ON_UA
 
1019
  // We probably should make this configurable, both enabling it and what
 
1020
  // MIME types we want to treat differently. // Leif
 
1021
 
 
1022
  if (cache_config_vary_on_user_agent && request) {
 
1023
    // If we are varying on user-agent, we only want to do
 
1024
    //  this for text content-types since expirence says
 
1025
    //  images do not vary.   Varying on images decimiates
 
1026
    //  the hitrate (INKqa04820)
 
1027
 
 
1028
    // HDR FIX ME - mimeTable needs to be updated to support
 
1029
    //   ptr/len pairs
 
1030
 
 
1031
    // Note: if 'proxy.config.http.global_user_agent_header' is set, we
 
1032
    // should ignore 'cache_config_vary_on_user_agent' flag because
 
1033
    // all requests to OS were sent with one so-called 'global user agent'
 
1034
    // header instead of original client's 'user-agent'
 
1035
 
 
1036
    MimeTableEntry *url_mime_type_entry =
 
1037
//      mimeTable.get_entry_path(url->path_get());
 
1038
      NULL;
 
1039
 
 
1040
    if (url_mime_type_entry && strstr(url_mime_type_entry->mime_type, "text")) {
 
1041
      url->MD5_get(md5);
 
1042
      int ua_len;
 
1043
      const char *value = request->value_get(MIME_FIELD_USER_AGENT,
 
1044
                                             MIME_LEN_USER_AGENT, &ua_len);
 
1045
      if (value) {
 
1046
        INK_DIGEST_CTX context;
 
1047
        // Mix the user-agent and URL INK_MD5's
 
1048
        ink_code_incr_md5_init(&context);
 
1049
        ink_code_incr_md5_update(&context, value, ua_len);
 
1050
        ink_code_incr_md5_update(&context, (char *) md5, sizeof(INK_MD5));
 
1051
        ink_code_incr_md5_final((char *) md5, &context);
 
1052
      }
 
1053
      return;
 
1054
    }
 
1055
  }
 
1056
#endif /* BROKEN_HACK_FOR_VARY_ON_UA */
 
1057
  url->MD5_get(md5);
 
1058
}
 
1059
 
 
1060
TS_INLINE Action *
 
1061
Cache::open_write(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
 
1062
                  CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
 
1063
{
 
1064
  (void) request;
 
1065
  INK_MD5 url_md5;
 
1066
  url->MD5_get(&url_md5);
 
1067
  int len;
 
1068
  const char *hostname = url->host_get(&len);
 
1069
 
 
1070
  return open_write(cont, &url_md5, old_info, pin_in_cache, NULL, type, (char *) hostname, len);
 
1071
}
 
1072
#endif
 
1073
 
 
1074
TS_INLINE unsigned int
 
1075
cache_hash(INK_MD5 & md5)
 
1076
{
 
1077
  uint64_t f = md5.fold();
 
1078
  unsigned int mhash = (unsigned int) (f >> 32);
 
1079
  return mhash;
 
1080
}
 
1081
 
 
1082
#ifdef HTTP_CACHE
 
1083
#define CLUSTER_CACHE
 
1084
#endif
 
1085
#ifdef CLUSTER_CACHE
 
1086
#include "P_Net.h"
 
1087
#include "P_ClusterInternal.h"
 
1088
// Note: This include must occur here in order to avoid numerous forward
 
1089
//       reference problems.
 
1090
#include "P_ClusterInline.h"
 
1091
#endif
 
1092
 
 
1093
TS_INLINE Action *
 
1094
CacheProcessor::lookup(Continuation *cont, CacheKey *key, bool local_only,
 
1095
                       CacheFragType frag_type, char *hostname, int host_len)
 
1096
{
 
1097
  (void) local_only;
 
1098
#ifdef CLUSTER_CACHE
 
1099
  // Try to send remote, if not possible, handle locally
 
1100
  if ((cache_clustering_enabled > 0) && !local_only) {
 
1101
    Action *a = Cluster_lookup(cont, key, frag_type, hostname, host_len);
 
1102
    if (a) {
 
1103
      return a;
 
1104
    }
 
1105
  }
 
1106
#endif
 
1107
  return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len);
 
1108
}
 
1109
 
 
1110
TS_INLINE inkcoreapi Action *
 
1111
CacheProcessor::open_read(Continuation *cont, CacheKey *key, CacheFragType frag_type, char *hostname, int host_len)
 
1112
{
 
1113
#ifdef CLUSTER_CACHE
 
1114
  if (cache_clustering_enabled > 0) {
 
1115
    return open_read_internal(CACHE_OPEN_READ, cont, (MIOBuffer *) 0,
 
1116
                              (CacheURL *) 0, (CacheHTTPHdr *) 0,
 
1117
                              (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
 
1118
  }
 
1119
#endif
 
1120
  return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
 
1121
}
 
1122
 
 
1123
TS_INLINE Action *
 
1124
CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key, CacheFragType frag_type,
 
1125
                                 char *hostname, int host_len)
 
1126
{
 
1127
  (void) buf;
 
1128
#ifdef CLUSTER_CACHE
 
1129
  if (cache_clustering_enabled > 0) {
 
1130
    return open_read_internal(CACHE_OPEN_READ_BUFFER, cont, buf,
 
1131
                              (CacheURL *) 0, (CacheHTTPHdr *) 0,
 
1132
                              (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
 
1133
  }
 
1134
#endif
 
1135
  return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
 
1136
}
 
1137
 
 
1138
 
 
1139
TS_INLINE inkcoreapi Action *
 
1140
CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
 
1141
                           int expected_size, int options, time_t pin_in_cache,
 
1142
                           char *hostname, int host_len)
 
1143
{
 
1144
  (void) expected_size;
 
1145
#ifdef CLUSTER_CACHE
 
1146
  ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
 
1147
 
 
1148
  if (m && (cache_clustering_enabled > 0)) {
 
1149
    return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
 
1150
                         key, frag_type, options, pin_in_cache,
 
1151
                         CACHE_OPEN_WRITE, key, (CacheURL *) 0,
 
1152
                         (CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
 
1153
  }
 
1154
#endif
 
1155
  return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
 
1156
}
 
1157
 
 
1158
TS_INLINE Action *
 
1159
CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key,
 
1160
                                  CacheFragType frag_type, int options, time_t pin_in_cache,
 
1161
                                  char *hostname, int host_len)
 
1162
{
 
1163
  NOWARN_UNUSED(pin_in_cache);
 
1164
  (void)cont;
 
1165
  (void)buf;
 
1166
  (void)key;
 
1167
  (void)frag_type;
 
1168
  (void)options;
 
1169
  (void)hostname;
 
1170
  (void)host_len;
 
1171
  ink_assert(!"implemented");
 
1172
  return NULL;
 
1173
}
 
1174
 
 
1175
TS_INLINE Action *
 
1176
CacheProcessor::remove(Continuation *cont, CacheKey *key, CacheFragType frag_type,
 
1177
                       bool rm_user_agents, bool rm_link, char *hostname, int host_len)
 
1178
{
 
1179
#ifdef CLUSTER_CACHE
 
1180
  if (cache_clustering_enabled > 0) {
 
1181
    ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
 
1182
 
 
1183
    if (m) {
 
1184
      return Cluster_remove(m, cont, key, rm_user_agents, rm_link, frag_type, hostname, host_len);
 
1185
    }
 
1186
  }
 
1187
#endif
 
1188
  return caches[frag_type]->remove(cont, key, frag_type, rm_user_agents, rm_link, hostname, host_len);
 
1189
}
 
1190
 
 
1191
TS_INLINE Action *
 
1192
scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500)
 
1193
{
 
1194
  return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
 
1195
}
 
1196
 
 
1197
#ifdef HTTP_CACHE
 
1198
TS_INLINE Action *
 
1199
CacheProcessor::lookup(Continuation *cont, URL *url, bool local_only, CacheFragType frag_type)
 
1200
{
 
1201
  (void) local_only;
 
1202
  INK_MD5 md5;
 
1203
  url->MD5_get(&md5);
 
1204
  int host_len = 0;
 
1205
  const char *hostname = url->host_get(&host_len);
 
1206
 
 
1207
  return lookup(cont, &md5, local_only, frag_type, (char *) hostname, host_len);
 
1208
}
 
1209
 
 
1210
TS_INLINE Action *
 
1211
CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf,
 
1212
                                 URL *url, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type)
 
1213
{
 
1214
  (void) buf;
 
1215
#ifdef CLUSTER_CACHE
 
1216
  if (cache_clustering_enabled > 0) {
 
1217
    return open_read_internal(CACHE_OPEN_READ_BUFFER_LONG, cont, buf, url,
 
1218
                              request, params, (CacheKey *) 0, 0, type, (char *) 0, 0);
 
1219
  }
 
1220
#endif
 
1221
  return caches[type]->open_read(cont, url, request, params, type);
 
1222
}
 
1223
 
 
1224
TS_INLINE Action *
 
1225
CacheProcessor::open_write_buffer(Continuation * cont, MIOBuffer * buf, URL * url,
 
1226
                                  CacheHTTPHdr * request, CacheHTTPHdr * response, CacheFragType type)
 
1227
{
 
1228
  (void) cont;
 
1229
  (void) buf;
 
1230
  (void) url;
 
1231
  (void) request;
 
1232
  (void) response;
 
1233
  (void) type;
 
1234
  ink_assert(!"implemented");
 
1235
  return NULL;
 
1236
}
 
1237
 
 
1238
#endif
 
1239
 
 
1240
 
 
1241
#ifdef CLUSTER_CACHE
 
1242
TS_INLINE Action *
 
1243
CacheProcessor::open_read_internal(int opcode,
 
1244
                                   Continuation *cont, MIOBuffer *buf,
 
1245
                                   CacheURL *url,
 
1246
                                   CacheHTTPHdr *request,
 
1247
                                   CacheLookupHttpConfig *params,
 
1248
                                   CacheKey *key,
 
1249
                                   time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
 
1250
{
 
1251
  INK_MD5 url_md5;
 
1252
  if ((opcode == CACHE_OPEN_READ_LONG) || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
 
1253
    Cache::generate_key(&url_md5, url, request);
 
1254
  } else {
 
1255
    url_md5 = *key;
 
1256
  }
 
1257
  ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
 
1258
  ClusterMachine *owner_machine = m ? m : this_cluster_machine();
 
1259
 
 
1260
  if (owner_machine != this_cluster_machine()) {
 
1261
    return Cluster_read(owner_machine, opcode, cont, buf, url,
 
1262
                        request, params, key, pin_in_cache, frag_type, hostname, host_len);
 
1263
  } else {
 
1264
    if ((opcode == CACHE_OPEN_READ_LONG)
 
1265
        || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
 
1266
      return caches[frag_type]->open_read(cont, &url_md5, request, params, frag_type, hostname, host_len);
 
1267
    } else {
 
1268
      return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
 
1269
    }
 
1270
  }
 
1271
}
 
1272
#endif
 
1273
 
 
1274
#ifdef CLUSTER_CACHE
 
1275
TS_INLINE Action *
 
1276
CacheProcessor::link(Continuation *cont, CacheKey *from, CacheKey *to,
 
1277
                     CacheFragType type, char *hostname, int host_len)
 
1278
{
 
1279
  if (cache_clustering_enabled > 0) {
 
1280
    // Use INK_MD5 in "from" to determine target machine
 
1281
    ClusterMachine *m = cluster_machine_at_depth(cache_hash(*from));
 
1282
    if (m) {
 
1283
      return Cluster_link(m, cont, from, to, type, hostname, host_len);
 
1284
    }
 
1285
  }
 
1286
  return caches[type]->link(cont, from, to, type, hostname, host_len);
 
1287
}
 
1288
 
 
1289
TS_INLINE Action *
 
1290
CacheProcessor::deref(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len)
 
1291
{
 
1292
  if (cache_clustering_enabled > 0) {
 
1293
    ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
 
1294
    if (m) {
 
1295
      return Cluster_deref(m, cont, key, type, hostname, host_len);
 
1296
    }
 
1297
  }
 
1298
  return caches[type]->deref(cont, key, type, hostname, host_len);
 
1299
}
 
1300
#endif
 
1301
 
 
1302
TS_INLINE Action *
 
1303
CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second)
 
1304
{
 
1305
  return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
 
1306
}
 
1307
 
 
1308
TS_INLINE int
 
1309
CacheProcessor::IsCacheEnabled()
 
1310
{
 
1311
  return CacheProcessor::initialized;
 
1312
}
 
1313
 
 
1314
TS_INLINE unsigned int
 
1315
CacheProcessor::IsCacheReady(CacheFragType type)
 
1316
{
 
1317
  if (IsCacheEnabled() != CACHE_INITIALIZED)
 
1318
    return 0;
 
1319
  return (cache_ready & (1 << type));
 
1320
}
 
1321
 
 
1322
TS_INLINE Cache *
 
1323
local_cache()
 
1324
{
 
1325
  return theCache;
 
1326
}
 
1327
 
 
1328
LINK_DEFINITION(CacheVC, opendir_link)
 
1329
 
 
1330
#endif /* _P_CACHE_INTERNAL_H__ */