87
87
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
90
def test_add_and_remove_chain_custom_binary_name(self):
90
def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
91
expected_calls.insert(2, (
92
mock.call(['ip6tables-save', '-c'],
93
root_helper=self.root_helper),
95
expected_calls.insert(3, (
96
mock.call(['ip6tables-restore', '-c'],
97
process_input=filter_dump,
98
root_helper=self.root_helper),
100
expected_calls.extend([
101
(mock.call(['ip6tables-save', '-c'],
102
root_helper=self.root_helper),
104
(mock.call(['ip6tables-restore', '-c'],
105
process_input=filter_dump,
106
root_helper=self.root_helper),
109
def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
91
110
bn = ("abcdef" * 5)
93
self.iptables = (iptables_manager.
94
IptablesManager(root_helper=self.root_helper,
112
self.iptables = iptables_manager.IptablesManager(
113
root_helper=self.root_helper,
96
116
self.execute = mock.patch.object(self.iptables, "execute").start()
98
118
iptables_args = {'bn': bn[:16]}
115
135
'# Completed by iptables_manager\n' % iptables_args)
137
filter_dump_ipv6 = ('# Generated by iptables_manager\n'
139
':neutron-filter-top - [0:0]\n'
140
':%(bn)s-FORWARD - [0:0]\n'
141
':%(bn)s-INPUT - [0:0]\n'
142
':%(bn)s-local - [0:0]\n'
143
':%(bn)s-OUTPUT - [0:0]\n'
144
'[0:0] -A FORWARD -j neutron-filter-top\n'
145
'[0:0] -A OUTPUT -j neutron-filter-top\n'
146
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
147
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
148
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
149
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
151
'# Completed by iptables_manager\n' %
117
154
filter_dump_mod = ('# Generated by iptables_manager\n'
119
156
':neutron-filter-top - [0:0]\n'
268
320
tools.verify_mock_calls(self.execute, expected_calls_and_values)
322
def test_empty_chain_custom_binary_name(self):
323
self._test_empty_chain_custom_binary_name_helper(False)
325
def test_empty_chain_custom_binary_name_with_ipv6(self):
326
self._test_empty_chain_custom_binary_name_helper(True)
328
def _test_add_and_remove_chain_helper(self, use_ipv6):
329
self.iptables = iptables_manager.IptablesManager(
330
root_helper=self.root_helper,
332
self.execute = mock.patch.object(self.iptables, "execute").start()
334
filter_dump_mod = ('# Generated by iptables_manager\n'
336
':neutron-filter-top - [0:0]\n'
337
':%(bn)s-FORWARD - [0:0]\n'
338
':%(bn)s-INPUT - [0:0]\n'
339
':%(bn)s-local - [0:0]\n'
340
':%(bn)s-filter - [0:0]\n'
341
':%(bn)s-OUTPUT - [0:0]\n'
342
'[0:0] -A FORWARD -j neutron-filter-top\n'
343
'[0:0] -A OUTPUT -j neutron-filter-top\n'
344
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
345
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
346
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
347
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
349
'# Completed by iptables_manager\n'
352
expected_calls_and_values = [
353
(mock.call(['iptables-save', '-c'],
354
root_helper=self.root_helper),
356
(mock.call(['iptables-restore', '-c'],
357
process_input=NAT_DUMP + filter_dump_mod,
358
root_helper=self.root_helper),
360
(mock.call(['iptables-save', '-c'],
361
root_helper=self.root_helper),
363
(mock.call(['iptables-restore', '-c'],
364
process_input=NAT_DUMP + FILTER_DUMP,
365
root_helper=self.root_helper),
369
self._extend_with_ip6tables_filter(expected_calls_and_values,
372
tools.setup_mock_calls(self.execute, expected_calls_and_values)
374
self.iptables.ipv4['filter'].add_chain('filter')
375
self.iptables.apply()
377
self.iptables.ipv4['filter'].remove_chain('filter')
378
self.iptables.apply()
380
tools.verify_mock_calls(self.execute, expected_calls_and_values)
270
382
def test_add_and_remove_chain(self):
271
filter_dump_mod = ('# Generated by iptables_manager\n'
273
':neutron-filter-top - [0:0]\n'
274
':%(bn)s-FORWARD - [0:0]\n'
275
':%(bn)s-INPUT - [0:0]\n'
276
':%(bn)s-local - [0:0]\n'
277
':%(bn)s-filter - [0:0]\n'
278
':%(bn)s-OUTPUT - [0:0]\n'
279
'[0:0] -A FORWARD -j neutron-filter-top\n'
280
'[0:0] -A OUTPUT -j neutron-filter-top\n'
281
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
282
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
283
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
284
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
286
'# Completed by iptables_manager\n'
289
expected_calls_and_values = [
290
(mock.call(['iptables-save', '-c'],
291
root_helper=self.root_helper),
293
(mock.call(['iptables-restore', '-c'],
294
process_input=NAT_DUMP + filter_dump_mod,
295
root_helper=self.root_helper),
297
(mock.call(['iptables-save', '-c'],
298
root_helper=self.root_helper),
300
(mock.call(['iptables-restore', '-c'],
301
process_input=NAT_DUMP + FILTER_DUMP,
302
root_helper=self.root_helper),
305
tools.setup_mock_calls(self.execute, expected_calls_and_values)
307
self.iptables.ipv4['filter'].add_chain('filter')
308
self.iptables.apply()
310
self.iptables.ipv4['filter'].remove_chain('filter')
311
self.iptables.apply()
313
tools.verify_mock_calls(self.execute, expected_calls_and_values)
315
def test_add_filter_rule(self):
383
self._test_add_and_remove_chain_helper(False)
385
def test_add_and_remove_chain_with_ipv6(self):
386
self._test_add_and_remove_chain_helper(True)
388
def _test_add_filter_rule_helper(self, use_ipv6):
389
self.iptables = iptables_manager.IptablesManager(
390
root_helper=self.root_helper,
392
self.execute = mock.patch.object(self.iptables, "execute").start()
316
394
filter_dump_mod = ('# Generated by iptables_manager\n'
318
396
':neutron-filter-top - [0:0]\n'
563
688
root_helper=self.root_helper),
565
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
567
root_helper=self.root_helper),
692
expected_calls_and_values.append(
693
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
695
root_helper=self.root_helper),
570
700
tools.setup_mock_calls(self.execute, expected_calls_and_values)
572
702
acc = self.iptables.get_traffic_counters('OUTPUT')
573
self.assertEqual(acc['pkts'], 1600)
574
self.assertEqual(acc['bytes'], 263604)
703
self.assertEqual(acc['pkts'], exp_packets)
704
self.assertEqual(acc['bytes'], exp_bytes)
576
706
tools.verify_mock_calls(self.execute, expected_calls_and_values)
578
def test_get_traffic_counters_with_zero(self):
708
def test_get_traffic_counters(self):
709
self._test_get_traffic_counters_helper(False)
711
def test_get_traffic_counters_with_ipv6(self):
712
self._test_get_traffic_counters_helper(True)
714
def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
715
self.iptables = iptables_manager.IptablesManager(
716
root_helper=self.root_helper,
718
self.execute = mock.patch.object(self.iptables, "execute").start()
579
722
iptables_dump = (
580
723
'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
581
724
' pkts bytes target prot opt in out source'
593
736
(mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
594
737
'-v', '-x', '-Z'],
595
738
root_helper=self.root_helper),
597
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
598
'-n', '-v', '-x', '-Z'],
599
root_helper=self.root_helper),
742
expected_calls_and_values.append(
743
(mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
744
'-n', '-v', '-x', '-Z'],
745
root_helper=self.root_helper),
602
750
tools.setup_mock_calls(self.execute, expected_calls_and_values)
604
752
acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
605
self.assertEqual(acc['pkts'], 1600)
606
self.assertEqual(acc['bytes'], 263604)
753
self.assertEqual(acc['pkts'], exp_packets)
754
self.assertEqual(acc['bytes'], exp_bytes)
608
756
tools.verify_mock_calls(self.execute, expected_calls_and_values)
758
def test_get_traffic_counters_with_zero(self):
759
self._test_get_traffic_counters_with_zero_helper(False)
761
def test_get_traffic_counters_with_zero_with_ipv6(self):
762
self._test_get_traffic_counters_with_zero_helper(True)
610
764
def _test_find_last_entry(self, find_str):
611
765
filter_list = [':neutron-filter-top - [0:0]',
612
766
':%(bn)s-FORWARD - [0:0]',