3
A brief file description
5
@section license License
7
Licensed to the Apache Software Foundation (ASF) under one
8
or more contributor license agreements. See the NOTICE file
9
distributed with this work for additional information
10
regarding copyright ownership. The ASF licenses this file
11
to you under the Apache License, Version 2.0 (the
12
"License"); you may not use this file except in compliance
13
with the License. You may obtain a copy of the License at
15
http://www.apache.org/licenses/LICENSE-2.0
17
Unless required by applicable law or agreed to in writing, software
18
distributed under the License is distributed on an "AS IS" BASIS,
19
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20
See the License for the specific language governing permissions and
21
limitations under the License.
24
//-------------------------------------------------------------------------
26
//-------------------------------------------------------------------------
28
#include "ink_config.h"
34
#include <sys/types.h>
36
#include "P_EventSystem.h"
43
#include "LogFormat.h"
44
#include "LogBuffer.h"
46
#include "LogObject.h"
47
#include "LogConfig.h"
50
#include "LogCollationHostSM.h"
52
//-------------------------------------------------------------------------
54
//-------------------------------------------------------------------------
57
LogCollationHostSM::ID = 0;
59
//-------------------------------------------------------------------------
60
// LogCollationHostSM::LogCollationHostSM
61
//-------------------------------------------------------------------------
63
LogCollationHostSM::LogCollationHostSM(NetVConnection * client_vc):
64
Continuation(new_ProxyMutex()),
65
m_client_vc(client_vc),
67
m_client_buffer(NULL),
68
m_client_reader(NULL),
69
m_pending_event(NULL),
70
m_read_buffer(NULL), m_read_bytes_wanted(0), m_read_bytes_received(0), m_client_ip(0), m_client_port(0), m_id(ID++)
73
Debug("log-coll", "[%d]host::constructor", m_id);
75
ink_assert(m_client_vc != NULL);
78
m_client_ip = m_client_vc->get_remote_ip();
79
m_client_port = m_client_vc->get_remote_port();
80
Note("[log-coll] client connected [%d.%d.%d.%d:%d]",
81
((unsigned char *) (&m_client_ip))[0],
82
((unsigned char *) (&m_client_ip))[1],
83
((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
85
SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
86
host_init(LOG_COLL_EVENT_SWITCH, NULL);
90
//-------------------------------------------------------------------------
91
//-------------------------------------------------------------------------
95
//-------------------------------------------------------------------------
96
//-------------------------------------------------------------------------
98
//-------------------------------------------------------------------------
99
// LogCollationHostSM::host_handler
100
//-------------------------------------------------------------------------
103
LogCollationHostSM::host_handler(int event, void *data)
106
switch (m_host_state) {
107
case LOG_COLL_HOST_AUTH:
108
return host_auth(event, data);
109
case LOG_COLL_HOST_DONE:
110
return host_done(event, data);
111
case LOG_COLL_HOST_INIT:
112
return host_init(event, data);
113
case LOG_COLL_HOST_RECV:
114
return host_recv(event, data);
116
ink_assert(!"unexpected state");
122
//-------------------------------------------------------------------------
123
// LogCollationHostSM::read_handler
124
//-------------------------------------------------------------------------
127
LogCollationHostSM::read_handler(int event, void *data)
130
switch (m_read_state) {
131
case LOG_COLL_READ_BODY:
132
return read_body(event, (VIO *) data);
133
case LOG_COLL_READ_HDR:
134
return read_hdr(event, (VIO *) data);
136
ink_assert(!"unexpected state");
142
//-------------------------------------------------------------------------
143
//-------------------------------------------------------------------------
147
//-------------------------------------------------------------------------
148
//-------------------------------------------------------------------------
150
//-------------------------------------------------------------------------
151
// LogCollationHostSM::host_auth
152
// next: host_done || host_recv
153
//-------------------------------------------------------------------------
156
LogCollationHostSM::host_auth(int event, void *data)
159
Debug("log-coll", "[%d]host::host_auth", m_id);
163
case LOG_COLL_EVENT_SWITCH:
164
Debug("log-coll", "[%d]host::host_auth - SWITCH", m_id);
165
m_host_state = LOG_COLL_HOST_AUTH;
168
case LOG_COLL_EVENT_READ_COMPLETE:
169
Debug("log-coll", "[%d]host::host_auth - READ_COMPLETE", m_id);
171
// compare authorization secrets
172
ink_assert(m_read_buffer != NULL);
173
int diff = strncmp(m_read_buffer, Log::config->collation_secret,
174
m_read_bytes_received);
175
delete[]m_read_buffer;
178
Debug("log-coll", "[%d]host::host_auth - authenticated!", m_id);
179
return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
181
Debug("log-coll", "[%d]host::host_auth - authenticated failed!", m_id);
182
Note("[log-coll] authentication failed [%d.%d.%d.%d:%d]",
183
((unsigned char *) (&m_client_ip))[0],
184
((unsigned char *) (&m_client_ip))[1],
185
((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
186
return host_done(LOG_COLL_EVENT_SWITCH, NULL);
191
case LOG_COLL_EVENT_ERROR:
192
Debug("log-coll", "[%d]host::host_auth - ERROR", m_id);
193
return host_done(LOG_COLL_EVENT_SWITCH, NULL);
196
ink_assert(!"unexpected state");
203
//-------------------------------------------------------------------------
204
// LogCollationHostSM::host_done
206
//-------------------------------------------------------------------------
209
LogCollationHostSM::host_done(int event, void *data)
211
NOWARN_UNUSED(event);
213
Debug("log-coll", "[%d]host::host_done", m_id);
217
Debug("log-coll", "[%d]host::host_done - disconnecting!", m_id);
218
m_client_vc->do_io_close();
220
Note("[log-coll] client disconnected [%d.%d.%d.%d:%d]",
221
((unsigned char *) (&m_client_ip))[0],
222
((unsigned char *) (&m_client_ip))[1],
223
((unsigned char *) (&m_client_ip))[2], ((unsigned char *) (&m_client_ip))[3], m_client_port);
226
if (m_client_buffer) {
227
if (m_client_reader) {
228
m_client_buffer->dealloc_reader(m_client_reader);
230
free_MIOBuffer(m_client_buffer);
232
// delete this state machine and return
238
//-------------------------------------------------------------------------
239
// LogCollationHostSM::host_init
240
// next: host_auth || host_done
241
//-------------------------------------------------------------------------
244
LogCollationHostSM::host_init(int event, void *data)
247
Debug("log-coll", "[%d]host::host_init", m_id);
251
case LOG_COLL_EVENT_SWITCH:
252
m_host_state = LOG_COLL_HOST_INIT;
253
m_pending_event = eventProcessor.schedule_imm(this);
256
case EVENT_IMMEDIATE:
258
m_client_buffer = new_MIOBuffer();
259
ink_assert(m_client_buffer != NULL);
260
m_client_reader = m_client_buffer->alloc_reader();
261
ink_assert(m_client_reader != NULL);
262
return host_auth(LOG_COLL_EVENT_SWITCH, NULL);
265
ink_assert(!"unexpected state");
272
//-------------------------------------------------------------------------
273
// LogCollationHostSM::host_recv
274
// next: host_done || host_recv
275
//-------------------------------------------------------------------------
278
LogCollationHostSM::host_recv(int event, void *data)
281
Debug("log-coll", "[%d]host::host_recv", m_id);
285
case LOG_COLL_EVENT_SWITCH:
286
Debug("log-coll", "[%d]host::host_recv - SWITCH", m_id);
287
m_host_state = LOG_COLL_HOST_RECV;
290
case LOG_COLL_EVENT_READ_COMPLETE:
291
Debug("log-coll", "[%d]host::host_recv - READ_COMPLETE", m_id);
293
// grab the log_buffer
294
LogBufferHeader *log_buffer_header;
295
LogBuffer *log_buffer;
296
LogFormat *log_format;
297
LogObject *log_object;
300
ink_assert(m_read_buffer != NULL);
301
ink_assert(m_read_bytes_received >= (int64_t)sizeof(LogBufferHeader));
302
log_buffer_header = (LogBufferHeader *) m_read_buffer;
304
// convert the buffer we just received to host order
305
LogBuffer::convert_to_host_order(log_buffer_header);
307
version = log_buffer_header->version;
308
if (version != LOG_SEGMENT_VERSION) {
309
Note("[log-coll] invalid LogBuffer received; invalid version - "
310
"buffer = %u, current = %u", version, LOG_SEGMENT_VERSION);
311
delete[]m_read_buffer;
314
log_object = Log::match_logobject(log_buffer_header);
316
Note("[log-coll] LogObject not found with fieldlist id; " "writing LogBuffer to scrap file");
317
log_object = Log::global_scrap_object;
319
log_format = log_object->m_format;
320
Debug("log-coll", "[%d]host::host_recv - using format '%s'", m_id, log_format->name());
322
// make a new LogBuffer (log_buffer_header plus subsequent
323
// buffer already converted to host order) and add it to the
324
// object's flush queue
326
log_buffer = NEW(new LogBuffer(log_object, log_buffer_header));
327
log_object->add_to_flush_queue(log_buffer);
328
ink_mutex_acquire(&Log::flush_mutex);
329
Log::flush_counter++;
330
ink_cond_signal(&Log::flush_cond);
331
ink_mutex_release(&Log::flush_mutex);
334
#if defined(LOG_BUFFER_TRACKING)
335
Debug("log-buftrak", "[%d]host::host_recv - network read complete", log_buffer_header->id);
336
#endif // defined(LOG_BUFFER_TRACKING)
338
// get ready for next read (memory may not be freed!!!)
341
return host_recv(LOG_COLL_EVENT_SWITCH, NULL);
345
case LOG_COLL_EVENT_ERROR:
346
Debug("log-coll", "[%d]host::host_recv - ERROR", m_id);
347
return host_done(LOG_COLL_EVENT_SWITCH, NULL);
350
ink_assert(!"unexpected state");
357
//-------------------------------------------------------------------------
358
//-------------------------------------------------------------------------
362
//-------------------------------------------------------------------------
363
//-------------------------------------------------------------------------
365
//-------------------------------------------------------------------------
366
// LogCollationHostSM::read_start
368
//-------------------------------------------------------------------------
371
LogCollationHostSM::read_start()
374
Debug("log-coll", "[%d]host::read_start", m_id);
376
SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::read_handler);
378
ink_assert(!"m_read_buffer still points to something, doh!");
380
return read_hdr(LOG_COLL_EVENT_SWITCH, NULL);
384
//-------------------------------------------------------------------------
385
// LogCollationHostSM::read_hdr
386
// next: read_body || read_done
387
//-------------------------------------------------------------------------
390
LogCollationHostSM::read_hdr(int event, VIO * vio)
393
Debug("log-coll", "[%d]host::read_hdr", m_id);
397
case LOG_COLL_EVENT_SWITCH:
398
Debug("log-coll", "[%d]host:read_hdr - SWITCH", m_id);
399
m_read_state = LOG_COLL_READ_HDR;
401
m_read_bytes_wanted = sizeof(NetMsgHeader);
402
m_read_bytes_received = 0;
403
m_read_buffer = (char *) &m_net_msg_header;
404
ink_assert(m_client_vc != NULL);
405
Debug("log-coll", "[%d]host:read_hdr - do_io_read(%d)", m_id, m_read_bytes_wanted);
406
m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
407
ink_assert(m_client_vio != NULL);
410
case VC_EVENT_IMMEDIATE:
411
Debug("log-coll", "[%d]host::read_hdr - IMMEDIATE", m_id);
414
case VC_EVENT_READ_READY:
415
Debug("log-coll", "[%d]host::read_hdr - READ_READY", m_id);
419
case VC_EVENT_READ_COMPLETE:
420
Debug("log-coll", "[%d]host::read_hdr - READ_COMPLETE", m_id);
422
ink_assert(m_read_bytes_wanted == m_read_bytes_received);
423
return read_body(LOG_COLL_EVENT_SWITCH, NULL);
427
Debug("log-coll", "[%d]host::read_hdr - EOS|ERROR", m_id);
428
return read_done(LOG_COLL_EVENT_ERROR, NULL);
431
Debug("log-coll", "[%d]host::read_hdr - default %d", m_id, event);
432
return read_done(LOG_COLL_EVENT_ERROR, NULL);
438
//-------------------------------------------------------------------------
439
// LogCollationHostSM::read_body
440
// next: read_body || read_done
441
//-------------------------------------------------------------------------
444
LogCollationHostSM::read_body(int event, VIO * vio)
447
Debug("log-coll", "[%d]host::read_body", m_id);
451
case LOG_COLL_EVENT_SWITCH:
452
Debug("log-coll", "[%d]host:read_body - SWITCH", m_id);
453
m_read_state = LOG_COLL_READ_BODY;
455
m_read_bytes_wanted = ntohl(m_net_msg_header.htonl_size);
456
ink_assert(m_read_bytes_wanted > 0);
457
m_read_bytes_received = 0;
458
m_read_buffer = new char[m_read_bytes_wanted];
459
ink_assert(m_read_buffer != NULL);
460
ink_assert(m_client_vc != NULL);
461
Debug("log-coll", "[%d]host:read_body - do_io_read(%d)", m_id, m_read_bytes_wanted);
462
m_client_vio = m_client_vc->do_io_read(this, m_read_bytes_wanted, m_client_buffer);
463
ink_assert(m_client_vio != NULL);
466
case VC_EVENT_IMMEDIATE:
467
Debug("log-coll", "[%d]host::read_body - IMMEDIATE", m_id);
470
case VC_EVENT_READ_READY:
471
Debug("log-coll", "[%d]host::read_body - READ_READY", m_id);
475
case VC_EVENT_READ_COMPLETE:
476
Debug("log-coll", "[%d]host::read_body - READ_COMPLETE", m_id);
478
ink_assert(m_read_bytes_wanted == m_read_bytes_received);
479
return read_done(LOG_COLL_EVENT_READ_COMPLETE, NULL);
483
Debug("log-coll", "[%d]host::read_body - EOS|ERROR", m_id);
484
return read_done(LOG_COLL_EVENT_ERROR, NULL);
487
Debug("log-coll", "[%d]host::read_body - default %d", m_id, event);
488
return read_done(LOG_COLL_EVENT_ERROR, NULL);
494
//-------------------------------------------------------------------------
495
// LogCollationHostSM::read_done
496
// next: give control back to host state-machine
497
//-------------------------------------------------------------------------
500
LogCollationHostSM::read_done(int event, void *data)
503
SET_HANDLER((LogCollationHostSMHandler) & LogCollationHostSM::host_handler);
504
return host_handler(event, NULL);
508
//-------------------------------------------------------------------------
509
// LogCollationHostSM::read_partial
510
//-------------------------------------------------------------------------
513
LogCollationHostSM::read_partial(VIO * vio)
517
ink_assert(vio != NULL);
518
ink_assert(vio->vc_server == m_client_vc);
519
ink_assert(m_client_buffer != NULL);
520
ink_assert(m_client_reader != NULL);
522
// careful not to read more than we have memory for
523
char *p = &(m_read_buffer[m_read_bytes_received]);
524
int64_t bytes_wanted_now = m_read_bytes_wanted - m_read_bytes_received;
525
int64_t bytes_received_now = m_client_reader->read(p, bytes_wanted_now);
527
m_read_bytes_received += bytes_received_now;
530
LOG_SUM_DYN_STAT(log_stat_bytes_received_from_network_stat, bytes_received_now);