3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
24
/****************************************************************************
28
Description: Allows bi-directional transfer for data from one
29
continuation to another via a mechanism that impersonates a
30
NetVC. Should implement all external attributes of NetVConnections.
32
Since data is transfered within Traffic Server, this is a two
33
headed beast. One NetVC on initiating side (active side) and
34
one NetVC on the receiving side (passive side).
36
The two NetVC subclasses, PluginVC, are part PluginVCCore object. All
37
three objects share the same mutex. That mutex is required
38
for doing operations that affect the shared buffers,
39
read state from the PluginVC on the other side or deal with deallocation.
41
To simplify the code, all data passing through the system goes initially
42
into a shared buffer. There are two shared buffers, one for each
43
direction of the connection. While it's more efficient to transfer
44
the data from one buffer to another directly, this creates a lot
45
of tricky conditions since you must be holding the lock for both
46
sides, in additional this VC's lock. Additionally, issues like
47
watermarks are very hard to deal with. Since we try to
48
to move data by IOBufferData references the efficiency penalty shouldn't
49
be too bad and if it is a big pentaly, a brave soul can reimplement
50
to move the data directly without the intermediate buffer.
52
Locking is difficult issue for this multi-headed beast. In each
53
PluginVC, there a two locks. The one we got from our PluginVCCore and
54
the lock from the state machine using the PluginVC. The read side
55
lock & the write side lock must be the same. The regular net processor has
56
this constraint as well. In order to handle scheduling of retry events cleanly,
57
we have two event poitners, one for each lock. sm_lock_retry_event can only
58
be changed while holding the using state machine's lock and
59
core_lock_retry_event can only be manipulated while holding the PluginVC's
60
lock. On entry to PluginVC::main_handler, we obtain all the locks
61
before looking at the events. If we can't get all the locks
62
we reschedule the event for further retries. Since all the locks are
63
obtained in the beginning of the handler, we know we are running
64
exclusively in the later parts of the handler and we will
65
be free from do_io or reenable calls on the PluginVC.
67
The assumption is made (conistant with IO Core spec) that any close,
68
shutdown, reenable, or do_io_{read,write) operation is done by the callee
69
while holding the lock for that side of the operation.
72
****************************************************************************/
75
#include "P_EventSystem.h"
77
#include "Regression.h"
79
#define PVC_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
81
#define MIN(x,y) (((x) <= (y)) ? (x) : (y))
83
#define MAX(x,y) (((x) >= (y)) ? (x) : (y))
85
#define PVC_DEFAULT_MAX_BYTES 32768
86
#define MIN_BLOCK_TRANSFER_BYTES 128
88
#define EVENT_PTR_LOCKED (void*) 0x1
89
#define EVENT_PTR_CLOSED (void*) 0x2
91
#define PVC_TYPE ((vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")
92
#define PVC_ID (core_obj? core_obj->id : (unsigned)-1)
96
magic(PLUGIN_VC_MAGIC_ALIVE), vc_type(PLUGIN_VC_UNKNOWN), core_obj(NULL),
97
other_side(NULL), read_state(), write_state(),
98
need_read_process(false), need_write_process(false),
99
closed(false), sm_lock_retry_event(NULL), core_lock_retry_event(NULL),
100
deletable(false), reentrancy_count(0), active_timeout(0), inactive_timeout(0), active_event(NULL), inactive_event(NULL)
102
SET_HANDLER(&PluginVC::main_handler);
105
PluginVC::~PluginVC()
111
PluginVC::main_handler(int event, void *data)
114
Debug("pvc_event", "[%u] %s: Received event %d", PVC_ID, PVC_TYPE, event);
116
ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE);
117
ink_release_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
118
ink_debug_assert(!deletable);
119
ink_debug_assert(data != NULL);
121
Event *call_event = (Event *) data;
122
EThread *my_ethread = mutex->thread_holding;
123
ink_release_assert(my_ethread != NULL);
125
bool read_mutex_held = false;
126
bool write_mutex_held = false;
127
Ptr<ProxyMutex> read_side_mutex = read_state.vio.mutex;
128
Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex;
130
if (read_side_mutex) {
131
read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread);
133
if (!read_mutex_held) {
134
call_event->schedule_in(PVC_LOCK_RETRY_TIME);
138
if (read_side_mutex.m_ptr != read_state.vio.mutex.m_ptr) {
139
// It's possible some swapped the mutex on us before
140
// we were able to grab it
141
Mutex_unlock(read_side_mutex, my_ethread);
142
call_event->schedule_in(PVC_LOCK_RETRY_TIME);
147
if (write_side_mutex) {
148
write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread);
150
if (!write_mutex_held) {
151
if (read_mutex_held) {
152
Mutex_unlock(read_side_mutex, my_ethread);
154
call_event->schedule_in(PVC_LOCK_RETRY_TIME);
158
if (write_side_mutex.m_ptr != write_state.vio.mutex.m_ptr) {
159
// It's possible some swapped the mutex on us before
160
// we were able to grab it
161
Mutex_unlock(write_side_mutex, my_ethread);
162
if (read_mutex_held) {
163
Mutex_unlock(read_side_mutex, my_ethread);
165
call_event->schedule_in(PVC_LOCK_RETRY_TIME);
169
// We've got all the locks so there should not be any
170
// other calls active
171
ink_release_assert(reentrancy_count == 0);
177
// We can get closed while we're calling back the
178
// continuation. Set the reentrancy count so we know
179
// we could be calling the continuation and that we
180
// need to defer close processing
183
if (call_event == active_event) {
184
process_timeout(call_event, VC_EVENT_ACTIVE_TIMEOUT, &active_event);
185
} else if (call_event == inactive_event) {
186
process_timeout(call_event, VC_EVENT_INACTIVITY_TIMEOUT, &inactive_event);
188
if (call_event == sm_lock_retry_event) {
189
sm_lock_retry_event = NULL;
191
ink_release_assert(call_event == core_lock_retry_event);
192
core_lock_retry_event = NULL;
195
if (need_read_process) {
196
process_read_side(false);
199
if (need_write_process && !closed) {
200
process_write_side(false);
210
if (read_mutex_held) {
211
Mutex_unlock(read_side_mutex, my_ethread);
214
if (write_mutex_held) {
215
Mutex_unlock(write_side_mutex, my_ethread);
222
PluginVC::do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf)
226
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
229
read_state.vio.buffer.writer_for(buf);
231
read_state.vio.buffer.clear();
234
// Note: we set vio.op last because process_read_side looks at it to
235
// tell if the VConnection is active.
236
read_state.vio.mutex = c->mutex;
237
read_state.vio._cont = c;
238
read_state.vio.nbytes = nbytes;
239
read_state.vio.ndone = 0;
240
read_state.vio.vc_server = (VConnection *) this;
241
read_state.vio.op = VIO::READ;
243
Debug("pvc", "[%u] %s: do_io_read for %d bytes", PVC_ID, PVC_TYPE, nbytes);
245
// Since reentrant callbacks are not allowed on from do_io
246
// functions schedule ourselves get on a different stack
247
need_read_process = true;
248
setup_event_cb(0, &sm_lock_retry_event);
250
return &read_state.vio;
254
PluginVC::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * abuffer, bool owner)
258
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
262
write_state.vio.buffer.reader_for(abuffer);
264
write_state.vio.buffer.clear();
267
// Note: we set vio.op last because process_write_side looks at it to
268
// tell if the VConnection is active.
269
write_state.vio.mutex = c->mutex;
270
write_state.vio._cont = c;
271
write_state.vio.nbytes = nbytes;
272
write_state.vio.ndone = 0;
273
write_state.vio.vc_server = (VConnection *) this;
274
write_state.vio.op = VIO::WRITE;
276
Debug("pvc", "[%u] %s: do_io_write for %d bytes", PVC_ID, PVC_TYPE, nbytes);
278
// Since reentrant callbacks are not allowed on from do_io
279
// functions schedule ourselves get on a different stack
280
need_write_process = true;
281
setup_event_cb(0, &sm_lock_retry_event);
283
return &write_state.vio;
287
PluginVC::reenable(VIO * vio)
291
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
292
ink_debug_assert(vio->mutex->thread_holding == this_ethread());
294
Debug("pvc", "[%u] %s: reenable %s", PVC_ID, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
296
if (vio->op == VIO::WRITE) {
297
ink_assert(vio == &write_state.vio);
298
need_write_process = true;
299
} else if (vio->op == VIO::READ) {
300
need_read_process = true;
302
ink_release_assert(0);
304
setup_event_cb(0, &sm_lock_retry_event);
308
PluginVC::reenable_re(VIO * vio)
312
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
313
ink_debug_assert(vio->mutex->thread_holding == this_ethread());
315
Debug("pvc", "[%u] %s: reenable_re %s", PVC_ID, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
317
MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
319
if (vio->op == VIO::WRITE) {
320
need_write_process = true;
322
need_read_process = true;
324
setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
330
if (vio->op == VIO::WRITE) {
331
ink_assert(vio == &write_state.vio);
332
process_write_side(false);
333
} else if (vio->op == VIO::READ) {
334
ink_assert(vio == &read_state.vio);
335
process_read_side(false);
337
ink_release_assert(0);
342
// To process the close, we need the lock
343
// for the PluginVC. Schedule an event
344
// to make sure we get it
346
setup_event_cb(0, &sm_lock_retry_event);
351
PluginVC::do_io_close(int flag)
354
ink_assert(closed == false);
355
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
357
Debug("pvc", "[%u] %s: do_io_close", PVC_ID, PVC_TYPE);
359
if (reentrancy_count > 0) {
360
// Do nothing since dealloacting ourselves
361
// now will lead to us running on a dead
362
// PluginVC since we are being called
368
MUTEX_TRY_LOCK(lock, mutex, this_ethread());
371
setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
382
PluginVC::do_io_shutdown(ShutdownHowTo_t howto)
386
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
389
case IO_SHUTDOWN_READ:
390
read_state.shutdown = true;
392
case IO_SHUTDOWN_WRITE:
393
write_state.shutdown = true;
395
case IO_SHUTDOWN_READWRITE:
396
read_state.shutdown = true;
397
write_state.shutdown = true;
402
// int PluginVC::transfer_bytes(MIOBuffer* transfer_to,
403
// IOBufferReader* transfer_from, int act_on)
405
// Takes care of transfering bytes from a reader to another buffer
406
// In the case of large transfers, we move blocks. In the case
407
// of small transfers we copy data so as to not build too many
411
// transfer_to: buffer to copy to
412
// transfer_from: buffer_copy_from
413
// act_on: is the max number of bytes we are to copy. There must
414
// be at least act_on bytes available from transfer_from
416
// Returns number of bytes transfered
419
PluginVC::transfer_bytes(MIOBuffer * transfer_to, IOBufferReader * transfer_from, int64_t act_on)
422
int64_t total_added = 0;
424
ink_debug_assert(act_on <= transfer_from->read_avail());
427
int64_t block_read_avail = transfer_from->block_read_avail();
428
int64_t to_move = MIN(act_on, block_read_avail);
435
if (to_move >= MIN_BLOCK_TRANSFER_BYTES) {
436
moved = transfer_to->write(transfer_from, to_move, 0);
438
// We have a really small amount of data. To make
439
// sure we don't get a huge build up of blocks which
440
// can lead to stack overflows if the buffer is destroyed
441
// before we read from it, we need copy over to the new
442
// buffer instead of doing a block transfer
443
moved = transfer_to->write(transfer_from->start(), to_move);
446
// We are out of buffer space
452
transfer_from->consume(moved);
453
total_added += moved;
459
// void PluginVC::process_write_side(bool cb_ok)
461
// This function may only be called while holding
462
// this->mutex & while it is ok to callback the
463
// write side continuation
465
// Does write side processing
468
PluginVC::process_write_side(bool other_side_call)
471
ink_assert(!deletable);
472
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
474
MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
476
need_write_process = false;
478
if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
481
// Acquire the lock of the write side continuation
482
EThread *my_ethread = mutex->thread_holding;
483
ink_assert(my_ethread != NULL);
484
MUTEX_TRY_LOCK(lock, write_state.vio.mutex, my_ethread);
486
Debug("pvc_event", "[%u] %s: process_write_side lock miss, retrying", PVC_ID, PVC_TYPE);
488
need_write_process = true;
489
setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
493
Debug("pvc", "[%u] %s: process_write_side", PVC_ID, PVC_TYPE);
494
need_write_process = false;
497
// Check the state of our write buffer as well as ntodo
498
int64_t ntodo = write_state.vio.ntodo();
503
IOBufferReader *reader = write_state.vio.get_reader();
504
int64_t bytes_avail = reader->read_avail();
505
int64_t act_on = MIN(bytes_avail, ntodo);
507
Debug("pvc", "[%u] %s: process_write_side; act_on %d", PVC_ID, PVC_TYPE, act_on);
509
if (other_side->closed || other_side->read_state.shutdown) {
510
write_state.vio._cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
516
// Notify the continuation that we are "disabling"
517
// ourselves due to to nothing to write
518
write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
522
// Bytes available, try to transfer to the PluginVCCore
523
// intermediate buffer
525
int64_t buf_space = PVC_DEFAULT_MAX_BYTES - core_buffer->max_read_avail();
526
if (buf_space <= 0) {
527
Debug("pvc", "[%u] %s: process_write_side no buffer space", PVC_ID, PVC_TYPE);
530
act_on = MIN(act_on, buf_space);
532
int64_t added = transfer_bytes(core_buffer, reader, act_on);
534
// Couldn't actually get the buffer space. This only
535
// happens on small transfers with the above
536
// PVC_DEFAULT_MAX_BYTES factor doesn't apply
537
Debug("pvc", "[%u] %s: process_write_side out of buffer space", PVC_ID, PVC_TYPE);
541
write_state.vio.ndone += added;
543
Debug("pvc", "[%u] %s: process_write_side; added %d", PVC_ID, PVC_TYPE, added);
545
if (write_state.vio.ntodo() == 0) {
546
write_state.vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
548
write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
551
update_inactive_time();
553
// Wake up the read side on the other side to process these bytes
554
if (!other_side->closed) {
555
if (!other_side_call) {
556
other_side->process_read_side(true);
558
other_side->read_state.vio.reenable();
564
// void PluginVC::process_read_side()
566
// This function may only be called while holding
567
// this->mutex & while it is ok to callback the
568
// read side continuation
570
// Does read side processing
573
PluginVC::process_read_side(bool other_side_call)
576
ink_assert(!deletable);
577
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
579
// TODO: Never used??
580
//MIOBuffer *core_buffer;
582
IOBufferReader *core_reader;
584
if (vc_type == PLUGIN_VC_ACTIVE) {
585
//core_buffer = core_obj->p_to_a_buffer;
586
core_reader = core_obj->p_to_a_reader;
588
ink_assert(vc_type == PLUGIN_VC_PASSIVE);
589
//core_buffer = core_obj->a_to_p_buffer;
590
core_reader = core_obj->a_to_p_reader;
593
need_read_process = false;
595
if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
598
// Acquire the lock of the read side continuation
599
EThread *my_ethread = mutex->thread_holding;
600
ink_assert(my_ethread != NULL);
601
MUTEX_TRY_LOCK(lock, read_state.vio.mutex, my_ethread);
603
Debug("pvc_event", "[%u] %s: process_read_side lock miss, retrying", PVC_ID, PVC_TYPE);
605
need_read_process = true;
606
setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
610
Debug("pvc", "[%u] %s: process_read_side", PVC_ID, PVC_TYPE);
611
need_read_process = false;
613
// Check the state of our read buffer as well as ntodo
614
int64_t ntodo = read_state.vio.ntodo();
619
int64_t bytes_avail = core_reader->read_avail();
620
int64_t act_on = MIN(bytes_avail, ntodo);
622
Debug("pvc", "[%u] %s: process_read_side; act_on %d", PVC_ID, PVC_TYPE, act_on);
625
if (other_side->closed || other_side->write_state.shutdown) {
626
read_state.vio._cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
630
// Bytes available, try to transfer from the PluginVCCore
631
// intermediate buffer
633
MIOBuffer *output_buffer = read_state.vio.get_writer();
635
int64_t water_mark = output_buffer->water_mark;
636
water_mark = MAX(water_mark, PVC_DEFAULT_MAX_BYTES);
637
int64_t buf_space = water_mark - output_buffer->max_read_avail();
638
if (buf_space <= 0) {
639
Debug("pvc", "[%u] %s: process_read_side no buffer space", PVC_ID, PVC_TYPE);
642
act_on = MIN(act_on, buf_space);
644
int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
646
// Couldn't actually get the buffer space. This only
647
// happens on small transfers with the above
648
// PVC_DEFAULT_MAX_BYTES factor doesn't apply
649
Debug("pvc", "[%u] %s: process_read_side out of buffer space", PVC_ID, PVC_TYPE);
653
read_state.vio.ndone += added;
655
Debug("pvc", "[%u] %s: process_read_side; added %d", PVC_ID, PVC_TYPE, added);
657
if (read_state.vio.ntodo() == 0) {
658
read_state.vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
660
read_state.vio._cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
663
update_inactive_time();
665
// Wake up the other side so it knows there is space available in
666
// intermediate buffer
667
if (!other_side->closed) {
668
if (!other_side_call) {
669
other_side->process_write_side(true);
671
other_side->write_state.vio.reenable();
676
// void PluginVC::process_read_close()
678
// This function may only be called while holding
681
// Tries to close the and dealloc the the vc
684
PluginVC::process_close()
687
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
689
Debug("pvc", "[%u] %s: process_close", PVC_ID, PVC_TYPE);
695
if (sm_lock_retry_event) {
696
sm_lock_retry_event->cancel();
697
sm_lock_retry_event = NULL;
700
if (core_lock_retry_event) {
701
core_lock_retry_event->cancel();
702
core_lock_retry_event = NULL;
706
active_event->cancel();
710
if (inactive_event) {
711
inactive_event->cancel();
712
inactive_event = NULL;
714
// If the other side of the PluginVC is not closed
715
// we need to force it process both living sides
716
// of the connection in order that it recognizes
718
if (!other_side->closed && core_obj->connected) {
719
other_side->need_write_process = true;
720
other_side->need_read_process = true;
721
other_side->setup_event_cb(0, &other_side->core_lock_retry_event);
724
core_obj->attempt_delete();
727
// void PluginVC::process_timeout(Event* e, int event_to_send, Event** our_eptr)
729
// Handles sending timeout event to the VConnection. e is the event we got
730
// which indicats the timeout. event_to_send is the event to the
731
// vc user. Our_eptr is a pointer our event either inactive_event,
732
// or active_event. If we successfully send the timeout to vc user,
733
// we clear the pointer, otherwise we reschedule it.
735
// Because the possibility of reentrant close from vc user, we don't want to
736
// touch any state after making the call back
739
PluginVC::process_timeout(Event * e, int event_to_send, Event ** our_eptr)
742
ink_assert(e = *our_eptr);
744
if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) {
745
MUTEX_TRY_LOCK(lock, read_state.vio.mutex, e->ethread);
747
e->schedule_in(PVC_LOCK_RETRY_TIME);
751
read_state.vio._cont->handleEvent(event_to_send, &read_state.vio);
752
} else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) {
753
MUTEX_TRY_LOCK(lock, write_state.vio.mutex, e->ethread);
755
e->schedule_in(PVC_LOCK_RETRY_TIME);
759
write_state.vio._cont->handleEvent(event_to_send, &write_state.vio);
766
PluginVC::update_inactive_time()
768
if (inactive_event) {
769
inactive_event->cancel();
770
inactive_event = eventProcessor.schedule_in(this, inactive_timeout);
774
// void PluginVC::setup_event_cb(ink_hrtime in)
776
// Setup up the event processor to call us back.
777
// We've got two different event pointers to handle
781
PluginVC::setup_event_cb(ink_hrtime in, Event ** e_ptr)
784
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
786
if (*e_ptr == NULL) {
788
// We locked the pointer so we can now allocate an event
791
*e_ptr = eventProcessor.schedule_imm(this);
793
*e_ptr = eventProcessor.schedule_in(this, in);
799
PluginVC::set_active_timeout(ink_hrtime timeout_in)
801
active_timeout = timeout_in;
803
// FIX - Do we need to handle the case where the timeout is set
804
// but no io has been done?
806
ink_assert(!active_event->cancelled);
807
active_event->cancel();
811
if (active_timeout > 0) {
812
active_event = eventProcessor.schedule_in(this, active_timeout);
817
PluginVC::set_inactivity_timeout(ink_hrtime timeout_in)
819
inactive_timeout = timeout_in;
821
// FIX - Do we need to handle the case where the timeout is set
822
// but no io has been done?
823
if (inactive_event) {
824
ink_assert(!inactive_event->cancelled);
825
inactive_event->cancel();
826
inactive_event = NULL;
829
if (inactive_timeout > 0) {
830
inactive_event = eventProcessor.schedule_in(this, inactive_timeout);
835
PluginVC::cancel_active_timeout()
837
set_active_timeout(0);
841
PluginVC::cancel_inactivity_timeout()
843
set_inactivity_timeout(0);
847
PluginVC::get_active_timeout()
849
return active_timeout;
853
PluginVC::get_inactivity_timeout()
855
return inactive_timeout;
859
PluginVC::get_socket()
865
PluginVC::set_local_addr()
867
if (vc_type == PLUGIN_VC_ACTIVE) {
868
local_addr = core_obj->active_addr_struct;
870
local_addr = core_obj->passive_addr_struct;
875
PluginVC::set_remote_addr()
877
if (vc_type == PLUGIN_VC_ACTIVE) {
878
remote_addr = core_obj->passive_addr_struct;
880
remote_addr = core_obj->active_addr_struct;
886
PluginVC::get_data(int id, void *data)
892
case PLUGIN_VC_DATA_LOCAL:
893
if (vc_type == PLUGIN_VC_ACTIVE) {
894
*(void **) data = core_obj->active_data;
896
*(void **) data = core_obj->passive_data;
899
case PLUGIN_VC_DATA_REMOTE:
900
if (vc_type == PLUGIN_VC_ACTIVE) {
901
*(void **) data = core_obj->passive_data;
903
*(void **) data = core_obj->active_data;
907
*(void **) data = NULL;
913
PluginVC::set_data(int id, void *data)
916
case PLUGIN_VC_DATA_LOCAL:
917
if (vc_type == PLUGIN_VC_ACTIVE) {
918
core_obj->active_data = data;
920
core_obj->passive_data = data;
923
case PLUGIN_VC_DATA_REMOTE:
924
if (vc_type == PLUGIN_VC_ACTIVE) {
925
core_obj->passive_data = data;
927
core_obj->active_data = data;
938
PluginVCCore::nextid = 0;
940
PluginVCCore::~PluginVCCore()
945
PluginVCCore::alloc()
947
PluginVCCore *pvc = NEW(new PluginVCCore);
955
mutex = new_ProxyMutex();
957
active_vc.vc_type = PLUGIN_VC_ACTIVE;
958
active_vc.other_side = &passive_vc;
959
active_vc.core_obj = this;
960
active_vc.mutex = mutex;
961
active_vc.thread = this_ethread();
963
passive_vc.vc_type = PLUGIN_VC_PASSIVE;
964
passive_vc.other_side = &active_vc;
965
passive_vc.core_obj = this;
966
passive_vc.mutex = mutex;
967
passive_vc.thread = active_vc.thread;
969
p_to_a_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
970
p_to_a_reader = p_to_a_buffer->alloc_reader();
972
a_to_p_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
973
a_to_p_reader = a_to_p_buffer->alloc_reader();
975
Debug("pvc", "[%u] Created PluginVCCore at 0x%X, active 0x%X, passive 0x%X", id, this, &active_vc, &passive_vc);
979
PluginVCCore::destroy()
982
Debug("pvc", "[%u] Destroying PluginVCCore at 0x%X", id, this);
984
ink_assert(active_vc.closed == true || !connected);
985
active_vc.mutex = NULL;
986
active_vc.read_state.vio.buffer.clear();
987
active_vc.write_state.vio.buffer.clear();
988
active_vc.magic = PLUGIN_VC_MAGIC_DEAD;
990
ink_assert(passive_vc.closed == true || !connected);
991
passive_vc.mutex = NULL;
992
passive_vc.read_state.vio.buffer.clear();
993
passive_vc.write_state.vio.buffer.clear();
994
passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
997
free_MIOBuffer(p_to_a_buffer);
998
p_to_a_buffer = NULL;
1001
if (a_to_p_buffer) {
1002
free_MIOBuffer(a_to_p_buffer);
1003
a_to_p_buffer = NULL;
1011
PluginVCCore::set_accept_cont(Continuation * c)
1015
// FIX ME - must return action
1019
PluginVCCore::connect()
1022
// Make sure there is another end to connect to
1023
if (connect_to == NULL) {
1028
state_send_accept(EVENT_IMMEDIATE, NULL);
1034
PluginVCCore::connect_re(Continuation * c)
1037
// Make sure there is another end to connect to
1038
if (connect_to == NULL) {
1042
EThread *my_thread = this_ethread();
1043
MUTEX_TAKE_LOCK(this->mutex, my_thread);
1046
state_send_accept(EVENT_IMMEDIATE, NULL);
1048
// We have to take out our mutex because rest of the
1049
// system expects the VC mutex to held when calling back.
1050
// We can use take lock here instead of try lock because the
1051
// lock should never already be held.
1053
c->handleEvent(NET_EVENT_OPEN, &active_vc);
1054
MUTEX_UNTAKE_LOCK(this->mutex, my_thread);
1056
return ACTION_RESULT_DONE;
1060
PluginVCCore::state_send_accept_failed(int event, void *data)
1062
NOWARN_UNUSED(event);
1063
NOWARN_UNUSED(data);
1064
MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1067
connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, NULL);
1070
SET_HANDLER(&PluginVCCore::state_send_accept_failed);
1071
eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1079
PluginVCCore::state_send_accept(int event, void *data)
1081
NOWARN_UNUSED(event);
1082
NOWARN_UNUSED(data);
1083
MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
1086
connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
1088
SET_HANDLER(&PluginVCCore::state_send_accept);
1089
eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
1096
// void PluginVCCore::attempt_delete()
1098
// Mutex must be held when calling this function
1101
PluginVCCore::attempt_delete()
1104
if (active_vc.deletable) {
1105
if (passive_vc.deletable) {
1107
} else if (!connected) {
1108
state_send_accept_failed(EVENT_IMMEDIATE, NULL);
1113
// void PluginVCCore::kill_no_connect()
1115
// Called to kill the PluginVCCore when the
1116
// connect call hasn't been made yet
1119
PluginVCCore::kill_no_connect()
1121
ink_assert(!connected);
1122
ink_assert(!active_vc.closed);
1123
active_vc.do_io_close();
1127
PluginVCCore::set_passive_addr(uint32_t ip, int port)
1129
((struct sockaddr_in *)&(passive_addr_struct))->sin_addr.s_addr = htonl(ip);
1130
((struct sockaddr_in *)&(passive_addr_struct))->sin_port = htons(port);
1134
PluginVCCore::set_active_addr(uint32_t ip, int port)
1136
((struct sockaddr_in *)&(active_addr_struct))->sin_addr.s_addr = htonl(ip);
1137
((struct sockaddr_in *)&(active_addr_struct))->sin_port = htons(port);
1141
PluginVCCore::set_passive_data(void *data)
1143
passive_data = data;
1147
PluginVCCore::set_active_data(void *data)
1152
/*************************************************************
1154
* REGRESSION TEST STUFF
1156
**************************************************************/
1159
class PVCTestDriver:public NetTestDriver
1165
void start_tests(RegressionTest * r_arg, int *pstatus_arg);
1166
void run_next_test();
1167
int main_handler(int event, void *data);
1171
int completions_received;
1174
PVCTestDriver::PVCTestDriver():
1175
NetTestDriver(), i(0), completions_received(0)
1179
PVCTestDriver::~PVCTestDriver()
1185
PVCTestDriver::start_tests(RegressionTest * r_arg, int *pstatus_arg)
1187
mutex = new_ProxyMutex();
1188
MUTEX_TRY_LOCK(lock, mutex, this_ethread());
1191
pstatus = pstatus_arg;
1195
SET_HANDLER(&PVCTestDriver::main_handler);
1199
PVCTestDriver::run_next_test()
1202
int a_index = i * 2;
1203
int p_index = a_index + 1;
1205
if (p_index >= num_netvc_tests) {
1206
// We are done - // FIX - PASS or FAIL?
1208
*pstatus = REGRESSION_TEST_PASSED;
1210
*pstatus = REGRESSION_TEST_FAILED;
1215
completions_received = 0;
1218
Debug("pvc_test", "Starting test %s", netvc_tests_def[a_index].test_name);
1220
NetVCTest *p = NEW(new NetVCTest);
1221
NetVCTest *a = NEW(new NetVCTest);
1222
PluginVCCore *core = PluginVCCore::alloc();
1223
core->set_accept_cont(p);
1225
p->init_test(NET_VC_TEST_PASSIVE, this, NULL, r, &netvc_tests_def[p_index], "PluginVC", "pvc_test_detail");
1226
PluginVC *a_vc = core->connect();
1228
a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "PluginVC", "pvc_test_detail");
1232
PVCTestDriver::main_handler(int event, void *data)
1234
NOWARN_UNUSED(event);
1235
NOWARN_UNUSED(data);
1236
completions_received++;
1238
if (completions_received == 2) {
1245
EXCLUSIVE_REGRESSION_TEST(PVC) (RegressionTest * t, int atype, int *pstatus)
1247
NOWARN_UNUSED(atype);
1248
PVCTestDriver *driver = NEW(new PVCTestDriver);
1249
driver->start_tests(t, pstatus);