3
import landscape.ui.model.registration.mechanism as mechanism
6
class RegistrationProxy(object):
8
L{RegistrationProxy} allows the use of the L{RegistrationMechanism} via
9
DBus without having to know about DBus. This in turn allows controller
10
code to remain agnostic to the implementation of registration.
13
def __init__(self, on_register_notify=None, on_register_error=None,
14
on_register_succeed=None, on_register_fail=None,
15
on_disable_succeed=None, on_disable_fail=None, bus=None):
17
self._interface = None
18
self._on_register_notify = on_register_notify
19
self._on_register_error = on_register_error
20
self._on_register_succeed = on_register_succeed
21
self._on_register_fail = on_register_fail
22
self._on_disable_succeed = on_disable_succeed
23
self._on_disable_fail = on_disable_fail
24
self._setup_interface(bus)
26
def _setup_interface(self, bus):
28
Redefining L{_setup_interface} allows us to bypass DBus for more
29
convenient testing in some instances.
32
self._bus = dbus.SystemBus()
35
self._remote_object = self._bus.get_object(mechanism.SERVICE_NAME,
36
mechanism.OBJECT_PATH)
37
self._interface = dbus.Interface(self._remote_object,
38
mechanism.INTERFACE_NAME)
40
def _exit_handler_wrapper(self, exit_handler):
42
def wrapped_exit_handler(message):
43
self._remove_handlers()
46
return wrapped_exit_handler
48
def _register_handlers(self):
50
if self._on_register_notify:
51
self._handlers.append(
52
self._bus.add_signal_receiver(
53
self._on_register_notify,
54
signal_name="register_notify",
55
dbus_interface=mechanism.INTERFACE_NAME,
57
path=mechanism.OBJECT_PATH))
58
if self._on_register_error:
59
self._handlers.append(
60
self._bus.add_signal_receiver(
61
self._on_register_error,
62
signal_name="register_error",
63
dbus_interface=mechanism.INTERFACE_NAME,
65
path=mechanism.OBJECT_PATH))
66
if self._on_register_succeed:
67
self._handlers.append(
68
self._bus.add_signal_receiver(
69
self._exit_handler_wrapper(self._on_register_succeed),
70
signal_name="register_succeed",
71
dbus_interface=mechanism.INTERFACE_NAME,
73
path=mechanism.OBJECT_PATH))
74
if self._on_register_fail:
75
self._handlers.append(
76
self._bus.add_signal_receiver(
77
self._exit_handler_wrapper(self._on_register_fail),
78
signal_name="register_fail",
79
dbus_interface=mechanism.INTERFACE_NAME,
81
path=mechanism.OBJECT_PATH))
82
if self._on_disable_succeed:
83
self._handlers.append(
84
self._bus.add_signal_receiver(
85
self._exit_handler_wrapper(self._on_disable_succeed),
86
signal_name="disable_succeed",
87
dbus_interface=mechanism.INTERFACE_NAME,
89
path=mechanism.OBJECT_PATH))
90
if self._on_disable_fail:
91
self._handlers.append(
92
self._bus.add_signal_receiver(
93
self._exit_handler_wrapper(self._on_disable_fail),
94
signal_name="disable_fail",
95
dbus_interface=mechanism.INTERFACE_NAME,
97
path=mechanism.OBJECT_PATH))
99
def _remove_handlers(self):
100
for handler in self._handlers:
101
self._bus.remove_signal_receiver(handler)
104
return self._interface.challenge()
106
def register(self, config_path):
107
self._register_handlers()
109
result, message = self._interface.register(config_path)
110
except dbus.DBusException, e:
111
if e.get_dbus_name() != "org.freedesktop.DBus.Error.NoReply":
115
message = "Registration timed out."
117
self._on_register_succeed()
119
self._on_register_error(message)
123
self._register_handlers()
124
result = self._interface.disable()
126
self._on_disable_succeed()
128
self._on_disable_fail()
133
Cause the mechanism to exit.
136
self._interface.exit()
137
except dbus.DBusException, e:
138
if e.get_dbus_name() != "org.freedesktop.DBus.Error.NoReply":