~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to iocore/utils/OneWayTunnel.cc

  • Committer: Bazaar Package Importer
  • Author(s): Arno Toell
  • Date: 2011-01-13 11:49:18 UTC
  • Revision ID: james.westby@ubuntu.com-20110113114918-vu422h8dknrgkj15
Tags: upstream-2.1.5-unstable
ImportĀ upstreamĀ versionĀ 2.1.5-unstable

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/** @file
 
2
 
 
3
  A brief file description
 
4
 
 
5
  @section license License
 
6
 
 
7
  Licensed to the Apache Software Foundation (ASF) under one
 
8
  or more contributor license agreements.  See the NOTICE file
 
9
  distributed with this work for additional information
 
10
  regarding copyright ownership.  The ASF licenses this file
 
11
  to you under the Apache License, Version 2.0 (the
 
12
  "License"); you may not use this file except in compliance
 
13
  with the License.  You may obtain a copy of the License at
 
14
 
 
15
      http://www.apache.org/licenses/LICENSE-2.0
 
16
 
 
17
  Unless required by applicable law or agreed to in writing, software
 
18
  distributed under the License is distributed on an "AS IS" BASIS,
 
19
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
20
  See the License for the specific language governing permissions and
 
21
  limitations under the License.
 
22
 */
 
23
 
 
24
/****************************************************************************
 
25
 
 
26
   OneWayTunnel.cc
 
27
 
 
28
   A OneWayTunnel is a module that connects two virtual connections, a
 
29
   source vc and a target vc, and copies the data between source and target.
 
30
 
 
31
   This class used to be called HttpTunnelVC, but it doesn't seem to have
 
32
   anything to do with HTTP, so it has been renamed to OneWayTunnel.
 
33
 ****************************************************************************/
 
34
 
 
35
#include "P_EventSystem.h"
 
36
#include "I_OneWayTunnel.h"
 
37
 
 
38
// #define TEST
 
39
 
 
40
//////////////////////////////////////////////////////////////////////////////
 
41
//
 
42
//      OneWayTunnel::OneWayTunnel()
 
43
//
 
44
//////////////////////////////////////////////////////////////////////////////
 
45
 
 
46
ClassAllocator<OneWayTunnel> OneWayTunnelAllocator("OneWayTunnelAllocator");
 
47
 
 
48
inline void
 
49
transfer_data(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
 
50
{
 
51
  ink_release_assert(!"Not Implemented.");
 
52
 
 
53
  int64_t n = in_buf.reader()->read_avail();
 
54
  int64_t o = out_buf.writer()->write_avail();
 
55
 
 
56
  if (n > o)
 
57
    n = o;
 
58
  if (!n)
 
59
    return;
 
60
  memcpy(in_buf.reader()->start(), out_buf.writer()->end(), n);
 
61
  in_buf.reader()->consume(n);
 
62
  out_buf.writer()->fill(n);
 
63
}
 
64
 
 
65
OneWayTunnel::OneWayTunnel():Continuation(0),
 
66
vioSource(0), vioTarget(0), cont(0), manipulate_fn(0),
 
67
n_connections(0), lerrno(0), single_buffer(0),
 
68
close_source(0), close_target(0), tunnel_till_done(0), tunnel_peer(0), free_vcs(true)
 
69
{
 
70
}
 
71
 
 
72
OneWayTunnel *
 
73
OneWayTunnel::OneWayTunnel_alloc()
 
74
{
 
75
  return OneWayTunnelAllocator.alloc();
 
76
}
 
77
 
 
78
void
 
79
OneWayTunnel::OneWayTunnel_free(OneWayTunnel * pOWT)
 
80
{
 
81
 
 
82
  pOWT->mutex = NULL;
 
83
  OneWayTunnelAllocator.free(pOWT);
 
84
}
 
85
 
 
86
void
 
87
OneWayTunnel::SetupTwoWayTunnel(OneWayTunnel * east, OneWayTunnel * west)
 
88
{
 
89
  //make sure the both use the same mutex
 
90
  ink_assert(east->mutex == west->mutex);
 
91
 
 
92
  east->tunnel_peer = west;
 
93
  west->tunnel_peer = east;
 
94
}
 
95
 
 
96
OneWayTunnel::~OneWayTunnel()
 
97
{
 
98
}
 
99
 
 
100
OneWayTunnel::OneWayTunnel(Continuation * aCont, Transform_fn aManipulate_fn, bool aclose_source, bool aclose_target)
 
101
:
 
102
Continuation(aCont
 
103
             ? (ProxyMutex *) aCont->mutex
 
104
             : new_ProxyMutex()),
 
105
cont(aCont),
 
106
manipulate_fn(aManipulate_fn),
 
107
n_connections(2),
 
108
lerrno(0),
 
109
single_buffer(true), close_source(aclose_source), close_target(aclose_target), tunnel_till_done(false), free_vcs(false)
 
110
{
 
111
  ink_assert(!"This form of OneWayTunnel() constructor not supported");
 
112
}
 
113
 
 
114
void
 
115
OneWayTunnel::init(VConnection * vcSource,
 
116
                   VConnection * vcTarget,
 
117
                   Continuation * aCont,
 
118
                   int size_estimate,
 
119
                   ProxyMutex * aMutex,
 
120
                   int64_t nbytes,
 
121
                   bool asingle_buffer,
 
122
                   bool aclose_source, bool aclose_target, Transform_fn aManipulate_fn, int water_mark)
 
123
{
 
124
  mutex = aCont ? (ProxyMutex *) aCont->mutex : (aMutex ? aMutex : new_ProxyMutex());
 
125
  cont = aMutex ? NULL : aCont;
 
126
  single_buffer = asingle_buffer;
 
127
  manipulate_fn = aManipulate_fn;
 
128
  n_connections = 2;
 
129
  close_source = aclose_source;
 
130
  close_target = aclose_target;
 
131
  lerrno = 0;
 
132
  tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
 
133
 
 
134
  SET_HANDLER(&OneWayTunnel::startEvent);
 
135
 
 
136
  int64_t size_index = 0;
 
137
 
 
138
  if (size_estimate)
 
139
    size_index = buffer_size_to_index(size_estimate);
 
140
  else
 
141
    size_index = default_large_iobuffer_size;
 
142
 
 
143
  Debug("one_way_tunnel", "buffer size index [%d] [%d]\n", size_index, size_estimate);
 
144
 
 
145
  // enqueue read request on vcSource.
 
146
  MIOBuffer *buf1 = new_MIOBuffer(size_index);
 
147
  MIOBuffer *buf2 = NULL;
 
148
  if (single_buffer)
 
149
    buf2 = buf1;
 
150
  else
 
151
    buf2 = new_MIOBuffer(size_index);
 
152
 
 
153
  buf1->water_mark = water_mark;
 
154
 
 
155
  MUTEX_LOCK(lock, mutex, this_ethread());
 
156
  vioSource = vcSource->do_io_read(this, nbytes, buf1);
 
157
  vioTarget = vcTarget->do_io_write(this, nbytes, buf2->alloc_reader(), 0);
 
158
  ink_assert(vioSource && vioTarget);
 
159
 
 
160
  return;
 
161
}
 
162
 
 
163
void
 
164
OneWayTunnel::init(VConnection * vcSource,
 
165
                   VConnection * vcTarget,
 
166
                   Continuation * aCont,
 
167
                   VIO * SourceVio, IOBufferReader * reader, bool aclose_source, bool aclose_target)
 
168
{
 
169
  (void) vcSource;
 
170
  mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
 
171
  cont = aCont;
 
172
  single_buffer = true;
 
173
  manipulate_fn = 0;
 
174
  n_connections = 2;
 
175
  close_source = aclose_source;
 
176
  close_target = aclose_target;
 
177
  tunnel_till_done = true;
 
178
 
 
179
  // Prior to constructing the OneWayTunnel, we initiated a do_io(VIO::READ)
 
180
  // on the source VC.  We wish to use the same MIO buffer in the tunnel.
 
181
 
 
182
  // do_io() read already posted on vcSource.
 
183
  SET_HANDLER(&OneWayTunnel::startEvent);
 
184
 
 
185
  SourceVio->set_continuation(this);
 
186
  MUTEX_LOCK(lock, mutex, this_ethread());
 
187
  vioSource = SourceVio;
 
188
 
 
189
  vioTarget = vcTarget->do_io_write(this, TUNNEL_TILL_DONE, reader, 0);
 
190
  ink_assert(vioSource && vioTarget);
 
191
}
 
192
 
 
193
void
 
194
OneWayTunnel::init(Continuation * aCont, VIO * SourceVio, VIO * TargetVio, bool aclose_source, bool aclose_target)
 
195
{
 
196
  mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
 
197
  cont = aCont;
 
198
  single_buffer = true;
 
199
  manipulate_fn = 0;
 
200
  n_connections = 2;
 
201
  close_source = aclose_source;
 
202
  close_target = aclose_target;
 
203
  tunnel_till_done = true;
 
204
 
 
205
  // do_io_read() read already posted on vcSource.
 
206
  // do_io_write() already posted on vcTarget
 
207
  SET_HANDLER(&OneWayTunnel::startEvent);
 
208
 
 
209
  ink_assert(SourceVio && TargetVio);
 
210
 
 
211
  SourceVio->set_continuation(this);
 
212
  TargetVio->set_continuation(this);
 
213
  vioSource = SourceVio;
 
214
  vioTarget = TargetVio;
 
215
}
 
216
 
 
217
 
 
218
void
 
219
OneWayTunnel::transform(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
 
220
{
 
221
  if (manipulate_fn)
 
222
    manipulate_fn(in_buf, out_buf);
 
223
  else if (in_buf.writer() != out_buf.writer())
 
224
    transfer_data(in_buf, out_buf);
 
225
}
 
226
 
 
227
//////////////////////////////////////////////////////////////////////////////
 
228
//
 
229
//      int OneWayTunnel::startEvent()
 
230
//
 
231
//////////////////////////////////////////////////////////////////////////////
 
232
 
 
233
//
 
234
// tunnel was invoked with an event
 
235
//
 
236
int
 
237
OneWayTunnel::startEvent(int event, void *data)
 
238
{
 
239
  VIO *vio = (VIO *) data;
 
240
  int ret = VC_EVENT_DONE;
 
241
  int result = 0;
 
242
 
 
243
#ifdef TEST
 
244
  const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
 
245
  printf("OneWayTunnel --- %s received from %s VC\n", event_name, event_origin);
 
246
#endif
 
247
 
 
248
  if (!vioTarget)
 
249
    goto Lerror;
 
250
 
 
251
  // handle the event
 
252
  //
 
253
  switch (event) {
 
254
 
 
255
  case ONE_WAY_TUNNEL_EVENT_PEER_CLOSE:
 
256
    /* This event is sent out by our peer */
 
257
    ink_assert(tunnel_peer);
 
258
    tunnel_peer = NULL;
 
259
    free_vcs = false;
 
260
    goto Ldone;
 
261
 
 
262
  case VC_EVENT_READ_READY:
 
263
    transform(vioSource->buffer, vioTarget->buffer);
 
264
    vioTarget->reenable();
 
265
    ret = VC_EVENT_CONT;
 
266
    break;
 
267
 
 
268
  case VC_EVENT_WRITE_READY:
 
269
    if (vioSource)
 
270
      vioSource->reenable();
 
271
    ret = VC_EVENT_CONT;
 
272
    break;
 
273
 
 
274
  case VC_EVENT_EOS:
 
275
    if (!tunnel_till_done && vio->ntodo())
 
276
      goto Lerror;
 
277
    if (vio == vioSource) {
 
278
      transform(vioSource->buffer, vioTarget->buffer);
 
279
      goto Lread_complete;
 
280
    } else
 
281
      goto Ldone;
 
282
 
 
283
  Lread_complete:
 
284
  case VC_EVENT_READ_COMPLETE:
 
285
    // set write nbytes to the current buffer size
 
286
    //
 
287
    vioTarget->nbytes = vioTarget->ndone + vioTarget->buffer.reader()->read_avail();
 
288
    if (vioTarget->nbytes == vioTarget->ndone)
 
289
      goto Ldone;
 
290
    vioTarget->reenable();
 
291
    if (!tunnel_peer)
 
292
      close_source_vio(0);
 
293
    break;
 
294
 
 
295
  Lerror:
 
296
  case VC_EVENT_ERROR:
 
297
    lerrno = ((VIO *) data)->vc_server->lerrno;
 
298
  case VC_EVENT_INACTIVITY_TIMEOUT:
 
299
  case VC_EVENT_ACTIVE_TIMEOUT:
 
300
    result = -1;
 
301
  Ldone:
 
302
  case VC_EVENT_WRITE_COMPLETE:
 
303
    if (tunnel_peer) {
 
304
      //inform the peer:
 
305
      tunnel_peer->startEvent(ONE_WAY_TUNNEL_EVENT_PEER_CLOSE, data);
 
306
    }
 
307
    close_source_vio(result);
 
308
    close_target_vio(result);
 
309
    connection_closed(result);
 
310
    break;
 
311
 
 
312
  default:
 
313
    ink_assert(!"bad case");
 
314
    ret = VC_EVENT_CONT;
 
315
    break;
 
316
  }
 
317
#ifdef TEST
 
318
  printf("    (OneWayTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
 
319
#endif
 
320
  return ret;
 
321
}
 
322
 
 
323
// If result is Non-zero, the vc should be aborted.
 
324
void
 
325
OneWayTunnel::close_source_vio(int result)
 
326
{
 
327
 
 
328
  if (vioSource) {
 
329
    if (last_connection() || !single_buffer)
 
330
      free_MIOBuffer(vioSource->buffer.mbuf);
 
331
    if (close_source && free_vcs)
 
332
      vioSource->vc_server->do_io_close(result ? lerrno : -1);
 
333
    vioSource = NULL;
 
334
    n_connections--;
 
335
  }
 
336
}
 
337
 
 
338
void
 
339
OneWayTunnel::close_target_vio(int result, VIO * vio)
 
340
{
 
341
 
 
342
  (void) vio;
 
343
  if (vioTarget) {
 
344
    if (last_connection() || !single_buffer)
 
345
      free_MIOBuffer(vioTarget->buffer.mbuf);
 
346
    if (close_target && free_vcs)
 
347
      vioTarget->vc_server->do_io_close(result ? lerrno : -1);
 
348
    vioTarget = NULL;
 
349
    n_connections--;
 
350
  }
 
351
}
 
352
 
 
353
//////////////////////////////////////////////////////////////////////////////
 
354
//
 
355
//      void OneWayTunnel::connection_closed
 
356
//
 
357
//////////////////////////////////////////////////////////////////////////////
 
358
void
 
359
OneWayTunnel::connection_closed(int result)
 
360
{
 
361
  if (cont) {
 
362
#ifdef TEST
 
363
    cout << "OneWayTunnel::connection_closed() ... calling cont" << endl;
 
364
#endif
 
365
    cont->handleEvent(result ? VC_EVENT_ERROR : VC_EVENT_EOS, cont);
 
366
  } else {
 
367
    OneWayTunnel_free(this);
 
368
  }
 
369
}
 
370
 
 
371
void
 
372
OneWayTunnel::reenable_all()
 
373
{
 
374
  if (vioSource)
 
375
    vioSource->reenable();
 
376
  if (vioTarget)
 
377
    vioTarget->reenable();
 
378
}
 
379
 
 
380
bool
 
381
OneWayTunnel::last_connection()
 
382
{
 
383
  return n_connections == 1;
 
384
}