~free.ekanayaka/landscape-client/config-failure

« back to all changes in this revision

Viewing changes to landscape/monitor/usermonitor.py

  • Committer: Free Ekanayaka
  • Date: 2010-04-28 13:38:56 UTC
  • mfrom: (182.1.30 amp-trunk)
  • Revision ID: free.ekanayaka@canonical.com-20100428133856-wo697ypl3e96cd6i
Merge amp-trunk [trivial]

Migrate our inter-process communication system from D-Bus to a Twisted AMP-based
protocol.

The change has been splitted into serveral branches that have been reviewed
separately in the LP bugs 570763, 570192, 568983, 568499, 567152, 564587,
564620, 562330, 561471, 559312, 558455, 556603, 552414, 546825, 544070, 539658,
537538, 535227, 499176, 531480, 499121, 514250, 499225 and 499018.

For sake of convenience here follows a summary of the changes introduced by
each branch:

  - Cleanup for having the base classes for monitor and manager plugins
    in landscape.monitor.plugin and landscape.manager.plugin.
  
  - Install the dbus bpickle extensions when starting a LandscapeService, letting
    plugins like the HAL/hardware one send dbus data types over AMP.
  
  - Make the monitor and manager test cases inherit from LandscapeTest instead of
    LandscapeIsolatedTest, as they don't need DBus anymore.
  
  - Make the landscape-config script use the AMP-based protocol to communicate
    with the broker. 

 -  Change the MethodCall protocol to split arguments in chunks of
    64k and transparently send them over several AMP commands.

 -  Make all task handlers derived from PackageTaskHandlers use the AMP-based
    protocol instead of D-Bus to communicate with the broker.
  
  - Port the behavior introduced in Bug #542215 to the AMP-based
    BrokerServer, which now broadcasts package-data-changed events.
  
  - Make the BrokerServiceHelper provide a 'live' RemoteBroker instead
    of a FakeRemoteBroker. The former BrokerServiceHelper has been renamed
    to FakeBrokerServiceHelper.
  
  - Migrate the watchdog to AMP, replacing the existing DBus-based
    communication mechanism with the AMP-based one.

  - Change is in the shutdown logic in the watchdog to be fully asynchronous,
    while before was relying on DBus being synchronous.
  
  - Add a 'factor' parameter to the RemoteComponentConnector.connect
    mehtod, for setting the pace at which service will try to reconnect.
  
  - Make services pass "wantPID" to the reactor.listenUNIX method, which
    cleans up left-over unix sockets on the filesystem (e.g. the former
    process died).
  
  - Change the landscape-{broker,monitor,manager} scripts to use the AMP-based versions
    of the associated services.
  
  - Move the BrokerConfiguration from landscape.broker.deployment to landscape.broker.config,
    for consistency with the monitor and the manager equivalent classes.

  - Replace the DBus-based MonitorHelper and ManagerHelper with their
    AMP-based equivalent.
  
  - Make the BrokerServer subscribe to the exchanger events, like "message",
   "impending-exchange", "exchange-failed", "registration-done", etc. and
    notifiy all connected clients about them.
        
  - Convert the usermanager and usermonitor communication mechanism from
    D-Bus to our AMP-based protocol.
  
  - Add a ManagerService class that takes care of starting the manager
    and make it listen to the correct Unix socket for incoming AMP
    connections.
 
  - Add a MonitorService class that takes care of starting the monitor
    and make it listen to the correct Unix socket for incoming AMP
    connections.

  - Add a BrokerService class that starts listening for incoming AMP connections
    on a Unix socket.
  
  - Add a BrokerClientProtocol for exposing the methods of a broker client
    to the broker server.

  - Decouple the broker client API logic from DBus, adding a new BrokerClient
    class implementing the behavior the broker expects for its clients.

  - Add a BrokerServerProtocol class that can be used by the broker
    clients (monitor and manager) to perform remote method calls
    on the BrokerServer.

  - Add a MethodCall-based protocol for basic communication between the
    various Landscape components.
  
  - Add a BrokerServer class implementing the same methods and
    interfaces of BrokerDBusObject, but not publishing them on
    DBus. The methods of this class will be exposed to the broker
    clients (the monitor and the manager) by an AMP-based protocol.

  - Add a reconnection mechanism to the MethodCall protocol makes it:
  
    * Keep trying to connect if the very fist attempt failed (for instance the
      remote process we're trying to contact is not yet ready to accept
      connections).
  
    * Try to reconnect if the connection drops (for instance the remote
      process died and the watchdog started it again).
  
    * Try to re-perform a failed MethodCall request as soon as the connection
      is up again, in case it failed because it was issued during a connection 
      blackout (for instance a monitor plugin tries to call a 
      RemoteBroker.send_message while the broker is restarting).
  
  - Make the MethodCall protocol able to cope with remote methods returning
    deferreds.
  
  - Add an AMP-based protocol to easily expose objects over AMP and perform
    remote method calls on them.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import os
 
2
 
1
3
from twisted.internet.defer import maybeDeferred
2
4
 
3
5
from landscape.lib.log import log_failure
 
6
from landscape.lib.amp import RemoteObject
 
7
from landscape.amp import (
 
8
    ComponentProtocol, ComponentProtocolFactory, RemoteComponentConnector)
4
9
 
5
 
from landscape.lib.dbus_util import method, get_object, Object
6
 
from landscape.monitor.monitor import (MonitorPlugin, BUS_NAME, OBJECT_PATH,
7
 
                                       IFACE_NAME)
 
10
from landscape.monitor.plugin import MonitorPlugin
8
11
from landscape.user.changes import UserChanges
9
12
from landscape.user.provider import UserProvider
10
13
 
11
14
 
12
 
class UserMonitorDBusObject(Object):
13
 
    """
14
 
    A DBUS object which exposes an API for getting the monitor to detect user
15
 
    changes and upload them to the Landscape server.
16
 
    """
17
 
    object_path = OBJECT_PATH + "/UserMonitor"
18
 
    iface_name = IFACE_NAME + ".UserMonitor"
19
 
    bus_name = BUS_NAME
20
 
 
21
 
    def __init__(self, bus, plugin):
22
 
        super(UserMonitorDBusObject, self).__init__(bus)
23
 
        self._plugin = plugin
24
 
 
25
 
    @method(iface_name)
26
 
    def detect_changes(self, operation_id=None):
27
 
        return self._plugin.run(operation_id)
28
 
 
29
 
 
30
15
class UserMonitor(MonitorPlugin):
31
16
    """
32
17
    A plugin which monitors the system user databases.
34
19
 
35
20
    persist_name = "users"
36
21
    run_interval = 3600 # 1 hour
 
22
    name = "usermonitor"
37
23
 
38
24
    def __init__(self, provider=None):
39
25
        if provider is None:
40
26
            provider = UserProvider()
41
27
        self._provider = provider
 
28
        self._port = None
42
29
 
43
30
    def register(self, registry):
44
31
        super(UserMonitor, self).register(registry)
 
32
 
45
33
        self.registry.reactor.call_on("resynchronize", self._resynchronize)
46
34
        self.call_on_accepted("users", self._run_detect_changes, None)
47
 
        self._dbus_object = UserMonitorDBusObject(registry.bus, self)
 
35
 
 
36
        factory = UserMonitorProtocolFactory(object=self)
 
37
        socket = os.path.join(self.registry.config.data_path,
 
38
                              self.name + ".sock")
 
39
        self._port = self.registry.reactor.listen_unix(socket, factory)
 
40
        from landscape.manager.usermanager import RemoteUserManagerConnector
 
41
        self._user_manager_connector = RemoteUserManagerConnector(
 
42
            self.registry.reactor, self.registry.config)
 
43
 
 
44
    def stop(self):
 
45
        """Stop listening for incoming AMP connections."""
 
46
        if self._port:
 
47
            self._port.stopListening()
 
48
            self._port = None
48
49
 
49
50
    def _resynchronize(self):
50
51
        """Resynchronize user and group data."""
56
57
        return self.registry.broker.call_if_accepted(
57
58
            "users", self._run_detect_changes, operation_id)
58
59
 
 
60
    detect_changes = run
 
61
 
59
62
    def _run_detect_changes(self, operation_id=None):
60
63
        """
61
64
        If changes are detected an C{urgent-exchange} is fired to send
64
67
        @param operation_id: When present it will be included in the
65
68
            C{operation-id} field.
66
69
        """
67
 
        from landscape.manager.usermanager import UserManagerDBusObject
68
70
        # We'll skip checking the locked users if we're in monitor-only mode.
69
71
        if getattr(self.registry.config, "monitor_only", False):
70
72
            result = maybeDeferred(self._detect_changes,
71
73
                                   [], operation_id)
72
74
        else:
73
 
            remote_service = get_object(self.registry.bus,
74
 
                                        UserManagerDBusObject.bus_name,
75
 
                                        UserManagerDBusObject.object_path)
76
 
 
77
 
            result = remote_service.get_locked_usernames()
 
75
 
 
76
            def get_locked_usernames(user_manager):
 
77
                return user_manager.get_locked_usernames()
 
78
 
 
79
            def disconnect(locked_usernames):
 
80
                self._user_manager_connector.disconnect()
 
81
                return locked_usernames
 
82
 
 
83
            result = self._user_manager_connector.connect()
 
84
            result.addCallback(get_locked_usernames)
 
85
            result.addCallback(disconnect)
78
86
            result.addCallback(self._detect_changes, operation_id)
79
87
            result.addErrback(lambda f: self._detect_changes([], operation_id))
80
88
        return result
81
89
 
82
90
    def _detect_changes(self, locked_users, operation_id=None):
 
91
 
83
92
        def update_snapshot(result):
84
93
            changes.snapshot()
85
94
            return result
91
100
        self._provider.locked_users = locked_users
92
101
        changes = UserChanges(self._persist, self._provider)
93
102
        message = changes.create_diff()
 
103
 
94
104
        if message:
95
105
            message["type"] = "users"
96
106
            if operation_id:
99
109
            result.addCallback(update_snapshot)
100
110
            result.addErrback(log_error)
101
111
            return result
 
112
 
 
113
 
 
114
class UserMonitorProtocol(ComponentProtocol):
 
115
    """L{AMP}-based protocol for calling L{UserMonitor}'s methods remotely."""
 
116
 
 
117
    methods = ["detect_changes"]
 
118
 
 
119
 
 
120
class UserMonitorProtocolFactory(ComponentProtocolFactory):
 
121
 
 
122
    protocol = UserMonitorProtocol
 
123
 
 
124
 
 
125
class RemoteUserMonitor(RemoteObject):
 
126
    """A connected remote L{UserMonitor}."""
 
127
 
 
128
 
 
129
class RemoteUserMonitorConnector(RemoteComponentConnector):
 
130
 
 
131
    factory = ComponentProtocolFactory
 
132
    remote = RemoteUserMonitor
 
133
    component = UserMonitor