108
def _fake_make_msg(method, **kwargs):
110
def _fake_make_msg(method, namespace, **kwargs):
109
111
call_info['rpc_method'] = method
110
112
call_info['rpc_kwargs'] = kwargs
111
113
return 'fake-message'
115
117
call_info['cast_kwargs'] = kwargs
117
119
self.stubs.Set(rpc, 'cast_to_server', _fake_cast_to_server)
118
self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
120
self.stubs.Set(self.driver.intercell_rpcapi, 'make_namespaced_msg',
120
122
self.stubs.Set(self.driver.intercell_rpcapi, 'cast_to_server',
121
123
_fake_cast_to_server)
146
def _fake_make_msg(method, **kwargs):
148
def _fake_make_msg(method, namespace, **kwargs):
147
149
call_info['rpc_method'] = method
148
150
call_info['rpc_kwargs'] = kwargs
149
151
return 'fake-message'
155
157
self.stubs.Set(rpc, 'fanout_cast_to_server',
156
158
_fake_fanout_cast_to_server)
157
self.stubs.Set(self.driver.intercell_rpcapi, 'make_msg',
159
self.stubs.Set(self.driver.intercell_rpcapi, 'make_namespaced_msg',
159
161
self.stubs.Set(self.driver.intercell_rpcapi,
160
162
'fanout_cast_to_server', _fake_fanout_cast_to_server)
229
231
url = "rabbit://u:p@h:10/virtual?ssl=1"
230
232
self.assertRaises(ValueError, rpc_driver.parse_transport_url, url)
234
def test_query_string_old_urlparse(self):
235
# Test parse_transport_url with urlparse.urlparse behaving as in python
236
# 2.7.3 or below. See https://bugs.launchpad.net/nova/+bug/1202149
237
url = "rabbit://u:p@h:10/virtual?ssl=1"
239
parse_result = urlparse.ParseResult(
240
scheme='rabbit', netloc='u:p@h:10', path='/virtual?ssl=1',
241
params='', query='', fragment=''
244
self.mox.StubOutWithMock(urlparse, 'urlparse')
245
urlparse.urlparse(url).AndReturn(parse_result)
248
self.assertRaises(ValueError, rpc_driver.parse_transport_url, url)
250
def test_query_string_new_urlparse(self):
251
# Test parse_transport_url with urlparse.urlparse behaving as in python
252
# 2.7.4 or above. See https://bugs.launchpad.net/nova/+bug/1202149
253
url = "rabbit://u:p@h:10/virtual?ssl=1"
255
parse_result = urlparse.ParseResult(
256
scheme='rabbit', netloc='u:p@h:10', path='/virtual',
257
params='', query='ssl=1', fragment=''
260
self.mox.StubOutWithMock(urlparse, 'urlparse')
261
urlparse.urlparse(url).AndReturn(parse_result)
264
self.assertRaises(ValueError, rpc_driver.parse_transport_url, url)
232
266
def test_empty(self):