~awuerl/blitzortung-tracker/0.9

« back to all changes in this revision

Viewing changes to source/lib/DataWorker.cc

  • Committer: Andreas Wuerl
  • Date: 2012-03-08 22:02:31 UTC
  • Revision ID: andi@debian-20120308220231-8dc444kpv5b4s72p
backported most of the 1.0 features

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
 
2
 
#include "DataThread.h"
 
2
#include "DataWorker.h"
3
3
#include "exception/Base.h"
4
4
 
5
5
namespace blitzortung {
6
6
 
7
 
  DataThread::DataThread(Queue<data::Event>& sampleQueue, EventCountBuffer& eventCountBuffer, network::transfer::Base& transfer, output::Base& output) :
 
7
  DataWorker::DataWorker(Queue<data::Event>& sampleQueue, EventCountBuffer& eventCountBuffer, network::transfer::Base& transfer, output::Base& output) :
8
8
    sampleQueue_(sampleQueue),
9
9
    eventCountBuffer_(eventCountBuffer),
10
10
    transfer_(transfer),
11
11
    events_(new data::Events()),
12
12
    output_(output),
13
 
    logger_("DataThread")
 
13
    logger_("DataWorker")
14
14
  {
15
15
    eventRateLimit_ = 1.0;
16
16
 
18
18
      logger_.debugStream() << "initalized(eventRateLimit: " << eventRateLimit_ << ")";
19
19
  }
20
20
 
21
 
  DataThread::~DataThread() {
 
21
  DataWorker::~DataWorker() {
22
22
    if (logger_.isDebugEnabled())
23
23
      logger_.debugStream() << "deleted";
24
24
  }
25
25
 
26
 
  void DataThread::setEventRateLimit(const double eventRateLimit) {
 
26
  void DataWorker::setEventRateLimit(const double eventRateLimit) {
27
27
    eventRateLimit_ = eventRateLimit;
28
28
  }
29
29
 
30
 
  data::Events::AP DataThread::prepareData() {
 
30
  data::Events::AP DataWorker::prepareData(int eventsPerSecond) {
31
31
    data::Events::AP deletedEvents(new data::Events());
32
32
 
33
33
    double secondsElapsed = eventCountBuffer_.getActualSize();
34
 
    double eventRate = double(eventCountBuffer_.getSum()) / secondsElapsed;
 
34
    double eventRate = double(eventCountBuffer_.getSum() + eventsPerSecond) / (secondsElapsed + 1);
35
35
 
36
36
    if (logger_.isInfoEnabled())
37
37
      logger_.infoStream() << "prepareData() " << events_->size() << " events (rate " << eventRate << " events/second)";
43
43
 
44
44
      deletedEvents->transfer(deletedEvents->end(), events_->begin(), events_->end(), *events_);
45
45
 
46
 
        logger_.warnStream() << "prepareData() limit " << eventRateLimit_ << " reached, interval seconds: " << secondsElapsed << ", erasing to " << sampleLimit << " elements (new # of elements: " << events_->size() << ")" << " deleted # " << deletedEvents->size();
 
46
      logger_.warnStream() << "prepareData() limit " << eventRateLimit_ << " reached, interval seconds: " << secondsElapsed << ", erasing to " << sampleLimit << " elements (new # of elements: " << events_->size() << ")" << " deleted # " << deletedEvents->size();
47
47
 
48
48
      // time sort events
49
49
      events_->sort();
52
52
    return deletedEvents;
53
53
  }
54
54
 
55
 
  void DataThread::operator()() {
 
55
  void DataWorker::operator()() {
56
56
 
57
57
    if (logger_.isInfoEnabled())
58
58
      logger_.infoStream() << "() started";
59
59
 
 
60
    pt::ptime second(getSecond());
 
61
    int eventsPerSecond = 0;
60
62
    while (true) {
61
63
      boost::xtime xt;
62
64
      boost::xtime_get(&xt, boost::TIME_UTC);
86
88
          events_->add(sampleQueue_.pop());
87
89
        }
88
90
 
 
91
        pt::ptime currentSecond(getSecond());
 
92
        if (currentSecond > second) {
 
93
          // record number of events for the actual second
 
94
          if (logger_.isDebugEnabled())
 
95
            logger_.debugStream() << second << " : " << eventsPerSecond << " events per second";
 
96
          eventCountBuffer_.add(eventsPerSecond);
 
97
          eventsPerSecond = 0;
 
98
          second = currentSecond;
 
99
        }
 
100
 
 
101
 
89
102
        if (events_->size() > 0) {
90
103
 
91
104
          if (logger_.isDebugEnabled())
92
105
            logger_.debug("() transmitting/saving data");
93
106
 
94
107
          // prepare data for transmission
95
 
          data::Events::AP deletedEvents = prepareData();
 
108
          data::Events::AP deletedEvents = prepareData(eventsPerSecond);
 
109
 
 
110
          eventsPerSecond += events_->size();
96
111
 
97
112
          // transmit data
98
113
          transfer_.send(*events_);
119
134
          output_.output(*events_);
120
135
        }
121
136
 
122
 
        // record number of events for the actual second
123
 
        eventCountBuffer_.add(events_->size());
124
 
 
125
137
        // delete all events
126
138
        events_->clear();
127
139
 
133
145
      logger_.infoStream() << "() terminated";
134
146
  }
135
147
 
 
148
  pt::ptime DataWorker::getSecond() const {
 
149
    return pt::second_clock::universal_time();
 
150
  }
 
151
 
136
152
}