3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
24
/****************************************************************************
28
A OneWayTunnel is a module that connects two virtual connections, a
29
source vc and a target vc, and copies the data between source and target.
31
This class used to be called HttpTunnelVC, but it doesn't seem to have
32
anything to do with HTTP, so it has been renamed to OneWayTunnel.
33
****************************************************************************/
35
#include "P_EventSystem.h"
36
#include "I_OneWayTunnel.h"
40
//////////////////////////////////////////////////////////////////////////////
42
// OneWayTunnel::OneWayTunnel()
44
//////////////////////////////////////////////////////////////////////////////
46
ClassAllocator<OneWayTunnel> OneWayTunnelAllocator("OneWayTunnelAllocator");
49
transfer_data(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
51
ink_release_assert(!"Not Implemented.");
53
int64_t n = in_buf.reader()->read_avail();
54
int64_t o = out_buf.writer()->write_avail();
60
memcpy(in_buf.reader()->start(), out_buf.writer()->end(), n);
61
in_buf.reader()->consume(n);
62
out_buf.writer()->fill(n);
65
OneWayTunnel::OneWayTunnel():Continuation(0),
66
vioSource(0), vioTarget(0), cont(0), manipulate_fn(0),
67
n_connections(0), lerrno(0), single_buffer(0),
68
close_source(0), close_target(0), tunnel_till_done(0), tunnel_peer(0), free_vcs(true)
73
OneWayTunnel::OneWayTunnel_alloc()
75
return OneWayTunnelAllocator.alloc();
79
OneWayTunnel::OneWayTunnel_free(OneWayTunnel * pOWT)
83
OneWayTunnelAllocator.free(pOWT);
87
OneWayTunnel::SetupTwoWayTunnel(OneWayTunnel * east, OneWayTunnel * west)
89
//make sure the both use the same mutex
90
ink_assert(east->mutex == west->mutex);
92
east->tunnel_peer = west;
93
west->tunnel_peer = east;
96
OneWayTunnel::~OneWayTunnel()
100
OneWayTunnel::OneWayTunnel(Continuation * aCont, Transform_fn aManipulate_fn, bool aclose_source, bool aclose_target)
103
? (ProxyMutex *) aCont->mutex
106
manipulate_fn(aManipulate_fn),
109
single_buffer(true), close_source(aclose_source), close_target(aclose_target), tunnel_till_done(false), free_vcs(false)
111
ink_assert(!"This form of OneWayTunnel() constructor not supported");
115
OneWayTunnel::init(VConnection * vcSource,
116
VConnection * vcTarget,
117
Continuation * aCont,
122
bool aclose_source, bool aclose_target, Transform_fn aManipulate_fn, int water_mark)
124
mutex = aCont ? (ProxyMutex *) aCont->mutex : (aMutex ? aMutex : new_ProxyMutex());
125
cont = aMutex ? NULL : aCont;
126
single_buffer = asingle_buffer;
127
manipulate_fn = aManipulate_fn;
129
close_source = aclose_source;
130
close_target = aclose_target;
132
tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
134
SET_HANDLER(&OneWayTunnel::startEvent);
136
int64_t size_index = 0;
139
size_index = buffer_size_to_index(size_estimate);
141
size_index = default_large_iobuffer_size;
143
Debug("one_way_tunnel", "buffer size index [%d] [%d]\n", size_index, size_estimate);
145
// enqueue read request on vcSource.
146
MIOBuffer *buf1 = new_MIOBuffer(size_index);
147
MIOBuffer *buf2 = NULL;
151
buf2 = new_MIOBuffer(size_index);
153
buf1->water_mark = water_mark;
155
MUTEX_LOCK(lock, mutex, this_ethread());
156
vioSource = vcSource->do_io_read(this, nbytes, buf1);
157
vioTarget = vcTarget->do_io_write(this, nbytes, buf2->alloc_reader(), 0);
158
ink_assert(vioSource && vioTarget);
164
OneWayTunnel::init(VConnection * vcSource,
165
VConnection * vcTarget,
166
Continuation * aCont,
167
VIO * SourceVio, IOBufferReader * reader, bool aclose_source, bool aclose_target)
170
mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
172
single_buffer = true;
175
close_source = aclose_source;
176
close_target = aclose_target;
177
tunnel_till_done = true;
179
// Prior to constructing the OneWayTunnel, we initiated a do_io(VIO::READ)
180
// on the source VC. We wish to use the same MIO buffer in the tunnel.
182
// do_io() read already posted on vcSource.
183
SET_HANDLER(&OneWayTunnel::startEvent);
185
SourceVio->set_continuation(this);
186
MUTEX_LOCK(lock, mutex, this_ethread());
187
vioSource = SourceVio;
189
vioTarget = vcTarget->do_io_write(this, TUNNEL_TILL_DONE, reader, 0);
190
ink_assert(vioSource && vioTarget);
194
OneWayTunnel::init(Continuation * aCont, VIO * SourceVio, VIO * TargetVio, bool aclose_source, bool aclose_target)
196
mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
198
single_buffer = true;
201
close_source = aclose_source;
202
close_target = aclose_target;
203
tunnel_till_done = true;
205
// do_io_read() read already posted on vcSource.
206
// do_io_write() already posted on vcTarget
207
SET_HANDLER(&OneWayTunnel::startEvent);
209
ink_assert(SourceVio && TargetVio);
211
SourceVio->set_continuation(this);
212
TargetVio->set_continuation(this);
213
vioSource = SourceVio;
214
vioTarget = TargetVio;
219
OneWayTunnel::transform(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
222
manipulate_fn(in_buf, out_buf);
223
else if (in_buf.writer() != out_buf.writer())
224
transfer_data(in_buf, out_buf);
227
//////////////////////////////////////////////////////////////////////////////
229
// int OneWayTunnel::startEvent()
231
//////////////////////////////////////////////////////////////////////////////
234
// tunnel was invoked with an event
237
OneWayTunnel::startEvent(int event, void *data)
239
VIO *vio = (VIO *) data;
240
int ret = VC_EVENT_DONE;
244
const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
245
printf("OneWayTunnel --- %s received from %s VC\n", event_name, event_origin);
255
case ONE_WAY_TUNNEL_EVENT_PEER_CLOSE:
256
/* This event is sent out by our peer */
257
ink_assert(tunnel_peer);
262
case VC_EVENT_READ_READY:
263
transform(vioSource->buffer, vioTarget->buffer);
264
vioTarget->reenable();
268
case VC_EVENT_WRITE_READY:
270
vioSource->reenable();
275
if (!tunnel_till_done && vio->ntodo())
277
if (vio == vioSource) {
278
transform(vioSource->buffer, vioTarget->buffer);
284
case VC_EVENT_READ_COMPLETE:
285
// set write nbytes to the current buffer size
287
vioTarget->nbytes = vioTarget->ndone + vioTarget->buffer.reader()->read_avail();
288
if (vioTarget->nbytes == vioTarget->ndone)
290
vioTarget->reenable();
297
lerrno = ((VIO *) data)->vc_server->lerrno;
298
case VC_EVENT_INACTIVITY_TIMEOUT:
299
case VC_EVENT_ACTIVE_TIMEOUT:
302
case VC_EVENT_WRITE_COMPLETE:
305
tunnel_peer->startEvent(ONE_WAY_TUNNEL_EVENT_PEER_CLOSE, data);
307
close_source_vio(result);
308
close_target_vio(result);
309
connection_closed(result);
313
ink_assert(!"bad case");
318
printf(" (OneWayTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
323
// If result is Non-zero, the vc should be aborted.
325
OneWayTunnel::close_source_vio(int result)
329
if (last_connection() || !single_buffer)
330
free_MIOBuffer(vioSource->buffer.mbuf);
331
if (close_source && free_vcs)
332
vioSource->vc_server->do_io_close(result ? lerrno : -1);
339
OneWayTunnel::close_target_vio(int result, VIO * vio)
344
if (last_connection() || !single_buffer)
345
free_MIOBuffer(vioTarget->buffer.mbuf);
346
if (close_target && free_vcs)
347
vioTarget->vc_server->do_io_close(result ? lerrno : -1);
353
//////////////////////////////////////////////////////////////////////////////
355
// void OneWayTunnel::connection_closed
357
//////////////////////////////////////////////////////////////////////////////
359
OneWayTunnel::connection_closed(int result)
363
cout << "OneWayTunnel::connection_closed() ... calling cont" << endl;
365
cont->handleEvent(result ? VC_EVENT_ERROR : VC_EVENT_EOS, cont);
367
OneWayTunnel_free(this);
372
OneWayTunnel::reenable_all()
375
vioSource->reenable();
377
vioTarget->reenable();
381
OneWayTunnel::last_connection()
383
return n_connections == 1;