~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/cluster/ClusterCache.cc

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
/****************************************************************************
 
25
 
 
26
  ClusterCache.cc
 
27
****************************************************************************/
 
28
 
 
29
#include "P_Cluster.h"
 
30
 
 
31
#ifdef DEBUG
 
32
#define CLUSTER_TEST_DEBUG      1
 
33
#endif
 
34
 
 
35
#ifdef ENABLE_TIME_TRACE
 
36
int callback_time_dist[TIME_DIST_BUCKETS_SIZE];
 
37
int cache_callbacks = 0;
 
38
 
 
39
int rmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
 
40
int rmt_cache_callbacks = 0;
 
41
 
 
42
int lkrmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
 
43
int lkrmt_cache_callbacks = 0;
 
44
 
 
45
int cntlck_acquire_time_dist[TIME_DIST_BUCKETS_SIZE];
 
46
int cntlck_acquire_events = 0;
 
47
 
 
48
int open_delay_time_dist[TIME_DIST_BUCKETS_SIZE];
 
49
int open_delay_events = 0;
 
50
 
 
51
#endif // ENABLE_TIME_TRACE
 
52
 
 
53
// default will be read from config
 
54
int cache_migrate_on_demand = false;
 
55
 
 
56
/////////////////
 
57
// Static Data //
 
58
/////////////////
 
59
static ClassAllocator<CacheContinuation> cacheContAllocator("cacheContAllocator");
 
60
 
 
61
static Queue<CacheContinuation> remoteCacheContQueue[REMOTE_CONNECT_HASH];
 
62
static Ptr<ProxyMutex> remoteCacheContQueueMutex[REMOTE_CONNECT_HASH];
 
63
 
 
64
// 0 is an illegal sequence number
 
65
#define CACHE_NO_RESPONSE            0
 
66
static int cluster_sequence_number = 1;
 
67
 
 
68
#ifdef CLUSTER_TEST_DEBUG
 
69
static ink_hrtime cache_cluster_timeout = HRTIME_SECONDS(65536);
 
70
#else
 
71
static ink_hrtime cache_cluster_timeout = CACHE_CLUSTER_TIMEOUT;
 
72
#endif
 
73
 
 
74
///////////////////
 
75
// Declarations  //
 
76
///////////////////
 
77
static CacheContinuation *find_cache_continuation(unsigned int, unsigned int);
 
78
 
 
79
static unsigned int new_cache_sequence_number();
 
80
 
 
81
#define DOT_SEPARATED(_x)                             \
 
82
((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1],   \
 
83
  ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
 
84
 
 
85
#define ET_CACHE_CONT_SM        ET_NET
 
86
#define ALLOW_THREAD_STEAL      true
 
87
 
 
88
/**********************************************************************/
 
89
#ifdef CACHE_MSG_TRACE
 
90
/**********************************************************************/
 
91
 
 
92
/**********************************************************************/
 
93
// Debug trace support for cache RPC messages
 
94
/**********************************************************************/
 
95
 
 
96
#define MAX_TENTRIES    4096
 
97
struct traceEntry
 
98
{
 
99
  unsigned int seqno;
 
100
  int op;
 
101
  char *type;
 
102
};
 
103
struct traceEntry recvTraceTable[MAX_TENTRIES];
 
104
struct traceEntry sndTraceTable[MAX_TENTRIES];
 
105
 
 
106
static recvTraceTable_index = 0;
 
107
static sndTraceTable_index = 0;
 
108
 
 
109
void
 
110
log_cache_op_msg(unsigned int seqno, int op, char *type)
 
111
{
 
112
  int t = ink_atomic_increment(&recvTraceTable_index, 1);
 
113
  int n = recvTraceTable_index % MAX_TENTRIES;
 
114
  recvTraceTable[n].seqno = seqno;
 
115
  recvTraceTable[n].op = op;
 
116
  recvTraceTable[n].type = type;
 
117
}
 
118
 
 
119
void
 
120
log_cache_op_sndmsg(unsigned int seqno, int op, char *type)
 
121
{
 
122
  int t = ink_atomic_increment(&sndTraceTable_index, 1);
 
123
  int n = sndTraceTable_index % MAX_TENTRIES;
 
124
  sndTraceTable[n].seqno = seqno;
 
125
  sndTraceTable[n].op = op;
 
126
  sndTraceTable[n].type = type;
 
127
}
 
128
 
 
129
void
 
130
dump_recvtrace_table()
 
131
{
 
132
  int n;
 
133
  printf("\n");
 
134
  for (n = 0; n < MAX_TENTRIES; ++n)
 
135
    printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno,
 
136
           recvTraceTable[n].op, recvTraceTable[n].type ? recvTraceTable[n].type : "");
 
137
}
 
138
 
 
139
void
 
140
dump_sndtrace_table()
 
141
{
 
142
  int n;
 
143
  printf("\n");
 
144
  for (n = 0; n < MAX_TENTRIES; ++n)
 
145
    printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno,
 
146
           sndTraceTable[n].op, sndTraceTable[n].type ? sndTraceTable[n].type : "");
 
147
}
 
148
 
 
149
/**********************************************************************/
 
150
#endif // CACHE_MSG_TRACE
 
151
/**********************************************************************/
 
152
 
 
153
///////////////////////////////////////////////////////////////////////
 
154
// Cluster write VC cache.
 
155
///////////////////////////////////////////////////////////////////////
 
156
//
 
157
// In the event that a remote open read fails (HTTP only), an
 
158
// open write is issued and if successful a open write connection
 
159
// is returned for the open read.  We cache the open write VC and
 
160
// resolve the subsequent open write locally from the write VC cache
 
161
// using the INK_MD5 of the URL.
 
162
// Note that this is a global per node cache.
 
163
///////////////////////////////////////////////////////////////////////
 
164
 
 
165
class ClusterVConnectionCache
 
166
{
 
167
public:
 
168
  ClusterVConnectionCache()
 
169
  {
 
170
    memset(hash_event, 0, sizeof(hash_event));
 
171
  }
 
172
  void init();
 
173
  int MD5ToIndex(INK_MD5 * p);
 
174
  int insert(INK_MD5 *, ClusterVConnection *);
 
175
  ClusterVConnection *lookup(INK_MD5 *);
 
176
 
 
177
public:
 
178
  struct Entry
 
179
  {
 
180
    LINK(Entry, link);
 
181
    bool mark_for_delete;
 
182
    INK_MD5 key;
 
183
    ClusterVConnection *vc;
 
184
 
 
185
      Entry():mark_for_delete(0), vc(0)
 
186
    {
 
187
    }
 
188
     ~Entry()
 
189
    {
 
190
    }
 
191
  };
 
192
 
 
193
  enum
 
194
  { MAX_TABLE_ENTRIES = 256,    // must be power of 2
 
195
    SCAN_INTERVAL = 10          // seconds
 
196
  };
 
197
  Queue<Entry> hash_table[MAX_TABLE_ENTRIES];
 
198
  Ptr<ProxyMutex> hash_lock[MAX_TABLE_ENTRIES];
 
199
  Event *hash_event[MAX_TABLE_ENTRIES];
 
200
};
 
201
 
 
202
static ClassAllocator <
 
203
  ClusterVConnectionCache::Entry >
 
204
ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry");
 
205
 
 
206
ClusterVConnectionCache *GlobalOpenWriteVCcache = 0;
 
207
 
 
208
/////////////////////////////////////////////////////////////////
 
209
// Perform periodic purges of ClusterVConnectionCache entries
 
210
/////////////////////////////////////////////////////////////////
 
211
class ClusterVConnectionCacheEvent:public Continuation
 
212
{
 
213
public:
 
214
  ClusterVConnectionCacheEvent(ClusterVConnectionCache * c, int n)
 
215
  : Continuation(new_ProxyMutex()), cache(c), hash_index(n)
 
216
  {
 
217
    SET_HANDLER(&ClusterVConnectionCacheEvent::eventHandler);
 
218
  }
 
219
  int eventHandler(int, Event *);
 
220
 
 
221
private:
 
222
  ClusterVConnectionCache * cache;
 
223
  int hash_index;
 
224
};
 
225
 
 
226
void
 
227
ClusterVConnectionCache::init()
 
228
{
 
229
  int n;
 
230
  ClusterVConnectionCacheEvent *eh;
 
231
 
 
232
  for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
 
233
    hash_lock[n] = new_ProxyMutex();
 
234
  }
 
235
  for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
 
236
    // Setup up periodic purge events on each hash list
 
237
 
 
238
    eh = new ClusterVConnectionCacheEvent(this, n);
 
239
    hash_event[n] =
 
240
      eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
 
241
  }
 
242
}
 
243
inline int
 
244
ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p)
 
245
{
 
246
  uint64_t i = p->fold();
 
247
  int32_t h, l;
 
248
 
 
249
  h = i >> 32;
 
250
  l = i & 0xFFFFFFFF;
 
251
  return ((h ^ l) % MAX_TABLE_ENTRIES) & (MAX_TABLE_ENTRIES - 1);
 
252
}
 
253
 
 
254
int
 
255
ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
 
256
{
 
257
  int index = MD5ToIndex(key);
 
258
  Entry *e;
 
259
  EThread *thread = this_ethread();
 
260
  ProxyMutex *mutex = thread->mutex;
 
261
 
 
262
  MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
 
263
  if (!lock) {
 
264
    CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
 
265
    return 0;                   // lock miss, retry later
 
266
 
 
267
  } else {
 
268
    // Add entry to list
 
269
 
 
270
    e = ClusterVCCacheEntryAlloc.alloc();
 
271
    e->key = *key;
 
272
    e->vc = vc;
 
273
    hash_table[index].enqueue(e);
 
274
    CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
 
275
  }
 
276
  return 1;                     // Success
 
277
}
 
278
 
 
279
ClusterVConnection *
 
280
ClusterVConnectionCache::lookup(INK_MD5 * key)
 
281
{
 
282
  int index = MD5ToIndex(key);
 
283
  Entry *e;
 
284
  ClusterVConnection *vc = 0;
 
285
  EThread *thread = this_ethread();
 
286
  ProxyMutex *mutex = thread->mutex;
 
287
 
 
288
  MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
 
289
  if (!lock) {
 
290
    CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
 
291
    return vc;                  // lock miss, retry later
 
292
 
 
293
  } else {
 
294
    e = hash_table[index].head;
 
295
    while (e) {
 
296
      if (*key == e->key) {     // Hit
 
297
        vc = e->vc;
 
298
        hash_table[index].remove(e);
 
299
        ClusterVCCacheEntryAlloc.free(e);
 
300
        CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
 
301
        return vc;
 
302
 
 
303
      } else {
 
304
        e = e->link.next;
 
305
      }
 
306
    }
 
307
  }
 
308
  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
 
309
  return (ClusterVConnection *) - 1;    // Miss
 
310
}
 
311
 
 
312
int
 
313
ClusterVConnectionCacheEvent::eventHandler(int event, Event * e)
 
314
{
 
315
  NOWARN_UNUSED(event);
 
316
  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
 
317
  MUTEX_TRY_LOCK(lock, cache->hash_lock[hash_index], this_ethread());
 
318
  if (!lock) {
 
319
    CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
 
320
    e->schedule_in(HRTIME_MSECONDS(10));
 
321
    return EVENT_DONE;
 
322
  }
 
323
  // Perform purge action on unreferenced VC(s).
 
324
 
 
325
  ClusterVConnectionCache::Entry * entry;
 
326
  ClusterVConnectionCache::Entry * next_entry;
 
327
  entry = cache->hash_table[hash_index].head;
 
328
 
 
329
  while (entry) {
 
330
    if (entry->mark_for_delete) {
 
331
      next_entry = entry->link.next;
 
332
 
 
333
      cache->hash_table[hash_index].remove(entry);
 
334
      entry->vc->allow_remote_close();
 
335
      entry->vc->do_io(VIO::CLOSE);
 
336
 
 
337
      ClusterVCCacheEntryAlloc.free(entry);
 
338
      entry = next_entry;
 
339
      CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
 
340
 
 
341
    } else {
 
342
      entry->mark_for_delete = true;
 
343
      entry = entry->link.next;
 
344
    }
 
345
  }
 
346
 
 
347
  // Setup for next purge event
 
348
 
 
349
  e->schedule_in(HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
 
350
  return EVENT_DONE;
 
351
}
 
352
 
 
353
///////////////////////////////////////////////////////////////////////
 
354
 
 
355
////////////////////////////////////////////////////
 
356
// init()
 
357
//   Global initializations for CacheContinuation
 
358
////////////////////////////////////////////////////
 
359
int
 
360
CacheContinuation::init()
 
361
{
 
362
  int n;
 
363
  for (n = 0; n < REMOTE_CONNECT_HASH; ++n)
 
364
    remoteCacheContQueueMutex[n] = new_ProxyMutex();
 
365
 
 
366
  GlobalOpenWriteVCcache = new ClusterVConnectionCache;
 
367
  GlobalOpenWriteVCcache->init();
 
368
  return 0;
 
369
}
 
370
 
 
371
///////////////////////////////////////////////////////////////////////
 
372
// do_op()
 
373
//   Main function to do a cluster cache operation
 
374
///////////////////////////////////////////////////////////////////////
 
375
Action *
 
376
CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
 
377
                         int user_opcode, char *data, int data_len, int nbytes, MIOBuffer * b)
 
378
{
 
379
  CacheContinuation *cc = 0;
 
380
  Action *act = 0;
 
381
  char *msg = 0;
 
382
 
 
383
  /////////////////////////////////////////////////////////////////////
 
384
  // Unconditionally map open read buffer interfaces to open read.
 
385
  // open read buffer interfaces are now deprecated.
 
386
  /////////////////////////////////////////////////////////////////////
 
387
  int opcode = user_opcode;
 
388
  switch (opcode) {
 
389
  case CACHE_OPEN_READ_BUFFER:
 
390
    opcode = CACHE_OPEN_READ;
 
391
    break;
 
392
  case CACHE_OPEN_READ_BUFFER_LONG:
 
393
    opcode = CACHE_OPEN_READ_LONG;
 
394
    break;
 
395
  default:
 
396
    break;
 
397
  }
 
398
 
 
399
  if (c) {
 
400
    cc = cacheContAllocator_alloc();
 
401
    cc->target_machine = mp;
 
402
    cc->request_opcode = opcode;
 
403
    cc->mutex = c->mutex;
 
404
    cc->action = c;
 
405
    cc->action.cancelled = false;
 
406
    cc->start_time = ink_get_hrtime();
 
407
    cc->from = mp;
 
408
    cc->result = op_failure(opcode);
 
409
    SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
 
410
                             & CacheContinuation::remoteOpEvent);
 
411
    act = &cc->action;
 
412
 
 
413
    // set up sequence number so we can find this continuation
 
414
 
 
415
    cc->target_ip = mp->ip;
 
416
    cc->seq_number = new_cache_sequence_number();
 
417
 
 
418
    // establish timeout for cache op
 
419
 
 
420
    unsigned int hash = FOLDHASH(cc->target_ip, cc->seq_number);
 
421
    MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
 
422
    if (!queuelock) {
 
423
 
 
424
      // failed to acquire lock: no problem, retry later
 
425
      cc->timeout = eventProcessor.schedule_in(cc, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
 
426
    } else {
 
427
      remoteCacheContQueue[hash].enqueue(cc);
 
428
      MUTEX_RELEASE(queuelock);
 
429
      cc->timeout = eventProcessor.schedule_in(cc, cache_cluster_timeout, ET_CACHE_CONT_SM);
 
430
    }
 
431
  }
 
432
  //
 
433
  // Determine the type of the "Over The Wire" (OTW) message header and
 
434
  //   initialize it.
 
435
  //
 
436
#ifdef PURIFY
 
437
  if (data)
 
438
    memset(data, 0, op_to_sizeof_fixedlen_msg(opcode));
 
439
#endif
 
440
  Debug("cache_msg",
 
441
        "do_op opcode=%d seqno=%d Machine=0x%x data=0x%x datalen=%d mio=0x%x",
 
442
        opcode, (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b);
 
443
 
 
444
  switch (opcode) {
 
445
  case CACHE_OPEN_WRITE_BUFFER:
 
446
  case CACHE_OPEN_WRITE_BUFFER_LONG:
 
447
    {
 
448
      ink_release_assert(!"write buffer not supported");
 
449
      break;
 
450
    }
 
451
  case CACHE_OPEN_READ_BUFFER:
 
452
  case CACHE_OPEN_READ_BUFFER_LONG:
 
453
    {
 
454
      ink_release_assert(!"read buffer not supported");
 
455
      break;
 
456
    }
 
457
  case CACHE_OPEN_WRITE:
 
458
  case CACHE_OPEN_READ:
 
459
    {
 
460
      ink_release_assert(c > 0);
 
461
      //////////////////////
 
462
      // Use short format //
 
463
      //////////////////////
 
464
      if (!data) {
 
465
        data_len = op_to_sizeof_fixedlen_msg(opcode);
 
466
        data = (char *) ALLOCA_DOUBLE(data_len);
 
467
#ifdef PURIFY
 
468
        memset(data, 0, data_len);
 
469
#endif
 
470
      }
 
471
      msg = (char *) data;
 
472
      CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
 
473
      m->init();
 
474
      m->opcode = opcode;
 
475
      m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
 
476
      m->md5 = *((CacheOpArgs_General *) args)->url_md5;
 
477
      cc->url_md5 = m->md5;
 
478
      m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
 
479
      m->frag_type = ((CacheOpArgs_General *) args)->frag_type;
 
480
      if (opcode == CACHE_OPEN_WRITE) {
 
481
        m->nbytes = nbytes;
 
482
        m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
 
483
      } else {
 
484
        m->nbytes = 0;
 
485
        m->data = 0;
 
486
      }
 
487
 
 
488
      if (opcode == CACHE_OPEN_READ) {
 
489
        //
 
490
        // Set upper limit on initial data received with response
 
491
        // for open read response
 
492
        //
 
493
        m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
 
494
      } else {
 
495
        m->buffer_size = 0;
 
496
      }
 
497
 
 
498
      //
 
499
      // Establish the local VC
 
500
      //
 
501
      int res = setup_local_vc(msg, data_len, cc, mp, &act);
 
502
      if (!res) {
 
503
        /////////////////////////////////////////////////////
 
504
        // Unable to setup local VC, request aborted.
 
505
        // Remove request from pending list and deallocate.
 
506
        /////////////////////////////////////////////////////
 
507
        cc->remove_and_delete(0, (Event *) 0);
 
508
        return act;
 
509
 
 
510
      } else if (res != -1) {
 
511
        ///////////////////////////////////////
 
512
        // VC established, send request
 
513
        ///////////////////////////////////////
 
514
        break;
 
515
 
 
516
      } else {
 
517
        //////////////////////////////////////////////////////
 
518
        // Unable to setup VC, delay required, await callback
 
519
        //////////////////////////////////////////////////////
 
520
        goto no_send_exit;
 
521
      }
 
522
    }
 
523
 
 
524
  case CACHE_OPEN_READ_LONG:
 
525
  case CACHE_OPEN_WRITE_LONG:
 
526
    {
 
527
      ink_release_assert(c > 0);
 
528
      //////////////////////
 
529
      // Use long format  //
 
530
      //////////////////////
 
531
      msg = data;
 
532
      CacheOpMsg_long *m = (CacheOpMsg_long *) msg;
 
533
      m->init();
 
534
      m->opcode = opcode;
 
535
      m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
 
536
      m->url_md5 = *((CacheOpArgs_General *) args)->url_md5;
 
537
      cc->url_md5 = m->url_md5;
 
538
      m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
 
539
      m->nbytes = nbytes;
 
540
      m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
 
541
      m->frag_type = (uint32_t) ((CacheOpArgs_General *) args)->frag_type;
 
542
 
 
543
      if (opcode == CACHE_OPEN_READ_LONG) {
 
544
        //
 
545
        // Set upper limit on initial data received with response
 
546
        // for open read response
 
547
        //
 
548
        m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
 
549
      } else {
 
550
        m->buffer_size = 0;
 
551
      }
 
552
      //
 
553
      // Establish the local VC
 
554
      //
 
555
      int res = setup_local_vc(msg, data_len, cc, mp, &act);
 
556
      if (!res) {
 
557
        /////////////////////////////////////////////////////
 
558
        // Unable to setup local VC, request aborted.
 
559
        // Remove request from pending list and deallocate.
 
560
        /////////////////////////////////////////////////////
 
561
        cc->remove_and_delete(0, (Event *) 0);
 
562
        return act;
 
563
 
 
564
      } else if (res != -1) {
 
565
        ///////////////////////////////////////
 
566
        // VC established, send request
 
567
        ///////////////////////////////////////
 
568
        break;
 
569
 
 
570
      } else {
 
571
        //////////////////////////////////////////////////////
 
572
        // Unable to setup VC, delay required, await callback
 
573
        //////////////////////////////////////////////////////
 
574
        goto no_send_exit;
 
575
      }
 
576
    }
 
577
  case CACHE_UPDATE:
 
578
  case CACHE_REMOVE:
 
579
  case CACHE_DEREF:
 
580
    {
 
581
      //////////////////////
 
582
      // Use short format //
 
583
      //////////////////////
 
584
      msg = data;
 
585
      CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
 
586
      m->init();
 
587
      m->opcode = opcode;
 
588
      m->frag_type = ((CacheOpArgs_Deref *) args)->frag_type;
 
589
      m->cfl_flags = ((CacheOpArgs_Deref *) args)->cfl_flags;
 
590
      if (opcode == CACHE_DEREF)
 
591
        m->md5 = *((CacheOpArgs_Deref *) args)->md5;
 
592
      else
 
593
        m->md5 = *((CacheOpArgs_General *) args)->url_md5;
 
594
      m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
 
595
      break;
 
596
    }
 
597
  case CACHE_LINK:
 
598
    {
 
599
      ////////////////////////
 
600
      // Use short_2 format //
 
601
      ////////////////////////
 
602
      msg = data;
 
603
      CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *) msg;
 
604
      m->init();
 
605
      m->opcode = opcode;
 
606
      m->cfl_flags = ((CacheOpArgs_Link *) args)->cfl_flags;
 
607
      m->md5_1 = *((CacheOpArgs_Link *) args)->from;
 
608
      m->md5_2 = *((CacheOpArgs_Link *) args)->to;
 
609
      m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
 
610
      m->frag_type = ((CacheOpArgs_Link *) args)->frag_type;
 
611
      break;
 
612
    }
 
613
  default:
 
614
    msg = 0;
 
615
    break;
 
616
  }
 
617
#ifdef CACHE_MSG_TRACE
 
618
  log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
 
619
#endif
 
620
  clusterProcessor.invoke_remote(mp,
 
621
                                 op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
 
622
                                 : CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
 
623
 
 
624
no_send_exit:
 
625
  if (c) {
 
626
    return act;
 
627
  } else {
 
628
    return (Action *) 0;
 
629
  }
 
630
}
 
631
 
 
632
int
 
633
CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action ** act)
 
634
{
 
635
  bool read_op = op_is_read(cc->request_opcode);
 
636
  bool short_msg = op_is_shortform(cc->request_opcode);
 
637
 
 
638
  // Alloc buffer, copy message and attach to continuation
 
639
  cc->setMsgBufferLen(data_len);
 
640
  cc->allocMsgBuffer();
 
641
  memcpy(cc->getMsgBuffer(), data, data_len);
 
642
 
 
643
  SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
 
644
                           & CacheContinuation::localVCsetupEvent);
 
645
 
 
646
  if (short_msg) {
 
647
    Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
 
648
  } else {
 
649
    Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
 
650
  }
 
651
 
 
652
  // Create local VC
 
653
  ClusterVConnection *vc;
 
654
 
 
655
  if (!read_op && (cc->request_opcode == CACHE_OPEN_WRITE_LONG)) {
 
656
    // Determine if the open_write has already been established.
 
657
    vc = cc->lookupOpenWriteVC();
 
658
 
 
659
  } else {
 
660
    vc = clusterProcessor.open_local(cc, mp, cc->open_local_token,
 
661
                                     (CLUSTER_OPT_ALLOW_IMMEDIATE |
 
662
                                      (read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE)));
 
663
  }
 
664
  if (!vc) {
 
665
    // Error, abort request
 
666
    if (short_msg) {
 
667
      Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d",
 
668
            (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
 
669
    } else {
 
670
      Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d",
 
671
            (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
 
672
    }
 
673
    cc->freeMsgBuffer();
 
674
    if (cc->timeout)
 
675
      cc->timeout->cancel();
 
676
    cc->timeout = NULL;
 
677
 
 
678
    // Post async failure callback on a different continuation.
 
679
    *act = callback_failure(&cc->action, (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
 
680
    return 0;
 
681
 
 
682
  } else if (vc != CLUSTER_DELAYED_OPEN) {
 
683
    // We have established the VC
 
684
    if (read_op) {
 
685
      cc->read_cluster_vc = vc;
 
686
    } else {
 
687
      cc->write_cluster_vc = vc;
 
688
    }
 
689
    cc->cluster_vc_channel = vc->channel;
 
690
    vc->current_cont = cc;
 
691
 
 
692
    if (short_msg) {
 
693
      CacheOpMsg_short *ms = (CacheOpMsg_short *) data;
 
694
      ms->channel = vc->channel;
 
695
      ms->token = cc->open_local_token;
 
696
      Debug("cache_proto",
 
697
            "0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
 
698
            (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
 
699
    } else {
 
700
      CacheOpMsg_long *ml = (CacheOpMsg_long *) data;
 
701
      ml->channel = vc->channel;
 
702
      ml->token = cc->open_local_token;
 
703
      Debug("cache_proto",
 
704
            "1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
 
705
            (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
 
706
    }
 
707
    cc->freeMsgBuffer();
 
708
    SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
 
709
                             & CacheContinuation::remoteOpEvent);
 
710
    return 1;
 
711
 
 
712
  } else {
 
713
    //////////////////////////////////////////////////////
 
714
    // Unable to setup VC, delay required, await callback
 
715
    //////////////////////////////////////////////////////
 
716
    return -1;
 
717
  }
 
718
}
 
719
 
 
720
ClusterVConnection *
 
721
CacheContinuation::lookupOpenWriteVC()
 
722
{
 
723
  ///////////////////////////////////////////////////////////////
 
724
  // See if we already have an open_write ClusterVConnection
 
725
  // which was established in a previous remote open_read which
 
726
  // failed.
 
727
  ///////////////////////////////////////////////////////////////
 
728
  ClusterVConnection *vc;
 
729
  CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
 
730
 
 
731
  vc = GlobalOpenWriteVCcache->lookup(&ml->url_md5);
 
732
 
 
733
  if (vc == ((ClusterVConnection *) 0)) {
 
734
    // Retry lookup
 
735
    SET_CONTINUATION_HANDLER(this, (CacheContHandler)
 
736
                             & CacheContinuation::lookupOpenWriteVCEvent);
 
737
    //
 
738
    // Note: In the lookupOpenWriteVCEvent handler, we use EVENT_IMMEDIATE
 
739
    //       to distinguish the lookup retry from a request timeout
 
740
    //       which uses EVENT_INTERVAL.
 
741
    //
 
742
    lookup_open_write_vc_event = eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
 
743
 
 
744
  } else if (vc != ((ClusterVConnection *) - 1)) {
 
745
    // Hit, found open_write VC in cache.
 
746
    // Post open_write completion by simulating a
 
747
    // remote cache op result message.
 
748
 
 
749
    vc->action_ = action;       // establish new continuation
 
750
 
 
751
    SET_CONTINUATION_HANDLER(this, (CacheContHandler)
 
752
                             & CacheContinuation::localVCsetupEvent);
 
753
    this->handleEvent(CLUSTER_EVENT_OPEN_EXISTS, vc);
 
754
 
 
755
    CacheOpReplyMsg msg;
 
756
    int msglen;
 
757
 
 
758
    msglen = CacheOpReplyMsg::sizeof_fixedlen_msg();
 
759
    msg.result = CACHE_EVENT_OPEN_WRITE;
 
760
    msg.seq_number = seq_number;
 
761
    msg.token = vc->token;
 
762
 
 
763
    cache_op_result_ClusterFunction(from, (void *) &msg, msglen);
 
764
 
 
765
  } else {
 
766
    // Miss, establish local VC and send remote open_write request
 
767
 
 
768
    SET_CONTINUATION_HANDLER(this, (CacheContHandler)
 
769
                             & CacheContinuation::localVCsetupEvent);
 
770
    vc = clusterProcessor.open_local(this, from, open_local_token,
 
771
                                     (CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
 
772
    if (!vc) {
 
773
      this->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
 
774
 
 
775
    } else if (vc != CLUSTER_DELAYED_OPEN) {
 
776
      this->handleEvent(CLUSTER_EVENT_OPEN, vc);
 
777
    }
 
778
  }
 
779
  return CLUSTER_DELAYED_OPEN;  // force completion in callback
 
780
}
 
781
 
 
782
int
 
783
CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e)
 
784
{
 
785
  if (event == EVENT_IMMEDIATE) {
 
786
    // Retry open_write VC lookup
 
787
    lookupOpenWriteVC();
 
788
 
 
789
  } else {
 
790
    lookup_open_write_vc_event->cancel();
 
791
    SET_CONTINUATION_HANDLER(this, (CacheContHandler)
 
792
                             & CacheContinuation::localVCsetupEvent);
 
793
    this->handleEvent(event, e);
 
794
  }
 
795
  return EVENT_DONE;
 
796
}
 
797
 
 
798
int
 
799
CacheContinuation::remove_and_delete(int event, Event * e)
 
800
{
 
801
  NOWARN_UNUSED(event);
 
802
  unsigned int hash = FOLDHASH(target_ip, seq_number);
 
803
  MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
 
804
  if (queuelock) {
 
805
    if (remoteCacheContQueue[hash].in(this)) {
 
806
      remoteCacheContQueue[hash].remove(this);
 
807
    }
 
808
    MUTEX_RELEASE(queuelock);
 
809
    if (use_deferred_callback)
 
810
      callback_failure(&action, result, result_error, this);
 
811
    else
 
812
      cacheContAllocator_free(this);
 
813
 
 
814
  } else {
 
815
    SET_HANDLER((CacheContHandler) & CacheContinuation::remove_and_delete);
 
816
    if (!e) {
 
817
      timeout = eventProcessor.schedule_in(this, cache_cluster_timeout, ET_CACHE_CONT_SM);
 
818
    } else {
 
819
      e->schedule_in(cache_cluster_timeout);
 
820
    }
 
821
  }
 
822
  return EVENT_DONE;
 
823
}
 
824
 
 
825
int
 
826
CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
 
827
{
 
828
  ink_assert(magicno == (int) MagicNo);
 
829
  ink_assert(getMsgBuffer());
 
830
  bool short_msg = op_is_shortform(request_opcode);
 
831
  bool read_op = op_is_read(request_opcode);
 
832
 
 
833
  if (event == EVENT_INTERVAL) {
 
834
    Event *e = (Event *) vc;
 
835
    unsigned int hash = FOLDHASH(target_ip, seq_number);
 
836
 
 
837
    MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
 
838
    if (!queuelock) {
 
839
      e->schedule_in(CACHE_RETRY_PERIOD);
 
840
      return EVENT_CONT;
 
841
    }
 
842
 
 
843
    if (!remoteCacheContQueue[hash].in(this)) {
 
844
      ////////////////////////////////////////////////////
 
845
      // Not yet queued on outstanding operations list
 
846
      ////////////////////////////////////////////////////
 
847
      remoteCacheContQueue[hash].enqueue(this);
 
848
      ink_assert(timeout == e);
 
849
      MUTEX_RELEASE(queuelock);
 
850
      e->schedule_in(cache_cluster_timeout);
 
851
      return EVENT_CONT;
 
852
 
 
853
    } else {
 
854
      /////////////////////////////////////////////////////
 
855
      // Timeout occurred
 
856
      /////////////////////////////////////////////////////
 
857
      remoteCacheContQueue[hash].remove(this);
 
858
      MUTEX_RELEASE(queuelock);
 
859
      Debug("cluster_timeout", "0cluster op timeout %d", seq_number);
 
860
      CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
 
861
      timeout = (Event *) 1;    // Note timeout
 
862
      /////////////////////////////////////////////////////////////////
 
863
      // Note: Failure callback is sent now, but the deallocation of
 
864
      //       the CacheContinuation is deferred until we receive the
 
865
      //       open_local() callback.
 
866
      /////////////////////////////////////////////////////////////////
 
867
      if (!action.cancelled)
 
868
        action.continuation->handleEvent((read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
 
869
      return EVENT_DONE;
 
870
    }
 
871
 
 
872
  } else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS))
 
873
             && (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0)) {
 
874
    ink_hrtime now;
 
875
    now = ink_get_hrtime();
 
876
    CLUSTER_SUM_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT, now - start_time);
 
877
    LOG_EVENT_TIME(start_time, open_delay_time_dist, open_delay_events);
 
878
    if (read_op) {
 
879
      read_cluster_vc = vc;
 
880
    } else {
 
881
      write_cluster_vc = vc;
 
882
    }
 
883
    cluster_vc_channel = vc->channel;
 
884
    vc->current_cont = this;
 
885
 
 
886
    if (short_msg) {
 
887
      CacheOpMsg_short *ms = (CacheOpMsg_short *) getMsgBuffer();
 
888
      ms->channel = vc->channel;
 
889
      ms->token = open_local_token;
 
890
 
 
891
      Debug("cache_proto",
 
892
            "2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
 
893
            (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
 
894
 
 
895
    } else {
 
896
      CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
 
897
      ml->channel = vc->channel;
 
898
      ml->token = open_local_token;
 
899
 
 
900
      Debug("cache_proto",
 
901
            "3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
 
902
            (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
 
903
    }
 
904
    SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
 
905
 
 
906
    if (event != CLUSTER_EVENT_OPEN_EXISTS) {
 
907
      // Send request message
 
908
      clusterProcessor.invoke_remote(from,
 
909
                                     (op_needs_marshalled_coi(request_opcode) ?
 
910
                                      CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
 
911
                                      CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
 
912
    }
 
913
 
 
914
  } else {
 
915
    int send_failure_callback = 1;
 
916
 
 
917
    if (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0) {
 
918
      if (short_msg) {
 
919
        Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d",
 
920
              (read_op ? "R" : "W"), ((CacheOpMsg_short *) getMsgBuffer())->seq_number);
 
921
      } else {
 
922
        Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d",
 
923
              (read_op ? "R" : "W"), ((CacheOpMsg_long *) getMsgBuffer())->seq_number);
 
924
      }
 
925
 
 
926
    } else {
 
927
      Debug("cache_proto", "4open_local cancelled due to timeout, seqno=%d", seq_number);
 
928
      this->timeout = 0;
 
929
 
 
930
      // Deallocate VC if successfully acquired
 
931
 
 
932
      if (event == CLUSTER_EVENT_OPEN) {
 
933
        vc->pending_remote_fill = 0;
 
934
        vc->remote_closed = 1;  // avoid remote close msg
 
935
        vc->do_io(VIO::CLOSE);
 
936
      }
 
937
      send_failure_callback = 0;        // already sent.
 
938
    }
 
939
 
 
940
    if (this->timeout)
 
941
      this->timeout->cancel();
 
942
    this->timeout = NULL;
 
943
 
 
944
    freeMsgBuffer();
 
945
    if (send_failure_callback) {
 
946
      //
 
947
      // Action corresponding to "this" already sent back to user,
 
948
      //   use "this" to establish the failure callback after
 
949
      //   removing ourselves from the active list.
 
950
      //
 
951
      this->use_deferred_callback = true;
 
952
      this->result = (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED);
 
953
      this->result_error = 0;
 
954
      remove_and_delete(0, (Event *) 0);
 
955
 
 
956
    } else {
 
957
      cacheContAllocator_free(this);
 
958
    }
 
959
    return EVENT_DONE;
 
960
  }
 
961
  // Free message
 
962
  freeMsgBuffer();
 
963
 
 
964
  return EVENT_DONE;
 
965
}
 
966
 
 
967
///////////////////////////////////////////////////////////////////////////
 
968
// cache_op_ClusterFunction()
 
969
//   On the receiving side, handle a general cluster cache operation
 
970
///////////////////////////////////////////////////////////////////////////
 
971
 
 
972
////////////////////////////////////////////////////////////////////////
 
973
// Marshaling functions for OTW message headers
 
974
////////////////////////////////////////////////////////////////////////
 
975
 
 
976
inline CacheOpMsg_long *
 
977
unmarshal_CacheOpMsg_long(void *data, int NeedByteSwap)
 
978
{
 
979
  if (NeedByteSwap)
 
980
    ((CacheOpMsg_long *) data)->SwapBytes();
 
981
  return (CacheOpMsg_long *) data;
 
982
}
 
983
 
 
984
inline CacheOpMsg_short *
 
985
unmarshal_CacheOpMsg_short(void *data, int NeedByteSwap)
 
986
{
 
987
  if (NeedByteSwap)
 
988
    ((CacheOpMsg_short *) data)->SwapBytes();
 
989
  return (CacheOpMsg_short *) data;
 
990
}
 
991
 
 
992
inline CacheOpMsg_short_2 *
 
993
unmarshal_CacheOpMsg_short_2(void *data, int NeedByteSwap)
 
994
{
 
995
  if (NeedByteSwap)
 
996
    ((CacheOpMsg_short_2 *) data)->SwapBytes();
 
997
  return (CacheOpMsg_short_2 *) data;
 
998
}
 
999
 
 
1000
// init_from_long() support routine for cache_op_ClusterFunction()
 
1001
inline void
 
1002
init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * m)
 
1003
{
 
1004
  cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
 
1005
  cont->seq_number = msg->seq_number;
 
1006
  cont->cfl_flags = msg->cfl_flags;
 
1007
  cont->from = m;
 
1008
  cont->url_md5 = msg->url_md5;
 
1009
  cont->cluster_vc_channel = msg->channel;
 
1010
  cont->frag_type = (CacheFragType) msg->frag_type;
 
1011
  if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG)
 
1012
      || (cont->request_opcode == CACHE_OPEN_READ_LONG)) {
 
1013
    cont->pin_in_cache = (time_t) msg->data;
 
1014
  } else {
 
1015
    cont->pin_in_cache = 0;
 
1016
  }
 
1017
  cont->token = msg->token;
 
1018
  cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
 
1019
 
 
1020
  if (cont->request_opcode == CACHE_OPEN_READ_LONG) {
 
1021
    cont->caller_buf_freebytes = msg->buffer_size;
 
1022
  } else {
 
1023
    cont->caller_buf_freebytes = 0;
 
1024
  }
 
1025
}
 
1026
 
 
1027
// init_from_short() support routine for cache_op_ClusterFunction()
 
1028
inline void
 
1029
init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine * m)
 
1030
{
 
1031
  cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
 
1032
  cont->seq_number = msg->seq_number;
 
1033
  cont->cfl_flags = msg->cfl_flags;
 
1034
  cont->from = m;
 
1035
  cont->url_md5 = msg->md5;
 
1036
  cont->cluster_vc_channel = msg->channel;
 
1037
  cont->token = msg->token;
 
1038
  cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
 
1039
  cont->frag_type = (CacheFragType) msg->frag_type;
 
1040
 
 
1041
  if (cont->request_opcode == CACHE_OPEN_WRITE) {
 
1042
    cont->pin_in_cache = (time_t) msg->data;
 
1043
  } else {
 
1044
    cont->pin_in_cache = 0;
 
1045
  }
 
1046
 
 
1047
  if (cont->request_opcode == CACHE_OPEN_READ) {
 
1048
    cont->caller_buf_freebytes = msg->buffer_size;
 
1049
  } else {
 
1050
    cont->caller_buf_freebytes = 0;
 
1051
  }
 
1052
}
 
1053
 
 
1054
// init_from_short_2() support routine for cache_op_ClusterFunction()
 
1055
inline void
 
1056
init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMachine * m)
 
1057
{
 
1058
  cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
 
1059
  cont->seq_number = msg->seq_number;
 
1060
  cont->cfl_flags = msg->cfl_flags;
 
1061
  cont->from = m;
 
1062
  cont->url_md5 = msg->md5_1;
 
1063
  cont->frag_type = (CacheFragType) msg->frag_type;
 
1064
}
 
1065
 
 
1066
void
 
1067
cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
 
1068
{
 
1069
  EThread *thread = this_ethread();
 
1070
  ProxyMutex *mutex = thread->mutex;
 
1071
  ////////////////////////////////////////////////////////
 
1072
  // Note: we are running on the ET_CLUSTER thread
 
1073
  ////////////////////////////////////////////////////////
 
1074
  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
 
1075
 
 
1076
  int opcode;
 
1077
  ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
 
1078
 
 
1079
  if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {  ////////////////////////////////////////////////
 
1080
    // Convert from old to current message format
 
1081
    ////////////////////////////////////////////////
 
1082
    ink_release_assert(!"cache_op_ClusterFunction() bad msg version");
 
1083
  }
 
1084
  opcode = ((CacheOpMsg_long *) data)->opcode;
 
1085
 
 
1086
  // If necessary, create a continuation to reflect the response back
 
1087
 
 
1088
  CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
 
1089
  c->mutex = new_ProxyMutex();
 
1090
  MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
 
1091
  c->request_opcode = opcode;
 
1092
  c->token.clear();
 
1093
  c->start_time = ink_get_hrtime();
 
1094
  SET_CONTINUATION_HANDLER(c, (CacheContHandler)
 
1095
                           & CacheContinuation::replyOpEvent);
 
1096
 
 
1097
  switch (opcode) {
 
1098
  case CACHE_OPEN_WRITE_BUFFER:
 
1099
  case CACHE_OPEN_WRITE_BUFFER_LONG:
 
1100
    ink_release_assert(!"cache_op_ClusterFunction WRITE_BUFFER not supported");
 
1101
    break;
 
1102
 
 
1103
  case CACHE_OPEN_READ_BUFFER:
 
1104
  case CACHE_OPEN_READ_BUFFER_LONG:
 
1105
    ink_release_assert(!"cache_op_ClusterFunction READ_BUFFER not supported");
 
1106
    break;
 
1107
 
 
1108
  case CACHE_OPEN_READ:
 
1109
    {
 
1110
      CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
 
1111
      init_from_short(c, msg, from);
 
1112
      Debug("cache_msg",
 
1113
            "cache_op-s op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
 
1114
      //
 
1115
      // Establish the remote side of the ClusterVConnection
 
1116
      //
 
1117
      c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
 
1118
                                                           &c->token,
 
1119
                                                           c->cluster_vc_channel,
 
1120
                                                           (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
 
1121
      if (!c->write_cluster_vc) {
 
1122
        // Unable to setup channel, abort processing.
 
1123
        CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
 
1124
        Debug("chan_inuse",
 
1125
              "1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
 
1126
              c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
 
1127
 
 
1128
        // Send cluster op failed reply
 
1129
        c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
 
1130
        break;
 
1131
 
 
1132
      } else {
 
1133
        c->write_cluster_vc->current_cont = c;
 
1134
      }
 
1135
      ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
 
1136
      ink_release_assert((opcode == CACHE_OPEN_READ)
 
1137
                         || c->write_cluster_vc->pending_remote_fill);
 
1138
 
 
1139
      SET_CONTINUATION_HANDLER(c, (CacheContHandler)
 
1140
                               & CacheContinuation::setupVCdataRead);
 
1141
      Debug("cache_proto",
 
1142
            "0read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
 
1143
            msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
 
1144
#ifdef CACHE_MSG_TRACE
 
1145
      log_cache_op_msg(msg->seq_number, len, "cache_op_open_read");
 
1146
#endif
 
1147
      CacheKey key(msg->md5);
 
1148
 
 
1149
      char *hostname = NULL;
 
1150
      int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
 
1151
      if (host_len) {
 
1152
        hostname = (char *) msg->moi;
 
1153
      }
 
1154
      Cache *call_cache = caches[c->frag_type];
 
1155
      c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len);
 
1156
      break;
 
1157
    }
 
1158
  case CACHE_OPEN_READ_LONG:
 
1159
    {
 
1160
      // Cache needs message data, copy it.
 
1161
      c->setMsgBufferLen(len);
 
1162
      c->allocMsgBuffer();
 
1163
      memcpy(c->getMsgBuffer(), (char *) data, len);
 
1164
 
 
1165
      int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
 
1166
      CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
 
1167
      init_from_long(c, msg, from);
 
1168
      Debug("cache_msg",
 
1169
            "cache_op-l op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
 
1170
#ifdef CACHE_MSG_TRACE
 
1171
      log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
 
1172
#endif
 
1173
      //
 
1174
      // Establish the remote side of the ClusterVConnection
 
1175
      //
 
1176
      c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
 
1177
                                                           &c->token,
 
1178
                                                           c->cluster_vc_channel,
 
1179
                                                           (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
 
1180
      if (!c->write_cluster_vc) {
 
1181
        // Unable to setup channel, abort processing.
 
1182
        CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
 
1183
        Debug("chan_inuse",
 
1184
              "2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
 
1185
              c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
 
1186
 
 
1187
        // Send cluster op failed reply
 
1188
        c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
 
1189
        break;
 
1190
 
 
1191
      } else {
 
1192
        c->write_cluster_vc->current_cont = c;
 
1193
      }
 
1194
      ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
 
1195
      ink_release_assert((opcode == CACHE_OPEN_READ_LONG)
 
1196
                         || c->write_cluster_vc->pending_remote_fill);
 
1197
 
 
1198
      SET_CONTINUATION_HANDLER(c, (CacheContHandler)
 
1199
                               & CacheContinuation::setupReadWriteVC);
 
1200
      Debug("cache_proto",
 
1201
            "1read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
 
1202
            msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
 
1203
 
 
1204
      const char *p = (const char *) msg + flen;
 
1205
      int moi_len = len - flen;
 
1206
      int res;
 
1207
 
 
1208
      ink_assert(moi_len > 0);
 
1209
 
 
1210
      // Unmarshal CacheHTTPHdr
 
1211
      res = c->ic_request.unmarshal((char *) p, moi_len, NULL);
 
1212
      ink_assert(res > 0);
 
1213
      ink_assert(c->ic_request.valid());
 
1214
      moi_len -= res;
 
1215
      p += res;
 
1216
      ink_assert(moi_len > 0);
 
1217
      // Unmarshal CacheLookupHttpConfig
 
1218
      c->ic_params = new(CacheLookupHttpConfigAllocator.alloc())
 
1219
        CacheLookupHttpConfig();
 
1220
      res = c->ic_params->unmarshal(&c->ic_arena, (const char *) p, moi_len);
 
1221
      ink_assert(res > 0);
 
1222
 
 
1223
      moi_len -= res;
 
1224
      p += res;
 
1225
 
 
1226
      CacheKey key(msg->url_md5);
 
1227
 
 
1228
      char *hostname = NULL;
 
1229
      int host_len = 0;
 
1230
 
 
1231
      if (moi_len) {
 
1232
        hostname = (char *) p;
 
1233
        host_len = moi_len;
 
1234
 
 
1235
        // Save hostname and attach it to the continuation since we may
 
1236
        //  need it if we convert this to an open_write.
 
1237
 
 
1238
        c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len));
 
1239
        c->ic_hostname_len = host_len;
 
1240
 
 
1241
        memcpy(c->ic_hostname->data(), hostname, host_len);
 
1242
      }
 
1243
 
 
1244
      Cache *call_cache = caches[c->frag_type];
 
1245
      Action *a = call_cache->open_read(c, &key, &c->ic_request,
 
1246
                                        c->ic_params,
 
1247
                                        c->frag_type, hostname, host_len);
 
1248
      // Get rid of purify warnings since 'c' can be freed by open_read.
 
1249
      if (a != ACTION_RESULT_DONE) {
 
1250
        c->cache_action = a;
 
1251
      }
 
1252
      break;
 
1253
    }
 
1254
  case CACHE_OPEN_WRITE:
 
1255
    {
 
1256
      CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
 
1257
      init_from_short(c, msg, from);
 
1258
      Debug("cache_msg",
 
1259
            "cache_op-s op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
 
1260
#ifdef CACHE_MSG_TRACE
 
1261
      log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
 
1262
#endif
 
1263
      //
 
1264
      // Establish the remote side of the ClusterVConnection
 
1265
      //
 
1266
      c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
 
1267
                                                          &c->token,
 
1268
                                                          c->cluster_vc_channel,
 
1269
                                                          (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
 
1270
      if (!c->read_cluster_vc) {
 
1271
        // Unable to setup channel, abort processing.
 
1272
        CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
 
1273
        Debug("chan_inuse",
 
1274
              "3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
 
1275
              c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
 
1276
 
 
1277
        // Send cluster op failed reply
 
1278
        c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
 
1279
        break;
 
1280
 
 
1281
      } else {
 
1282
        c->read_cluster_vc->current_cont = c;
 
1283
      }
 
1284
      ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
 
1285
 
 
1286
      CacheKey key(msg->md5);
 
1287
 
 
1288
      char *hostname = NULL;
 
1289
      int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
 
1290
      if (host_len) {
 
1291
        hostname = (char *) msg->moi;
 
1292
      }
 
1293
 
 
1294
      Cache *call_cache = caches[c->frag_type];
 
1295
      Action *a = call_cache->open_write(c, &key, c->frag_type,
 
1296
                                         !!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE),
 
1297
                                         c->pin_in_cache, hostname, host_len);
 
1298
      if (a != ACTION_RESULT_DONE) {
 
1299
        c->cache_action = a;
 
1300
      }
 
1301
      break;
 
1302
    }
 
1303
  case CACHE_OPEN_WRITE_LONG:
 
1304
    {
 
1305
      // Cache needs message data, copy it.
 
1306
      c->setMsgBufferLen(len);
 
1307
      c->allocMsgBuffer();
 
1308
      memcpy(c->getMsgBuffer(), (char *) data, len);
 
1309
 
 
1310
      int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
 
1311
      CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
 
1312
      init_from_long(c, msg, from);
 
1313
      Debug("cache_msg",
 
1314
            "cache_op-l op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
 
1315
#ifdef CACHE_MSG_TRACE
 
1316
      log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
 
1317
#endif
 
1318
      //
 
1319
      // Establish the remote side of the ClusterVConnection
 
1320
      //
 
1321
      c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
 
1322
                                                          &c->token,
 
1323
                                                          c->cluster_vc_channel,
 
1324
                                                          (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
 
1325
      if (!c->read_cluster_vc) {
 
1326
        // Unable to setup channel, abort processing.
 
1327
        CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
 
1328
        Debug("chan_inuse",
 
1329
              "4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
 
1330
              c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
 
1331
 
 
1332
        // Send cluster op failed reply
 
1333
        c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
 
1334
        break;
 
1335
 
 
1336
      } else {
 
1337
        c->read_cluster_vc->current_cont = c;
 
1338
      }
 
1339
      ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
 
1340
 
 
1341
      CacheHTTPInfo *ci = 0;
 
1342
      const char *p;
 
1343
      int res = 0;
 
1344
      int moi_len = len - flen;
 
1345
 
 
1346
      if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) {
 
1347
        p = (const char *) msg + flen;
 
1348
 
 
1349
        // Unmarshal old CacheHTTPInfo
 
1350
        res = HTTPInfo::unmarshal((char *) p, moi_len, NULL);
 
1351
        ink_assert(res > 0);
 
1352
        c->ic_old_info.get_handle((char *) p, moi_len);
 
1353
        ink_assert(c->ic_old_info.valid());
 
1354
        ci = &c->ic_old_info;
 
1355
      } else {
 
1356
        p = (const char *) 0;
 
1357
      }
 
1358
      if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) {
 
1359
        ink_assert(!ci);
 
1360
        ci = (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES;
 
1361
      }
 
1362
      moi_len -= res;
 
1363
      p += res;
 
1364
 
 
1365
      CacheKey key(msg->url_md5);
 
1366
      char *hostname = NULL;
 
1367
 
 
1368
      if (moi_len) {
 
1369
        hostname = (char *) p;
 
1370
      }
 
1371
 
 
1372
      Cache *call_cache = caches[c->frag_type];
 
1373
      Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache,
 
1374
                                         NULL, c->frag_type, hostname, len);
 
1375
      if (a != ACTION_RESULT_DONE) {
 
1376
        c->cache_action = a;
 
1377
      }
 
1378
      break;
 
1379
    }
 
1380
  case CACHE_REMOVE:
 
1381
    {
 
1382
      CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
 
1383
      init_from_short(c, msg, from);
 
1384
      Debug("cache_msg",
 
1385
            "cache_op op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
 
1386
#ifdef CACHE_MSG_TRACE
 
1387
      log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
 
1388
#endif
 
1389
      CacheKey key(msg->md5);
 
1390
 
 
1391
      char *hostname = NULL;
 
1392
      int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
 
1393
      if (host_len) {
 
1394
        hostname = (char *) msg->moi;
 
1395
      }
 
1396
 
 
1397
      Cache *call_cache = caches[c->frag_type];
 
1398
      Action *a = call_cache->remove(c, &key, c->frag_type,
 
1399
                                     !!(c->cfl_flags & CFL_REMOVE_USER_AGENTS),
 
1400
                                     !!(c->cfl_flags & CFL_REMOVE_LINK),
 
1401
                                     hostname, host_len);
 
1402
      if (a != ACTION_RESULT_DONE) {
 
1403
        c->cache_action = a;
 
1404
      }
 
1405
      break;
 
1406
    }
 
1407
  case CACHE_LINK:
 
1408
    {
 
1409
      CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
 
1410
      init_from_short_2(c, msg, from);
 
1411
      Debug("cache_msg",
 
1412
            "cache_op op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
 
1413
#ifdef CACHE_MSG_TRACE
 
1414
      log_cache_op_msg(msg->seq_number, len, "cache_op_link");
 
1415
#endif
 
1416
 
 
1417
      CacheKey key1(msg->md5_1);
 
1418
      CacheKey key2(msg->md5_2);
 
1419
 
 
1420
      char *hostname = NULL;
 
1421
      int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
 
1422
      if (host_len) {
 
1423
        hostname = (char *) msg->moi;
 
1424
      }
 
1425
 
 
1426
      Cache *call_cache = caches[c->frag_type];
 
1427
      Action *a = call_cache->link(c, &key1, &key2, c->frag_type,
 
1428
                                   hostname, host_len);
 
1429
      if (a != ACTION_RESULT_DONE) {
 
1430
        c->cache_action = a;
 
1431
      }
 
1432
      break;
 
1433
    }
 
1434
  case CACHE_DEREF:
 
1435
    {
 
1436
      CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
 
1437
      init_from_short(c, msg, from);
 
1438
      Debug("cache_msg",
 
1439
            "cache_op op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
 
1440
#ifdef CACHE_MSG_TRACE
 
1441
      log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
 
1442
#endif
 
1443
 
 
1444
      CacheKey key(msg->md5);
 
1445
 
 
1446
      char *hostname = NULL;
 
1447
      int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
 
1448
      if (host_len) {
 
1449
        hostname = (char *) msg->moi;
 
1450
      }
 
1451
 
 
1452
      Cache *call_cache = caches[c->frag_type];
 
1453
      Action *a = call_cache->deref(c, &key, c->frag_type,
 
1454
                                    hostname, host_len);
 
1455
      if (a != ACTION_RESULT_DONE) {
 
1456
        c->cache_action = a;
 
1457
      }
 
1458
      break;
 
1459
    }
 
1460
 
 
1461
  default:
 
1462
    {
 
1463
      ink_release_assert(0);
 
1464
    }
 
1465
  }                             // End of switch
 
1466
}
 
1467
 
 
1468
void
 
1469
cache_op_malloc_ClusterFunction(ClusterMachine * from, void *data, int len)
 
1470
{
 
1471
  cache_op_ClusterFunction(from, data, len);
 
1472
  // We own the message data, free it back to the Cluster subsystem
 
1473
  clusterProcessor.free_remote_data((char *) data, len);
 
1474
}
 
1475
 
 
1476
int
 
1477
CacheContinuation::setupVCdataRead(int event, VConnection * vc)
 
1478
{
 
1479
  ink_assert(magicno == (int) MagicNo);
 
1480
  //
 
1481
  // Setup the initial data read for the given Cache VC.
 
1482
  // This data is sent back in the response message.
 
1483
  //
 
1484
  if (event == CACHE_EVENT_OPEN_READ) {
 
1485
    //////////////////////////////////////////
 
1486
    // Allocate buffer and initiate read.
 
1487
    //////////////////////////////////////////
 
1488
    Debug("cache_proto", "setupVCdataRead CACHE_EVENT_OPEN_READ seqno=%d", seq_number);
 
1489
    ink_release_assert(caller_buf_freebytes);
 
1490
    SET_HANDLER((CacheContHandler) & CacheContinuation::VCdataRead);
 
1491
 
 
1492
    int64_t size_index = iobuffer_size_to_index(caller_buf_freebytes);
 
1493
    MIOBuffer *buf = new_MIOBuffer(size_index);
 
1494
    readahead_reader = buf->alloc_reader();
 
1495
 
 
1496
    MUTEX_TRY_LOCK(lock, mutex, this_ethread());        // prevent immediate callback
 
1497
    readahead_vio = vc->do_io_read(this, caller_buf_freebytes, buf);
 
1498
    return EVENT_DONE;
 
1499
 
 
1500
  } else {
 
1501
    // Error case, deflect processing to replyOpEvent.
 
1502
    SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
 
1503
    return handleEvent(event, vc);
 
1504
  }
 
1505
}
 
1506
 
 
1507
int
 
1508
CacheContinuation::VCdataRead(int event, VIO * target_vio)
 
1509
{
 
1510
  ink_release_assert(magicno == (int) MagicNo);
 
1511
  ink_release_assert(readahead_vio == target_vio);
 
1512
 
 
1513
  VConnection *vc = target_vio->vc_server;
 
1514
  int reply = CACHE_EVENT_OPEN_READ;
 
1515
  int32_t object_size;
 
1516
 
 
1517
  switch (event) {
 
1518
  case VC_EVENT_EOS:
 
1519
    {
 
1520
      if (!target_vio->ndone) {
 
1521
        // Doc with zero byte body, handle as read failure
 
1522
        goto read_failed;
 
1523
      }
 
1524
      // Fall through
 
1525
    }
 
1526
  case VC_EVENT_READ_READY:
 
1527
  case VC_EVENT_READ_COMPLETE:
 
1528
    {
 
1529
      int clone_bytes;
 
1530
      int current_ndone = target_vio->ndone;
 
1531
 
 
1532
      ink_assert(current_ndone);
 
1533
      ink_assert(current_ndone <= readahead_reader->read_avail());
 
1534
 
 
1535
      object_size = getObjectSize(vc, request_opcode, &cache_vc_info);
 
1536
      have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone));
 
1537
 
 
1538
      // Use no more than the caller's max buffer limit
 
1539
 
 
1540
      clone_bytes = current_ndone;
 
1541
      if (!have_all_data) {
 
1542
        if (current_ndone > caller_buf_freebytes) {
 
1543
          clone_bytes = caller_buf_freebytes;
 
1544
        }
 
1545
      }
 
1546
      // Clone data
 
1547
 
 
1548
      IOBufferBlock *tail;
 
1549
      readahead_data = clone_IOBufferBlockList(readahead_reader->get_current_block(),
 
1550
                                               readahead_reader->start_offset, clone_bytes, &tail);
 
1551
 
 
1552
      if (have_all_data) {
 
1553
        // Close VC, since no more data and also to avoid VC_EVENT_EOS
 
1554
 
 
1555
        MIOBuffer *mbuf = target_vio->buffer.writer();
 
1556
        vc->do_io(VIO::CLOSE);
 
1557
        free_MIOBuffer(mbuf);
 
1558
        readahead_vio = 0;
 
1559
      }
 
1560
      SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
 
1561
      handleEvent(reply, vc);
 
1562
      return EVENT_CONT;
 
1563
    }
 
1564
  case VC_EVENT_ERROR:
 
1565
  case VC_EVENT_INACTIVITY_TIMEOUT:
 
1566
  case VC_EVENT_ACTIVE_TIMEOUT:
 
1567
  default:
 
1568
    {
 
1569
    read_failed:
 
1570
      // Read failed, deflect to replyOpEvent.
 
1571
 
 
1572
      MIOBuffer * mbuf = target_vio->buffer.writer();
 
1573
      vc->do_io(VIO::CLOSE);
 
1574
      free_MIOBuffer(mbuf);
 
1575
      readahead_vio = 0;
 
1576
      reply = CACHE_EVENT_OPEN_READ_FAILED;
 
1577
 
 
1578
      SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
 
1579
      handleEvent(reply, (VConnection *) - ECLUSTER_ORB_DATA_READ);
 
1580
      return EVENT_DONE;
 
1581
    }
 
1582
  }                             // End of switch
 
1583
}
 
1584
 
 
1585
int
 
1586
CacheContinuation::setupReadWriteVC(int event, VConnection * vc)
 
1587
{
 
1588
  // Only handles OPEN_READ_LONG processing.
 
1589
 
 
1590
  switch (event) {
 
1591
  case CACHE_EVENT_OPEN_READ:
 
1592
    {
 
1593
      // setup readahead
 
1594
 
 
1595
      SET_HANDLER((CacheContHandler) & CacheContinuation::setupVCdataRead);
 
1596
      return handleEvent(event, vc);
 
1597
      break;
 
1598
    }
 
1599
  case CACHE_EVENT_OPEN_READ_FAILED:
 
1600
    {
 
1601
      if (frag_type == CACHE_FRAG_TYPE_HTTP) {
 
1602
        // HTTP open read failed, attempt open write now to avoid an additional
 
1603
        //  message round trip
 
1604
 
 
1605
        CacheKey key(url_md5);
 
1606
 
 
1607
        Cache *call_cache = caches[frag_type];
 
1608
        Action *a = call_cache->open_write(this, &key, 0, pin_in_cache,
 
1609
                                           NULL, frag_type, ic_hostname->data(),
 
1610
                                           ic_hostname_len);
 
1611
        if (a != ACTION_RESULT_DONE) {
 
1612
          cache_action = a;
 
1613
        }
 
1614
      } else {
 
1615
        SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
 
1616
        return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
 
1617
      }
 
1618
      break;
 
1619
    }
 
1620
  case CACHE_EVENT_OPEN_WRITE:
 
1621
    {
 
1622
      // Convert from read to write connection
 
1623
 
 
1624
      ink_assert(!read_cluster_vc && write_cluster_vc);
 
1625
      read_cluster_vc = write_cluster_vc;
 
1626
      read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
 
1627
      write_cluster_vc = 0;
 
1628
 
 
1629
      SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
 
1630
      return handleEvent(event, vc);
 
1631
      break;
 
1632
    }
 
1633
  case CACHE_EVENT_OPEN_WRITE_FAILED:
 
1634
  default:
 
1635
    {
 
1636
      SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
 
1637
      return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
 
1638
      break;
 
1639
    }
 
1640
  }                             // end of switch
 
1641
 
 
1642
  return EVENT_DONE;
 
1643
}
 
1644
 
 
1645
/////////////////////////////////////////////////////////////////////////
 
1646
// replyOpEvent()
 
1647
//   Reflect the (local) reply back to the (remote) requesting node.
 
1648
/////////////////////////////////////////////////////////////////////////
 
1649
int
 
1650
CacheContinuation::replyOpEvent(int event, VConnection * cvc)
 
1651
{
 
1652
  ink_assert(magicno == (int) MagicNo);
 
1653
  Debug("cache_proto", "replyOpEvent(this=%x,event=%d,VC=%x)", this, event, cvc);
 
1654
  ink_hrtime now;
 
1655
  now = ink_get_hrtime();
 
1656
  CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
 
1657
  LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
 
1658
  ink_release_assert(expect_cache_callback);
 
1659
  expect_cache_callback = false;        // make sure we are called back exactly once
 
1660
 
 
1661
 
 
1662
  result = event;
 
1663
  bool open = event_is_open(event);
 
1664
  bool read_op = op_is_read(request_opcode);
 
1665
  bool open_read_now_open_write = false;
 
1666
 
 
1667
  // Reply message initializations
 
1668
  CacheOpReplyMsg rmsg;
 
1669
  CacheOpReplyMsg *msg = &rmsg;
 
1670
  msg->result = event;
 
1671
 
 
1672
  if ((request_opcode == CACHE_OPEN_READ_LONG)
 
1673
      && cvc && (event == CACHE_EVENT_OPEN_WRITE)) {
 
1674
    //////////////////////////////////////////////////////////////////////////
 
1675
    // open read failed, but open write succeeded, set result to
 
1676
    // CACHE_EVENT_OPEN_READ_FAILED and make result token non zero to
 
1677
    // signal to the remote node that we have established a write connection.
 
1678
    //////////////////////////////////////////////////////////////////////////
 
1679
    msg->result = CACHE_EVENT_OPEN_READ_FAILED;
 
1680
    open_read_now_open_write = true;
 
1681
  }
 
1682
 
 
1683
  msg->seq_number = seq_number;
 
1684
  int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();    // include token
 
1685
  int len = 0;
 
1686
  int vers = 0;
 
1687
 
 
1688
  int results_expected = 1;
 
1689
 
 
1690
  if (no_reply_message)         // CACHE_NO_RESPONSE request
 
1691
    goto free_exit;
 
1692
 
 
1693
  if (open) {
 
1694
 
 
1695
    // prepare for CACHE_OPEN_EVENT
 
1696
 
 
1697
    results_expected = 2;
 
1698
    cache_vc = cvc;
 
1699
    cache_read = (event == CACHE_EVENT_OPEN_READ);
 
1700
 
 
1701
    if (read_op && !open_read_now_open_write) {
 
1702
      ink_release_assert(write_cluster_vc->pending_remote_fill);
 
1703
      ink_assert(have_all_data || (readahead_vio == &((CacheVC *) cache_vc)->vio));
 
1704
      Debug("cache_proto", "connect_local success seqno=%d alldata=%d", seq_number, (have_all_data ? 1 : 0));
 
1705
 
 
1706
      if (have_all_data) {
 
1707
        msg->token.clear();     // Tell sender no conn established
 
1708
 
 
1709
      } else {
 
1710
        msg->token = token;     // Tell sender conn established
 
1711
        setupReadBufTunnel(cache_vc, write_cluster_vc);
 
1712
      }
 
1713
 
 
1714
    } else {
 
1715
      Debug("cache_proto", "cache_open [%s] success seqno=%d", (cache_read ? "R" : "W"), seq_number);
 
1716
      msg->token = token;       // Tell sender conn established
 
1717
 
 
1718
      OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
 
1719
      pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex);
 
1720
      read_cluster_vc->allow_remote_close();
 
1721
      results_expected--;
 
1722
    }
 
1723
 
 
1724
    // For cache reads, marshal the associated CacheHTTPInfo in the reply
 
1725
    if (cache_read) {
 
1726
      int res;
 
1727
 
 
1728
      if (!cache_vc_info.valid()) {
 
1729
        (void) getObjectSize(cache_vc, request_opcode, &cache_vc_info);
 
1730
      }
 
1731
      // Determine data length and allocate
 
1732
      len = cache_vc_info.marshal_length();
 
1733
      CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
 
1734
 
 
1735
      // Initialize reply message header
 
1736
      *reply = *msg;
 
1737
 
 
1738
      // Marshal response data into reply message
 
1739
      res = cache_vc_info.marshal((char *) reply + flen, len);
 
1740
      ink_assert(res >= 0 && res <= len);
 
1741
 
 
1742
      // Make reply message the current message
 
1743
      msg = reply;
 
1744
    }
 
1745
 
 
1746
  } else {
 
1747
    Debug("cache_proto", "cache operation failed result=%d seqno=%d (this=%x)", event, seq_number, this);
 
1748
    msg->token.clear();         // Tell sender no conn established
 
1749
 
 
1750
    // Reallocate reply message, allowing for marshalled data
 
1751
    len += sizeof(int32_t);
 
1752
    CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
 
1753
 
 
1754
    // Initialize reply message header
 
1755
    *reply = *msg;
 
1756
 
 
1757
    if (request_opcode != CACHE_LINK) {
 
1758
      //
 
1759
      // open read/write failed, close preallocated VC
 
1760
      //
 
1761
      if (read_cluster_vc) {
 
1762
        read_cluster_vc->remote_closed = 1;     // avoid remote close msg
 
1763
        read_cluster_vc->do_io(VIO::CLOSE);
 
1764
      }
 
1765
      if (write_cluster_vc) {
 
1766
        write_cluster_vc->pending_remote_fill = 0;
 
1767
        write_cluster_vc->remote_closed = 1;    // avoid remote close msg
 
1768
        write_cluster_vc->do_io(VIO::CLOSE);
 
1769
      }
 
1770
      *((int32_t *) reply->moi) = (int32_t) ((uintptr_t) cvc & 0xffffffff);    // code describing failure
 
1771
    }
 
1772
    // Make reply message the current message
 
1773
    msg = reply;
 
1774
  }
 
1775
  CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
 
1776
 
 
1777
  //
 
1778
  // Send reply message
 
1779
  //
 
1780
#ifdef CACHE_MSG_TRACE
 
1781
  log_cache_op_sndmsg(msg->seq_number, 0, "replyOpEvent");
 
1782
#endif
 
1783
  vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
 
1784
  if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
 
1785
    if (read_op) {
 
1786
      // Transmit reply message and object data in same cluster message
 
1787
      Debug("cache_proto", "Sending reply/data seqno=%d buflen=%d",
 
1788
            seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
 
1789
      clusterProcessor.invoke_remote_data(from,
 
1790
                                          CACHE_OP_RESULT_CLUSTER_FUNCTION,
 
1791
                                          (void *) msg, (flen + len),
 
1792
                                          readahead_data,
 
1793
                                          cluster_vc_channel, &token,
 
1794
                                          &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
 
1795
    } else {
 
1796
      Debug("cache_proto", "Sending reply seqno=%d, (this=%x)", seq_number, this);
 
1797
      clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
 
1798
                                     (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
 
1799
    }
 
1800
 
 
1801
  } else {
 
1802
    //////////////////////////////////////////////////////////////
 
1803
    // Create the specified down rev version of this message
 
1804
    //////////////////////////////////////////////////////////////
 
1805
    ink_release_assert(!"replyOpEvent() bad msg version");
 
1806
  }
 
1807
 
 
1808
free_exit:
 
1809
  results_expected--;
 
1810
  if (results_expected <= 0) {
 
1811
    Debug("cache_proto", "replyOpEvent: freeing this=%x", this, event, cvc);
 
1812
    cacheContAllocator_free(this);
 
1813
  }
 
1814
  return EVENT_DONE;
 
1815
}
 
1816
 
 
1817
void
 
1818
CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * cluster_write_vc)
 
1819
{
 
1820
  ////////////////////////////////////////////////////////////
 
1821
  // Setup OneWayTunnel and tunnel close event handler.
 
1822
  // Used in readahead processing on open read connections.
 
1823
  ////////////////////////////////////////////////////////////
 
1824
  tunnel_cont = cacheContAllocator_alloc();
 
1825
  tunnel_cont->mutex = this->mutex;
 
1826
  SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)
 
1827
                           & CacheContinuation::tunnelClosedEvent);
 
1828
  int64_t ravail = bytes_IOBufferBlockList(readahead_data, 1);
 
1829
 
 
1830
  tunnel_mutex = tunnel_cont->mutex;
 
1831
  tunnel_closed = false;
 
1832
 
 
1833
  tunnel = OneWayTunnel::OneWayTunnel_alloc();
 
1834
  readahead_reader->consume(ravail);    // allow for bytes sent in initial reply
 
1835
  tunnel->init(cache_read_vc, cluster_write_vc, tunnel_cont, readahead_vio, readahead_reader);
 
1836
  tunnel_cont->action = this;
 
1837
  tunnel_cont->tunnel = tunnel;
 
1838
  tunnel_cont->tunnel_cont = tunnel_cont;
 
1839
 
 
1840
  // Disable cluster_write_vc
 
1841
  ((ClusterVConnection *) cluster_write_vc)->write.enabled = 0;
 
1842
 
 
1843
  // Disable cache read VC
 
1844
  readahead_vio->nbytes = readahead_vio->ndone;
 
1845
 
 
1846
  /////////////////////////////////////////////////////////////////////
 
1847
  // At this point, the OneWayTunnel is blocked awaiting a reenable
 
1848
  // on both the source and target VCs. Reenable occurs after the
 
1849
  // message containing the initial data and open read reply are sent.
 
1850
  /////////////////////////////////////////////////////////////////////
 
1851
}
 
1852
 
 
1853
///////////////////////////////////////////////////////////////////////
 
1854
// Tunnnel exited event handler, used for readahead on open read.
 
1855
///////////////////////////////////////////////////////////////////////
 
1856
int
 
1857
CacheContinuation::tunnelClosedEvent(int event, void *c)
 
1858
{
 
1859
  NOWARN_UNUSED(event);
 
1860
  ink_assert(magicno == (int) MagicNo);
 
1861
  // Note: We are called with the tunnel_mutex held.
 
1862
  CacheContinuation *tc = (CacheContinuation *) c;
 
1863
  ink_release_assert(tc->tunnel_cont == tc);
 
1864
  CacheContinuation *real_cc = (CacheContinuation *) tc->action.continuation;
 
1865
 
 
1866
  if (real_cc) {
 
1867
    // Notify the real continuation of the tunnel closed event
 
1868
    real_cc->tunnel = 0;
 
1869
    real_cc->tunnel_cont = 0;
 
1870
    real_cc->tunnel_closed = true;
 
1871
  }
 
1872
  OneWayTunnel::OneWayTunnel_free(tc->tunnel);
 
1873
  cacheContAllocator_free(tc);
 
1874
 
 
1875
  return EVENT_DONE;
 
1876
}
 
1877
 
 
1878
////////////////////////////////////////////////////////////
 
1879
// Retry DisposeOfDataBuffer continuation
 
1880
////////////////////////////////////////////////////////////
 
1881
struct retryDisposeOfDataBuffer;
 
1882
typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler) (int, void *);
 
1883
struct retryDisposeOfDataBuffer:public Continuation
 
1884
{
 
1885
  CacheContinuation *c;
 
1886
 
 
1887
  int handleRetryEvent(int event, Event * e)
 
1888
  {
 
1889
    if (CacheContinuation::handleDisposeEvent(event, c) == EVENT_DONE) {
 
1890
      delete this;
 
1891
        return EVENT_DONE;
 
1892
    } else
 
1893
    {
 
1894
      e->schedule_in(HRTIME_MSECONDS(10));
 
1895
      return EVENT_CONT;
 
1896
    }
 
1897
  }
 
1898
  retryDisposeOfDataBuffer(CacheContinuation * cont)
 
1899
:  Continuation(new_ProxyMutex()), c(cont) {
 
1900
    SET_HANDLER((rtryDisOfDBufHandler)
 
1901
                & retryDisposeOfDataBuffer::handleRetryEvent);
 
1902
  }
 
1903
};
 
1904
 
 
1905
//////////////////////////////////////////////////////////////////
 
1906
// Callback from cluster to dispose of data passed in
 
1907
// call to invoke_remote_data().
 
1908
//////////////////////////////////////////////////////////////////
 
1909
void
 
1910
CacheContinuation::disposeOfDataBuffer(void *d)
 
1911
{
 
1912
  ink_assert(d);
 
1913
  CacheContinuation *cc = (CacheContinuation *) d;
 
1914
  ink_assert(cc->have_all_data || cc->readahead_vio);
 
1915
  ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *) cc->cache_vc)->vio));
 
1916
 
 
1917
  if (cc->have_all_data) {
 
1918
    //
 
1919
    // All object data resides in the buffer, no OneWayTunnel
 
1920
    // started and the Cache VConnection has already been closed.
 
1921
    // Close write_cluster_vc and set remote close to avoid send of
 
1922
    // close message to remote node.
 
1923
    //
 
1924
    cc->write_cluster_vc->pending_remote_fill = 0;
 
1925
    cc->write_cluster_vc->remote_closed = 1;
 
1926
    cc->write_cluster_vc->do_io(VIO::CLOSE);
 
1927
    cc->readahead_data = 0;
 
1928
 
 
1929
    cacheContAllocator_free(cc);
 
1930
 
 
1931
  } else {
 
1932
    cc->write_cluster_vc->pending_remote_fill = 0;
 
1933
    cc->write_cluster_vc->allow_remote_close();
 
1934
    if (handleDisposeEvent(0, cc) == EVENT_CONT) {
 
1935
      // Setup retry continuation.
 
1936
      retryDisposeOfDataBuffer *retryCont = NEW(new retryDisposeOfDataBuffer(cc));
 
1937
      eventProcessor.schedule_in(retryCont, HRTIME_MSECONDS(10), ET_CALL);
 
1938
    }
 
1939
  }
 
1940
}
 
1941
 
 
1942
int
 
1943
CacheContinuation::handleDisposeEvent(int event, CacheContinuation * cc)
 
1944
{
 
1945
  NOWARN_UNUSED(event);
 
1946
  ink_assert(cc->magicno == (int) MagicNo);
 
1947
  MUTEX_TRY_LOCK(lock, cc->tunnel_mutex, this_ethread());
 
1948
  if (lock) {
 
1949
    // Write of initial object data is complete.
 
1950
 
 
1951
    if (!cc->tunnel_closed) {
 
1952
      // Start tunnel by reenabling source and target VCs.
 
1953
 
 
1954
      cc->tunnel->vioSource->nbytes = getObjectSize(cc->tunnel->vioSource->vc_server, cc->request_opcode, 0);
 
1955
      cc->tunnel->vioSource->reenable_re();
 
1956
      cc->tunnel->vioTarget->reenable();
 
1957
 
 
1958
      // Tell tunnel event we are gone
 
1959
      cc->tunnel_cont->action.continuation = 0;
 
1960
    }
 
1961
    cacheContAllocator_free(cc);
 
1962
    return EVENT_DONE;
 
1963
 
 
1964
  } else {
 
1965
    // Lock acquire failed, retry operation.
 
1966
    return EVENT_CONT;
 
1967
  }
 
1968
}
 
1969
 
 
1970
/////////////////////////////////////////////////////////////////////////////
 
1971
// cache_op_result_ClusterFunction()
 
1972
//   Invoked on the machine which initiated a remote op, this
 
1973
//   unmarshals the result and calls a continuation in the requesting thread.
 
1974
/////////////////////////////////////////////////////////////////////////////
 
1975
void
 
1976
cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
 
1977
{
 
1978
  (void) from;
 
1979
  ////////////////////////////////////////////////////////
 
1980
  // Note: we are running on the ET_CACHE_CONT_SM thread
 
1981
  ////////////////////////////////////////////////////////
 
1982
 
 
1983
  // Copy reply message data
 
1984
  Ptr<IOBufferData> iob = new_IOBufferData(iobuffer_size_to_index(l));
 
1985
  memcpy(iob->data(), (char *) d, l);
 
1986
  char *data = iob->data();
 
1987
  int flen, len = l;
 
1988
  CacheHTTPInfo ci;
 
1989
  CacheOpReplyMsg *msg = (CacheOpReplyMsg *) data;
 
1990
  int32_t op_result_error = 0;
 
1991
  ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
 
1992
 
 
1993
  if (mh->GetMsgVersion() != CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) { ////////////////////////////////////////////////
 
1994
    // Convert from old to current message format
 
1995
    ////////////////////////////////////////////////
 
1996
    ink_release_assert(!"cache_op_result_ClusterFunction() bad msg version");
 
1997
  }
 
1998
 
 
1999
  flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
 
2000
  if (mh->NeedByteSwap())
 
2001
    msg->SwapBytes();
 
2002
 
 
2003
  Debug("cluster_cache", "received cache op result, seqno=%d result=%d", msg->seq_number, msg->result);
 
2004
 
 
2005
  // If applicable, unmarshal any response data
 
2006
  if ((len > flen) && event_reply_may_have_moi(msg->result)) {
 
2007
    switch (msg->result) {
 
2008
    case CACHE_EVENT_OPEN_READ:
 
2009
      {
 
2010
        char *p = (char *) msg + flen;
 
2011
        int res;
 
2012
 
 
2013
        // Unmarshal CacheHTTPInfo
 
2014
        res = HTTPInfo::unmarshal(p, len, NULL);
 
2015
        ci.get_handle(p, len);
 
2016
        ink_assert(res > 0);
 
2017
        ink_assert(ci.valid());
 
2018
        break;
 
2019
      }
 
2020
    case CACHE_EVENT_LINK:
 
2021
    case CACHE_EVENT_LINK_FAILED:
 
2022
      break;
 
2023
    case CACHE_EVENT_OPEN_READ_FAILED:
 
2024
    case CACHE_EVENT_OPEN_WRITE_FAILED:
 
2025
    case CACHE_EVENT_REMOVE_FAILED:
 
2026
    case CACHE_EVENT_UPDATE_FAILED:
 
2027
    case CACHE_EVENT_DEREF_FAILED:
 
2028
      {
 
2029
        // Unmarshal the error code
 
2030
        ink_assert(((len - flen) == sizeof(int32_t)));
 
2031
        op_result_error = *(int32_t *) msg->moi;
 
2032
        if (mh->NeedByteSwap())
 
2033
          swap32((uint32_t *) & op_result_error);
 
2034
        op_result_error = -op_result_error;
 
2035
        break;
 
2036
      }
 
2037
    default:
 
2038
      {
 
2039
        ink_release_assert(!"invalid moi data for received msg");
 
2040
        break;
 
2041
      }
 
2042
    }                           // end of switch
 
2043
  }
 
2044
  // See if this response is still expected (expected case == yes)
 
2045
 
 
2046
  unsigned int hash = FOLDHASH(from->ip, msg->seq_number);
 
2047
  EThread *thread = this_ethread();
 
2048
  ProxyMutex *mutex = thread->mutex;
 
2049
  if (MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], thread)) {
 
2050
 
 
2051
    // Find it in pending list
 
2052
 
 
2053
    CacheContinuation *c = find_cache_continuation(msg->seq_number,
 
2054
                                                   from->ip);
 
2055
    if (!c) {
 
2056
      // Reply took to long, response no longer expected.
 
2057
      MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
 
2058
      Debug("cluster_timeout", "0cache reply timeout: %d", msg->seq_number);
 
2059
      CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
 
2060
      if (ci.valid())
 
2061
        ci.destroy();
 
2062
      return;
 
2063
    }
 
2064
    // Try to send the message
 
2065
 
 
2066
    MUTEX_TRY_LOCK(lock, c->mutex, thread);
 
2067
 
 
2068
    // Failed to acquire lock, defer
 
2069
 
 
2070
    if (!lock) {
 
2071
      MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
 
2072
      goto Lretry;
 
2073
    }
 
2074
    c->result_error = op_result_error;
 
2075
 
 
2076
    // send message, release lock
 
2077
 
 
2078
    c->freeMsgBuffer();
 
2079
    if (ci.valid()) {
 
2080
      // Unmarshaled CacheHTTPInfo contained in reply message, copy it.
 
2081
      c->setMsgBufferLen(len, iob);
 
2082
      c->ic_new_info = ci;
 
2083
    }
 
2084
    msg->seq_number = len;      // HACK ALERT: reusing variable
 
2085
    c->handleEvent(CACHE_EVENT_RESPONSE_MSG, data);
 
2086
 
 
2087
  } else {
 
2088
 
 
2089
    // Failed to wake it up, defer by creating a timed continuation
 
2090
 
 
2091
  Lretry:
 
2092
    CacheContinuation * c = CacheContinuation::cacheContAllocator_alloc();
 
2093
    c->mutex = new_ProxyMutex();
 
2094
    c->seq_number = msg->seq_number;
 
2095
    c->target_ip = from->ip;
 
2096
    SET_CONTINUATION_HANDLER(c, (CacheContHandler)
 
2097
                             & CacheContinuation::handleReplyEvent);
 
2098
    c->start_time = ink_get_hrtime();
 
2099
    c->result = msg->result;
 
2100
    if (event_is_open(msg->result))
 
2101
      c->token = msg->token;
 
2102
    if (ci.valid()) {
 
2103
      // Unmarshaled CacheHTTPInfo contained in reply message, copy it.
 
2104
      c->setMsgBufferLen(len, iob);
 
2105
      c->ic_new_info = ci;
 
2106
    }
 
2107
    c->result_error = op_result_error;
 
2108
    eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
 
2109
  }
 
2110
}
 
2111
 
 
2112
////////////////////////////////////////////////////////////////////////
 
2113
// handleReplyEvent()
 
2114
//   If we cannot acquire any of the locks to handle the response
 
2115
//   inline, it is defered and later handled by this function.
 
2116
////////////////////////////////////////////////////////////////////////
 
2117
int
 
2118
CacheContinuation::handleReplyEvent(int event, Event * e)
 
2119
{
 
2120
  (void) event;
 
2121
 
 
2122
  // take lock on outstanding message queue
 
2123
 
 
2124
  EThread *t = e->ethread;
 
2125
  unsigned int hash = FOLDHASH(target_ip, seq_number);
 
2126
 
 
2127
  if (!MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], t)) {
 
2128
    e->schedule_in(CACHE_RETRY_PERIOD);
 
2129
    return EVENT_CONT;
 
2130
  }
 
2131
 
 
2132
  LOG_EVENT_TIME(start_time, cntlck_acquire_time_dist, cntlck_acquire_events);
 
2133
 
 
2134
  // See if this response is still expected
 
2135
 
 
2136
  CacheContinuation *c = find_cache_continuation(seq_number, target_ip);
 
2137
  if (c) {
 
2138
 
 
2139
    // Acquire the lock to the continuation mutex
 
2140
 
 
2141
    MUTEX_TRY_LOCK(lock, c->mutex, e->ethread);
 
2142
    if (!lock) {
 
2143
 
 
2144
      // If we fail to acquire the lock, reschedule
 
2145
 
 
2146
      MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
 
2147
      e->schedule_in(CACHE_RETRY_PERIOD);
 
2148
      return EVENT_CONT;
 
2149
    }
 
2150
 
 
2151
    // If unmarshalled CacheHTTPInfo exists, pass it along
 
2152
 
 
2153
    if (ic_new_info.valid()) {
 
2154
      c->freeMsgBuffer();
 
2155
      c->setMsgBufferLen(getMsgBufferLen(), getMsgBufferIOBData());
 
2156
      c->ic_new_info = ic_new_info;
 
2157
      ic_new_info.clear();
 
2158
    }
 
2159
    // send message, release lock
 
2160
 
 
2161
    c->handleEvent(CACHE_EVENT_RESPONSE, this);
 
2162
 
 
2163
  } else {
 
2164
    MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
 
2165
    Debug("cluster_timeout", "cache reply timeout: %d", seq_number);
 
2166
    CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
 
2167
  }
 
2168
 
 
2169
  // Free this continuation
 
2170
 
 
2171
  cacheContAllocator_free(this);
 
2172
  return EVENT_DONE;
 
2173
}
 
2174
 
 
2175
//////////////////////////////////////////////////////////////////////////
 
2176
// remoteOpEvent()
 
2177
//   On the requesting node, handle the timeout and response to the user.
 
2178
//   There may be two CacheContinuations involved:
 
2179
//    1) One waiting to respond to the user.
 
2180
//       This case is CACHE_EVENT_RESPONSE_MSG which is handled
 
2181
//       inline (without delay).
 
2182
//    2) One which is carrying the response from the remote machine which
 
2183
//       has been delayed for a lock.  This case is CACHE_EVENT_RESPONSE.
 
2184
//////////////////////////////////////////////////////////////////////////
 
2185
int
 
2186
CacheContinuation::remoteOpEvent(int event_code, Event * e)
 
2187
{
 
2188
  ink_assert(magicno == (int) MagicNo);
 
2189
  int event = event_code;
 
2190
  ink_hrtime now;
 
2191
  if (start_time) {
 
2192
    int res;
 
2193
    if (event != EVENT_INTERVAL) {
 
2194
      if (event == CACHE_EVENT_RESPONSE) {
 
2195
        CacheContinuation *ccont = (CacheContinuation *) e;
 
2196
        res = ccont->result;
 
2197
      } else {
 
2198
        CacheOpReplyMsg *rmsg = (CacheOpReplyMsg *) e;
 
2199
        res = rmsg->result;
 
2200
      }
 
2201
      if ((res == CACHE_EVENT_LOOKUP) || (res == CACHE_EVENT_LOOKUP_FAILED)) {
 
2202
        now = ink_get_hrtime();
 
2203
        CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, now - start_time);
 
2204
        LOG_EVENT_TIME(start_time, lkrmt_callback_time_dist, lkrmt_cache_callbacks);
 
2205
      } else {
 
2206
        now = ink_get_hrtime();
 
2207
        CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, now - start_time);
 
2208
        LOG_EVENT_TIME(start_time, rmt_callback_time_dist, rmt_cache_callbacks);
 
2209
      }
 
2210
    }
 
2211
    start_time = 0;
 
2212
  }
 
2213
  // for CACHE_EVENT_RESPONSE/XXX the lock was acquired at the higher level
 
2214
  intptr_t return_error = 0;
 
2215
  ClusterVCToken *pToken = NULL;
 
2216
 
 
2217
retry:
 
2218
 
 
2219
  switch (event) {
 
2220
  default:
 
2221
    ink_assert(!"bad case");
 
2222
    return EVENT_DONE;
 
2223
 
 
2224
  case EVENT_INTERVAL:{
 
2225
 
 
2226
      unsigned int hash = FOLDHASH(target_ip, seq_number);
 
2227
 
 
2228
      MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
 
2229
      if (!queuelock) {
 
2230
        e->schedule_in(CACHE_RETRY_PERIOD);
 
2231
        return EVENT_CONT;
 
2232
      }
 
2233
      // we are not yet enqueued on the list of outstanding operations
 
2234
 
 
2235
      if (!remoteCacheContQueue[hash].in(this)) {
 
2236
        remoteCacheContQueue[hash].enqueue(this);
 
2237
        ink_assert(timeout == e);
 
2238
        MUTEX_RELEASE(queuelock);
 
2239
        e->schedule_in(cache_cluster_timeout);
 
2240
        return EVENT_CONT;
 
2241
      }
 
2242
      // a timeout has occurred
 
2243
 
 
2244
      if (find_cache_continuation(seq_number, target_ip)) {
 
2245
        // Valid timeout
 
2246
        MUTEX_RELEASE(queuelock);
 
2247
 
 
2248
        Debug("cluster_timeout", "cluster op timeout %d", seq_number);
 
2249
        CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
 
2250
        request_timeout = true;
 
2251
        timeout = 0;
 
2252
        //
 
2253
        // Post error completion now and defer deallocation of
 
2254
        // the continuation until we receive the reply or the
 
2255
        // target node goes down.
 
2256
        //
 
2257
        if (!action.cancelled)
 
2258
          action.continuation->handleEvent(result, (void *) -ECLUSTER_OP_TIMEOUT);
 
2259
        action.cancelled = 1;
 
2260
 
 
2261
        if (target_machine->dead) {
 
2262
          event = CACHE_EVENT_RESPONSE_MSG;
 
2263
          goto retry;
 
2264
        } else {
 
2265
          timeout = e;
 
2266
          e->schedule_in(cache_cluster_timeout);
 
2267
          return EVENT_DONE;
 
2268
        }
 
2269
 
 
2270
      } else {
 
2271
        // timeout not expected for continuation; log and ignore
 
2272
        MUTEX_RELEASE(queuelock);
 
2273
        Debug("cluster_timeout", "unknown cluster op timeout %d", seq_number);
 
2274
        Note("Unexpected CacheCont timeout, [%u.%u.%u.%u] seqno=%d", DOT_SEPARATED(target_ip), seq_number);
 
2275
        CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
 
2276
        return EVENT_DONE;
 
2277
      }
 
2278
    }
 
2279
 
 
2280
  case CACHE_EVENT_RESPONSE:
 
2281
  case CACHE_EVENT_RESPONSE_MSG:{
 
2282
 
 
2283
      // the response has arrived, cancel timeout
 
2284
 
 
2285
      if (timeout) {
 
2286
        timeout->cancel();
 
2287
        timeout = 0;
 
2288
      }
 
2289
      // remove from the pending queue
 
2290
      unsigned int hash = FOLDHASH(target_ip, seq_number);
 
2291
 
 
2292
      remoteCacheContQueue[hash].remove(this);
 
2293
      MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], this_ethread());
 
2294
      // Fall through
 
2295
    }
 
2296
 
 
2297
  case CACHE_EVENT_RESPONSE_RETRY:{
 
2298
 
 
2299
      // determine result code
 
2300
 
 
2301
      CacheContinuation *c = (CacheContinuation *) e;
 
2302
      CacheOpReplyMsg *msg = (CacheOpReplyMsg *) e;
 
2303
      if (event == CACHE_EVENT_RESPONSE_MSG) {
 
2304
        result = (request_timeout ? result : msg->result);
 
2305
        pToken = (request_timeout ? &token : &msg->token);
 
2306
      } else if (event == CACHE_EVENT_RESPONSE) {
 
2307
        result = (request_timeout ? result : c->result);
 
2308
        pToken = &c->token;
 
2309
      } else if (event == CACHE_EVENT_RESPONSE_RETRY) {
 
2310
        pToken = &token;
 
2311
      } else {
 
2312
        ink_release_assert(!"remoteOpEvent bad event code");
 
2313
      }
 
2314
 
 
2315
      // handle response
 
2316
 
 
2317
      if (result == CACHE_EVENT_LOOKUP) {
 
2318
        callback_user(result, 0);
 
2319
        return EVENT_DONE;
 
2320
 
 
2321
      } else if (event_is_open(result)) {
 
2322
        bool read_op = ((request_opcode == CACHE_OPEN_READ)
 
2323
                        || (request_opcode == CACHE_OPEN_READ_LONG));
 
2324
        if (read_op) {
 
2325
          ink_release_assert(read_cluster_vc->pending_remote_fill > 1);
 
2326
          read_cluster_vc->pending_remote_fill = 0;
 
2327
 
 
2328
          have_all_data = pToken->is_clear();   // no conn implies all data
 
2329
          if (have_all_data) {
 
2330
            read_cluster_vc->have_all_data = 1;
 
2331
          } else {
 
2332
            read_cluster_vc->have_all_data = 0;
 
2333
          }
 
2334
          // Move CacheHTTPInfo reply data into VC
 
2335
          read_cluster_vc->marshal_buf = this->getMsgBufferIOBData();
 
2336
          read_cluster_vc->alternate = this->ic_new_info;
 
2337
          this->ic_new_info.clear();
 
2338
          ink_release_assert(read_cluster_vc->alternate.object_size_get());
 
2339
 
 
2340
          if (!action.cancelled) {
 
2341
            ClusterVConnection *target_vc = read_cluster_vc;
 
2342
            callback_user(result, target_vc);   // "this" is deallocated
 
2343
            target_vc->allow_remote_close();
 
2344
          } else {
 
2345
            read_cluster_vc->allow_remote_close();
 
2346
            read_cluster_vc->do_io(VIO::ABORT);
 
2347
            cacheContAllocator_free(this);
 
2348
          }
 
2349
 
 
2350
        } else {
 
2351
          ink_assert(result == CACHE_EVENT_OPEN_WRITE);
 
2352
          ink_assert(!pToken->is_clear());
 
2353
 
 
2354
          ClusterVConnection *result_vc = write_cluster_vc;
 
2355
          if (!action.cancelled) {
 
2356
            callback_user(result, result_vc);
 
2357
            result_vc->allow_remote_close();
 
2358
          } else {
 
2359
            result_vc->allow_remote_close();
 
2360
            result_vc->do_io(VIO::ABORT);
 
2361
            cacheContAllocator_free(this);
 
2362
          }
 
2363
        }
 
2364
        return EVENT_DONE;
 
2365
      }
 
2366
      break;
 
2367
    }                           // End of case
 
2368
  }                             // End of switch
 
2369
 
 
2370
  // Handle failure cases
 
2371
 
 
2372
  if (result == CACHE_EVENT_LOOKUP_FAILED) {
 
2373
 
 
2374
 
 
2375
    // check for local probes
 
2376
 
 
2377
    ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
 
2378
 
 
2379
    // if the current configuration indicates that this
 
2380
    // machine is the master (or the owner machine has failed), go to
 
2381
    // the local machine.  Also if PROBE_LOCAL_CACHE_LAST.
 
2382
    //
 
2383
    int len = getMsgBufferLen();
 
2384
    char *hostname = (len ? getMsgBuffer() : 0);
 
2385
 
 
2386
    if (!m || PROBE_LOCAL_CACHE_LAST) {
 
2387
      SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
 
2388
      CacheKey key(url_md5);
 
2389
 
 
2390
      Cache *call_cache = caches[frag_type];
 
2391
      call_cache->lookup(this, &key, frag_type, hostname, len);
 
2392
      return EVENT_DONE;
 
2393
    }
 
2394
    if (PROBE_LOCAL_CACHE_FIRST) {
 
2395
      callback_user(CACHE_EVENT_LOOKUP_FAILED, 0);
 
2396
    } else {
 
2397
      SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
 
2398
      CacheKey key(url_md5);
 
2399
 
 
2400
      Cache *call_cache = caches[frag_type];
 
2401
      call_cache->lookup(this, &key, frag_type, hostname, len);
 
2402
    }
 
2403
    return EVENT_DONE;
 
2404
 
 
2405
  } else {
 
2406
    // Handle failure of all ops except for lookup
 
2407
 
 
2408
    ClusterVConnection *cacheable_vc = 0;
 
2409
    if ((request_opcode == CACHE_OPEN_READ_LONG) && !pToken->is_clear()) {
 
2410
      ink_assert(read_cluster_vc && !write_cluster_vc);
 
2411
      //
 
2412
      // OPEN_READ_LONG has failed, but the remote node was able to
 
2413
      // establish an OPEN_WRITE_LONG connection.
 
2414
      // Convert the cluster read VC to a write VC and insert it
 
2415
      // into the global write VC cache.  This will allow us to
 
2416
      // locally resolve the subsequent OPEN_WRITE_LONG request.
 
2417
      //
 
2418
 
 
2419
      // Note: We do not allow remote close on this VC while
 
2420
      //       it resides in cache
 
2421
      //
 
2422
      read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
 
2423
      // FIX ME. ajitb 12/21/99
 
2424
      // Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
 
2425
      // Does not accept assignment of ((Continuation *) NULL)
 
2426
      {
 
2427
        Continuation *temp = NULL;
 
2428
        read_cluster_vc->action_ = temp;
 
2429
      }
 
2430
      if (!GlobalOpenWriteVCcache->insert(&url_md5, read_cluster_vc)) {
 
2431
        // Unable to insert VC into cache, try later
 
2432
        cacheable_vc = read_cluster_vc;
 
2433
      }
 
2434
      read_cluster_vc = 0;
 
2435
    }
 
2436
    if (read_cluster_vc) {
 
2437
      read_cluster_vc->remote_closed = 0;       // send remote close
 
2438
      read_cluster_vc->allow_remote_close();
 
2439
      read_cluster_vc->do_io(VIO::ABORT);
 
2440
      read_cluster_vc = 0;
 
2441
    }
 
2442
    if (write_cluster_vc) {
 
2443
      write_cluster_vc->remote_closed = 0;      // send remote close
 
2444
      write_cluster_vc->allow_remote_close();
 
2445
      write_cluster_vc->do_io(VIO::ABORT);
 
2446
      write_cluster_vc = 0;
 
2447
    }
 
2448
    if (!request_timeout) {
 
2449
      if (!return_error) {
 
2450
        return_error = result_error;
 
2451
      }
 
2452
      if (cacheable_vc) {
 
2453
        insert_cache_callback_user(cacheable_vc, result, (void *) return_error);
 
2454
      } else {
 
2455
        callback_user(result, (void *) return_error);
 
2456
      }
 
2457
    } else {
 
2458
      // callback already made at timeout, just free continuation
 
2459
      if (cacheable_vc) {
 
2460
        cacheable_vc->allow_remote_close();
 
2461
        cacheable_vc->do_io(VIO::CLOSE);
 
2462
        cacheable_vc = 0;
 
2463
      }
 
2464
      cacheContAllocator_free(this);
 
2465
    }
 
2466
    return EVENT_DONE;
 
2467
  }
 
2468
}
 
2469
 
 
2470
//////////////////////////////////////////////////////////////////////////
 
2471
// probeLookupEvent()
 
2472
//   After a local probe, return the response to the client and cleanup.
 
2473
//////////////////////////////////////////////////////////////////////////
 
2474
 
 
2475
int
 
2476
CacheContinuation::probeLookupEvent(int event, void *d)
 
2477
{
 
2478
  NOWARN_UNUSED(d);
 
2479
  ink_assert(magicno == (int) MagicNo);
 
2480
  callback_user(event, 0);
 
2481
  return EVENT_DONE;
 
2482
}
 
2483
 
 
2484
///////////////////////////////////////////////////////////
 
2485
// lookupEvent()
 
2486
//   Result of a local lookup for PROBE_LOCAL_CACHE_FIRST
 
2487
///////////////////////////////////////////////////////////
 
2488
int
 
2489
CacheContinuation::lookupEvent(int event, void *d)
 
2490
{
 
2491
  NOWARN_UNUSED(event);
 
2492
  NOWARN_UNUSED(d);
 
2493
  ink_release_assert(!"Invalid call CacheContinuation::lookupEvent");
 
2494
  return EVENT_DONE;
 
2495
 
 
2496
}
 
2497
 
 
2498
 
 
2499
 
 
2500
//////////////////////////////////////////////////////////////////////////
 
2501
// do_remote_lookup()
 
2502
//   If the object is supposed to be on a remote machine, probe there.
 
2503
//   Returns: Non zero (Action *) if a probe was initiated
 
2504
//            Zero (Action *) if no probe
 
2505
//////////////////////////////////////////////////////////////////////////
 
2506
Action *
 
2507
CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
 
2508
                                    CacheContinuation * c, CacheFragType ft, char *hostname, int hostname_len)
 
2509
{
 
2510
  int probe_depth = 0;
 
2511
  ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH] = { 0 };
 
2512
  int mlen = op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP) + ((hostname && hostname_len) ? hostname_len : 0);
 
2513
  CacheLookupMsg *msg = (CacheLookupMsg *) ALLOCA_DOUBLE(mlen);
 
2514
#ifdef PURIFY
 
2515
  memset((char *) msg, 0, mlen);
 
2516
#endif
 
2517
  msg->init();
 
2518
 
 
2519
 
 
2520
  if (key) {
 
2521
    msg->url_md5 = *key;
 
2522
  } else {
 
2523
    ink_assert(c);
 
2524
    msg->url_md5 = c->url_md5;
 
2525
  }
 
2526
 
 
2527
  ClusterMachine *m = NULL;
 
2528
 
 
2529
  if (cache_migrate_on_demand) {
 
2530
    m = cluster_machine_at_depth(cache_hash(msg->url_md5),
 
2531
                                 c ? &c->probe_depth : &probe_depth, c ? c->past_probes : past_probes);
 
2532
  } else {
 
2533
 
 
2534
    // If migrate-on-demand is off, do not probe beyond one level.
 
2535
 
 
2536
    if (c && c->probe_depth)
 
2537
      return (Action *) 0;
 
2538
    m = cluster_machine_at_depth(cache_hash(msg->url_md5));
 
2539
    if (c)
 
2540
      c->probe_depth = 1;
 
2541
  }
 
2542
 
 
2543
  if (!m)
 
2544
    return (Action *) 0;
 
2545
 
 
2546
  // If we do not have a continuation, build one
 
2547
 
 
2548
  if (!c) {
 
2549
    c = cacheContAllocator_alloc();
 
2550
    c->mutex = cont->mutex;
 
2551
    c->probe_depth = probe_depth;
 
2552
    memcpy(c->past_probes, past_probes, sizeof(past_probes));
 
2553
  }
 
2554
  // Save hostname data in case we need to do a local lookup.
 
2555
  if (hostname && hostname_len) {
 
2556
    // Alloc buffer, copy hostname data and attach to continuation
 
2557
    c->setMsgBufferLen(hostname_len);
 
2558
    c->allocMsgBuffer();
 
2559
    memcpy(c->getMsgBuffer(), hostname, hostname_len);
 
2560
  }
 
2561
 
 
2562
  c->url_md5 = msg->url_md5;
 
2563
  c->action.cancelled = false;
 
2564
  c->action = cont;
 
2565
  c->start_time = ink_get_hrtime();
 
2566
  SET_CONTINUATION_HANDLER(c, (CacheContHandler)
 
2567
                           & CacheContinuation::remoteOpEvent);
 
2568
  c->result = CACHE_EVENT_LOOKUP_FAILED;
 
2569
 
 
2570
  // set up sequence number so we can find this continuation
 
2571
 
 
2572
  c->target_ip = m->ip;
 
2573
  c->seq_number = new_cache_sequence_number();
 
2574
  msg->seq_number = c->seq_number;
 
2575
  c->frag_type = ft;
 
2576
  msg->frag_type = ft;
 
2577
 
 
2578
  // establish timeout for lookup
 
2579
 
 
2580
  unsigned int hash = FOLDHASH(c->target_ip, c->seq_number);
 
2581
  MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
 
2582
  if (!queuelock) {
 
2583
    // failed to acquire lock: no problem, retry later
 
2584
    c->timeout = eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
 
2585
  } else {
 
2586
    remoteCacheContQueue[hash].enqueue(c);
 
2587
    MUTEX_RELEASE(queuelock);
 
2588
    c->timeout = eventProcessor.schedule_in(c, cache_cluster_timeout, ET_CACHE_CONT_SM);
 
2589
  }
 
2590
 
 
2591
  char *data;
 
2592
  int len;
 
2593
  int vers = CacheLookupMsg::protoToVersion(m->msg_proto_major);
 
2594
 
 
2595
  if (vers == CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {
 
2596
    msg->seq_number = c->seq_number;
 
2597
    data = (char *) msg;
 
2598
    len = mlen;
 
2599
    if (hostname && hostname_len) {
 
2600
      memcpy(msg->moi, hostname, hostname_len);
 
2601
    }
 
2602
  } else {
 
2603
    //////////////////////////////////////////////////////////////
 
2604
    // Create the specified down rev version of this message
 
2605
    //////////////////////////////////////////////////////////////
 
2606
    ink_release_assert(!"CacheLookupMsg bad msg version");
 
2607
  }
 
2608
 
 
2609
  // send the message
 
2610
 
 
2611
#ifdef CACHE_MSG_TRACE
 
2612
  log_cache_op_sndmsg(msg.seq_number, 0, "cache_lookup");
 
2613
#endif
 
2614
  clusterProcessor.invoke_remote(m, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
 
2615
  return &c->action;
 
2616
}
 
2617
 
 
2618
 
 
2619
////////////////////////////////////////////////////////////////////////////
 
2620
// cache_lookup_ClusterFunction()
 
2621
//   This function is invoked on a remote machine to do a remote lookup.
 
2622
//   It unmarshals the URL and does a local lookup, with its own
 
2623
//   continuation set to CacheContinuation::replyLookupEvent()
 
2624
////////////////////////////////////////////////////////////////////////////
 
2625
void
 
2626
cache_lookup_ClusterFunction(ClusterMachine * from, void *data, int len)
 
2627
{
 
2628
  (void) from;
 
2629
  (void) len;
 
2630
  EThread *thread = this_ethread();
 
2631
  ProxyMutex *mutex = thread->mutex;
 
2632
  ////////////////////////////////////////////////////////
 
2633
  // Note: we are running on the ET_CLUSTER thread
 
2634
  ////////////////////////////////////////////////////////
 
2635
 
 
2636
  CacheLookupMsg *msg = (CacheLookupMsg *) data;
 
2637
  ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
 
2638
 
 
2639
  if (mh->GetMsgVersion() != CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {    ////////////////////////////////////////////////
 
2640
    // Convert from old to current message format
 
2641
    ////////////////////////////////////////////////
 
2642
    ink_release_assert(!"cache_lookup_ClusterFunction() bad msg version");
 
2643
  }
 
2644
 
 
2645
  if (mh->NeedByteSwap())
 
2646
    msg->SwapBytes();
 
2647
 
 
2648
  CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
 
2649
 
 
2650
  CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
 
2651
  c->mutex = new_ProxyMutex();
 
2652
  MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
 
2653
  c->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
 
2654
  c->seq_number = msg->seq_number;
 
2655
  c->from = from;
 
2656
  c->url_md5 = msg->url_md5;
 
2657
  SET_CONTINUATION_HANDLER(c, (CacheContHandler)
 
2658
                           & CacheContinuation::replyLookupEvent);
 
2659
 
 
2660
  CacheKey key(msg->url_md5);
 
2661
#ifdef CACHE_MSG_TRACE
 
2662
  log_cache_op_msg(msg->seq_number, 0, "cache_lookup");
 
2663
#endif
 
2664
 
 
2665
  // Extract hostname data if passed.
 
2666
 
 
2667
  char *hostname;
 
2668
  int hostname_len = len - op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP);
 
2669
  hostname = (hostname_len ? (char *) msg->moi : 0);
 
2670
 
 
2671
  // Note: Hostname data invalid after return from lookup
 
2672
  Cache *call_cache = caches[msg->frag_type];
 
2673
  call_cache->lookup(c, &key, (CacheFragType) msg->frag_type, hostname, hostname_len);
 
2674
}
 
2675
 
 
2676
/////////////////////////////////////////////////////////////////////////
 
2677
// replyLookupEvent()
 
2678
//   This function handles the result of a lookup on a remote machine.
 
2679
//   It packages up the result and sends it back to the calling machine.
 
2680
/////////////////////////////////////////////////////////////////////////
 
2681
int
 
2682
CacheContinuation::replyLookupEvent(int event, void *d)
 
2683
{
 
2684
  NOWARN_UNUSED(d);
 
2685
  ink_hrtime now;
 
2686
  now = ink_get_hrtime();
 
2687
  CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
 
2688
  LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
 
2689
 
 
2690
  int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
 
2691
  if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
 
2692
    CacheOpReplyMsg *msg;
 
2693
    int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
 
2694
#ifdef PURIFY
 
2695
    msg = (CacheOpReplyMsg *) xmalloc(flen);
 
2696
#else
 
2697
    msg = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen);
 
2698
#endif
 
2699
    msg->init();
 
2700
    CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
 
2701
    int len = flen - sizeof(msg->token);
 
2702
 
 
2703
    if (!no_reply_message) {
 
2704
      msg->seq_number = seq_number;
 
2705
      msg->result = event;
 
2706
#ifdef CACHE_MSG_TRACE
 
2707
      log_cache_op_sndmsg(seq_number, event, "cache_result");
 
2708
#endif
 
2709
      clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
 
2710
#ifdef PURIFY
 
2711
      xfree(msg);
 
2712
#endif
 
2713
    }
 
2714
  } else {
 
2715
    //////////////////////////////////////////////////////////////
 
2716
    // Create the specified down rev version of this message
 
2717
    //////////////////////////////////////////////////////////////
 
2718
    ink_release_assert(!"replyLookupEvent() bad msg version");
 
2719
  }
 
2720
 
 
2721
  // Free up everything
 
2722
 
 
2723
  cacheContAllocator_free(this);
 
2724
  return EVENT_DONE;
 
2725
}
 
2726
 
 
2727
int32_t CacheContinuation::getObjectSize(VConnection * vc, int opcode, CacheHTTPInfo * ret_ci)
 
2728
{
 
2729
  CacheHTTPInfo *ci = 0;
 
2730
  int64_t object_size = 0;
 
2731
 
 
2732
  if ((opcode == CACHE_OPEN_READ_LONG)
 
2733
      || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
 
2734
 
 
2735
    ((CacheVC *) vc)->get_http_info(&ci);
 
2736
    if (ci) {
 
2737
      object_size = ci->object_size_get();
 
2738
 
 
2739
    } else {
 
2740
      ci = 0;
 
2741
      object_size = 0;
 
2742
    }
 
2743
 
 
2744
  } else {
 
2745
    object_size = ((CacheVC *)vc)->get_object_size();
 
2746
  }
 
2747
 
 
2748
  if (ret_ci && !ret_ci->valid()) {
 
2749
    CacheHTTPInfo
 
2750
      new_ci;
 
2751
    new_ci.create();
 
2752
    if (ci) {
 
2753
      // Initialize copy
 
2754
      new_ci.copy(ci);
 
2755
    } else {
 
2756
      new_ci.object_size_set(object_size);
 
2757
    }
 
2758
    new_ci.m_alt->m_writeable = 1;
 
2759
    ret_ci->copy_shallow(&new_ci);
 
2760
  }
 
2761
  ink_release_assert(object_size);
 
2762
  return object_size;
 
2763
}
 
2764
 
 
2765
//////////////////////////////////////////////////////////////////////////
 
2766
// insert_cache_callback_user()
 
2767
//  Insert write VC into global cache prior to performing user callback.
 
2768
//////////////////////////////////////////////////////////////////////////
 
2769
void
 
2770
CacheContinuation::insert_cache_callback_user(ClusterVConnection * vc, int res, void *e)
 
2771
{
 
2772
  if (GlobalOpenWriteVCcache->insert(&url_md5, vc)) {
 
2773
    // Inserted
 
2774
    callback_user(res, e);
 
2775
 
 
2776
  } else {
 
2777
    // Unable to insert, try later
 
2778
    result = res;
 
2779
    callback_data = e;
 
2780
    callback_data_2 = (void *) vc;
 
2781
    SET_HANDLER((CacheContHandler) & CacheContinuation::insertCallbackEvent);
 
2782
    eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
 
2783
  }
 
2784
}
 
2785
 
 
2786
int
 
2787
CacheContinuation::insertCallbackEvent(int event, Event * e)
 
2788
{
 
2789
  NOWARN_UNUSED(event);
 
2790
  NOWARN_UNUSED(e);
 
2791
  if (GlobalOpenWriteVCcache->insert(&url_md5, (ClusterVConnection *)
 
2792
                                     callback_data_2)) {
 
2793
    // Inserted
 
2794
    callback_user(result, callback_data);
 
2795
 
 
2796
  } else {
 
2797
    // Unable to insert, try later
 
2798
    eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
 
2799
  }
 
2800
  return EVENT_DONE;
 
2801
}
 
2802
 
 
2803
///////////////////////////////////////////////////////////////////
 
2804
// callback_user()
 
2805
//  Invoke handleEvent on the given continuation (cont) with
 
2806
//    considerations for Action.
 
2807
///////////////////////////////////////////////////////////////////
 
2808
void
 
2809
CacheContinuation::callback_user(int res, void *e)
 
2810
{
 
2811
  EThread *et = this_ethread();
 
2812
 
 
2813
  if (!is_ClusterThread(et)) {
 
2814
    MUTEX_TRY_LOCK(lock, mutex, et);
 
2815
    if (lock) {
 
2816
      if (!action.cancelled) {
 
2817
        action.continuation->handleEvent(res, e);
 
2818
      }
 
2819
      cacheContAllocator_free(this);
 
2820
 
 
2821
    } else {
 
2822
      // Unable to acquire lock, retry later
 
2823
      defer_callback_result(res, e);
 
2824
    }
 
2825
  } else {
 
2826
    // Can not post completion on ET_CLUSTER thread.
 
2827
    defer_callback_result(res, e);
 
2828
  }
 
2829
}
 
2830
 
 
2831
void
 
2832
CacheContinuation::defer_callback_result(int r, void *e)
 
2833
{
 
2834
  result = r;
 
2835
  callback_data = e;
 
2836
  SET_HANDLER((CacheContHandler) & CacheContinuation::callbackResultEvent);
 
2837
  eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
 
2838
}
 
2839
 
 
2840
int
 
2841
CacheContinuation::callbackResultEvent(int event, Event * e)
 
2842
{
 
2843
  NOWARN_UNUSED(event);
 
2844
  NOWARN_UNUSED(e);
 
2845
  if (!action.cancelled)
 
2846
    action.continuation->handleEvent(result, callback_data);
 
2847
  cacheContAllocator_free(this);
 
2848
  return EVENT_DONE;
 
2849
}
 
2850
 
 
2851
//-----------------------------------------------------------------
 
2852
// CacheContinuation static member functions
 
2853
//-----------------------------------------------------------------
 
2854
 
 
2855
///////////////////////////////////////////////////////////////////////
 
2856
// cacheContAllocator_alloc()
 
2857
///////////////////////////////////////////////////////////////////////
 
2858
CacheContinuation *
 
2859
CacheContinuation::cacheContAllocator_alloc()
 
2860
{
 
2861
  return cacheContAllocator.alloc();
 
2862
}
 
2863
 
 
2864
 
 
2865
///////////////////////////////////////////////////////////////////////
 
2866
// cacheContAllocator_free()
 
2867
///////////////////////////////////////////////////////////////////////
 
2868
void
 
2869
CacheContinuation::cacheContAllocator_free(CacheContinuation * c)
 
2870
{
 
2871
  ink_assert(c->magicno == (int) MagicNo);
 
2872
//  ink_assert(!c->cache_op_ClusterFunction);
 
2873
  c->magicno = -1;
 
2874
#ifdef ENABLE_TIME_TRACE
 
2875
  c->start_time = 0;
 
2876
#endif
 
2877
  c->free();
 
2878
  c->mutex = NULL;
 
2879
  // FIX ME. ajitb 12/21/99
 
2880
  // Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
 
2881
  // Does not accept assignment of ((Continuation *) NULL)
 
2882
  {
 
2883
    Continuation *temp = NULL;
 
2884
    c->action = temp;
 
2885
  }
 
2886
  c->tunnel_mutex = NULL;
 
2887
  cacheContAllocator.free(c);
 
2888
}
 
2889
 
 
2890
/////////////////////////////////////////////////////////////////////////
 
2891
// callback_failure()
 
2892
//   Post error completion using a continuation.
 
2893
/////////////////////////////////////////////////////////////////////////
 
2894
Action *
 
2895
CacheContinuation::callback_failure(Action * a, int result, int err, CacheContinuation * this_cc)
 
2896
{
 
2897
  CacheContinuation *cc;
 
2898
  if (!this_cc) {
 
2899
    cc = cacheContAllocator_alloc();
 
2900
    cc->mutex = a->mutex;
 
2901
    cc->action = *a;
 
2902
 
 
2903
  } else {
 
2904
    cc = this_cc;
 
2905
  }
 
2906
  cc->result = result;
 
2907
  cc->result_error = err;
 
2908
  SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
 
2909
                           & CacheContinuation::callbackEvent);
 
2910
  eventProcessor.schedule_imm(cc, ET_CACHE_CONT_SM);
 
2911
  return &cc->action;
 
2912
}
 
2913
 
 
2914
///////////////////////////////////////////////////////////////////////
 
2915
// callbackEvent()
 
2916
//  Invoke callback and deallocate continuation.
 
2917
///////////////////////////////////////////////////////////////////////
 
2918
int
 
2919
CacheContinuation::callbackEvent(int event, Event * e)
 
2920
{
 
2921
  NOWARN_UNUSED(event);
 
2922
  NOWARN_UNUSED(e);
 
2923
  if (!action.cancelled)
 
2924
    action.continuation->handleEvent(result, (void *)(intptr_t)result_error);
 
2925
  cacheContAllocator_free(this);
 
2926
  return EVENT_DONE;
 
2927
}
 
2928
 
 
2929
//------------------------------------------------------------------
 
2930
// File static functions
 
2931
//------------------------------------------------------------------
 
2932
 
 
2933
////////////////////////////////////////////////////////////////////////
 
2934
// find_cache_continuation()
 
2935
//   Find a currently pending cache continuation expecting a response.
 
2936
//   Requires taking the lock on the remoteCacheContQueueMutex first.
 
2937
////////////////////////////////////////////////////////////////////////
 
2938
static CacheContinuation *
 
2939
find_cache_continuation(unsigned int seq_number, unsigned int from_ip)
 
2940
{
 
2941
  unsigned int hash = FOLDHASH(from_ip, seq_number);
 
2942
  CacheContinuation *c = NULL;
 
2943
  CacheContinuation *lastc = NULL;
 
2944
  for (c = (CacheContinuation *) remoteCacheContQueue[hash].head; c; c = (CacheContinuation *) c->link.next) {
 
2945
    if (seq_number == c->seq_number && from_ip == c->target_ip) {
 
2946
      if (lastc) {
 
2947
        ink_release_assert(c->link.prev == lastc);
 
2948
      } else {
 
2949
        ink_release_assert(!c->link.prev);
 
2950
      }
 
2951
      break;
 
2952
    }
 
2953
 
 
2954
    lastc = c;
 
2955
  }
 
2956
  return c;
 
2957
}
 
2958
 
 
2959
/////////////////////////////////////////////////////////////////////////////
 
2960
// new_cache_sequence_number()
 
2961
//  Generate unique request sequence numbers
 
2962
/////////////////////////////////////////////////////////////////////////////
 
2963
static unsigned int
 
2964
new_cache_sequence_number()
 
2965
{
 
2966
  unsigned int res = 0;
 
2967
 
 
2968
  do {
 
2969
    res = (unsigned int) ink_atomic_increment(&cluster_sequence_number, 1);
 
2970
  } while (!res);
 
2971
 
 
2972
 
 
2973
 
 
2974
  return res;
 
2975
}
 
2976
 
 
2977
/***************************************************************************/
 
2978
#ifdef OMIT
 
2979
/***************************************************************************/
 
2980
/////////////////////////////////////////////////////////////////////////////
 
2981
// forwardEvent()
 
2982
//   for migrate-on-demand, make a connection between the
 
2983
//   the node which has the object and the node which should have it.
 
2984
//
 
2985
//   prepared for either OPEN_READ (from current owner)
 
2986
//   or OPEN_WRITE (from new owner)
 
2987
/////////////////////////////////////////////////////////////////////////////
 
2988
int
 
2989
CacheContinuation::forwardEvent(int event, VConnection * c)
 
2990
{
 
2991
  int ret = EVENT_CONT;
 
2992
  cluster_vc = 0;
 
2993
 
 
2994
  cache_read = false;
 
2995
  switch (event) {
 
2996
  default:
 
2997
    ink_assert(!"bad case");
 
2998
  case CACHE_EVENT_OPEN_WRITE_FAILED:
 
2999
    ret = EVENT_DONE;
 
3000
    break;
 
3001
  case CACHE_EVENT_OPEN_WRITE:
 
3002
    cluster_vc = c;
 
3003
    break;
 
3004
  case CACHE_EVENT_OPEN_READ_FAILED:
 
3005
    cache_read = true;
 
3006
    ret = EVENT_DONE;
 
3007
    break;
 
3008
  case CACHE_EVENT_OPEN_READ:
 
3009
    cache_read = true;
 
3010
    cluster_vc = c;
 
3011
    break;
 
3012
  }
 
3013
  SET_HANDLER((CacheContHandler) & CacheContinuation::forwardWaitEvent);
 
3014
  return ret;
 
3015
}
 
3016
 
 
3017
////////////////////////////////////////////////////////////////////////
 
3018
// forwardWaitEvent()
 
3019
//   For migrate-on-demand, make a connection as above (forwardEvent)
 
3020
//   second either OPEN_READ or OPEN_WRITE,
 
3021
//   the data for the first is stored in (cluster_vc,cache_read)
 
3022
////////////////////////////////////////////////////////////////////////
 
3023
int
 
3024
CacheContinuation::forwardWaitEvent(int event, VConnection * c)
 
3025
{
 
3026
  int ret = EVENT_CONT;
 
3027
  int res = CACHE_EVENT_OPEN_READ_FAILED;
 
3028
  void *res_data = NULL;
 
3029
  VConnection *vc = NULL;
 
3030
 
 
3031
  switch (event) {
 
3032
  default:
 
3033
    ink_assert(!"bad case");
 
3034
  case CACHE_EVENT_OPEN_WRITE_FAILED:
 
3035
  case CACHE_EVENT_OPEN_READ_FAILED:
 
3036
    ret = EVENT_DONE;
 
3037
    break;
 
3038
  case CACHE_EVENT_OPEN_WRITE:
 
3039
  case CACHE_EVENT_OPEN_READ:
 
3040
    vc = c;
 
3041
    break;
 
3042
 
 
3043
  }
 
3044
  VConnection *read_vc = (cache_read ? cluster_vc : vc);
 
3045
  VConnection *write_vc = (!cache_read ? cluster_vc : vc);
 
3046
 
 
3047
  res = read_vc ? CACHE_EVENT_OPEN_READ : CACHE_EVENT_OPEN_READ_FAILED;
 
3048
  res_data = read_vc;
 
3049
 
 
3050
  // if the read and write are sucessful, tunnel the read to the write
 
3051
  if (read_vc && write_vc) {
 
3052
    res_data = NEW(new VCTee(read_vc, write_vc, vio));
 
3053
    if (vio) {                  // CACHE_EVENT_OPEN_READ_VIO
 
3054
      res = event;
 
3055
      res_data = &((VCTee *) read_vc)->vio;
 
3056
    }
 
3057
  }
 
3058
  // if the read is sucessful return it to the user
 
3059
  //
 
3060
  c->handleEvent(res, res_data);
 
3061
  return ret;
 
3062
}
 
3063
 
 
3064
/////////////////////////////////////////////////////////////////////
 
3065
// tunnelEvent()
 
3066
//   If the reply requires data, tunnel the data from the cache
 
3067
//   to the cluster.
 
3068
/////////////////////////////////////////////////////////////////////
 
3069
int
 
3070
CacheContinuation::tunnelEvent(int event, VConnection * vc)
 
3071
{
 
3072
  int ret = EVENT_DONE;
 
3073
  int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();    // include token
 
3074
  int len = 0;
 
3075
  bool read_buf = ((request_opcode == CACHE_OPEN_READ_BUFFER)
 
3076
                   || (request_opcode == CACHE_OPEN_READ_BUFFER_LONG));
 
3077
  ink_release_assert(!read_buf);
 
3078
 
 
3079
  CacheOpReplyMsg rmsg;
 
3080
  CacheOpReplyMsg *msg = &rmsg;
 
3081
  msg->result = result;
 
3082
  msg->seq_number = seq_number;
 
3083
  msg->token = token;
 
3084
  int expect_reply = 1;
 
3085
 
 
3086
  if (event == CLUSTER_EVENT_OPEN) {
 
3087
    if (cache_read) {
 
3088
      if (read_buf) {
 
3089
        ink_assert(have_all_data || (readahead_vio == &((CacheVConnection *) cluster_vc)->vio));
 
3090
        write_cluster_vc = (ClusterVConnection *) vc;
 
3091
 
 
3092
        if (have_all_data) {
 
3093
          msg->token.clear();   // Tell sender no conn established
 
3094
        } else {
 
3095
          msg->token = token;   // Tell sender conn established
 
3096
          setupReadBufTunnel(cluster_vc, vc);
 
3097
        }
 
3098
 
 
3099
      } else {
 
3100
        OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
 
3101
        pOWT->init(cluster_vc, vc, NULL, nbytes, this->mutex);
 
3102
        --expect_reply;
 
3103
      }
 
3104
 
 
3105
      ////////////////////////////////////////////////////////
 
3106
      // cache_read requires CacheHTTPInfo in reply message.
 
3107
      ////////////////////////////////////////////////////////
 
3108
      int res;
 
3109
      CacheHTTPInfo *ci;
 
3110
 
 
3111
      if (!cache_vc_info) {
 
3112
        // OPEN_READ case
 
3113
        (void) getObjectSize(cluster_vc, request_opcode, &cache_vc_info);
 
3114
      }
 
3115
      ci = cache_vc_info;
 
3116
 
 
3117
      // Determine data length and allocate
 
3118
      len = ci->marshal_length();
 
3119
      CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
 
3120
 
 
3121
      // Initialize reply message header
 
3122
      *reply = *msg;
 
3123
 
 
3124
      // Marshal response data into reply message
 
3125
      res = ci->marshal((char *) reply->moi, len);
 
3126
      ink_assert(res > 0);
 
3127
 
 
3128
      // Make reply message the current message
 
3129
      msg = reply;
 
3130
 
 
3131
    } else {
 
3132
      OneWayTunnel *pOWT = OneWayTunnelAllocator.alloc();
 
3133
      pOWT->init(vc, cluster_vc, NULL, nbytes, this->mutex);
 
3134
      --expect_reply;
 
3135
    }
 
3136
    ret = EVENT_CONT;
 
3137
  } else {
 
3138
    ink_release_assert(event == CLUSTER_EVENT_OPEN_FAILED);
 
3139
    msg->result = CACHE_EVENT_SET_FAILED(result);
 
3140
 
 
3141
    if (read_buf) {
 
3142
      Debug("cluster_timeout", "unable to make cluster connection2");
 
3143
      initial_buf = 0;          // Do not send data
 
3144
      initial_bufsize = 0;
 
3145
 
 
3146
      if (!have_all_data) {
 
3147
        // Shutdown cache connection and free MIOBuffer
 
3148
        MIOBuffer *mbuf = readahead_vio->buffer.writer();
 
3149
        cluster_vc->do_io(VIO::CLOSE);
 
3150
        free_MIOBuffer(mbuf);
 
3151
      }
 
3152
    } else {
 
3153
      Debug("cluster_timeout", "unable to make cluster connection2A");
 
3154
      cluster_vc->do_io(VIO::CLOSE);
 
3155
    }
 
3156
    len = 0 - (int) sizeof(msg->token);
 
3157
    --expect_reply;
 
3158
  }
 
3159
 
 
3160
  int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
 
3161
  if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
 
3162
    if (read_buf) {
 
3163
      // Transmit reply message and object data in same cluster message
 
3164
      clusterProcessor.invoke_remote_data(from,
 
3165
                                          CACHE_OP_RESULT_CLUSTER_FUNCTION,
 
3166
                                          (void *) msg, (flen + len),
 
3167
                                          initial_buf, initial_bufsize,
 
3168
                                          cluster_vc_channel, &token,
 
3169
                                          &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
 
3170
 
 
3171
    } else {
 
3172
      clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
 
3173
                                     (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
 
3174
    }
 
3175
  } else {
 
3176
    //////////////////////////////////////////////////////////////
 
3177
    // Create the specified down rev version of this message
 
3178
    //////////////////////////////////////////////////////////////
 
3179
    ink_release_assert(!"tunnelEvent() bad msg version");
 
3180
  }
 
3181
  if (expect_reply <= 0)
 
3182
    cacheContAllocator_free(this);
 
3183
  return ret;
 
3184
}
 
3185
 
 
3186
/////////////////////////////////////////////////////////////////////
 
3187
// remoteConnectEvent()
 
3188
//   If this was an open, make a connection on this side before
 
3189
//   responding to the user.
 
3190
/////////////////////////////////////////////////////////////////////
 
3191
int
 
3192
CacheContinuation::remoteConnectEvent(int event, VConnection * cvc)
 
3193
{
 
3194
  ClusterVConnection *vc = (ClusterVConnection *) cvc;
 
3195
 
 
3196
  if (event == CLUSTER_EVENT_OPEN) {
 
3197
    if (result == CACHE_EVENT_OPEN_READ) {
 
3198
      // Move CacheHTTPInfo reply data into VC
 
3199
      vc->alternate = this->ic_new_info;
 
3200
      this->ic_new_info.clear();
 
3201
    }
 
3202
    callback_user(result, vc);
 
3203
    return EVENT_CONT;
 
3204
  } else {
 
3205
    Debug("cluster_cache", "unable to make cluster connection");
 
3206
    callback_user(CACHE_EVENT_SET_FAILED(result), vc);
 
3207
    return EVENT_DONE;
 
3208
  }
 
3209
}
 
3210
 
 
3211
/***************************************************************************/
 
3212
#endif // OMIT
 
3213
/***************************************************************************/
 
3214
 
 
3215
// End of ClusterCache.cc