~corey.bryant/ubuntu/trusty/neutron/lp1318721

« back to all changes in this revision

Viewing changes to neutron/tests/unit/test_iptables_manager.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Corey Bryant
  • Date: 2014-10-06 09:15:06 UTC
  • mfrom: (28.1.4 trusty-proposed)
  • Revision ID: package-import@ubuntu.com-20141006091506-cesvev43moce4y74
Tags: 1:2014.1.3-0ubuntu1
[ Corey Bryant ]
* Resynchronize with stable/icehouse (4a0210e) (LP: #1377136):
  - [3a30d19] Deletes floating ip related connection states
  - [dd4b77f] Forbid regular users to reset admin-only attrs to default values
  - [dc2c893] Add delete operations for the ODL MechanismDriver
  - [b51e2c7] Add missing ml2 plugin to migration 1fcfc149aca4
  - [a17a500] Don't convert numeric protocol values to int
  - [3a85946] NSX: Optionally not enforce nat rule match length check
  - [645f984] Don't spawn metadata-proxy for non-isolated nets
  - [b464d89] Big Switch: Check for 'id' in port before lookup
  - [3116ffa] use TRUE in SQL for boolean var
  - [3520e66] call security_groups_member_updated in port_update
  - [50e1534] Don't allow user to set firewall rule with port and no protocol
  - [0061533] BSN: Add context to backend request for debugging
  - [6de6d61] Improve ODL ML2 Exception Handling
  - [2a4153d] Send network name and uuid to subnet create
  - [b5e3c9a] BSN: Allow concurrent reads to consistency DB
  - [b201432] Big Switch: Retry on 503 errors from backend
  - [f6c47ee] NSX: log request body to NSX as debug
  - [97d622a] Fix metadata agent's auth info caching
  - [255df45] NSX: Correct allowed_address_pair return value on create_port
  - [5bea041] Neutron should not use the neutronclient utils module for import_class
  - [d5314e2] Cisco N1kv plugin to send subtype on network profile creation
  - [f32d1ce] Pass object to policy when finding fields to strip
  - [8b5f6be] Call policy.init() once per API request
  - [9a6d811] Perform policy checks only once on list responses
  - [c48db90] Datacenter moid should not be tuple
  - [161d465] Allow unsharing a network used as gateway/floatingip
  - [9574a2f] Add support for router scheduling in Cisco N1kv Plugin
  - [6f54565] Fix func job hook script permission problems
  - [ea43103] Add hook scripts for the functional infra job
  - [8161cb7] Fixes Hyper-V agent issue on Hyper-V 2008 R2
  - [8e99cfd] Fixes Hyper-V issue due to ML2 RPC versioning
  - [69f9121] Ensure ip6tables are used only if ipv6 is enabled in kernel
  - [399b809] Remove explicit dependency on amqplib
  - [a872143] Clear entries in Cisco N1KV specific tables on rollback
  - [ad82fad] Verify ML2 type driver exists before calling del
  - [af2cc98] Big Switch: Only update hash header on success
  - [b1e5eec] Ignore variable column widths in ovsdb functional tests
  - [4a0210e] VMWare: don't notify on disassociate_floatingips()

Show diffs side-by-side

added added

removed removed

Lines of Context:
69
69
    def setUp(self):
70
70
        super(IptablesManagerStateFulTestCase, self).setUp()
71
71
        self.root_helper = 'sudo'
72
 
        self.iptables = (iptables_manager.
73
 
                         IptablesManager(root_helper=self.root_helper))
 
72
        self.iptables = iptables_manager.IptablesManager(
 
73
            root_helper=self.root_helper)
74
74
        self.execute = mock.patch.object(self.iptables, "execute").start()
75
75
 
76
76
    def test_binary_name(self):
87
87
        self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),
88
88
                         name[:11])
89
89
 
90
 
    def test_add_and_remove_chain_custom_binary_name(self):
 
90
    def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):
 
91
        expected_calls.insert(2, (
 
92
            mock.call(['ip6tables-save', '-c'],
 
93
                      root_helper=self.root_helper),
 
94
            ''))
 
95
        expected_calls.insert(3, (
 
96
            mock.call(['ip6tables-restore', '-c'],
 
97
                      process_input=filter_dump,
 
98
                      root_helper=self.root_helper),
 
99
            None))
 
100
        expected_calls.extend([
 
101
            (mock.call(['ip6tables-save', '-c'],
 
102
                       root_helper=self.root_helper),
 
103
             ''),
 
104
            (mock.call(['ip6tables-restore', '-c'],
 
105
                       process_input=filter_dump,
 
106
                       root_helper=self.root_helper),
 
107
             None)])
 
108
 
 
109
    def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):
91
110
        bn = ("abcdef" * 5)
92
111
 
93
 
        self.iptables = (iptables_manager.
94
 
                         IptablesManager(root_helper=self.root_helper,
95
 
                                         binary_name=bn))
 
112
        self.iptables = iptables_manager.IptablesManager(
 
113
            root_helper=self.root_helper,
 
114
            binary_name=bn,
 
115
            use_ipv6=use_ipv6)
96
116
        self.execute = mock.patch.object(self.iptables, "execute").start()
97
117
 
98
118
        iptables_args = {'bn': bn[:16]}
114
134
                       'COMMIT\n'
115
135
                       '# Completed by iptables_manager\n' % iptables_args)
116
136
 
 
137
        filter_dump_ipv6 = ('# Generated by iptables_manager\n'
 
138
                            '*filter\n'
 
139
                            ':neutron-filter-top - [0:0]\n'
 
140
                            ':%(bn)s-FORWARD - [0:0]\n'
 
141
                            ':%(bn)s-INPUT - [0:0]\n'
 
142
                            ':%(bn)s-local - [0:0]\n'
 
143
                            ':%(bn)s-OUTPUT - [0:0]\n'
 
144
                            '[0:0] -A FORWARD -j neutron-filter-top\n'
 
145
                            '[0:0] -A OUTPUT -j neutron-filter-top\n'
 
146
                            '[0:0] -A neutron-filter-top -j %(bn)s-local\n'
 
147
                            '[0:0] -A INPUT -j %(bn)s-INPUT\n'
 
148
                            '[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
 
149
                            '[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
 
150
                            'COMMIT\n'
 
151
                            '# Completed by iptables_manager\n' %
 
152
                            iptables_args)
 
153
 
117
154
        filter_dump_mod = ('# Generated by iptables_manager\n'
118
155
                           '*filter\n'
119
156
                           ':neutron-filter-top - [0:0]\n'
166
203
                       root_helper=self.root_helper),
167
204
             None),
168
205
        ]
 
206
        if use_ipv6:
 
207
            self._extend_with_ip6tables_filter(expected_calls_and_values,
 
208
                                               filter_dump_ipv6)
 
209
 
169
210
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
170
211
 
171
212
        self.iptables.ipv4['filter'].add_chain('filter')
176
217
 
177
218
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
178
219
 
179
 
    def test_empty_chain_custom_binary_name(self):
 
220
    def test_add_and_remove_chain_custom_binary_name(self):
 
221
        self._test_add_and_remove_chain_custom_binary_name_helper(False)
 
222
 
 
223
    def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):
 
224
        self._test_add_and_remove_chain_custom_binary_name_helper(True)
 
225
 
 
226
    def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):
180
227
        bn = ("abcdef" * 5)[:16]
181
228
 
182
 
        self.iptables = (iptables_manager.
183
 
                         IptablesManager(root_helper=self.root_helper,
184
 
                                         binary_name=bn))
 
229
        self.iptables = iptables_manager.IptablesManager(
 
230
            root_helper=self.root_helper,
 
231
            binary_name=bn,
 
232
            use_ipv6=use_ipv6)
185
233
        self.execute = mock.patch.object(self.iptables, "execute").start()
186
234
 
187
235
        iptables_args = {'bn': bn}
255
303
                       root_helper=self.root_helper),
256
304
             None),
257
305
        ]
 
306
        if use_ipv6:
 
307
            self._extend_with_ip6tables_filter(expected_calls_and_values,
 
308
                                               filter_dump)
 
309
 
258
310
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
259
311
 
260
312
        self.iptables.ipv4['filter'].add_chain('filter')
267
319
 
268
320
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
269
321
 
 
322
    def test_empty_chain_custom_binary_name(self):
 
323
        self._test_empty_chain_custom_binary_name_helper(False)
 
324
 
 
325
    def test_empty_chain_custom_binary_name_with_ipv6(self):
 
326
        self._test_empty_chain_custom_binary_name_helper(True)
 
327
 
 
328
    def _test_add_and_remove_chain_helper(self, use_ipv6):
 
329
        self.iptables = iptables_manager.IptablesManager(
 
330
            root_helper=self.root_helper,
 
331
            use_ipv6=use_ipv6)
 
332
        self.execute = mock.patch.object(self.iptables, "execute").start()
 
333
 
 
334
        filter_dump_mod = ('# Generated by iptables_manager\n'
 
335
                           '*filter\n'
 
336
                           ':neutron-filter-top - [0:0]\n'
 
337
                           ':%(bn)s-FORWARD - [0:0]\n'
 
338
                           ':%(bn)s-INPUT - [0:0]\n'
 
339
                           ':%(bn)s-local - [0:0]\n'
 
340
                           ':%(bn)s-filter - [0:0]\n'
 
341
                           ':%(bn)s-OUTPUT - [0:0]\n'
 
342
                           '[0:0] -A FORWARD -j neutron-filter-top\n'
 
343
                           '[0:0] -A OUTPUT -j neutron-filter-top\n'
 
344
                           '[0:0] -A neutron-filter-top -j %(bn)s-local\n'
 
345
                           '[0:0] -A INPUT -j %(bn)s-INPUT\n'
 
346
                           '[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
 
347
                           '[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
 
348
                           'COMMIT\n'
 
349
                           '# Completed by iptables_manager\n'
 
350
                           % IPTABLES_ARG)
 
351
 
 
352
        expected_calls_and_values = [
 
353
            (mock.call(['iptables-save', '-c'],
 
354
                       root_helper=self.root_helper),
 
355
             ''),
 
356
            (mock.call(['iptables-restore', '-c'],
 
357
                       process_input=NAT_DUMP + filter_dump_mod,
 
358
                       root_helper=self.root_helper),
 
359
             None),
 
360
            (mock.call(['iptables-save', '-c'],
 
361
                       root_helper=self.root_helper),
 
362
             ''),
 
363
            (mock.call(['iptables-restore', '-c'],
 
364
                       process_input=NAT_DUMP + FILTER_DUMP,
 
365
                       root_helper=self.root_helper),
 
366
             None),
 
367
        ]
 
368
        if use_ipv6:
 
369
            self._extend_with_ip6tables_filter(expected_calls_and_values,
 
370
                                               FILTER_DUMP)
 
371
 
 
372
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
 
373
 
 
374
        self.iptables.ipv4['filter'].add_chain('filter')
 
375
        self.iptables.apply()
 
376
 
 
377
        self.iptables.ipv4['filter'].remove_chain('filter')
 
378
        self.iptables.apply()
 
379
 
 
380
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
 
381
 
270
382
    def test_add_and_remove_chain(self):
271
 
        filter_dump_mod = ('# Generated by iptables_manager\n'
272
 
                           '*filter\n'
273
 
                           ':neutron-filter-top - [0:0]\n'
274
 
                           ':%(bn)s-FORWARD - [0:0]\n'
275
 
                           ':%(bn)s-INPUT - [0:0]\n'
276
 
                           ':%(bn)s-local - [0:0]\n'
277
 
                           ':%(bn)s-filter - [0:0]\n'
278
 
                           ':%(bn)s-OUTPUT - [0:0]\n'
279
 
                           '[0:0] -A FORWARD -j neutron-filter-top\n'
280
 
                           '[0:0] -A OUTPUT -j neutron-filter-top\n'
281
 
                           '[0:0] -A neutron-filter-top -j %(bn)s-local\n'
282
 
                           '[0:0] -A INPUT -j %(bn)s-INPUT\n'
283
 
                           '[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
284
 
                           '[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
285
 
                           'COMMIT\n'
286
 
                           '# Completed by iptables_manager\n'
287
 
                           % IPTABLES_ARG)
288
 
 
289
 
        expected_calls_and_values = [
290
 
            (mock.call(['iptables-save', '-c'],
291
 
                       root_helper=self.root_helper),
292
 
             ''),
293
 
            (mock.call(['iptables-restore', '-c'],
294
 
                       process_input=NAT_DUMP + filter_dump_mod,
295
 
                       root_helper=self.root_helper),
296
 
             None),
297
 
            (mock.call(['iptables-save', '-c'],
298
 
                       root_helper=self.root_helper),
299
 
             ''),
300
 
            (mock.call(['iptables-restore', '-c'],
301
 
                       process_input=NAT_DUMP + FILTER_DUMP,
302
 
                       root_helper=self.root_helper),
303
 
             None),
304
 
        ]
305
 
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
306
 
 
307
 
        self.iptables.ipv4['filter'].add_chain('filter')
308
 
        self.iptables.apply()
309
 
 
310
 
        self.iptables.ipv4['filter'].remove_chain('filter')
311
 
        self.iptables.apply()
312
 
 
313
 
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
314
 
 
315
 
    def test_add_filter_rule(self):
 
383
        self._test_add_and_remove_chain_helper(False)
 
384
 
 
385
    def test_add_and_remove_chain_with_ipv6(self):
 
386
        self._test_add_and_remove_chain_helper(True)
 
387
 
 
388
    def _test_add_filter_rule_helper(self, use_ipv6):
 
389
        self.iptables = iptables_manager.IptablesManager(
 
390
            root_helper=self.root_helper,
 
391
            use_ipv6=use_ipv6)
 
392
        self.execute = mock.patch.object(self.iptables, "execute").start()
 
393
 
316
394
        filter_dump_mod = ('# Generated by iptables_manager\n'
317
395
                           '*filter\n'
318
396
                           ':neutron-filter-top - [0:0]\n'
351
429
                       ),
352
430
             None),
353
431
        ]
 
432
        if use_ipv6:
 
433
            self._extend_with_ip6tables_filter(expected_calls_and_values,
 
434
                                               FILTER_DUMP)
 
435
 
354
436
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
355
437
 
356
438
        self.iptables.ipv4['filter'].add_chain('filter')
371
453
 
372
454
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
373
455
 
374
 
    def test_rule_with_wrap_target(self):
 
456
    def test_add_filter_rule(self):
 
457
        self._test_add_filter_rule_helper(False)
 
458
 
 
459
    def test_add_filter_rule_with_ipv6(self):
 
460
        self._test_add_filter_rule_helper(True)
 
461
 
 
462
    def _test_rule_with_wrap_target_helper(self, use_ipv6):
 
463
        self.iptables = iptables_manager.IptablesManager(
 
464
            root_helper=self.root_helper,
 
465
            use_ipv6=use_ipv6)
 
466
        self.execute = mock.patch.object(self.iptables, "execute").start()
 
467
 
375
468
        name = '0123456789' * 5
376
469
        wrap = "%s-%s" % (iptables_manager.binary_name,
377
470
                          iptables_manager.get_chain_name(name))
415
508
                       root_helper=self.root_helper),
416
509
             None),
417
510
        ]
 
511
        if use_ipv6:
 
512
            self._extend_with_ip6tables_filter(expected_calls_and_values,
 
513
                                               FILTER_DUMP)
 
514
 
418
515
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
419
516
 
420
517
        self.iptables.ipv4['filter'].add_chain(name)
432
529
 
433
530
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
434
531
 
435
 
    def test_add_nat_rule(self):
 
532
    def test_rule_with_wrap_target(self):
 
533
        self._test_rule_with_wrap_target_helper(False)
 
534
 
 
535
    def test_rule_with_wrap_target_with_ipv6(self):
 
536
        self._test_rule_with_wrap_target_helper(True)
 
537
 
 
538
    def _test_add_nat_rule_helper(self, use_ipv6):
 
539
        self.iptables = iptables_manager.IptablesManager(
 
540
            root_helper=self.root_helper,
 
541
            use_ipv6=use_ipv6)
 
542
        self.execute = mock.patch.object(self.iptables, "execute").start()
 
543
 
436
544
        nat_dump = ('# Generated by iptables_manager\n'
437
545
                    '*nat\n'
438
546
                    ':neutron-postrouting-bottom - [0:0]\n'
490
598
                       root_helper=self.root_helper),
491
599
             None),
492
600
        ]
 
601
        if use_ipv6:
 
602
            self._extend_with_ip6tables_filter(expected_calls_and_values,
 
603
                                               FILTER_DUMP)
 
604
 
493
605
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
494
606
 
495
607
        self.iptables.ipv4['nat'].add_chain('nat')
514
626
 
515
627
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
516
628
 
 
629
    def test_add_nat_rule(self):
 
630
        self._test_add_nat_rule_helper(False)
 
631
 
 
632
    def test_add_nat_rule_with_ipv6(self):
 
633
        self._test_add_nat_rule_helper(True)
 
634
 
517
635
    def test_add_rule_to_a_nonexistent_chain(self):
518
636
        self.assertRaises(LookupError, self.iptables.ipv4['filter'].add_rule,
519
637
                          'nonexistent', '-j DROP')
543
661
            'Attempted to get traffic counters of chain %s which '
544
662
            'does not exist', 'chain1')
545
663
 
546
 
    def test_get_traffic_counters(self):
 
664
    def _test_get_traffic_counters_helper(self, use_ipv6):
 
665
        self.iptables = iptables_manager.IptablesManager(
 
666
            root_helper=self.root_helper,
 
667
            use_ipv6=use_ipv6)
 
668
        self.execute = mock.patch.object(self.iptables, "execute").start()
 
669
        exp_packets = 800
 
670
        exp_bytes = 131802
 
671
 
547
672
        iptables_dump = (
548
673
            'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
549
674
            '    pkts      bytes target     prot opt in     out     source'
562
687
                        '-v', '-x'],
563
688
                       root_helper=self.root_helper),
564
689
             ''),
565
 
            (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
566
 
                        '-n', '-v', '-x'],
567
 
                       root_helper=self.root_helper),
568
 
             iptables_dump),
569
690
        ]
 
691
        if use_ipv6:
 
692
            expected_calls_and_values.append(
 
693
                (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
 
694
                           '-n', '-v', '-x'],
 
695
                           root_helper=self.root_helper),
 
696
                 iptables_dump))
 
697
            exp_packets *= 2
 
698
            exp_bytes *= 2
 
699
 
570
700
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
571
701
 
572
702
        acc = self.iptables.get_traffic_counters('OUTPUT')
573
 
        self.assertEqual(acc['pkts'], 1600)
574
 
        self.assertEqual(acc['bytes'], 263604)
 
703
        self.assertEqual(acc['pkts'], exp_packets)
 
704
        self.assertEqual(acc['bytes'], exp_bytes)
575
705
 
576
706
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
577
707
 
578
 
    def test_get_traffic_counters_with_zero(self):
 
708
    def test_get_traffic_counters(self):
 
709
        self._test_get_traffic_counters_helper(False)
 
710
 
 
711
    def test_get_traffic_counters_with_ipv6(self):
 
712
        self._test_get_traffic_counters_helper(True)
 
713
 
 
714
    def _test_get_traffic_counters_with_zero_helper(self, use_ipv6):
 
715
        self.iptables = iptables_manager.IptablesManager(
 
716
            root_helper=self.root_helper,
 
717
            use_ipv6=use_ipv6)
 
718
        self.execute = mock.patch.object(self.iptables, "execute").start()
 
719
        exp_packets = 800
 
720
        exp_bytes = 131802
 
721
 
579
722
        iptables_dump = (
580
723
            'Chain OUTPUT (policy ACCEPT 400 packets, 65901 bytes)\n'
581
724
            '    pkts      bytes target     prot opt in     out     source'
593
736
            (mock.call(['iptables', '-t', 'nat', '-L', 'OUTPUT', '-n',
594
737
                        '-v', '-x', '-Z'],
595
738
                       root_helper=self.root_helper),
596
 
             ''),
597
 
            (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
598
 
                        '-n', '-v', '-x', '-Z'],
599
 
                       root_helper=self.root_helper),
600
 
             iptables_dump),
 
739
             '')
601
740
        ]
 
741
        if use_ipv6:
 
742
            expected_calls_and_values.append(
 
743
                (mock.call(['ip6tables', '-t', 'filter', '-L', 'OUTPUT',
 
744
                            '-n', '-v', '-x', '-Z'],
 
745
                           root_helper=self.root_helper),
 
746
                 iptables_dump))
 
747
            exp_packets *= 2
 
748
            exp_bytes *= 2
 
749
 
602
750
        tools.setup_mock_calls(self.execute, expected_calls_and_values)
603
751
 
604
752
        acc = self.iptables.get_traffic_counters('OUTPUT', zero=True)
605
 
        self.assertEqual(acc['pkts'], 1600)
606
 
        self.assertEqual(acc['bytes'], 263604)
 
753
        self.assertEqual(acc['pkts'], exp_packets)
 
754
        self.assertEqual(acc['bytes'], exp_bytes)
607
755
 
608
756
        tools.verify_mock_calls(self.execute, expected_calls_and_values)
609
757
 
 
758
    def test_get_traffic_counters_with_zero(self):
 
759
        self._test_get_traffic_counters_with_zero_helper(False)
 
760
 
 
761
    def test_get_traffic_counters_with_zero_with_ipv6(self):
 
762
        self._test_get_traffic_counters_with_zero_helper(True)
 
763
 
610
764
    def _test_find_last_entry(self, find_str):
611
765
        filter_list = [':neutron-filter-top - [0:0]',
612
766
                       ':%(bn)s-FORWARD - [0:0]',