163
163
'not_used': [1, 2, 3]}
164
164
expected_networks = ('network_id1',)
166
'oslo.utils.timeutils.utcnow_ts', return_value=0):
166
'oslo_utils.timeutils.utcnow_ts', return_value=0):
167
167
mock_list_ports = self.qclient.return_value.list_ports
168
168
mock_list_ports.return_value = ports
169
169
networks = self.handler._get_router_networks(router_id)
510
510
2, self.qclient.return_value.list_ports.call_count)
513
class TestUnixDomainHttpProtocol(base.BaseTestCase):
514
def test_init_empty_client(self):
515
u = agent.UnixDomainHttpProtocol(mock.Mock(), '', mock.Mock())
516
self.assertEqual(u.client_address, ('<local>', 0))
518
def test_init_with_client(self):
519
u = agent.UnixDomainHttpProtocol(mock.Mock(), 'foo', mock.Mock())
520
self.assertEqual(u.client_address, 'foo')
523
class TestUnixDomainWSGIServer(base.BaseTestCase):
525
super(TestUnixDomainWSGIServer, self).setUp()
526
self.eventlet_p = mock.patch.object(agent, 'eventlet')
527
self.eventlet = self.eventlet_p.start()
528
self.server = agent.UnixDomainWSGIServer('test')
530
def test_start(self):
531
mock_app = mock.Mock()
532
with mock.patch.object(self.server, '_launch') as launcher:
533
self.server.start(mock_app, '/the/path', workers=5, backlog=128)
534
self.eventlet.assert_has_calls([
537
family=socket.AF_UNIX,
541
launcher.assert_called_once_with(mock_app, workers=5)
544
with mock.patch.object(agent, 'logging') as logging:
545
self.server._run('app', 'sock')
547
self.eventlet.wsgi.server.assert_called_once_with(
550
protocol=agent.UnixDomainHttpProtocol,
552
custom_pool=self.server.pool
554
self.assertTrue(len(logging.mock_calls))
557
513
class TestUnixDomainMetadataProxy(base.BaseTestCase):
559
515
super(TestUnixDomainMetadataProxy, self).setUp()
566
522
self.cfg.CONF.metadata_workers = 0
567
523
self.cfg.CONF.metadata_backlog = 128
569
def test_init_doesnot_exists(self):
570
with mock.patch('os.path.isdir') as isdir:
571
with mock.patch('os.makedirs') as makedirs:
572
isdir.return_value = False
573
agent.UnixDomainMetadataProxy(mock.Mock())
575
isdir.assert_called_once_with('/the')
576
makedirs.assert_called_once_with('/the', 0o755)
525
@mock.patch.object(agent_utils, 'ensure_dir')
526
def test_init_doesnot_exists(self, ensure_dir):
527
agent.UnixDomainMetadataProxy(mock.Mock())
528
ensure_dir.assert_called_once_with('/the')
578
530
def test_init_exists(self):
579
531
with mock.patch('os.path.isdir') as isdir:
580
532
with mock.patch('os.unlink') as unlink:
581
533
isdir.return_value = True
582
534
agent.UnixDomainMetadataProxy(mock.Mock())
584
isdir.assert_called_once_with('/the')
585
535
unlink.assert_called_once_with('/the/path')
587
537
def test_init_exists_unlink_no_file(self):
593
543
unlink.side_effect = OSError
595
545
agent.UnixDomainMetadataProxy(mock.Mock())
597
isdir.assert_called_once_with('/the')
598
546
unlink.assert_called_once_with('/the/path')
599
exists.assert_called_once_with('/the/path')
601
548
def test_init_exists_unlink_fails_file_still_exists(self):
602
549
with mock.patch('os.path.isdir') as isdir:
609
556
with testtools.ExpectedException(OSError):
610
557
agent.UnixDomainMetadataProxy(mock.Mock())
612
isdir.assert_called_once_with('/the')
613
558
unlink.assert_called_once_with('/the/path')
614
exists.assert_called_once_with('/the/path')
617
with mock.patch.object(agent, 'MetadataProxyHandler') as handler:
618
with mock.patch.object(agent, 'UnixDomainWSGIServer') as server:
619
with mock.patch('os.path.isdir') as isdir:
620
with mock.patch('os.makedirs') as makedirs:
621
isdir.return_value = False
623
p = agent.UnixDomainMetadataProxy(self.cfg.CONF)
626
isdir.assert_called_once_with('/the')
627
makedirs.assert_called_once_with('/the', 0o755)
628
server.assert_has_calls([
629
mock.call('neutron-metadata-agent'),
630
mock.call().start(handler.return_value,
631
'/the/path', workers=0,
560
@mock.patch.object(agent, 'MetadataProxyHandler')
561
@mock.patch.object(agent_utils, 'UnixDomainWSGIServer')
562
@mock.patch.object(agent_utils, 'ensure_dir')
563
def test_run(self, ensure_dir, server, handler):
564
p = agent.UnixDomainMetadataProxy(self.cfg.CONF)
567
ensure_dir.assert_called_once_with('/the')
568
server.assert_has_calls([
569
mock.call('neutron-metadata-agent'),
570
mock.call().start(handler.return_value,
571
'/the/path', workers=0,
636
576
def test_main(self):
637
577
with mock.patch.object(agent, 'UnixDomainMetadataProxy') as proxy: