3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
24
/****************************************************************************
27
****************************************************************************/
29
#include "P_Cluster.h"
32
#define CLUSTER_TEST_DEBUG 1
35
#ifdef ENABLE_TIME_TRACE
36
int callback_time_dist[TIME_DIST_BUCKETS_SIZE];
37
int cache_callbacks = 0;
39
int rmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
40
int rmt_cache_callbacks = 0;
42
int lkrmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
43
int lkrmt_cache_callbacks = 0;
45
int cntlck_acquire_time_dist[TIME_DIST_BUCKETS_SIZE];
46
int cntlck_acquire_events = 0;
48
int open_delay_time_dist[TIME_DIST_BUCKETS_SIZE];
49
int open_delay_events = 0;
51
#endif // ENABLE_TIME_TRACE
53
// default will be read from config
54
int cache_migrate_on_demand = false;
59
static ClassAllocator<CacheContinuation> cacheContAllocator("cacheContAllocator");
61
static Queue<CacheContinuation> remoteCacheContQueue[REMOTE_CONNECT_HASH];
62
static Ptr<ProxyMutex> remoteCacheContQueueMutex[REMOTE_CONNECT_HASH];
64
// 0 is an illegal sequence number
65
#define CACHE_NO_RESPONSE 0
66
static int cluster_sequence_number = 1;
68
#ifdef CLUSTER_TEST_DEBUG
69
static ink_hrtime cache_cluster_timeout = HRTIME_SECONDS(65536);
71
static ink_hrtime cache_cluster_timeout = CACHE_CLUSTER_TIMEOUT;
77
static CacheContinuation *find_cache_continuation(unsigned int, unsigned int);
79
static unsigned int new_cache_sequence_number();
81
#define DOT_SEPARATED(_x) \
82
((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1], \
83
((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
85
#define ET_CACHE_CONT_SM ET_NET
86
#define ALLOW_THREAD_STEAL true
88
/**********************************************************************/
89
#ifdef CACHE_MSG_TRACE
90
/**********************************************************************/
92
/**********************************************************************/
93
// Debug trace support for cache RPC messages
94
/**********************************************************************/
96
#define MAX_TENTRIES 4096
103
struct traceEntry recvTraceTable[MAX_TENTRIES];
104
struct traceEntry sndTraceTable[MAX_TENTRIES];
106
static recvTraceTable_index = 0;
107
static sndTraceTable_index = 0;
110
log_cache_op_msg(unsigned int seqno, int op, char *type)
112
int t = ink_atomic_increment(&recvTraceTable_index, 1);
113
int n = recvTraceTable_index % MAX_TENTRIES;
114
recvTraceTable[n].seqno = seqno;
115
recvTraceTable[n].op = op;
116
recvTraceTable[n].type = type;
120
log_cache_op_sndmsg(unsigned int seqno, int op, char *type)
122
int t = ink_atomic_increment(&sndTraceTable_index, 1);
123
int n = sndTraceTable_index % MAX_TENTRIES;
124
sndTraceTable[n].seqno = seqno;
125
sndTraceTable[n].op = op;
126
sndTraceTable[n].type = type;
130
dump_recvtrace_table()
134
for (n = 0; n < MAX_TENTRIES; ++n)
135
printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno,
136
recvTraceTable[n].op, recvTraceTable[n].type ? recvTraceTable[n].type : "");
140
dump_sndtrace_table()
144
for (n = 0; n < MAX_TENTRIES; ++n)
145
printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno,
146
sndTraceTable[n].op, sndTraceTable[n].type ? sndTraceTable[n].type : "");
149
/**********************************************************************/
150
#endif // CACHE_MSG_TRACE
151
/**********************************************************************/
153
///////////////////////////////////////////////////////////////////////
154
// Cluster write VC cache.
155
///////////////////////////////////////////////////////////////////////
157
// In the event that a remote open read fails (HTTP only), an
158
// open write is issued and if successful a open write connection
159
// is returned for the open read. We cache the open write VC and
160
// resolve the subsequent open write locally from the write VC cache
161
// using the INK_MD5 of the URL.
162
// Note that this is a global per node cache.
163
///////////////////////////////////////////////////////////////////////
165
class ClusterVConnectionCache
168
ClusterVConnectionCache()
170
memset(hash_event, 0, sizeof(hash_event));
173
int MD5ToIndex(INK_MD5 * p);
174
int insert(INK_MD5 *, ClusterVConnection *);
175
ClusterVConnection *lookup(INK_MD5 *);
181
bool mark_for_delete;
183
ClusterVConnection *vc;
185
Entry():mark_for_delete(0), vc(0)
194
{ MAX_TABLE_ENTRIES = 256, // must be power of 2
195
SCAN_INTERVAL = 10 // seconds
197
Queue<Entry> hash_table[MAX_TABLE_ENTRIES];
198
Ptr<ProxyMutex> hash_lock[MAX_TABLE_ENTRIES];
199
Event *hash_event[MAX_TABLE_ENTRIES];
202
static ClassAllocator <
203
ClusterVConnectionCache::Entry >
204
ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry");
206
ClusterVConnectionCache *GlobalOpenWriteVCcache = 0;
208
/////////////////////////////////////////////////////////////////
209
// Perform periodic purges of ClusterVConnectionCache entries
210
/////////////////////////////////////////////////////////////////
211
class ClusterVConnectionCacheEvent:public Continuation
214
ClusterVConnectionCacheEvent(ClusterVConnectionCache * c, int n)
215
: Continuation(new_ProxyMutex()), cache(c), hash_index(n)
217
SET_HANDLER(&ClusterVConnectionCacheEvent::eventHandler);
219
int eventHandler(int, Event *);
222
ClusterVConnectionCache * cache;
227
ClusterVConnectionCache::init()
230
ClusterVConnectionCacheEvent *eh;
232
for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
233
hash_lock[n] = new_ProxyMutex();
235
for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
236
// Setup up periodic purge events on each hash list
238
eh = new ClusterVConnectionCacheEvent(this, n);
240
eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
244
ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p)
246
uint64_t i = p->fold();
251
return ((h ^ l) % MAX_TABLE_ENTRIES) & (MAX_TABLE_ENTRIES - 1);
255
ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
257
int index = MD5ToIndex(key);
259
EThread *thread = this_ethread();
260
ProxyMutex *mutex = thread->mutex;
262
MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
264
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
265
return 0; // lock miss, retry later
270
e = ClusterVCCacheEntryAlloc.alloc();
273
hash_table[index].enqueue(e);
274
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
280
ClusterVConnectionCache::lookup(INK_MD5 * key)
282
int index = MD5ToIndex(key);
284
ClusterVConnection *vc = 0;
285
EThread *thread = this_ethread();
286
ProxyMutex *mutex = thread->mutex;
288
MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
290
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
291
return vc; // lock miss, retry later
294
e = hash_table[index].head;
296
if (*key == e->key) { // Hit
298
hash_table[index].remove(e);
299
ClusterVCCacheEntryAlloc.free(e);
300
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
308
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
309
return (ClusterVConnection *) - 1; // Miss
313
ClusterVConnectionCacheEvent::eventHandler(int event, Event * e)
315
NOWARN_UNUSED(event);
316
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
317
MUTEX_TRY_LOCK(lock, cache->hash_lock[hash_index], this_ethread());
319
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
320
e->schedule_in(HRTIME_MSECONDS(10));
323
// Perform purge action on unreferenced VC(s).
325
ClusterVConnectionCache::Entry * entry;
326
ClusterVConnectionCache::Entry * next_entry;
327
entry = cache->hash_table[hash_index].head;
330
if (entry->mark_for_delete) {
331
next_entry = entry->link.next;
333
cache->hash_table[hash_index].remove(entry);
334
entry->vc->allow_remote_close();
335
entry->vc->do_io(VIO::CLOSE);
337
ClusterVCCacheEntryAlloc.free(entry);
339
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
342
entry->mark_for_delete = true;
343
entry = entry->link.next;
347
// Setup for next purge event
349
e->schedule_in(HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
353
///////////////////////////////////////////////////////////////////////
355
////////////////////////////////////////////////////
357
// Global initializations for CacheContinuation
358
////////////////////////////////////////////////////
360
CacheContinuation::init()
363
for (n = 0; n < REMOTE_CONNECT_HASH; ++n)
364
remoteCacheContQueueMutex[n] = new_ProxyMutex();
366
GlobalOpenWriteVCcache = new ClusterVConnectionCache;
367
GlobalOpenWriteVCcache->init();
371
///////////////////////////////////////////////////////////////////////
373
// Main function to do a cluster cache operation
374
///////////////////////////////////////////////////////////////////////
376
CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
377
int user_opcode, char *data, int data_len, int nbytes, MIOBuffer * b)
379
CacheContinuation *cc = 0;
383
/////////////////////////////////////////////////////////////////////
384
// Unconditionally map open read buffer interfaces to open read.
385
// open read buffer interfaces are now deprecated.
386
/////////////////////////////////////////////////////////////////////
387
int opcode = user_opcode;
389
case CACHE_OPEN_READ_BUFFER:
390
opcode = CACHE_OPEN_READ;
392
case CACHE_OPEN_READ_BUFFER_LONG:
393
opcode = CACHE_OPEN_READ_LONG;
400
cc = cacheContAllocator_alloc();
401
cc->target_machine = mp;
402
cc->request_opcode = opcode;
403
cc->mutex = c->mutex;
405
cc->action.cancelled = false;
406
cc->start_time = ink_get_hrtime();
408
cc->result = op_failure(opcode);
409
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
410
& CacheContinuation::remoteOpEvent);
413
// set up sequence number so we can find this continuation
415
cc->target_ip = mp->ip;
416
cc->seq_number = new_cache_sequence_number();
418
// establish timeout for cache op
420
unsigned int hash = FOLDHASH(cc->target_ip, cc->seq_number);
421
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
424
// failed to acquire lock: no problem, retry later
425
cc->timeout = eventProcessor.schedule_in(cc, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
427
remoteCacheContQueue[hash].enqueue(cc);
428
MUTEX_RELEASE(queuelock);
429
cc->timeout = eventProcessor.schedule_in(cc, cache_cluster_timeout, ET_CACHE_CONT_SM);
433
// Determine the type of the "Over The Wire" (OTW) message header and
438
memset(data, 0, op_to_sizeof_fixedlen_msg(opcode));
441
"do_op opcode=%d seqno=%d Machine=0x%x data=0x%x datalen=%d mio=0x%x",
442
opcode, (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b);
445
case CACHE_OPEN_WRITE_BUFFER:
446
case CACHE_OPEN_WRITE_BUFFER_LONG:
448
ink_release_assert(!"write buffer not supported");
451
case CACHE_OPEN_READ_BUFFER:
452
case CACHE_OPEN_READ_BUFFER_LONG:
454
ink_release_assert(!"read buffer not supported");
457
case CACHE_OPEN_WRITE:
458
case CACHE_OPEN_READ:
460
ink_release_assert(c > 0);
461
//////////////////////
462
// Use short format //
463
//////////////////////
465
data_len = op_to_sizeof_fixedlen_msg(opcode);
466
data = (char *) ALLOCA_DOUBLE(data_len);
468
memset(data, 0, data_len);
472
CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
475
m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
476
m->md5 = *((CacheOpArgs_General *) args)->url_md5;
477
cc->url_md5 = m->md5;
478
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
479
m->frag_type = ((CacheOpArgs_General *) args)->frag_type;
480
if (opcode == CACHE_OPEN_WRITE) {
482
m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
488
if (opcode == CACHE_OPEN_READ) {
490
// Set upper limit on initial data received with response
491
// for open read response
493
m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
499
// Establish the local VC
501
int res = setup_local_vc(msg, data_len, cc, mp, &act);
503
/////////////////////////////////////////////////////
504
// Unable to setup local VC, request aborted.
505
// Remove request from pending list and deallocate.
506
/////////////////////////////////////////////////////
507
cc->remove_and_delete(0, (Event *) 0);
510
} else if (res != -1) {
511
///////////////////////////////////////
512
// VC established, send request
513
///////////////////////////////////////
517
//////////////////////////////////////////////////////
518
// Unable to setup VC, delay required, await callback
519
//////////////////////////////////////////////////////
524
case CACHE_OPEN_READ_LONG:
525
case CACHE_OPEN_WRITE_LONG:
527
ink_release_assert(c > 0);
528
//////////////////////
529
// Use long format //
530
//////////////////////
532
CacheOpMsg_long *m = (CacheOpMsg_long *) msg;
535
m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
536
m->url_md5 = *((CacheOpArgs_General *) args)->url_md5;
537
cc->url_md5 = m->url_md5;
538
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
540
m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
541
m->frag_type = (uint32_t) ((CacheOpArgs_General *) args)->frag_type;
543
if (opcode == CACHE_OPEN_READ_LONG) {
545
// Set upper limit on initial data received with response
546
// for open read response
548
m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
553
// Establish the local VC
555
int res = setup_local_vc(msg, data_len, cc, mp, &act);
557
/////////////////////////////////////////////////////
558
// Unable to setup local VC, request aborted.
559
// Remove request from pending list and deallocate.
560
/////////////////////////////////////////////////////
561
cc->remove_and_delete(0, (Event *) 0);
564
} else if (res != -1) {
565
///////////////////////////////////////
566
// VC established, send request
567
///////////////////////////////////////
571
//////////////////////////////////////////////////////
572
// Unable to setup VC, delay required, await callback
573
//////////////////////////////////////////////////////
581
//////////////////////
582
// Use short format //
583
//////////////////////
585
CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
588
m->frag_type = ((CacheOpArgs_Deref *) args)->frag_type;
589
m->cfl_flags = ((CacheOpArgs_Deref *) args)->cfl_flags;
590
if (opcode == CACHE_DEREF)
591
m->md5 = *((CacheOpArgs_Deref *) args)->md5;
593
m->md5 = *((CacheOpArgs_General *) args)->url_md5;
594
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
599
////////////////////////
600
// Use short_2 format //
601
////////////////////////
603
CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *) msg;
606
m->cfl_flags = ((CacheOpArgs_Link *) args)->cfl_flags;
607
m->md5_1 = *((CacheOpArgs_Link *) args)->from;
608
m->md5_2 = *((CacheOpArgs_Link *) args)->to;
609
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
610
m->frag_type = ((CacheOpArgs_Link *) args)->frag_type;
617
#ifdef CACHE_MSG_TRACE
618
log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
620
clusterProcessor.invoke_remote(mp,
621
op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
622
: CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
633
CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action ** act)
635
bool read_op = op_is_read(cc->request_opcode);
636
bool short_msg = op_is_shortform(cc->request_opcode);
638
// Alloc buffer, copy message and attach to continuation
639
cc->setMsgBufferLen(data_len);
640
cc->allocMsgBuffer();
641
memcpy(cc->getMsgBuffer(), data, data_len);
643
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
644
& CacheContinuation::localVCsetupEvent);
647
Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
649
Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
653
ClusterVConnection *vc;
655
if (!read_op && (cc->request_opcode == CACHE_OPEN_WRITE_LONG)) {
656
// Determine if the open_write has already been established.
657
vc = cc->lookupOpenWriteVC();
660
vc = clusterProcessor.open_local(cc, mp, cc->open_local_token,
661
(CLUSTER_OPT_ALLOW_IMMEDIATE |
662
(read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE)));
665
// Error, abort request
667
Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d",
668
(read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
670
Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d",
671
(read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
675
cc->timeout->cancel();
678
// Post async failure callback on a different continuation.
679
*act = callback_failure(&cc->action, (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
682
} else if (vc != CLUSTER_DELAYED_OPEN) {
683
// We have established the VC
685
cc->read_cluster_vc = vc;
687
cc->write_cluster_vc = vc;
689
cc->cluster_vc_channel = vc->channel;
690
vc->current_cont = cc;
693
CacheOpMsg_short *ms = (CacheOpMsg_short *) data;
694
ms->channel = vc->channel;
695
ms->token = cc->open_local_token;
697
"0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
698
(read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
700
CacheOpMsg_long *ml = (CacheOpMsg_long *) data;
701
ml->channel = vc->channel;
702
ml->token = cc->open_local_token;
704
"1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
705
(read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
708
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
709
& CacheContinuation::remoteOpEvent);
713
//////////////////////////////////////////////////////
714
// Unable to setup VC, delay required, await callback
715
//////////////////////////////////////////////////////
721
CacheContinuation::lookupOpenWriteVC()
723
///////////////////////////////////////////////////////////////
724
// See if we already have an open_write ClusterVConnection
725
// which was established in a previous remote open_read which
727
///////////////////////////////////////////////////////////////
728
ClusterVConnection *vc;
729
CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
731
vc = GlobalOpenWriteVCcache->lookup(&ml->url_md5);
733
if (vc == ((ClusterVConnection *) 0)) {
735
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
736
& CacheContinuation::lookupOpenWriteVCEvent);
738
// Note: In the lookupOpenWriteVCEvent handler, we use EVENT_IMMEDIATE
739
// to distinguish the lookup retry from a request timeout
740
// which uses EVENT_INTERVAL.
742
lookup_open_write_vc_event = eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
744
} else if (vc != ((ClusterVConnection *) - 1)) {
745
// Hit, found open_write VC in cache.
746
// Post open_write completion by simulating a
747
// remote cache op result message.
749
vc->action_ = action; // establish new continuation
751
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
752
& CacheContinuation::localVCsetupEvent);
753
this->handleEvent(CLUSTER_EVENT_OPEN_EXISTS, vc);
758
msglen = CacheOpReplyMsg::sizeof_fixedlen_msg();
759
msg.result = CACHE_EVENT_OPEN_WRITE;
760
msg.seq_number = seq_number;
761
msg.token = vc->token;
763
cache_op_result_ClusterFunction(from, (void *) &msg, msglen);
766
// Miss, establish local VC and send remote open_write request
768
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
769
& CacheContinuation::localVCsetupEvent);
770
vc = clusterProcessor.open_local(this, from, open_local_token,
771
(CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
773
this->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
775
} else if (vc != CLUSTER_DELAYED_OPEN) {
776
this->handleEvent(CLUSTER_EVENT_OPEN, vc);
779
return CLUSTER_DELAYED_OPEN; // force completion in callback
783
CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e)
785
if (event == EVENT_IMMEDIATE) {
786
// Retry open_write VC lookup
790
lookup_open_write_vc_event->cancel();
791
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
792
& CacheContinuation::localVCsetupEvent);
793
this->handleEvent(event, e);
799
CacheContinuation::remove_and_delete(int event, Event * e)
801
NOWARN_UNUSED(event);
802
unsigned int hash = FOLDHASH(target_ip, seq_number);
803
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
805
if (remoteCacheContQueue[hash].in(this)) {
806
remoteCacheContQueue[hash].remove(this);
808
MUTEX_RELEASE(queuelock);
809
if (use_deferred_callback)
810
callback_failure(&action, result, result_error, this);
812
cacheContAllocator_free(this);
815
SET_HANDLER((CacheContHandler) & CacheContinuation::remove_and_delete);
817
timeout = eventProcessor.schedule_in(this, cache_cluster_timeout, ET_CACHE_CONT_SM);
819
e->schedule_in(cache_cluster_timeout);
826
CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
828
ink_assert(magicno == (int) MagicNo);
829
ink_assert(getMsgBuffer());
830
bool short_msg = op_is_shortform(request_opcode);
831
bool read_op = op_is_read(request_opcode);
833
if (event == EVENT_INTERVAL) {
834
Event *e = (Event *) vc;
835
unsigned int hash = FOLDHASH(target_ip, seq_number);
837
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
839
e->schedule_in(CACHE_RETRY_PERIOD);
843
if (!remoteCacheContQueue[hash].in(this)) {
844
////////////////////////////////////////////////////
845
// Not yet queued on outstanding operations list
846
////////////////////////////////////////////////////
847
remoteCacheContQueue[hash].enqueue(this);
848
ink_assert(timeout == e);
849
MUTEX_RELEASE(queuelock);
850
e->schedule_in(cache_cluster_timeout);
854
/////////////////////////////////////////////////////
856
/////////////////////////////////////////////////////
857
remoteCacheContQueue[hash].remove(this);
858
MUTEX_RELEASE(queuelock);
859
Debug("cluster_timeout", "0cluster op timeout %d", seq_number);
860
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
861
timeout = (Event *) 1; // Note timeout
862
/////////////////////////////////////////////////////////////////
863
// Note: Failure callback is sent now, but the deallocation of
864
// the CacheContinuation is deferred until we receive the
865
// open_local() callback.
866
/////////////////////////////////////////////////////////////////
867
if (!action.cancelled)
868
action.continuation->handleEvent((read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
872
} else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS))
873
&& (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0)) {
875
now = ink_get_hrtime();
876
CLUSTER_SUM_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT, now - start_time);
877
LOG_EVENT_TIME(start_time, open_delay_time_dist, open_delay_events);
879
read_cluster_vc = vc;
881
write_cluster_vc = vc;
883
cluster_vc_channel = vc->channel;
884
vc->current_cont = this;
887
CacheOpMsg_short *ms = (CacheOpMsg_short *) getMsgBuffer();
888
ms->channel = vc->channel;
889
ms->token = open_local_token;
892
"2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
893
(read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
896
CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
897
ml->channel = vc->channel;
898
ml->token = open_local_token;
901
"3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=0x%x",
902
(read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
904
SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
906
if (event != CLUSTER_EVENT_OPEN_EXISTS) {
907
// Send request message
908
clusterProcessor.invoke_remote(from,
909
(op_needs_marshalled_coi(request_opcode) ?
910
CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
911
CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
915
int send_failure_callback = 1;
917
if (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0) {
919
Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d",
920
(read_op ? "R" : "W"), ((CacheOpMsg_short *) getMsgBuffer())->seq_number);
922
Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d",
923
(read_op ? "R" : "W"), ((CacheOpMsg_long *) getMsgBuffer())->seq_number);
927
Debug("cache_proto", "4open_local cancelled due to timeout, seqno=%d", seq_number);
930
// Deallocate VC if successfully acquired
932
if (event == CLUSTER_EVENT_OPEN) {
933
vc->pending_remote_fill = 0;
934
vc->remote_closed = 1; // avoid remote close msg
935
vc->do_io(VIO::CLOSE);
937
send_failure_callback = 0; // already sent.
941
this->timeout->cancel();
942
this->timeout = NULL;
945
if (send_failure_callback) {
947
// Action corresponding to "this" already sent back to user,
948
// use "this" to establish the failure callback after
949
// removing ourselves from the active list.
951
this->use_deferred_callback = true;
952
this->result = (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED);
953
this->result_error = 0;
954
remove_and_delete(0, (Event *) 0);
957
cacheContAllocator_free(this);
967
///////////////////////////////////////////////////////////////////////////
968
// cache_op_ClusterFunction()
969
// On the receiving side, handle a general cluster cache operation
970
///////////////////////////////////////////////////////////////////////////
972
////////////////////////////////////////////////////////////////////////
973
// Marshaling functions for OTW message headers
974
////////////////////////////////////////////////////////////////////////
976
inline CacheOpMsg_long *
977
unmarshal_CacheOpMsg_long(void *data, int NeedByteSwap)
980
((CacheOpMsg_long *) data)->SwapBytes();
981
return (CacheOpMsg_long *) data;
984
inline CacheOpMsg_short *
985
unmarshal_CacheOpMsg_short(void *data, int NeedByteSwap)
988
((CacheOpMsg_short *) data)->SwapBytes();
989
return (CacheOpMsg_short *) data;
992
inline CacheOpMsg_short_2 *
993
unmarshal_CacheOpMsg_short_2(void *data, int NeedByteSwap)
996
((CacheOpMsg_short_2 *) data)->SwapBytes();
997
return (CacheOpMsg_short_2 *) data;
1000
// init_from_long() support routine for cache_op_ClusterFunction()
1002
init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * m)
1004
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
1005
cont->seq_number = msg->seq_number;
1006
cont->cfl_flags = msg->cfl_flags;
1008
cont->url_md5 = msg->url_md5;
1009
cont->cluster_vc_channel = msg->channel;
1010
cont->frag_type = (CacheFragType) msg->frag_type;
1011
if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG)
1012
|| (cont->request_opcode == CACHE_OPEN_READ_LONG)) {
1013
cont->pin_in_cache = (time_t) msg->data;
1015
cont->pin_in_cache = 0;
1017
cont->token = msg->token;
1018
cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
1020
if (cont->request_opcode == CACHE_OPEN_READ_LONG) {
1021
cont->caller_buf_freebytes = msg->buffer_size;
1023
cont->caller_buf_freebytes = 0;
1027
// init_from_short() support routine for cache_op_ClusterFunction()
1029
init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine * m)
1031
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
1032
cont->seq_number = msg->seq_number;
1033
cont->cfl_flags = msg->cfl_flags;
1035
cont->url_md5 = msg->md5;
1036
cont->cluster_vc_channel = msg->channel;
1037
cont->token = msg->token;
1038
cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
1039
cont->frag_type = (CacheFragType) msg->frag_type;
1041
if (cont->request_opcode == CACHE_OPEN_WRITE) {
1042
cont->pin_in_cache = (time_t) msg->data;
1044
cont->pin_in_cache = 0;
1047
if (cont->request_opcode == CACHE_OPEN_READ) {
1048
cont->caller_buf_freebytes = msg->buffer_size;
1050
cont->caller_buf_freebytes = 0;
1054
// init_from_short_2() support routine for cache_op_ClusterFunction()
1056
init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMachine * m)
1058
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
1059
cont->seq_number = msg->seq_number;
1060
cont->cfl_flags = msg->cfl_flags;
1062
cont->url_md5 = msg->md5_1;
1063
cont->frag_type = (CacheFragType) msg->frag_type;
1067
cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
1069
EThread *thread = this_ethread();
1070
ProxyMutex *mutex = thread->mutex;
1071
////////////////////////////////////////////////////////
1072
// Note: we are running on the ET_CLUSTER thread
1073
////////////////////////////////////////////////////////
1074
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
1077
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
1079
if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { ////////////////////////////////////////////////
1080
// Convert from old to current message format
1081
////////////////////////////////////////////////
1082
ink_release_assert(!"cache_op_ClusterFunction() bad msg version");
1084
opcode = ((CacheOpMsg_long *) data)->opcode;
1086
// If necessary, create a continuation to reflect the response back
1088
CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
1089
c->mutex = new_ProxyMutex();
1090
MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
1091
c->request_opcode = opcode;
1093
c->start_time = ink_get_hrtime();
1094
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
1095
& CacheContinuation::replyOpEvent);
1098
case CACHE_OPEN_WRITE_BUFFER:
1099
case CACHE_OPEN_WRITE_BUFFER_LONG:
1100
ink_release_assert(!"cache_op_ClusterFunction WRITE_BUFFER not supported");
1103
case CACHE_OPEN_READ_BUFFER:
1104
case CACHE_OPEN_READ_BUFFER_LONG:
1105
ink_release_assert(!"cache_op_ClusterFunction READ_BUFFER not supported");
1108
case CACHE_OPEN_READ:
1110
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
1111
init_from_short(c, msg, from);
1113
"cache_op-s op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
1115
// Establish the remote side of the ClusterVConnection
1117
c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
1119
c->cluster_vc_channel,
1120
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
1121
if (!c->write_cluster_vc) {
1122
// Unable to setup channel, abort processing.
1123
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
1125
"1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
1126
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
1128
// Send cluster op failed reply
1129
c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
1133
c->write_cluster_vc->current_cont = c;
1135
ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
1136
ink_release_assert((opcode == CACHE_OPEN_READ)
1137
|| c->write_cluster_vc->pending_remote_fill);
1139
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
1140
& CacheContinuation::setupVCdataRead);
1141
Debug("cache_proto",
1142
"0read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
1143
msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
1144
#ifdef CACHE_MSG_TRACE
1145
log_cache_op_msg(msg->seq_number, len, "cache_op_open_read");
1147
CacheKey key(msg->md5);
1149
char *hostname = NULL;
1150
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
1152
hostname = (char *) msg->moi;
1154
Cache *call_cache = caches[c->frag_type];
1155
c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len);
1158
case CACHE_OPEN_READ_LONG:
1160
// Cache needs message data, copy it.
1161
c->setMsgBufferLen(len);
1162
c->allocMsgBuffer();
1163
memcpy(c->getMsgBuffer(), (char *) data, len);
1165
int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
1166
CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
1167
init_from_long(c, msg, from);
1169
"cache_op-l op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
1170
#ifdef CACHE_MSG_TRACE
1171
log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
1174
// Establish the remote side of the ClusterVConnection
1176
c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
1178
c->cluster_vc_channel,
1179
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
1180
if (!c->write_cluster_vc) {
1181
// Unable to setup channel, abort processing.
1182
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
1184
"2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
1185
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
1187
// Send cluster op failed reply
1188
c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
1192
c->write_cluster_vc->current_cont = c;
1194
ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
1195
ink_release_assert((opcode == CACHE_OPEN_READ_LONG)
1196
|| c->write_cluster_vc->pending_remote_fill);
1198
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
1199
& CacheContinuation::setupReadWriteVC);
1200
Debug("cache_proto",
1201
"1read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
1202
msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
1204
const char *p = (const char *) msg + flen;
1205
int moi_len = len - flen;
1208
ink_assert(moi_len > 0);
1210
// Unmarshal CacheHTTPHdr
1211
res = c->ic_request.unmarshal((char *) p, moi_len, NULL);
1212
ink_assert(res > 0);
1213
ink_assert(c->ic_request.valid());
1216
ink_assert(moi_len > 0);
1217
// Unmarshal CacheLookupHttpConfig
1218
c->ic_params = new(CacheLookupHttpConfigAllocator.alloc())
1219
CacheLookupHttpConfig();
1220
res = c->ic_params->unmarshal(&c->ic_arena, (const char *) p, moi_len);
1221
ink_assert(res > 0);
1226
CacheKey key(msg->url_md5);
1228
char *hostname = NULL;
1232
hostname = (char *) p;
1235
// Save hostname and attach it to the continuation since we may
1236
// need it if we convert this to an open_write.
1238
c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len));
1239
c->ic_hostname_len = host_len;
1241
memcpy(c->ic_hostname->data(), hostname, host_len);
1244
Cache *call_cache = caches[c->frag_type];
1245
Action *a = call_cache->open_read(c, &key, &c->ic_request,
1247
c->frag_type, hostname, host_len);
1248
// Get rid of purify warnings since 'c' can be freed by open_read.
1249
if (a != ACTION_RESULT_DONE) {
1250
c->cache_action = a;
1254
case CACHE_OPEN_WRITE:
1256
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
1257
init_from_short(c, msg, from);
1259
"cache_op-s op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
1260
#ifdef CACHE_MSG_TRACE
1261
log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
1264
// Establish the remote side of the ClusterVConnection
1266
c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
1268
c->cluster_vc_channel,
1269
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
1270
if (!c->read_cluster_vc) {
1271
// Unable to setup channel, abort processing.
1272
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
1274
"3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
1275
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
1277
// Send cluster op failed reply
1278
c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
1282
c->read_cluster_vc->current_cont = c;
1284
ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
1286
CacheKey key(msg->md5);
1288
char *hostname = NULL;
1289
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
1291
hostname = (char *) msg->moi;
1294
Cache *call_cache = caches[c->frag_type];
1295
Action *a = call_cache->open_write(c, &key, c->frag_type,
1296
!!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE),
1297
c->pin_in_cache, hostname, host_len);
1298
if (a != ACTION_RESULT_DONE) {
1299
c->cache_action = a;
1303
case CACHE_OPEN_WRITE_LONG:
1305
// Cache needs message data, copy it.
1306
c->setMsgBufferLen(len);
1307
c->allocMsgBuffer();
1308
memcpy(c->getMsgBuffer(), (char *) data, len);
1310
int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
1311
CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
1312
init_from_long(c, msg, from);
1314
"cache_op-l op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
1315
#ifdef CACHE_MSG_TRACE
1316
log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
1319
// Establish the remote side of the ClusterVConnection
1321
c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
1323
c->cluster_vc_channel,
1324
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
1325
if (!c->read_cluster_vc) {
1326
// Unable to setup channel, abort processing.
1327
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
1329
"4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
1330
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
1332
// Send cluster op failed reply
1333
c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
1337
c->read_cluster_vc->current_cont = c;
1339
ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
1341
CacheHTTPInfo *ci = 0;
1344
int moi_len = len - flen;
1346
if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) {
1347
p = (const char *) msg + flen;
1349
// Unmarshal old CacheHTTPInfo
1350
res = HTTPInfo::unmarshal((char *) p, moi_len, NULL);
1351
ink_assert(res > 0);
1352
c->ic_old_info.get_handle((char *) p, moi_len);
1353
ink_assert(c->ic_old_info.valid());
1354
ci = &c->ic_old_info;
1356
p = (const char *) 0;
1358
if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) {
1360
ci = (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES;
1365
CacheKey key(msg->url_md5);
1366
char *hostname = NULL;
1369
hostname = (char *) p;
1372
Cache *call_cache = caches[c->frag_type];
1373
Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache,
1374
NULL, c->frag_type, hostname, len);
1375
if (a != ACTION_RESULT_DONE) {
1376
c->cache_action = a;
1382
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
1383
init_from_short(c, msg, from);
1385
"cache_op op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
1386
#ifdef CACHE_MSG_TRACE
1387
log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
1389
CacheKey key(msg->md5);
1391
char *hostname = NULL;
1392
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
1394
hostname = (char *) msg->moi;
1397
Cache *call_cache = caches[c->frag_type];
1398
Action *a = call_cache->remove(c, &key, c->frag_type,
1399
!!(c->cfl_flags & CFL_REMOVE_USER_AGENTS),
1400
!!(c->cfl_flags & CFL_REMOVE_LINK),
1401
hostname, host_len);
1402
if (a != ACTION_RESULT_DONE) {
1403
c->cache_action = a;
1409
CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
1410
init_from_short_2(c, msg, from);
1412
"cache_op op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
1413
#ifdef CACHE_MSG_TRACE
1414
log_cache_op_msg(msg->seq_number, len, "cache_op_link");
1417
CacheKey key1(msg->md5_1);
1418
CacheKey key2(msg->md5_2);
1420
char *hostname = NULL;
1421
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
1423
hostname = (char *) msg->moi;
1426
Cache *call_cache = caches[c->frag_type];
1427
Action *a = call_cache->link(c, &key1, &key2, c->frag_type,
1428
hostname, host_len);
1429
if (a != ACTION_RESULT_DONE) {
1430
c->cache_action = a;
1436
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
1437
init_from_short(c, msg, from);
1439
"cache_op op=%d seqno=%d data=0x%x len=%d machine=0x%x", opcode, c->seq_number, data, len, from);
1440
#ifdef CACHE_MSG_TRACE
1441
log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
1444
CacheKey key(msg->md5);
1446
char *hostname = NULL;
1447
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
1449
hostname = (char *) msg->moi;
1452
Cache *call_cache = caches[c->frag_type];
1453
Action *a = call_cache->deref(c, &key, c->frag_type,
1454
hostname, host_len);
1455
if (a != ACTION_RESULT_DONE) {
1456
c->cache_action = a;
1463
ink_release_assert(0);
1469
cache_op_malloc_ClusterFunction(ClusterMachine * from, void *data, int len)
1471
cache_op_ClusterFunction(from, data, len);
1472
// We own the message data, free it back to the Cluster subsystem
1473
clusterProcessor.free_remote_data((char *) data, len);
1477
CacheContinuation::setupVCdataRead(int event, VConnection * vc)
1479
ink_assert(magicno == (int) MagicNo);
1481
// Setup the initial data read for the given Cache VC.
1482
// This data is sent back in the response message.
1484
if (event == CACHE_EVENT_OPEN_READ) {
1485
//////////////////////////////////////////
1486
// Allocate buffer and initiate read.
1487
//////////////////////////////////////////
1488
Debug("cache_proto", "setupVCdataRead CACHE_EVENT_OPEN_READ seqno=%d", seq_number);
1489
ink_release_assert(caller_buf_freebytes);
1490
SET_HANDLER((CacheContHandler) & CacheContinuation::VCdataRead);
1492
int64_t size_index = iobuffer_size_to_index(caller_buf_freebytes);
1493
MIOBuffer *buf = new_MIOBuffer(size_index);
1494
readahead_reader = buf->alloc_reader();
1496
MUTEX_TRY_LOCK(lock, mutex, this_ethread()); // prevent immediate callback
1497
readahead_vio = vc->do_io_read(this, caller_buf_freebytes, buf);
1501
// Error case, deflect processing to replyOpEvent.
1502
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
1503
return handleEvent(event, vc);
1508
CacheContinuation::VCdataRead(int event, VIO * target_vio)
1510
ink_release_assert(magicno == (int) MagicNo);
1511
ink_release_assert(readahead_vio == target_vio);
1513
VConnection *vc = target_vio->vc_server;
1514
int reply = CACHE_EVENT_OPEN_READ;
1515
int32_t object_size;
1520
if (!target_vio->ndone) {
1521
// Doc with zero byte body, handle as read failure
1526
case VC_EVENT_READ_READY:
1527
case VC_EVENT_READ_COMPLETE:
1530
int current_ndone = target_vio->ndone;
1532
ink_assert(current_ndone);
1533
ink_assert(current_ndone <= readahead_reader->read_avail());
1535
object_size = getObjectSize(vc, request_opcode, &cache_vc_info);
1536
have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone));
1538
// Use no more than the caller's max buffer limit
1540
clone_bytes = current_ndone;
1541
if (!have_all_data) {
1542
if (current_ndone > caller_buf_freebytes) {
1543
clone_bytes = caller_buf_freebytes;
1548
IOBufferBlock *tail;
1549
readahead_data = clone_IOBufferBlockList(readahead_reader->get_current_block(),
1550
readahead_reader->start_offset, clone_bytes, &tail);
1552
if (have_all_data) {
1553
// Close VC, since no more data and also to avoid VC_EVENT_EOS
1555
MIOBuffer *mbuf = target_vio->buffer.writer();
1556
vc->do_io(VIO::CLOSE);
1557
free_MIOBuffer(mbuf);
1560
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
1561
handleEvent(reply, vc);
1564
case VC_EVENT_ERROR:
1565
case VC_EVENT_INACTIVITY_TIMEOUT:
1566
case VC_EVENT_ACTIVE_TIMEOUT:
1570
// Read failed, deflect to replyOpEvent.
1572
MIOBuffer * mbuf = target_vio->buffer.writer();
1573
vc->do_io(VIO::CLOSE);
1574
free_MIOBuffer(mbuf);
1576
reply = CACHE_EVENT_OPEN_READ_FAILED;
1578
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
1579
handleEvent(reply, (VConnection *) - ECLUSTER_ORB_DATA_READ);
1586
CacheContinuation::setupReadWriteVC(int event, VConnection * vc)
1588
// Only handles OPEN_READ_LONG processing.
1591
case CACHE_EVENT_OPEN_READ:
1595
SET_HANDLER((CacheContHandler) & CacheContinuation::setupVCdataRead);
1596
return handleEvent(event, vc);
1599
case CACHE_EVENT_OPEN_READ_FAILED:
1601
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
1602
// HTTP open read failed, attempt open write now to avoid an additional
1603
// message round trip
1605
CacheKey key(url_md5);
1607
Cache *call_cache = caches[frag_type];
1608
Action *a = call_cache->open_write(this, &key, 0, pin_in_cache,
1609
NULL, frag_type, ic_hostname->data(),
1611
if (a != ACTION_RESULT_DONE) {
1615
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
1616
return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
1620
case CACHE_EVENT_OPEN_WRITE:
1622
// Convert from read to write connection
1624
ink_assert(!read_cluster_vc && write_cluster_vc);
1625
read_cluster_vc = write_cluster_vc;
1626
read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
1627
write_cluster_vc = 0;
1629
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
1630
return handleEvent(event, vc);
1633
case CACHE_EVENT_OPEN_WRITE_FAILED:
1636
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
1637
return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
1645
/////////////////////////////////////////////////////////////////////////
1647
// Reflect the (local) reply back to the (remote) requesting node.
1648
/////////////////////////////////////////////////////////////////////////
1650
CacheContinuation::replyOpEvent(int event, VConnection * cvc)
1652
ink_assert(magicno == (int) MagicNo);
1653
Debug("cache_proto", "replyOpEvent(this=%x,event=%d,VC=%x)", this, event, cvc);
1655
now = ink_get_hrtime();
1656
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
1657
LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
1658
ink_release_assert(expect_cache_callback);
1659
expect_cache_callback = false; // make sure we are called back exactly once
1663
bool open = event_is_open(event);
1664
bool read_op = op_is_read(request_opcode);
1665
bool open_read_now_open_write = false;
1667
// Reply message initializations
1668
CacheOpReplyMsg rmsg;
1669
CacheOpReplyMsg *msg = &rmsg;
1670
msg->result = event;
1672
if ((request_opcode == CACHE_OPEN_READ_LONG)
1673
&& cvc && (event == CACHE_EVENT_OPEN_WRITE)) {
1674
//////////////////////////////////////////////////////////////////////////
1675
// open read failed, but open write succeeded, set result to
1676
// CACHE_EVENT_OPEN_READ_FAILED and make result token non zero to
1677
// signal to the remote node that we have established a write connection.
1678
//////////////////////////////////////////////////////////////////////////
1679
msg->result = CACHE_EVENT_OPEN_READ_FAILED;
1680
open_read_now_open_write = true;
1683
msg->seq_number = seq_number;
1684
int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token
1688
int results_expected = 1;
1690
if (no_reply_message) // CACHE_NO_RESPONSE request
1695
// prepare for CACHE_OPEN_EVENT
1697
results_expected = 2;
1699
cache_read = (event == CACHE_EVENT_OPEN_READ);
1701
if (read_op && !open_read_now_open_write) {
1702
ink_release_assert(write_cluster_vc->pending_remote_fill);
1703
ink_assert(have_all_data || (readahead_vio == &((CacheVC *) cache_vc)->vio));
1704
Debug("cache_proto", "connect_local success seqno=%d alldata=%d", seq_number, (have_all_data ? 1 : 0));
1706
if (have_all_data) {
1707
msg->token.clear(); // Tell sender no conn established
1710
msg->token = token; // Tell sender conn established
1711
setupReadBufTunnel(cache_vc, write_cluster_vc);
1715
Debug("cache_proto", "cache_open [%s] success seqno=%d", (cache_read ? "R" : "W"), seq_number);
1716
msg->token = token; // Tell sender conn established
1718
OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
1719
pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex);
1720
read_cluster_vc->allow_remote_close();
1724
// For cache reads, marshal the associated CacheHTTPInfo in the reply
1728
if (!cache_vc_info.valid()) {
1729
(void) getObjectSize(cache_vc, request_opcode, &cache_vc_info);
1731
// Determine data length and allocate
1732
len = cache_vc_info.marshal_length();
1733
CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
1735
// Initialize reply message header
1738
// Marshal response data into reply message
1739
res = cache_vc_info.marshal((char *) reply + flen, len);
1740
ink_assert(res >= 0 && res <= len);
1742
// Make reply message the current message
1747
Debug("cache_proto", "cache operation failed result=%d seqno=%d (this=%x)", event, seq_number, this);
1748
msg->token.clear(); // Tell sender no conn established
1750
// Reallocate reply message, allowing for marshalled data
1751
len += sizeof(int32_t);
1752
CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
1754
// Initialize reply message header
1757
if (request_opcode != CACHE_LINK) {
1759
// open read/write failed, close preallocated VC
1761
if (read_cluster_vc) {
1762
read_cluster_vc->remote_closed = 1; // avoid remote close msg
1763
read_cluster_vc->do_io(VIO::CLOSE);
1765
if (write_cluster_vc) {
1766
write_cluster_vc->pending_remote_fill = 0;
1767
write_cluster_vc->remote_closed = 1; // avoid remote close msg
1768
write_cluster_vc->do_io(VIO::CLOSE);
1770
*((int32_t *) reply->moi) = (int32_t) ((uintptr_t) cvc & 0xffffffff); // code describing failure
1772
// Make reply message the current message
1775
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
1778
// Send reply message
1780
#ifdef CACHE_MSG_TRACE
1781
log_cache_op_sndmsg(msg->seq_number, 0, "replyOpEvent");
1783
vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
1784
if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
1786
// Transmit reply message and object data in same cluster message
1787
Debug("cache_proto", "Sending reply/data seqno=%d buflen=%d",
1788
seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
1789
clusterProcessor.invoke_remote_data(from,
1790
CACHE_OP_RESULT_CLUSTER_FUNCTION,
1791
(void *) msg, (flen + len),
1793
cluster_vc_channel, &token,
1794
&CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
1796
Debug("cache_proto", "Sending reply seqno=%d, (this=%x)", seq_number, this);
1797
clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
1798
(void *) msg, (flen + len), CLUSTER_OPT_STEAL);
1802
//////////////////////////////////////////////////////////////
1803
// Create the specified down rev version of this message
1804
//////////////////////////////////////////////////////////////
1805
ink_release_assert(!"replyOpEvent() bad msg version");
1810
if (results_expected <= 0) {
1811
Debug("cache_proto", "replyOpEvent: freeing this=%x", this, event, cvc);
1812
cacheContAllocator_free(this);
1818
CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * cluster_write_vc)
1820
////////////////////////////////////////////////////////////
1821
// Setup OneWayTunnel and tunnel close event handler.
1822
// Used in readahead processing on open read connections.
1823
////////////////////////////////////////////////////////////
1824
tunnel_cont = cacheContAllocator_alloc();
1825
tunnel_cont->mutex = this->mutex;
1826
SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)
1827
& CacheContinuation::tunnelClosedEvent);
1828
int64_t ravail = bytes_IOBufferBlockList(readahead_data, 1);
1830
tunnel_mutex = tunnel_cont->mutex;
1831
tunnel_closed = false;
1833
tunnel = OneWayTunnel::OneWayTunnel_alloc();
1834
readahead_reader->consume(ravail); // allow for bytes sent in initial reply
1835
tunnel->init(cache_read_vc, cluster_write_vc, tunnel_cont, readahead_vio, readahead_reader);
1836
tunnel_cont->action = this;
1837
tunnel_cont->tunnel = tunnel;
1838
tunnel_cont->tunnel_cont = tunnel_cont;
1840
// Disable cluster_write_vc
1841
((ClusterVConnection *) cluster_write_vc)->write.enabled = 0;
1843
// Disable cache read VC
1844
readahead_vio->nbytes = readahead_vio->ndone;
1846
/////////////////////////////////////////////////////////////////////
1847
// At this point, the OneWayTunnel is blocked awaiting a reenable
1848
// on both the source and target VCs. Reenable occurs after the
1849
// message containing the initial data and open read reply are sent.
1850
/////////////////////////////////////////////////////////////////////
1853
///////////////////////////////////////////////////////////////////////
1854
// Tunnnel exited event handler, used for readahead on open read.
1855
///////////////////////////////////////////////////////////////////////
1857
CacheContinuation::tunnelClosedEvent(int event, void *c)
1859
NOWARN_UNUSED(event);
1860
ink_assert(magicno == (int) MagicNo);
1861
// Note: We are called with the tunnel_mutex held.
1862
CacheContinuation *tc = (CacheContinuation *) c;
1863
ink_release_assert(tc->tunnel_cont == tc);
1864
CacheContinuation *real_cc = (CacheContinuation *) tc->action.continuation;
1867
// Notify the real continuation of the tunnel closed event
1868
real_cc->tunnel = 0;
1869
real_cc->tunnel_cont = 0;
1870
real_cc->tunnel_closed = true;
1872
OneWayTunnel::OneWayTunnel_free(tc->tunnel);
1873
cacheContAllocator_free(tc);
1878
////////////////////////////////////////////////////////////
1879
// Retry DisposeOfDataBuffer continuation
1880
////////////////////////////////////////////////////////////
1881
struct retryDisposeOfDataBuffer;
1882
typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler) (int, void *);
1883
struct retryDisposeOfDataBuffer:public Continuation
1885
CacheContinuation *c;
1887
int handleRetryEvent(int event, Event * e)
1889
if (CacheContinuation::handleDisposeEvent(event, c) == EVENT_DONE) {
1894
e->schedule_in(HRTIME_MSECONDS(10));
1898
retryDisposeOfDataBuffer(CacheContinuation * cont)
1899
: Continuation(new_ProxyMutex()), c(cont) {
1900
SET_HANDLER((rtryDisOfDBufHandler)
1901
& retryDisposeOfDataBuffer::handleRetryEvent);
1905
//////////////////////////////////////////////////////////////////
1906
// Callback from cluster to dispose of data passed in
1907
// call to invoke_remote_data().
1908
//////////////////////////////////////////////////////////////////
1910
CacheContinuation::disposeOfDataBuffer(void *d)
1913
CacheContinuation *cc = (CacheContinuation *) d;
1914
ink_assert(cc->have_all_data || cc->readahead_vio);
1915
ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *) cc->cache_vc)->vio));
1917
if (cc->have_all_data) {
1919
// All object data resides in the buffer, no OneWayTunnel
1920
// started and the Cache VConnection has already been closed.
1921
// Close write_cluster_vc and set remote close to avoid send of
1922
// close message to remote node.
1924
cc->write_cluster_vc->pending_remote_fill = 0;
1925
cc->write_cluster_vc->remote_closed = 1;
1926
cc->write_cluster_vc->do_io(VIO::CLOSE);
1927
cc->readahead_data = 0;
1929
cacheContAllocator_free(cc);
1932
cc->write_cluster_vc->pending_remote_fill = 0;
1933
cc->write_cluster_vc->allow_remote_close();
1934
if (handleDisposeEvent(0, cc) == EVENT_CONT) {
1935
// Setup retry continuation.
1936
retryDisposeOfDataBuffer *retryCont = NEW(new retryDisposeOfDataBuffer(cc));
1937
eventProcessor.schedule_in(retryCont, HRTIME_MSECONDS(10), ET_CALL);
1943
CacheContinuation::handleDisposeEvent(int event, CacheContinuation * cc)
1945
NOWARN_UNUSED(event);
1946
ink_assert(cc->magicno == (int) MagicNo);
1947
MUTEX_TRY_LOCK(lock, cc->tunnel_mutex, this_ethread());
1949
// Write of initial object data is complete.
1951
if (!cc->tunnel_closed) {
1952
// Start tunnel by reenabling source and target VCs.
1954
cc->tunnel->vioSource->nbytes = getObjectSize(cc->tunnel->vioSource->vc_server, cc->request_opcode, 0);
1955
cc->tunnel->vioSource->reenable_re();
1956
cc->tunnel->vioTarget->reenable();
1958
// Tell tunnel event we are gone
1959
cc->tunnel_cont->action.continuation = 0;
1961
cacheContAllocator_free(cc);
1965
// Lock acquire failed, retry operation.
1970
/////////////////////////////////////////////////////////////////////////////
1971
// cache_op_result_ClusterFunction()
1972
// Invoked on the machine which initiated a remote op, this
1973
// unmarshals the result and calls a continuation in the requesting thread.
1974
/////////////////////////////////////////////////////////////////////////////
1976
cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
1979
////////////////////////////////////////////////////////
1980
// Note: we are running on the ET_CACHE_CONT_SM thread
1981
////////////////////////////////////////////////////////
1983
// Copy reply message data
1984
Ptr<IOBufferData> iob = new_IOBufferData(iobuffer_size_to_index(l));
1985
memcpy(iob->data(), (char *) d, l);
1986
char *data = iob->data();
1989
CacheOpReplyMsg *msg = (CacheOpReplyMsg *) data;
1990
int32_t op_result_error = 0;
1991
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
1993
if (mh->GetMsgVersion() != CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) { ////////////////////////////////////////////////
1994
// Convert from old to current message format
1995
////////////////////////////////////////////////
1996
ink_release_assert(!"cache_op_result_ClusterFunction() bad msg version");
1999
flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
2000
if (mh->NeedByteSwap())
2003
Debug("cluster_cache", "received cache op result, seqno=%d result=%d", msg->seq_number, msg->result);
2005
// If applicable, unmarshal any response data
2006
if ((len > flen) && event_reply_may_have_moi(msg->result)) {
2007
switch (msg->result) {
2008
case CACHE_EVENT_OPEN_READ:
2010
char *p = (char *) msg + flen;
2013
// Unmarshal CacheHTTPInfo
2014
res = HTTPInfo::unmarshal(p, len, NULL);
2015
ci.get_handle(p, len);
2016
ink_assert(res > 0);
2017
ink_assert(ci.valid());
2020
case CACHE_EVENT_LINK:
2021
case CACHE_EVENT_LINK_FAILED:
2023
case CACHE_EVENT_OPEN_READ_FAILED:
2024
case CACHE_EVENT_OPEN_WRITE_FAILED:
2025
case CACHE_EVENT_REMOVE_FAILED:
2026
case CACHE_EVENT_UPDATE_FAILED:
2027
case CACHE_EVENT_DEREF_FAILED:
2029
// Unmarshal the error code
2030
ink_assert(((len - flen) == sizeof(int32_t)));
2031
op_result_error = *(int32_t *) msg->moi;
2032
if (mh->NeedByteSwap())
2033
swap32((uint32_t *) & op_result_error);
2034
op_result_error = -op_result_error;
2039
ink_release_assert(!"invalid moi data for received msg");
2044
// See if this response is still expected (expected case == yes)
2046
unsigned int hash = FOLDHASH(from->ip, msg->seq_number);
2047
EThread *thread = this_ethread();
2048
ProxyMutex *mutex = thread->mutex;
2049
if (MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], thread)) {
2051
// Find it in pending list
2053
CacheContinuation *c = find_cache_continuation(msg->seq_number,
2056
// Reply took to long, response no longer expected.
2057
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
2058
Debug("cluster_timeout", "0cache reply timeout: %d", msg->seq_number);
2059
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
2064
// Try to send the message
2066
MUTEX_TRY_LOCK(lock, c->mutex, thread);
2068
// Failed to acquire lock, defer
2071
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
2074
c->result_error = op_result_error;
2076
// send message, release lock
2080
// Unmarshaled CacheHTTPInfo contained in reply message, copy it.
2081
c->setMsgBufferLen(len, iob);
2082
c->ic_new_info = ci;
2084
msg->seq_number = len; // HACK ALERT: reusing variable
2085
c->handleEvent(CACHE_EVENT_RESPONSE_MSG, data);
2089
// Failed to wake it up, defer by creating a timed continuation
2092
CacheContinuation * c = CacheContinuation::cacheContAllocator_alloc();
2093
c->mutex = new_ProxyMutex();
2094
c->seq_number = msg->seq_number;
2095
c->target_ip = from->ip;
2096
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
2097
& CacheContinuation::handleReplyEvent);
2098
c->start_time = ink_get_hrtime();
2099
c->result = msg->result;
2100
if (event_is_open(msg->result))
2101
c->token = msg->token;
2103
// Unmarshaled CacheHTTPInfo contained in reply message, copy it.
2104
c->setMsgBufferLen(len, iob);
2105
c->ic_new_info = ci;
2107
c->result_error = op_result_error;
2108
eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
2112
////////////////////////////////////////////////////////////////////////
2113
// handleReplyEvent()
2114
// If we cannot acquire any of the locks to handle the response
2115
// inline, it is defered and later handled by this function.
2116
////////////////////////////////////////////////////////////////////////
2118
CacheContinuation::handleReplyEvent(int event, Event * e)
2122
// take lock on outstanding message queue
2124
EThread *t = e->ethread;
2125
unsigned int hash = FOLDHASH(target_ip, seq_number);
2127
if (!MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], t)) {
2128
e->schedule_in(CACHE_RETRY_PERIOD);
2132
LOG_EVENT_TIME(start_time, cntlck_acquire_time_dist, cntlck_acquire_events);
2134
// See if this response is still expected
2136
CacheContinuation *c = find_cache_continuation(seq_number, target_ip);
2139
// Acquire the lock to the continuation mutex
2141
MUTEX_TRY_LOCK(lock, c->mutex, e->ethread);
2144
// If we fail to acquire the lock, reschedule
2146
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
2147
e->schedule_in(CACHE_RETRY_PERIOD);
2151
// If unmarshalled CacheHTTPInfo exists, pass it along
2153
if (ic_new_info.valid()) {
2155
c->setMsgBufferLen(getMsgBufferLen(), getMsgBufferIOBData());
2156
c->ic_new_info = ic_new_info;
2157
ic_new_info.clear();
2159
// send message, release lock
2161
c->handleEvent(CACHE_EVENT_RESPONSE, this);
2164
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
2165
Debug("cluster_timeout", "cache reply timeout: %d", seq_number);
2166
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
2169
// Free this continuation
2171
cacheContAllocator_free(this);
2175
//////////////////////////////////////////////////////////////////////////
2177
// On the requesting node, handle the timeout and response to the user.
2178
// There may be two CacheContinuations involved:
2179
// 1) One waiting to respond to the user.
2180
// This case is CACHE_EVENT_RESPONSE_MSG which is handled
2181
// inline (without delay).
2182
// 2) One which is carrying the response from the remote machine which
2183
// has been delayed for a lock. This case is CACHE_EVENT_RESPONSE.
2184
//////////////////////////////////////////////////////////////////////////
2186
CacheContinuation::remoteOpEvent(int event_code, Event * e)
2188
ink_assert(magicno == (int) MagicNo);
2189
int event = event_code;
2193
if (event != EVENT_INTERVAL) {
2194
if (event == CACHE_EVENT_RESPONSE) {
2195
CacheContinuation *ccont = (CacheContinuation *) e;
2196
res = ccont->result;
2198
CacheOpReplyMsg *rmsg = (CacheOpReplyMsg *) e;
2201
if ((res == CACHE_EVENT_LOOKUP) || (res == CACHE_EVENT_LOOKUP_FAILED)) {
2202
now = ink_get_hrtime();
2203
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, now - start_time);
2204
LOG_EVENT_TIME(start_time, lkrmt_callback_time_dist, lkrmt_cache_callbacks);
2206
now = ink_get_hrtime();
2207
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, now - start_time);
2208
LOG_EVENT_TIME(start_time, rmt_callback_time_dist, rmt_cache_callbacks);
2213
// for CACHE_EVENT_RESPONSE/XXX the lock was acquired at the higher level
2214
intptr_t return_error = 0;
2215
ClusterVCToken *pToken = NULL;
2221
ink_assert(!"bad case");
2224
case EVENT_INTERVAL:{
2226
unsigned int hash = FOLDHASH(target_ip, seq_number);
2228
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
2230
e->schedule_in(CACHE_RETRY_PERIOD);
2233
// we are not yet enqueued on the list of outstanding operations
2235
if (!remoteCacheContQueue[hash].in(this)) {
2236
remoteCacheContQueue[hash].enqueue(this);
2237
ink_assert(timeout == e);
2238
MUTEX_RELEASE(queuelock);
2239
e->schedule_in(cache_cluster_timeout);
2242
// a timeout has occurred
2244
if (find_cache_continuation(seq_number, target_ip)) {
2246
MUTEX_RELEASE(queuelock);
2248
Debug("cluster_timeout", "cluster op timeout %d", seq_number);
2249
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
2250
request_timeout = true;
2253
// Post error completion now and defer deallocation of
2254
// the continuation until we receive the reply or the
2255
// target node goes down.
2257
if (!action.cancelled)
2258
action.continuation->handleEvent(result, (void *) -ECLUSTER_OP_TIMEOUT);
2259
action.cancelled = 1;
2261
if (target_machine->dead) {
2262
event = CACHE_EVENT_RESPONSE_MSG;
2266
e->schedule_in(cache_cluster_timeout);
2271
// timeout not expected for continuation; log and ignore
2272
MUTEX_RELEASE(queuelock);
2273
Debug("cluster_timeout", "unknown cluster op timeout %d", seq_number);
2274
Note("Unexpected CacheCont timeout, [%u.%u.%u.%u] seqno=%d", DOT_SEPARATED(target_ip), seq_number);
2275
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
2280
case CACHE_EVENT_RESPONSE:
2281
case CACHE_EVENT_RESPONSE_MSG:{
2283
// the response has arrived, cancel timeout
2289
// remove from the pending queue
2290
unsigned int hash = FOLDHASH(target_ip, seq_number);
2292
remoteCacheContQueue[hash].remove(this);
2293
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], this_ethread());
2297
case CACHE_EVENT_RESPONSE_RETRY:{
2299
// determine result code
2301
CacheContinuation *c = (CacheContinuation *) e;
2302
CacheOpReplyMsg *msg = (CacheOpReplyMsg *) e;
2303
if (event == CACHE_EVENT_RESPONSE_MSG) {
2304
result = (request_timeout ? result : msg->result);
2305
pToken = (request_timeout ? &token : &msg->token);
2306
} else if (event == CACHE_EVENT_RESPONSE) {
2307
result = (request_timeout ? result : c->result);
2309
} else if (event == CACHE_EVENT_RESPONSE_RETRY) {
2312
ink_release_assert(!"remoteOpEvent bad event code");
2317
if (result == CACHE_EVENT_LOOKUP) {
2318
callback_user(result, 0);
2321
} else if (event_is_open(result)) {
2322
bool read_op = ((request_opcode == CACHE_OPEN_READ)
2323
|| (request_opcode == CACHE_OPEN_READ_LONG));
2325
ink_release_assert(read_cluster_vc->pending_remote_fill > 1);
2326
read_cluster_vc->pending_remote_fill = 0;
2328
have_all_data = pToken->is_clear(); // no conn implies all data
2329
if (have_all_data) {
2330
read_cluster_vc->have_all_data = 1;
2332
read_cluster_vc->have_all_data = 0;
2334
// Move CacheHTTPInfo reply data into VC
2335
read_cluster_vc->marshal_buf = this->getMsgBufferIOBData();
2336
read_cluster_vc->alternate = this->ic_new_info;
2337
this->ic_new_info.clear();
2338
ink_release_assert(read_cluster_vc->alternate.object_size_get());
2340
if (!action.cancelled) {
2341
ClusterVConnection *target_vc = read_cluster_vc;
2342
callback_user(result, target_vc); // "this" is deallocated
2343
target_vc->allow_remote_close();
2345
read_cluster_vc->allow_remote_close();
2346
read_cluster_vc->do_io(VIO::ABORT);
2347
cacheContAllocator_free(this);
2351
ink_assert(result == CACHE_EVENT_OPEN_WRITE);
2352
ink_assert(!pToken->is_clear());
2354
ClusterVConnection *result_vc = write_cluster_vc;
2355
if (!action.cancelled) {
2356
callback_user(result, result_vc);
2357
result_vc->allow_remote_close();
2359
result_vc->allow_remote_close();
2360
result_vc->do_io(VIO::ABORT);
2361
cacheContAllocator_free(this);
2370
// Handle failure cases
2372
if (result == CACHE_EVENT_LOOKUP_FAILED) {
2375
// check for local probes
2377
ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
2379
// if the current configuration indicates that this
2380
// machine is the master (or the owner machine has failed), go to
2381
// the local machine. Also if PROBE_LOCAL_CACHE_LAST.
2383
int len = getMsgBufferLen();
2384
char *hostname = (len ? getMsgBuffer() : 0);
2386
if (!m || PROBE_LOCAL_CACHE_LAST) {
2387
SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
2388
CacheKey key(url_md5);
2390
Cache *call_cache = caches[frag_type];
2391
call_cache->lookup(this, &key, frag_type, hostname, len);
2394
if (PROBE_LOCAL_CACHE_FIRST) {
2395
callback_user(CACHE_EVENT_LOOKUP_FAILED, 0);
2397
SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
2398
CacheKey key(url_md5);
2400
Cache *call_cache = caches[frag_type];
2401
call_cache->lookup(this, &key, frag_type, hostname, len);
2406
// Handle failure of all ops except for lookup
2408
ClusterVConnection *cacheable_vc = 0;
2409
if ((request_opcode == CACHE_OPEN_READ_LONG) && !pToken->is_clear()) {
2410
ink_assert(read_cluster_vc && !write_cluster_vc);
2412
// OPEN_READ_LONG has failed, but the remote node was able to
2413
// establish an OPEN_WRITE_LONG connection.
2414
// Convert the cluster read VC to a write VC and insert it
2415
// into the global write VC cache. This will allow us to
2416
// locally resolve the subsequent OPEN_WRITE_LONG request.
2419
// Note: We do not allow remote close on this VC while
2420
// it resides in cache
2422
read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
2423
// FIX ME. ajitb 12/21/99
2424
// Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
2425
// Does not accept assignment of ((Continuation *) NULL)
2427
Continuation *temp = NULL;
2428
read_cluster_vc->action_ = temp;
2430
if (!GlobalOpenWriteVCcache->insert(&url_md5, read_cluster_vc)) {
2431
// Unable to insert VC into cache, try later
2432
cacheable_vc = read_cluster_vc;
2434
read_cluster_vc = 0;
2436
if (read_cluster_vc) {
2437
read_cluster_vc->remote_closed = 0; // send remote close
2438
read_cluster_vc->allow_remote_close();
2439
read_cluster_vc->do_io(VIO::ABORT);
2440
read_cluster_vc = 0;
2442
if (write_cluster_vc) {
2443
write_cluster_vc->remote_closed = 0; // send remote close
2444
write_cluster_vc->allow_remote_close();
2445
write_cluster_vc->do_io(VIO::ABORT);
2446
write_cluster_vc = 0;
2448
if (!request_timeout) {
2449
if (!return_error) {
2450
return_error = result_error;
2453
insert_cache_callback_user(cacheable_vc, result, (void *) return_error);
2455
callback_user(result, (void *) return_error);
2458
// callback already made at timeout, just free continuation
2460
cacheable_vc->allow_remote_close();
2461
cacheable_vc->do_io(VIO::CLOSE);
2464
cacheContAllocator_free(this);
2470
//////////////////////////////////////////////////////////////////////////
2471
// probeLookupEvent()
2472
// After a local probe, return the response to the client and cleanup.
2473
//////////////////////////////////////////////////////////////////////////
2476
CacheContinuation::probeLookupEvent(int event, void *d)
2479
ink_assert(magicno == (int) MagicNo);
2480
callback_user(event, 0);
2484
///////////////////////////////////////////////////////////
2486
// Result of a local lookup for PROBE_LOCAL_CACHE_FIRST
2487
///////////////////////////////////////////////////////////
2489
CacheContinuation::lookupEvent(int event, void *d)
2491
NOWARN_UNUSED(event);
2493
ink_release_assert(!"Invalid call CacheContinuation::lookupEvent");
2500
//////////////////////////////////////////////////////////////////////////
2501
// do_remote_lookup()
2502
// If the object is supposed to be on a remote machine, probe there.
2503
// Returns: Non zero (Action *) if a probe was initiated
2504
// Zero (Action *) if no probe
2505
//////////////////////////////////////////////////////////////////////////
2507
CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
2508
CacheContinuation * c, CacheFragType ft, char *hostname, int hostname_len)
2510
int probe_depth = 0;
2511
ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH] = { 0 };
2512
int mlen = op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP) + ((hostname && hostname_len) ? hostname_len : 0);
2513
CacheLookupMsg *msg = (CacheLookupMsg *) ALLOCA_DOUBLE(mlen);
2515
memset((char *) msg, 0, mlen);
2521
msg->url_md5 = *key;
2524
msg->url_md5 = c->url_md5;
2527
ClusterMachine *m = NULL;
2529
if (cache_migrate_on_demand) {
2530
m = cluster_machine_at_depth(cache_hash(msg->url_md5),
2531
c ? &c->probe_depth : &probe_depth, c ? c->past_probes : past_probes);
2534
// If migrate-on-demand is off, do not probe beyond one level.
2536
if (c && c->probe_depth)
2537
return (Action *) 0;
2538
m = cluster_machine_at_depth(cache_hash(msg->url_md5));
2544
return (Action *) 0;
2546
// If we do not have a continuation, build one
2549
c = cacheContAllocator_alloc();
2550
c->mutex = cont->mutex;
2551
c->probe_depth = probe_depth;
2552
memcpy(c->past_probes, past_probes, sizeof(past_probes));
2554
// Save hostname data in case we need to do a local lookup.
2555
if (hostname && hostname_len) {
2556
// Alloc buffer, copy hostname data and attach to continuation
2557
c->setMsgBufferLen(hostname_len);
2558
c->allocMsgBuffer();
2559
memcpy(c->getMsgBuffer(), hostname, hostname_len);
2562
c->url_md5 = msg->url_md5;
2563
c->action.cancelled = false;
2565
c->start_time = ink_get_hrtime();
2566
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
2567
& CacheContinuation::remoteOpEvent);
2568
c->result = CACHE_EVENT_LOOKUP_FAILED;
2570
// set up sequence number so we can find this continuation
2572
c->target_ip = m->ip;
2573
c->seq_number = new_cache_sequence_number();
2574
msg->seq_number = c->seq_number;
2576
msg->frag_type = ft;
2578
// establish timeout for lookup
2580
unsigned int hash = FOLDHASH(c->target_ip, c->seq_number);
2581
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
2583
// failed to acquire lock: no problem, retry later
2584
c->timeout = eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
2586
remoteCacheContQueue[hash].enqueue(c);
2587
MUTEX_RELEASE(queuelock);
2588
c->timeout = eventProcessor.schedule_in(c, cache_cluster_timeout, ET_CACHE_CONT_SM);
2593
int vers = CacheLookupMsg::protoToVersion(m->msg_proto_major);
2595
if (vers == CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {
2596
msg->seq_number = c->seq_number;
2597
data = (char *) msg;
2599
if (hostname && hostname_len) {
2600
memcpy(msg->moi, hostname, hostname_len);
2603
//////////////////////////////////////////////////////////////
2604
// Create the specified down rev version of this message
2605
//////////////////////////////////////////////////////////////
2606
ink_release_assert(!"CacheLookupMsg bad msg version");
2611
#ifdef CACHE_MSG_TRACE
2612
log_cache_op_sndmsg(msg.seq_number, 0, "cache_lookup");
2614
clusterProcessor.invoke_remote(m, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
2619
////////////////////////////////////////////////////////////////////////////
2620
// cache_lookup_ClusterFunction()
2621
// This function is invoked on a remote machine to do a remote lookup.
2622
// It unmarshals the URL and does a local lookup, with its own
2623
// continuation set to CacheContinuation::replyLookupEvent()
2624
////////////////////////////////////////////////////////////////////////////
2626
cache_lookup_ClusterFunction(ClusterMachine * from, void *data, int len)
2630
EThread *thread = this_ethread();
2631
ProxyMutex *mutex = thread->mutex;
2632
////////////////////////////////////////////////////////
2633
// Note: we are running on the ET_CLUSTER thread
2634
////////////////////////////////////////////////////////
2636
CacheLookupMsg *msg = (CacheLookupMsg *) data;
2637
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
2639
if (mh->GetMsgVersion() != CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) { ////////////////////////////////////////////////
2640
// Convert from old to current message format
2641
////////////////////////////////////////////////
2642
ink_release_assert(!"cache_lookup_ClusterFunction() bad msg version");
2645
if (mh->NeedByteSwap())
2648
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
2650
CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
2651
c->mutex = new_ProxyMutex();
2652
MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
2653
c->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
2654
c->seq_number = msg->seq_number;
2656
c->url_md5 = msg->url_md5;
2657
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
2658
& CacheContinuation::replyLookupEvent);
2660
CacheKey key(msg->url_md5);
2661
#ifdef CACHE_MSG_TRACE
2662
log_cache_op_msg(msg->seq_number, 0, "cache_lookup");
2665
// Extract hostname data if passed.
2668
int hostname_len = len - op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP);
2669
hostname = (hostname_len ? (char *) msg->moi : 0);
2671
// Note: Hostname data invalid after return from lookup
2672
Cache *call_cache = caches[msg->frag_type];
2673
call_cache->lookup(c, &key, (CacheFragType) msg->frag_type, hostname, hostname_len);
2676
/////////////////////////////////////////////////////////////////////////
2677
// replyLookupEvent()
2678
// This function handles the result of a lookup on a remote machine.
2679
// It packages up the result and sends it back to the calling machine.
2680
/////////////////////////////////////////////////////////////////////////
2682
CacheContinuation::replyLookupEvent(int event, void *d)
2686
now = ink_get_hrtime();
2687
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
2688
LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
2690
int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
2691
if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
2692
CacheOpReplyMsg *msg;
2693
int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
2695
msg = (CacheOpReplyMsg *) xmalloc(flen);
2697
msg = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen);
2700
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
2701
int len = flen - sizeof(msg->token);
2703
if (!no_reply_message) {
2704
msg->seq_number = seq_number;
2705
msg->result = event;
2706
#ifdef CACHE_MSG_TRACE
2707
log_cache_op_sndmsg(seq_number, event, "cache_result");
2709
clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
2715
//////////////////////////////////////////////////////////////
2716
// Create the specified down rev version of this message
2717
//////////////////////////////////////////////////////////////
2718
ink_release_assert(!"replyLookupEvent() bad msg version");
2721
// Free up everything
2723
cacheContAllocator_free(this);
2727
int32_t CacheContinuation::getObjectSize(VConnection * vc, int opcode, CacheHTTPInfo * ret_ci)
2729
CacheHTTPInfo *ci = 0;
2730
int64_t object_size = 0;
2732
if ((opcode == CACHE_OPEN_READ_LONG)
2733
|| (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
2735
((CacheVC *) vc)->get_http_info(&ci);
2737
object_size = ci->object_size_get();
2745
object_size = ((CacheVC *)vc)->get_object_size();
2748
if (ret_ci && !ret_ci->valid()) {
2756
new_ci.object_size_set(object_size);
2758
new_ci.m_alt->m_writeable = 1;
2759
ret_ci->copy_shallow(&new_ci);
2761
ink_release_assert(object_size);
2765
//////////////////////////////////////////////////////////////////////////
2766
// insert_cache_callback_user()
2767
// Insert write VC into global cache prior to performing user callback.
2768
//////////////////////////////////////////////////////////////////////////
2770
CacheContinuation::insert_cache_callback_user(ClusterVConnection * vc, int res, void *e)
2772
if (GlobalOpenWriteVCcache->insert(&url_md5, vc)) {
2774
callback_user(res, e);
2777
// Unable to insert, try later
2780
callback_data_2 = (void *) vc;
2781
SET_HANDLER((CacheContHandler) & CacheContinuation::insertCallbackEvent);
2782
eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
2787
CacheContinuation::insertCallbackEvent(int event, Event * e)
2789
NOWARN_UNUSED(event);
2791
if (GlobalOpenWriteVCcache->insert(&url_md5, (ClusterVConnection *)
2794
callback_user(result, callback_data);
2797
// Unable to insert, try later
2798
eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
2803
///////////////////////////////////////////////////////////////////
2805
// Invoke handleEvent on the given continuation (cont) with
2806
// considerations for Action.
2807
///////////////////////////////////////////////////////////////////
2809
CacheContinuation::callback_user(int res, void *e)
2811
EThread *et = this_ethread();
2813
if (!is_ClusterThread(et)) {
2814
MUTEX_TRY_LOCK(lock, mutex, et);
2816
if (!action.cancelled) {
2817
action.continuation->handleEvent(res, e);
2819
cacheContAllocator_free(this);
2822
// Unable to acquire lock, retry later
2823
defer_callback_result(res, e);
2826
// Can not post completion on ET_CLUSTER thread.
2827
defer_callback_result(res, e);
2832
CacheContinuation::defer_callback_result(int r, void *e)
2836
SET_HANDLER((CacheContHandler) & CacheContinuation::callbackResultEvent);
2837
eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
2841
CacheContinuation::callbackResultEvent(int event, Event * e)
2843
NOWARN_UNUSED(event);
2845
if (!action.cancelled)
2846
action.continuation->handleEvent(result, callback_data);
2847
cacheContAllocator_free(this);
2851
//-----------------------------------------------------------------
2852
// CacheContinuation static member functions
2853
//-----------------------------------------------------------------
2855
///////////////////////////////////////////////////////////////////////
2856
// cacheContAllocator_alloc()
2857
///////////////////////////////////////////////////////////////////////
2859
CacheContinuation::cacheContAllocator_alloc()
2861
return cacheContAllocator.alloc();
2865
///////////////////////////////////////////////////////////////////////
2866
// cacheContAllocator_free()
2867
///////////////////////////////////////////////////////////////////////
2869
CacheContinuation::cacheContAllocator_free(CacheContinuation * c)
2871
ink_assert(c->magicno == (int) MagicNo);
2872
// ink_assert(!c->cache_op_ClusterFunction);
2874
#ifdef ENABLE_TIME_TRACE
2879
// FIX ME. ajitb 12/21/99
2880
// Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
2881
// Does not accept assignment of ((Continuation *) NULL)
2883
Continuation *temp = NULL;
2886
c->tunnel_mutex = NULL;
2887
cacheContAllocator.free(c);
2890
/////////////////////////////////////////////////////////////////////////
2891
// callback_failure()
2892
// Post error completion using a continuation.
2893
/////////////////////////////////////////////////////////////////////////
2895
CacheContinuation::callback_failure(Action * a, int result, int err, CacheContinuation * this_cc)
2897
CacheContinuation *cc;
2899
cc = cacheContAllocator_alloc();
2900
cc->mutex = a->mutex;
2906
cc->result = result;
2907
cc->result_error = err;
2908
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
2909
& CacheContinuation::callbackEvent);
2910
eventProcessor.schedule_imm(cc, ET_CACHE_CONT_SM);
2914
///////////////////////////////////////////////////////////////////////
2916
// Invoke callback and deallocate continuation.
2917
///////////////////////////////////////////////////////////////////////
2919
CacheContinuation::callbackEvent(int event, Event * e)
2921
NOWARN_UNUSED(event);
2923
if (!action.cancelled)
2924
action.continuation->handleEvent(result, (void *)(intptr_t)result_error);
2925
cacheContAllocator_free(this);
2929
//------------------------------------------------------------------
2930
// File static functions
2931
//------------------------------------------------------------------
2933
////////////////////////////////////////////////////////////////////////
2934
// find_cache_continuation()
2935
// Find a currently pending cache continuation expecting a response.
2936
// Requires taking the lock on the remoteCacheContQueueMutex first.
2937
////////////////////////////////////////////////////////////////////////
2938
static CacheContinuation *
2939
find_cache_continuation(unsigned int seq_number, unsigned int from_ip)
2941
unsigned int hash = FOLDHASH(from_ip, seq_number);
2942
CacheContinuation *c = NULL;
2943
CacheContinuation *lastc = NULL;
2944
for (c = (CacheContinuation *) remoteCacheContQueue[hash].head; c; c = (CacheContinuation *) c->link.next) {
2945
if (seq_number == c->seq_number && from_ip == c->target_ip) {
2947
ink_release_assert(c->link.prev == lastc);
2949
ink_release_assert(!c->link.prev);
2959
/////////////////////////////////////////////////////////////////////////////
2960
// new_cache_sequence_number()
2961
// Generate unique request sequence numbers
2962
/////////////////////////////////////////////////////////////////////////////
2964
new_cache_sequence_number()
2966
unsigned int res = 0;
2969
res = (unsigned int) ink_atomic_increment(&cluster_sequence_number, 1);
2977
/***************************************************************************/
2979
/***************************************************************************/
2980
/////////////////////////////////////////////////////////////////////////////
2982
// for migrate-on-demand, make a connection between the
2983
// the node which has the object and the node which should have it.
2985
// prepared for either OPEN_READ (from current owner)
2986
// or OPEN_WRITE (from new owner)
2987
/////////////////////////////////////////////////////////////////////////////
2989
CacheContinuation::forwardEvent(int event, VConnection * c)
2991
int ret = EVENT_CONT;
2997
ink_assert(!"bad case");
2998
case CACHE_EVENT_OPEN_WRITE_FAILED:
3001
case CACHE_EVENT_OPEN_WRITE:
3004
case CACHE_EVENT_OPEN_READ_FAILED:
3008
case CACHE_EVENT_OPEN_READ:
3013
SET_HANDLER((CacheContHandler) & CacheContinuation::forwardWaitEvent);
3017
////////////////////////////////////////////////////////////////////////
3018
// forwardWaitEvent()
3019
// For migrate-on-demand, make a connection as above (forwardEvent)
3020
// second either OPEN_READ or OPEN_WRITE,
3021
// the data for the first is stored in (cluster_vc,cache_read)
3022
////////////////////////////////////////////////////////////////////////
3024
CacheContinuation::forwardWaitEvent(int event, VConnection * c)
3026
int ret = EVENT_CONT;
3027
int res = CACHE_EVENT_OPEN_READ_FAILED;
3028
void *res_data = NULL;
3029
VConnection *vc = NULL;
3033
ink_assert(!"bad case");
3034
case CACHE_EVENT_OPEN_WRITE_FAILED:
3035
case CACHE_EVENT_OPEN_READ_FAILED:
3038
case CACHE_EVENT_OPEN_WRITE:
3039
case CACHE_EVENT_OPEN_READ:
3044
VConnection *read_vc = (cache_read ? cluster_vc : vc);
3045
VConnection *write_vc = (!cache_read ? cluster_vc : vc);
3047
res = read_vc ? CACHE_EVENT_OPEN_READ : CACHE_EVENT_OPEN_READ_FAILED;
3050
// if the read and write are sucessful, tunnel the read to the write
3051
if (read_vc && write_vc) {
3052
res_data = NEW(new VCTee(read_vc, write_vc, vio));
3053
if (vio) { // CACHE_EVENT_OPEN_READ_VIO
3055
res_data = &((VCTee *) read_vc)->vio;
3058
// if the read is sucessful return it to the user
3060
c->handleEvent(res, res_data);
3064
/////////////////////////////////////////////////////////////////////
3066
// If the reply requires data, tunnel the data from the cache
3068
/////////////////////////////////////////////////////////////////////
3070
CacheContinuation::tunnelEvent(int event, VConnection * vc)
3072
int ret = EVENT_DONE;
3073
int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token
3075
bool read_buf = ((request_opcode == CACHE_OPEN_READ_BUFFER)
3076
|| (request_opcode == CACHE_OPEN_READ_BUFFER_LONG));
3077
ink_release_assert(!read_buf);
3079
CacheOpReplyMsg rmsg;
3080
CacheOpReplyMsg *msg = &rmsg;
3081
msg->result = result;
3082
msg->seq_number = seq_number;
3084
int expect_reply = 1;
3086
if (event == CLUSTER_EVENT_OPEN) {
3089
ink_assert(have_all_data || (readahead_vio == &((CacheVConnection *) cluster_vc)->vio));
3090
write_cluster_vc = (ClusterVConnection *) vc;
3092
if (have_all_data) {
3093
msg->token.clear(); // Tell sender no conn established
3095
msg->token = token; // Tell sender conn established
3096
setupReadBufTunnel(cluster_vc, vc);
3100
OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
3101
pOWT->init(cluster_vc, vc, NULL, nbytes, this->mutex);
3105
////////////////////////////////////////////////////////
3106
// cache_read requires CacheHTTPInfo in reply message.
3107
////////////////////////////////////////////////////////
3111
if (!cache_vc_info) {
3113
(void) getObjectSize(cluster_vc, request_opcode, &cache_vc_info);
3117
// Determine data length and allocate
3118
len = ci->marshal_length();
3119
CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
3121
// Initialize reply message header
3124
// Marshal response data into reply message
3125
res = ci->marshal((char *) reply->moi, len);
3126
ink_assert(res > 0);
3128
// Make reply message the current message
3132
OneWayTunnel *pOWT = OneWayTunnelAllocator.alloc();
3133
pOWT->init(vc, cluster_vc, NULL, nbytes, this->mutex);
3138
ink_release_assert(event == CLUSTER_EVENT_OPEN_FAILED);
3139
msg->result = CACHE_EVENT_SET_FAILED(result);
3142
Debug("cluster_timeout", "unable to make cluster connection2");
3143
initial_buf = 0; // Do not send data
3144
initial_bufsize = 0;
3146
if (!have_all_data) {
3147
// Shutdown cache connection and free MIOBuffer
3148
MIOBuffer *mbuf = readahead_vio->buffer.writer();
3149
cluster_vc->do_io(VIO::CLOSE);
3150
free_MIOBuffer(mbuf);
3153
Debug("cluster_timeout", "unable to make cluster connection2A");
3154
cluster_vc->do_io(VIO::CLOSE);
3156
len = 0 - (int) sizeof(msg->token);
3160
int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
3161
if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
3163
// Transmit reply message and object data in same cluster message
3164
clusterProcessor.invoke_remote_data(from,
3165
CACHE_OP_RESULT_CLUSTER_FUNCTION,
3166
(void *) msg, (flen + len),
3167
initial_buf, initial_bufsize,
3168
cluster_vc_channel, &token,
3169
&CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
3172
clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
3173
(void *) msg, (flen + len), CLUSTER_OPT_STEAL);
3176
//////////////////////////////////////////////////////////////
3177
// Create the specified down rev version of this message
3178
//////////////////////////////////////////////////////////////
3179
ink_release_assert(!"tunnelEvent() bad msg version");
3181
if (expect_reply <= 0)
3182
cacheContAllocator_free(this);
3186
/////////////////////////////////////////////////////////////////////
3187
// remoteConnectEvent()
3188
// If this was an open, make a connection on this side before
3189
// responding to the user.
3190
/////////////////////////////////////////////////////////////////////
3192
CacheContinuation::remoteConnectEvent(int event, VConnection * cvc)
3194
ClusterVConnection *vc = (ClusterVConnection *) cvc;
3196
if (event == CLUSTER_EVENT_OPEN) {
3197
if (result == CACHE_EVENT_OPEN_READ) {
3198
// Move CacheHTTPInfo reply data into VC
3199
vc->alternate = this->ic_new_info;
3200
this->ic_new_info.clear();
3202
callback_user(result, vc);
3205
Debug("cluster_cache", "unable to make cluster connection");
3206
callback_user(CACHE_EVENT_SET_FAILED(result), vc);
3211
/***************************************************************************/
3213
/***************************************************************************/
3215
// End of ClusterCache.cc