~ps10gel/ubuntu/xenial/trafficserver/6.2.0

« back to all changes in this revision

Viewing changes to plugins/experimental/esi/plugin.cc

  • Committer: Package Import Robot
  • Author(s): Aron Xu
  • Date: 2013-05-09 01:00:04 UTC
  • mto: (1.1.11) (5.3.3 experimental)
  • mto: This revision was merged to the branch mainline in revision 15.
  • Revision ID: package-import@ubuntu.com-20130509010004-9fqq9n0adseg3f8w
Tags: upstream-3.3.2
ImportĀ upstreamĀ versionĀ 3.3.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
38
38
#include "ts/experimental.h"
39
39
#include <ts/remap.h>
40
40
 
 
41
#include "lib/Utils.h"
 
42
#include "lib/gzip.h"
 
43
#include "EsiGzip.h"
41
44
#include "EsiProcessor.h"
42
45
#include "HttpDataFetcher.h"
43
 
#include "Utils.h"
44
46
#include "HandlerManager.h"
45
47
#include "serverIntercept.h"
46
48
#include "Stats.h"
47
 
#include "gzip.h"
48
49
#include "HttpDataFetcherImpl.h"
49
50
#include "FailureInfo.h"
50
51
using std::string;
57
58
  bool packed_node_support;
58
59
  bool private_response;
59
60
  bool disable_gzip_output;
 
61
  bool first_byte_flush;
60
62
};
61
63
 
62
64
static HandlerManager *gHandlerManager = NULL;
63
65
 
64
66
#define DEBUG_TAG "plugin_esi"
65
67
#define PROCESSOR_DEBUG_TAG "plugin_esi_processor"
 
68
#define GZIP_DEBUG_TAG "plugin_esi_gzip"
66
69
#define PARSER_DEBUG_TAG "plugin_esi_parser"
67
70
#define FETCHER_DEBUG_TAG "plugin_esi_fetcher"
68
71
#define VARS_DEBUG_TAG "plugin_esi_vars"
91
94
  STATE curr_state;
92
95
  TSVIO input_vio;
93
96
  TSIOBufferReader input_reader;
 
97
  TSVIO output_vio;
94
98
  TSIOBuffer output_buffer;
95
99
  TSIOBufferReader output_reader;
96
100
  Variables *esi_vars;
97
101
  HttpDataFetcherImpl *data_fetcher;
98
102
  EsiProcessor *esi_proc;
 
103
  EsiGzip *esi_gzip;
99
104
  TSCont contp;
100
105
  TSHttpTxn txnp;
101
106
  const struct OptionInfo *option_info;
116
121
  list<string> post_headers;
117
122
 
118
123
  ContData(TSCont contptr, TSHttpTxn tx)
119
 
    : curr_state(READING_ESI_DOC), input_vio(NULL),
 
124
    : curr_state(READING_ESI_DOC), input_vio(NULL), output_vio(NULL),
120
125
      output_buffer(NULL), output_reader(NULL),
121
 
      esi_vars(NULL), data_fetcher(NULL), esi_proc(NULL),
 
126
      esi_vars(NULL), data_fetcher(NULL), esi_proc(NULL), esi_gzip(NULL), 
122
127
      contp(contptr), txnp(tx), request_url(NULL),
123
128
      input_type(DATA_TYPE_RAW_ESI), packed_node_list(""),
124
129
      gzipped_data(""), gzip_output(false),
212
217
    }
213
218
    input_reader = TSVIOReaderGet(input_vio);
214
219
 
 
220
    // get downstream VIO
 
221
    TSVConn output_conn;
 
222
    output_conn = TSTransformOutputVConnGet(contp);
 
223
    if(!output_conn) {
 
224
      TSError("[%s] Error while getting transform VC", __FUNCTION__);
 
225
      goto lReturn;
 
226
    }
215
227
    output_buffer = TSIOBufferCreate();
216
228
    output_reader = TSIOBufferReaderAlloc(output_buffer);
217
229
 
218
 
    string fetcher_tag, vars_tag, expr_tag, proc_tag;
 
230
    // we don't know how much data we are going to write, so INT_MAX
 
231
    output_vio = TSVConnWrite(output_conn, contp, output_reader, INT64_MAX);
 
232
 
 
233
    string fetcher_tag, vars_tag, expr_tag, proc_tag, gzip_tag;
219
234
    if (!data_fetcher) {
220
235
      data_fetcher = new HttpDataFetcherImpl(contp, client_addr,
221
236
                                             createDebugTag(FETCHER_DEBUG_TAG, contp, fetcher_tag));
228
243
                                createDebugTag(PARSER_DEBUG_TAG, contp, fetcher_tag),
229
244
                                createDebugTag(EXPR_DEBUG_TAG, contp, expr_tag),
230
245
                                &TSDebug, &TSError, *data_fetcher, *esi_vars, *gHandlerManager);
 
246
    
 
247
    esi_gzip = new EsiGzip(createDebugTag(GZIP_DEBUG_TAG, contp, gzip_tag), &TSDebug, &TSError);
 
248
 
231
249
    TSDebug(debug_tag, "[%s] Set input data type to [%s]", __FUNCTION__,
232
250
             DATA_TYPE_NAMES_[input_type]);
233
251
 
471
489
  if (esi_proc) {
472
490
    delete esi_proc;
473
491
  }
 
492
  if(esi_gzip) {
 
493
    delete esi_gzip;
 
494
  }
474
495
}
475
496
 
476
497
static int removeCacheHandler(TSCont contp, TSEvent event, void *edata) {
610
631
      if (!cont_data->data_fetcher->isFetchComplete()) {
611
632
        TSDebug(cont_data->debug_tag,
612
633
                 "[%s] input_vio NULL, but data needs to be fetched. Returning control", __FUNCTION__);
613
 
        return 1;
 
634
        if(!cont_data->option_info->first_byte_flush) {
 
635
          return 1;
 
636
        }
614
637
      } else {
615
638
        TSDebug(cont_data->debug_tag,
616
639
                 "[%s] input_vio NULL, but processing needs to (and can) be completed", __FUNCTION__);
714
737
    }
715
738
  }
716
739
 
717
 
  if (cont_data->curr_state == ContData::FETCHING_DATA) { // retest as state may have changed in previous block
 
740
  if ((cont_data->curr_state == ContData::FETCHING_DATA) &&
 
741
      (!cont_data->option_info->first_byte_flush)) { // retest as state may have changed in previous block
718
742
    if (cont_data->data_fetcher->isFetchComplete()) {
719
743
      TSDebug(cont_data->debug_tag, "[%s] data ready; going to process doc", __FUNCTION__);
720
744
      const char *out_data;
780
804
    }
781
805
  }
782
806
 
 
807
  if ((cont_data->curr_state == ContData::FETCHING_DATA) &&
 
808
      (cont_data->option_info->first_byte_flush)) { // retest as state may have changed in previous block
 
809
    TSDebug(cont_data->debug_tag, "[%s] trying to process doc", __FUNCTION__);
 
810
    string out_data;
 
811
    string cdata;
 
812
    int overall_len;
 
813
    EsiProcessor::ReturnCode retval = cont_data->esi_proc->flush(out_data, overall_len);
 
814
 
 
815
    if (cont_data->data_fetcher->isFetchComplete()) {
 
816
      TSDebug(cont_data->debug_tag, "[%s] data ready; last process() will have finished the entire processing", __FUNCTION__);
 
817
      cont_data->curr_state = ContData::PROCESSING_COMPLETE;
 
818
    }
 
819
 
 
820
    if (retval == EsiProcessor::SUCCESS) {
 
821
      TSDebug(cont_data->debug_tag,
 
822
                 "[%s] ESI processor output document of size %d starting with [%.10s]",
 
823
                 __FUNCTION__, (int) out_data.size(), (out_data.size() ? out_data.data() : "(null)"));
 
824
    } else {
 
825
      TSError("[%s] ESI processor failed to process document; will return empty document", __FUNCTION__);
 
826
      out_data.assign("");
 
827
        
 
828
      if(!cont_data->xform_closed) {
 
829
        TSVIONBytesSet(cont_data->output_vio, 0);
 
830
        TSVIOReenable(cont_data->output_vio);
 
831
      }
 
832
    }
 
833
 
 
834
    // make sure transformation has not been prematurely terminated
 
835
    if (!cont_data->xform_closed && out_data.size() > 0) {
 
836
      if (cont_data->gzip_output) {
 
837
        if (!cont_data->esi_gzip->stream_encode(out_data, cdata)) {
 
838
          TSError("[%s] Error while gzipping content", __FUNCTION__);
 
839
        } else {
 
840
          TSDebug(cont_data->debug_tag, "[%s] Compressed document from size %d to %d bytes",
 
841
                     __FUNCTION__, (int) out_data.size(), (int) cdata.size());
 
842
        }
 
843
      }
 
844
 
 
845
      if (cont_data->gzip_output) {
 
846
        if (TSIOBufferWrite(TSVIOBufferGet(cont_data->output_vio), cdata.data(), cdata.size()) == TS_ERROR) {
 
847
          TSError("[%s] Error while writing bytes to downstream VC", __FUNCTION__);
 
848
          return 0;
 
849
        }
 
850
      } else {
 
851
        if (TSIOBufferWrite(TSVIOBufferGet(cont_data->output_vio), out_data.data(), out_data.size()) == TS_ERROR) {
 
852
          TSError("[%s] Error while writing bytes to downstream VC", __FUNCTION__);
 
853
          return 0;
 
854
        }
 
855
      }
 
856
    }
 
857
    if(!cont_data->xform_closed) {     
 
858
      // should not set any fixed length
 
859
      if(cont_data->curr_state == ContData::PROCESSING_COMPLETE) {
 
860
        if(cont_data->gzip_output) {
 
861
          string cdata;
 
862
          int downstream_length;
 
863
          if(!cont_data->esi_gzip->stream_finish(cdata, downstream_length)) {
 
864
            TSError("[%s] Error while finishing gzip", __FUNCTION__);
 
865
            return 0;  
 
866
          } else {   
 
867
            if (TSIOBufferWrite(TSVIOBufferGet(cont_data->output_vio), cdata.data(), cdata.size()) == TS_ERROR) {
 
868
              TSError("[%s] Error while writing bytes to downstream VC", __FUNCTION__);
 
869
              return 0;
 
870
            }
 
871
            TSDebug(cont_data->debug_tag,
 
872
                 "[%s] ESI processed overall/gzip: %d",
 
873
                 __FUNCTION__, downstream_length );
 
874
            TSVIONBytesSet(cont_data->output_vio, downstream_length);          
 
875
          }
 
876
        } else {
 
877
          TSDebug(cont_data->debug_tag,
 
878
                 "[%s] ESI processed overall: %d",
 
879
                 __FUNCTION__, overall_len );
 
880
          TSVIONBytesSet(cont_data->output_vio, overall_len);
 
881
        }
 
882
      } 
 
883
   
 
884
      // Reenable the output connection so it can read the data we've produced.
 
885
      TSVIOReenable(cont_data->output_vio);
 
886
    }
 
887
  }
 
888
 
783
889
  return 1;
784
890
}
785
891
 
859
965
      transformData(contp);
860
966
      break;
861
967
 
 
968
    case TS_EVENT_VCONN_WRITE_READY:
 
969
      TSDebug(cont_debug_tag, "[%s] WRITE_READY", __FUNCTION__);
 
970
      if(!cont_data->option_info->first_byte_flush) {
 
971
        TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1);
 
972
      }
 
973
      break;
 
974
 
862
975
    case TS_EVENT_VCONN_WRITE_COMPLETE:
863
 
    case TS_EVENT_VCONN_WRITE_READY:     // we write only once to downstream VC
864
976
      TSDebug(cont_debug_tag, "[%s] shutting down transformation", __FUNCTION__);
865
977
      TSVConnShutdown(TSTransformOutputVConnGet(contp), 0, 1);
866
978
      break;
874
986
      if (is_fetch_event) {
875
987
        TSDebug(cont_debug_tag, "[%s] Handling fetch event %d...", __FUNCTION__, event);
876
988
        if (cont_data->data_fetcher->handleFetchEvent(event, edata)) {
877
 
          if ((cont_data->curr_state == ContData::FETCHING_DATA) &&
878
 
              cont_data->data_fetcher->isFetchComplete()) {
 
989
          if (cont_data->curr_state == ContData::FETCHING_DATA) {
879
990
            // there's a small chance that fetcher is ready even before
880
991
            // parsing is complete; hence we need to check the state too
881
 
            TSDebug(cont_debug_tag, "[%s] fetcher is ready with data, going into process stage",
 
992
            if(cont_data->option_info->first_byte_flush ||
 
993
               cont_data->data_fetcher->isFetchComplete()){
 
994
              TSDebug(cont_debug_tag, "[%s] fetcher is ready with data, going into process stage",
882
995
                     __FUNCTION__);
883
 
            transformData(contp);
 
996
              transformData(contp);
 
997
            }
884
998
          }
885
999
        } else {
886
1000
          TSError("[%s] Could not handle fetch event!", __FUNCTION__);
1513
1627
  if (argc > 1) {
1514
1628
    int c;
1515
1629
    static const struct option longopts[] = {
1516
 
      { "packed-node-support", no_argument, NULL, 'n' },
1517
 
      { "private-response", no_argument, NULL, 'p' },
1518
 
      { "disable-gzip-output", no_argument, NULL, 'z' },
1519
 
      { "handler-filename", required_argument, NULL, 'f' },
 
1630
      { const_cast<char *>("packed-node-support"), no_argument, NULL, 'n' },
 
1631
      { const_cast<char *>("private-response"), no_argument, NULL, 'p' },
 
1632
      { const_cast<char *>("disable-gzip-output"), no_argument, NULL, 'z' },
 
1633
      { const_cast<char *>("first-byte-flush"), no_argument, NULL, 'b' },
 
1634
      { const_cast<char *>("handler-filename"), required_argument, NULL, 'f' },
1520
1635
      { NULL, 0, NULL, 0 }
1521
1636
    };
1522
1637
 
1523
1638
    optarg = NULL;
1524
1639
    optind = opterr = optopt = 0;
1525
1640
    int longindex = 0;
1526
 
    while ((c = getopt_long(argc, (char * const*) argv, "npzf:", longopts, &longindex)) != -1) {
 
1641
    while ((c = getopt_long(argc, (char * const*) argv, "npzbf:", longopts, &longindex)) != -1) {
1527
1642
      switch (c) {
1528
1643
        case 'n':
1529
1644
          pOptionInfo->packed_node_support = true;
1534
1649
        case 'z':
1535
1650
          pOptionInfo->disable_gzip_output = true;
1536
1651
          break;
 
1652
        case 'b':
 
1653
          pOptionInfo->first_byte_flush = true;
 
1654
          break;
1537
1655
        case 'f':
1538
1656
          {
1539
1657
            Utils::KeyValueMap handler_conf;
1553
1671
    bKeySet = true;
1554
1672
    if ((result=pthread_key_create(&threadKey, NULL)) != 0) {
1555
1673
      TSError("[%s] Could not create key", __FUNCTION__);
 
1674
      TSDebug(DEBUG_TAG, "[%s] Could not create key", __FUNCTION__);
1556
1675
    }
1557
1676
  }
1558
1677
  else {
1562
1681
  if (result == 0) {
1563
1682
    TSDebug(DEBUG_TAG, "[%s] Plugin started%s, " \
1564
1683
        "packed-node-support: %d, private-response: %d, " \
1565
 
        "disable-gzip-output: %d", __FUNCTION__, bKeySet ? " and key is set" : "",
 
1684
        "disable-gzip-output: %d, first-byte-flush: %d ", __FUNCTION__, bKeySet ? " and key is set" : "",
1566
1685
        pOptionInfo->packed_node_support, pOptionInfo->private_response,
1567
 
        pOptionInfo->disable_gzip_output);
 
1686
        pOptionInfo->disable_gzip_output, pOptionInfo->first_byte_flush);
1568
1687
  }
1569
1688
 
1570
1689
  return result;
1600
1719
TSRemapInit(TSRemapInterface* api_info, char *errbuf, int errbuf_size)
1601
1720
{
1602
1721
  if (!api_info) {
1603
 
    TSstrlcpy(errbuf, "[TSRemapInit] - Invalid TSRemapInterface argument", errbuf_size);
 
1722
    snprintf(errbuf, errbuf_size, "[TSRemapInit] - Invalid TSRemapInterface argument");
 
1723
    TSError("[TSRemapInit] - Invalid TSRemapInterface argument");
1604
1724
    return TS_ERROR;
1605
1725
  }
1606
1726
 
1607
1727
  if (api_info->size < sizeof(TSRemapInterface)) {
1608
 
    TSstrlcpy(errbuf, "[TSRemapInit] - Incorrect size of TSRemapInterface structure", errbuf_size);
 
1728
    snprintf(errbuf, errbuf_size, "[TSRemapInit] - Incorrect size of TSRemapInterface structure");
 
1729
    TSError("[TSRemapInit] - Incorrect size of TSRemapInterface structure");
1609
1730
    return TS_ERROR;
1610
1731
  }
1611
1732
 
1612
 
  TSDebug(DEBUG_TAG, "esi remap plugin is succesfully initialized");
 
1733
  TSDebug(DEBUG_TAG, "esi remap plugin is successfully initialized");
1613
1734
  return TS_SUCCESS;
1614
1735
}
1615
1736
 
1616
1737
TSReturnCode
1617
1738
TSRemapNewInstance(int argc, char* argv[], void** ih, char* errbuf, int errbuf_size)
1618
1739
{
1619
 
  if (argc < 3) {
1620
 
    TSError("Unable to create remap instance, need configuration file");
 
1740
  if (argc < 2) {
 
1741
    snprintf(errbuf, errbuf_size, "Unable to create remap instance, " \
 
1742
        "argc: %d < 2", argc);
 
1743
    TSError("Unable to create remap instance! argc: %d < 2", argc);
1621
1744
    return TS_ERROR;
1622
1745
  }
1623
1746
 
1632
1755
 
1633
1756
  struct OptionInfo *pOptionInfo = (struct OptionInfo *)TSmalloc(sizeof(struct OptionInfo));
1634
1757
  if (pOptionInfo == NULL) {
 
1758
    snprintf(errbuf, errbuf_size, "malloc %d bytes fail", (int)sizeof(struct OptionInfo));
1635
1759
    TSError("[%s] malloc %d bytes fail", __FUNCTION__, (int)sizeof(struct OptionInfo));
1636
1760
    return TS_ERROR;
1637
1761
  }