2
A monitor that collects data on network activity, and sends messages
3
with the inbound/outbound traffic per interface per step interval.
8
from landscape.lib.network import get_network_traffic
9
from landscape.accumulate import Accumulator
11
from landscape.monitor.plugin import MonitorPlugin
14
class NetworkActivity(MonitorPlugin):
16
Collect data regarding a machine's network activity.
19
message_type = "network-activity"
20
persist_name = message_type
23
def __init__(self, network_activity_file="/proc/net/dev",
24
create_time=time.time):
25
self._source_file = network_activity_file
26
# accumulated values for sending out via message
27
self._network_activity = {}
28
# our last traffic sample for calculating a traffic delta
29
self._last_activity = {}
30
self._create_time = create_time
32
def register(self, registry):
33
super(NetworkActivity, self).register(registry)
34
self._accumulate = Accumulator(self._persist, self.registry.step_size)
35
self.call_on_accepted("network-activity", self.exchange, True)
37
def create_message(self):
38
network_activity = self._network_activity
39
if not network_activity:
41
self._network_activity = {}
42
return {"type": "network-activity", "activities": network_activity}
44
def send_message(self, urgent):
45
message = self.create_message()
48
self.registry.broker.send_message(message, urgent=urgent)
50
def exchange(self, urgent=False):
51
self.registry.broker.call_if_accepted("network-activity",
52
self.send_message, urgent)
54
def _traffic_delta(self, new_traffic):
56
Given network activity metrics across all interfaces, calculate
57
and return the delta data transferred for inbound and outbound
58
traffic. Returns a tuple of interface name, outbound delta,
61
for interface in new_traffic:
62
traffic = new_traffic[interface]
63
if interface in self._last_activity:
64
previous_out, previous_in = self._last_activity[interface]
65
delta_out = traffic["send_bytes"] - previous_out
66
delta_in = traffic["recv_bytes"] - previous_in
67
if not delta_out and not delta_in:
69
yield interface, delta_out, delta_in
70
self._last_activity[interface] = (
71
traffic["send_bytes"], traffic["recv_bytes"])
75
Sample network traffic statistics and store them into the
76
accumulator, recording step data.
78
new_timestamp = int(self._create_time())
79
new_traffic = get_network_traffic(self._source_file)
80
for interface, delta_out, delta_in in self._traffic_delta(new_traffic):
81
out_step_data = self._accumulate(
82
new_timestamp, delta_out, "delta-out-%s"%interface)
84
in_step_data = self._accumulate(
85
new_timestamp, delta_in, "delta-in-%s"%interface)
87
# there's only data when we cross a step boundary
88
if not (in_step_data and out_step_data):
91
steps = self._network_activity.setdefault(interface, [])
93
(in_step_data[0], int(in_step_data[1]), int(out_step_data[1])))