~jamesh/bzr-dbus/server-signals

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Manage a GLib Mainloop in a thread."""

import threading

import gobject

from bzrlib.hooks import Hooks
from bzrlib.smart.server import SmartTCPServer


class MainloopThread(object):
    """A class to manage a thread running the glib mainloop."""

    def __init__(self):
        self.initialized = False
        self.start_count = 0
        self.thread = None
        self.main = None
        self.lock = threading.Lock()

    def _initialize(self):
        if not self.initialized:
            gobject.threads_init()
        self.initialized = True

    def start(self):
        """Start the mainloop thread."""
        self.lock.acquire()
        try:
            # If no thread has been started, start one now.
            if self.start_count == 0:
                assert self.thread is None, 'Thread has already started'
                self._initialize()
                event = threading.Event()
                self.thread = threading.Thread(target=self._run, args=(event,))
                self.thread.setName('MainloopThread')
                self.thread.setDaemon(True)
                self.thread.start()
                event.wait()
            self.start_count += 1
        finally:
            self.lock.release()

    def stop(self):
        """Stop the mainloop thread."""
        assert self.start_count > 0
        self.lock.acquire()
        try:
            self.start_count -= 1
            # Shut down thread if 
            if self.start_count == 0:
                assert self.thread is not None, 'Thread has not been started'
                self.main.quit()
                self.thread.join()
                self.thread = None
        finally:
            self.lock.release()

    def _run(self, event):
        self.main = gobject.MainLoop()
        for hook in MainloopThread.hooks['thread_started']:
            hook(self)
        event.set()
        self.main.run()
        for hook in MainloopThread.hooks['thread_stopped']:
            hook(self)
        self.main = None

    def run_in_thread(self, func, *args, **kwargs):
        """Run the given function in the mainloop thread."""
        retval = []
        event = threading.Event()
        def idle():
            retval.append(func(*args, **kwargs))
            event.set()
            return False
        gobject.idle_add(idle)
        event.wait()
        return retval[0]


class MainloopThreadHooks(Hooks):
    """Hooks for the mainloop thread."""

    def __init__(self):
        """Create the default hooks."""
        Hooks.__init__(self)
        # Invoked when the mainloop is started, called from the thread
        # where the mainloop will run.
        # The api signature is (mainloop thread).
        self['thread_started'] = []
        # Invoked when the mainloop is stopped, called from the thread
        # where the mainloop ran.
        # The api signature is (mainloop thread).
        self['thread_stopped'] = []
        # Invoked when a server starts serving a directory, from the
        # mainloop thread.
        # The api signature is (mainloop thread, backing urls, public url).
        self['server_started'] = []
        # Invoked when a server stops serving a directory, from the
        # mainloop thread.
        # The api signature is (mainloop thread, backing urls, public url).
        self['server_stopped'] = []

MainloopThread.hooks = MainloopThreadHooks()


# Hooks to actually start and stop the thread when the smart server is run.

_mainloop_thread = None

def on_server_started(backing_urls, public_url):
    global _mainloop_thread
    if _mainloop_thread is None:
        _mainloop_thread = MainloopThread()
    _mainloop_thread.start()

    def call_hooks():
        for hook in MainloopThread.hooks['server_started']:
            hook(_mainloop_thread, backing_urls, public_url)
    _mainloop_thread.run_in_thread(call_hooks)

def on_server_stopped(backing_urls, public_url):
    assert _mainloop_thread is not None, (
        "server_stopped hook called before server_started hook")
    def call_hooks():
        for hook in MainloopThread.hooks['server_stopped']:
            hook(_mainloop_thread, backing_urls, public_url)
    _mainloop_thread.run_in_thread(call_hooks)
    _mainloop_thread.stop()


def install_hooks():
    SmartTCPServer.hooks.install_hook('server_started', on_server_started)
    SmartTCPServer.hooks.install_hook('server_stopped', on_server_stopped)