~ubuntu-branches/ubuntu/raring/ipython/raring

« back to all changes in this revision

Viewing changes to IPython/kernel/controllerservice.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2011-11-22 23:40:57 UTC
  • mfrom: (6.1.5 sid)
  • Revision ID: package-import@ubuntu.com-20111122234057-ta86ocdahnhwmnd8
Tags: 0.11-2
* upload to unstable
* add patch fix-version-checks-for-pyzmq-2.1.10.patch
* fix debianize-error-messages.patch to reraise unknown exceptions
* suggest python-zmq for ipython package
* use dh_sphinxdoc
  - bump sphinx dependency to >= 1.0.7+dfsg-1~, replace libjs-jquery
    dependency with ${sphinxdoc:Depends} and drop ipython-doc.links
* remove empty directory from ipython
* link duplicate images in ipython-doc
* remove obsolete Conflicts and Replaces

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# encoding: utf-8
2
 
# -*- test-case-name: IPython.kernel.test.test_controllerservice -*-
3
 
 
4
 
"""A Twisted Service for the IPython Controller.
5
 
 
6
 
The IPython Controller:
7
 
 
8
 
* Listens for Engines to connect and then manages access to those engines.
9
 
* Listens for clients and passes commands from client to the Engines.
10
 
* Exposes an asynchronous interfaces to the Engines which themselves can block.
11
 
* Acts as a gateway to the Engines.
12
 
 
13
 
The design of the controller is somewhat abstract to allow flexibility in how 
14
 
the controller is presented to clients.  This idea is that there is a basic
15
 
ControllerService class that allows engines to connect to it.  But, this 
16
 
basic class has no client interfaces.  To expose client interfaces developers
17
 
provide an adapter that makes the ControllerService look like something.  For 
18
 
example, one client interface might support task farming and another might
19
 
support interactive usage.  The important thing is that by using interfaces
20
 
and adapters, a single controller can be accessed from multiple interfaces.
21
 
Furthermore, by adapting various client interfaces to various network
22
 
protocols, each client interface can be exposed to multiple network protocols.
23
 
See multiengine.py for an example of how to adapt the ControllerService
24
 
to a client interface.
25
 
"""
26
 
 
27
 
__docformat__ = "restructuredtext en"
28
 
 
29
 
#-------------------------------------------------------------------------------
30
 
#  Copyright (C) 2008  The IPython Development Team
31
 
#
32
 
#  Distributed under the terms of the BSD License.  The full license is in
33
 
#  the file COPYING, distributed as part of this software.
34
 
#-------------------------------------------------------------------------------
35
 
 
36
 
#-------------------------------------------------------------------------------
37
 
# Imports
38
 
#-------------------------------------------------------------------------------
39
 
 
40
 
import os, sys
41
 
 
42
 
from twisted.application import service
43
 
from twisted.internet import defer, reactor
44
 
from twisted.python import log, components
45
 
from zope.interface import Interface, implements, Attribute
46
 
import zope.interface as zi
47
 
 
48
 
from IPython.kernel.engineservice import \
49
 
    IEngineCore, \
50
 
    IEngineSerialized, \
51
 
    IEngineQueued
52
 
    
53
 
from IPython.genutils import get_ipython_dir
54
 
from IPython.kernel import codeutil
55
 
 
56
 
#-------------------------------------------------------------------------------
57
 
# Interfaces for the Controller
58
 
#-------------------------------------------------------------------------------
59
 
 
60
 
class IControllerCore(Interface):
61
 
    """Basic methods any controller must have.
62
 
    
63
 
    This is basically the aspect of the controller relevant to the 
64
 
    engines and does not assume anything about how the engines will
65
 
    be presented to a client.
66
 
    """
67
 
        
68
 
    engines = Attribute("A dict of engine ids and engine instances.")
69
 
        
70
 
    def register_engine(remoteEngine, id=None, ip=None, port=None, 
71
 
        pid=None):
72
 
        """Register new remote engine.
73
 
        
74
 
        The controller can use the ip, port, pid of the engine to do useful things
75
 
        like kill the engines.
76
 
        
77
 
        :Parameters:
78
 
            remoteEngine
79
 
                An implementer of IEngineCore, IEngineSerialized and IEngineQueued.
80
 
            id : int
81
 
                Requested id.
82
 
            ip : str
83
 
                IP address the engine is running on.
84
 
            port : int
85
 
                Port the engine is on.
86
 
            pid : int
87
 
                pid of the running engine.
88
 
        
89
 
        :Returns: A dict of {'id':id} and possibly other key, value pairs.
90
 
        """
91
 
    
92
 
    def unregister_engine(id):
93
 
        """Handle a disconnecting engine.
94
 
        
95
 
        :Parameters:
96
 
            id
97
 
                The integer engine id of the engine to unregister.
98
 
        """
99
 
        
100
 
    def on_register_engine_do(f, includeID, *args, **kwargs):
101
 
        """Call ``f(*args, **kwargs)`` when an engine is registered.
102
 
        
103
 
        :Parameters:
104
 
            includeID : int
105
 
                If True the first argument to f will be the id of the engine.
106
 
        """
107
 
            
108
 
    def on_unregister_engine_do(f, includeID, *args, **kwargs):
109
 
        """Call ``f(*args, **kwargs)`` when an engine is unregistered.
110
 
        
111
 
        :Parameters:
112
 
            includeID : int
113
 
                If True the first argument to f will be the id of the engine.
114
 
        """
115
 
    
116
 
    def on_register_engine_do_not(f):
117
 
        """Stop calling f on engine registration"""
118
 
    
119
 
    def on_unregister_engine_do_not(f):
120
 
        """Stop calling f on engine unregistration"""
121
 
        
122
 
    def on_n_engines_registered_do(n, f, *arg, **kwargs):
123
 
        """Call f(*args, **kwargs) the first time the nth engine registers."""
124
 
                    
125
 
class IControllerBase(IControllerCore):
126
 
    """The basic controller interface."""
127
 
    pass 
128
 
 
129
 
 
130
 
#-------------------------------------------------------------------------------
131
 
# Implementation of the ControllerService
132
 
#-------------------------------------------------------------------------------
133
 
 
134
 
class ControllerService(object, service.Service):
135
 
    """A basic Controller represented as a Twisted Service.
136
 
    
137
 
    This class doesn't implement any client notification mechanism.  That
138
 
    is up to adapted subclasses.
139
 
    """
140
 
    
141
 
    # I also pick up the IService interface by inheritance from service.Service
142
 
    implements(IControllerBase)
143
 
    name = 'ControllerService'
144
 
    
145
 
    def __init__(self, maxEngines=511, saveIDs=False):
146
 
        self.saveIDs = saveIDs
147
 
        self.engines = {}
148
 
        self.availableIDs = range(maxEngines,-1,-1)   # [511,...,0]
149
 
        self._onRegister = []
150
 
        self._onUnregister = []
151
 
        self._onNRegistered = []
152
 
    
153
 
    #---------------------------------------------------------------------------
154
 
    # Methods used to save the engine info to a log file
155
 
    #---------------------------------------------------------------------------
156
 
    
157
 
    def _buildEngineInfoString(self, id, ip, port, pid):
158
 
        if id is None:
159
 
            id = -99
160
 
        if ip is None:
161
 
            ip = "-99"
162
 
        if port is None:
163
 
            port = -99
164
 
        if pid is None:
165
 
            pid = -99
166
 
        return "Engine Info: %d %s %d %d" % (id, ip , port, pid)
167
 
        
168
 
    def _logEngineInfo(self, id, ip, port, pid):
169
 
        log.msg(self._buildEngineInfoString(id,ip,port,pid))
170
 
    
171
 
    def _getEngineInfoLogFile(self):
172
 
        # Store all logs inside the ipython directory
173
 
        ipdir = get_ipython_dir()
174
 
        pjoin = os.path.join
175
 
        logdir_base = pjoin(ipdir,'log')
176
 
        if not os.path.isdir(logdir_base):
177
 
            os.makedirs(logdir_base)
178
 
        logfile = os.path.join(logdir_base,'ipcontroller-%s-engine-info.log' % os.getpid())
179
 
        return logfile
180
 
    
181
 
    def _logEngineInfoToFile(self, id, ip, port, pid):
182
 
        """Log info about an engine to a log file.
183
 
        
184
 
        When an engine registers with a ControllerService, the ControllerService
185
 
        saves information about the engine to a log file.  That information
186
 
        can be useful for various purposes, such as killing hung engines, etc.
187
 
        
188
 
        This method takes the assigned id, ip/port and pid of the engine
189
 
        and saves it to a file of the form:
190
 
        
191
 
        ~/.ipython/log/ipcontroller-###-engine-info.log
192
 
        
193
 
        where ### is the pid of the controller.
194
 
        
195
 
        Each line of this file has the form:
196
 
        
197
 
        Engine Info: ip ip port pid
198
 
        
199
 
        If any of the entries are not known, they are replaced by -99.
200
 
        """
201
 
        
202
 
        fname = self._getEngineInfoLogFile()
203
 
        f = open(fname, 'a')
204
 
        s = self._buildEngineInfoString(id,ip,port,pid)
205
 
        f.write(s + '\n')
206
 
        f.close()
207
 
    
208
 
    #---------------------------------------------------------------------------
209
 
    # IControllerCore methods
210
 
    #---------------------------------------------------------------------------
211
 
        
212
 
    def register_engine(self, remoteEngine, id=None,
213
 
        ip=None, port=None, pid=None):
214
 
        """Register new engine connection"""
215
 
        
216
 
        # What happens if these assertions fail?
217
 
        assert IEngineCore.providedBy(remoteEngine), \
218
 
            "engine passed to register_engine doesn't provide IEngineCore"
219
 
        assert IEngineSerialized.providedBy(remoteEngine), \
220
 
            "engine passed to register_engine doesn't provide IEngineSerialized"
221
 
        assert IEngineQueued.providedBy(remoteEngine), \
222
 
            "engine passed to register_engine doesn't provide IEngineQueued"
223
 
        assert isinstance(id, int) or id is None, \
224
 
            "id to register_engine must be an integer or None"
225
 
        assert isinstance(ip, str) or ip is None, \
226
 
            "ip to register_engine must be a string or None"
227
 
        assert isinstance(port, int) or port is None, \
228
 
            "port to register_engine must be an integer or None"
229
 
        assert isinstance(pid, int) or pid is None, \
230
 
            "pid to register_engine must be an integer or None"
231
 
            
232
 
        desiredID = id
233
 
        if desiredID in self.engines.keys():
234
 
            desiredID = None
235
 
            
236
 
        if desiredID in self.availableIDs:
237
 
            getID = desiredID
238
 
            self.availableIDs.remove(desiredID)
239
 
        else:
240
 
            getID = self.availableIDs.pop()
241
 
        remoteEngine.id = getID
242
 
        remoteEngine.service = self
243
 
        self.engines[getID] = remoteEngine
244
 
 
245
 
        # Log the Engine Information for monitoring purposes
246
 
        self._logEngineInfoToFile(getID, ip, port, pid)
247
 
 
248
 
        msg = "registered engine with id: %i" %getID
249
 
        log.msg(msg)
250
 
        
251
 
        for i in range(len(self._onRegister)):
252
 
            (f,args,kwargs,ifid) = self._onRegister[i]
253
 
            try:
254
 
                if ifid:
255
 
                    f(getID, *args, **kwargs)
256
 
                else:
257
 
                    f(*args, **kwargs)
258
 
            except:
259
 
                self._onRegister.pop(i)
260
 
        
261
 
        # Call functions when the nth engine is registered and them remove them
262
 
        for i, (n, f, args, kwargs) in enumerate(self._onNRegistered):
263
 
            if len(self.engines.keys()) == n:
264
 
                try:
265
 
                    try:
266
 
                        f(*args, **kwargs)
267
 
                    except:
268
 
                        log.msg("Function %r failed when the %ith engine registered" % (f, n))
269
 
                finally:
270
 
                    self._onNRegistered.pop(i)
271
 
        
272
 
        return {'id':getID}
273
 
    
274
 
    def unregister_engine(self, id):
275
 
        """Unregister engine by id."""
276
 
        
277
 
        assert isinstance(id, int) or id is None, \
278
 
            "id to unregister_engine must be an integer or None"
279
 
        
280
 
        msg = "unregistered engine with id: %i" %id
281
 
        log.msg(msg)
282
 
        try:
283
 
            del self.engines[id]
284
 
        except KeyError:
285
 
            log.msg("engine with id %i was not registered" % id)
286
 
        else:
287
 
            if not self.saveIDs:
288
 
                self.availableIDs.append(id)
289
 
                # Sort to assign lower ids first
290
 
                self.availableIDs.sort(reverse=True) 
291
 
            else:
292
 
                log.msg("preserving id %i" %id)
293
 
            
294
 
            for i in range(len(self._onUnregister)):
295
 
                (f,args,kwargs,ifid) = self._onUnregister[i]
296
 
                try:
297
 
                    if ifid:
298
 
                        f(id, *args, **kwargs)
299
 
                    else:
300
 
                        f(*args, **kwargs)
301
 
                except:
302
 
                    self._onUnregister.pop(i)
303
 
    
304
 
    def on_register_engine_do(self, f, includeID, *args, **kwargs):
305
 
        assert callable(f), "f must be callable"
306
 
        self._onRegister.append((f,args,kwargs,includeID))
307
 
 
308
 
    def on_unregister_engine_do(self, f, includeID, *args, **kwargs):
309
 
        assert callable(f), "f must be callable"
310
 
        self._onUnregister.append((f,args,kwargs,includeID))
311
 
    
312
 
    def on_register_engine_do_not(self, f):
313
 
        for i in range(len(self._onRegister)):
314
 
            g = self._onRegister[i][0]
315
 
            if f == g:
316
 
                self._onRegister.pop(i)
317
 
                return
318
 
    
319
 
    def on_unregister_engine_do_not(self, f):
320
 
        for i in range(len(self._onUnregister)):
321
 
            g = self._onUnregister[i][0]
322
 
            if f == g:
323
 
                self._onUnregister.pop(i)
324
 
                return
325
 
 
326
 
    def on_n_engines_registered_do(self, n, f, *args, **kwargs):
327
 
        if len(self.engines.keys()) >= n:
328
 
            f(*args, **kwargs)
329
 
        else:
330
 
            self._onNRegistered.append((n,f,args,kwargs))
331
 
            
332
 
 
333
 
#-------------------------------------------------------------------------------
334
 
# Base class for adapting controller to different client APIs
335
 
#-------------------------------------------------------------------------------
336
 
 
337
 
class ControllerAdapterBase(object):
338
 
    """All Controller adapters should inherit from this class.
339
 
    
340
 
    This class provides a wrapped version of the IControllerBase interface that
341
 
    can be used to easily create new custom controllers.  Subclasses of this
342
 
    will provide a full implementation of IControllerBase.
343
 
    
344
 
    This class doesn't implement any client notification mechanism.  That
345
 
    is up to subclasses.
346
 
    """
347
 
    
348
 
    implements(IControllerBase)
349
 
    
350
 
    def __init__(self, controller):
351
 
        self.controller = controller
352
 
        # Needed for IControllerCore
353
 
        self.engines = self.controller.engines
354
 
        
355
 
    def register_engine(self, remoteEngine, id=None,
356
 
        ip=None, port=None, pid=None):
357
 
        return self.controller.register_engine(remoteEngine, 
358
 
            id, ip, port, pid)
359
 
    
360
 
    def unregister_engine(self, id):
361
 
        return self.controller.unregister_engine(id)
362
 
 
363
 
    def on_register_engine_do(self, f, includeID, *args, **kwargs):
364
 
        return self.controller.on_register_engine_do(f, includeID, *args, **kwargs)
365
 
 
366
 
    def on_unregister_engine_do(self, f, includeID, *args, **kwargs):
367
 
        return self.controller.on_unregister_engine_do(f, includeID, *args, **kwargs)        
368
 
    
369
 
    def on_register_engine_do_not(self, f):
370
 
        return self.controller.on_register_engine_do_not(f)
371
 
    
372
 
    def on_unregister_engine_do_not(self, f):
373
 
        return self.controller.on_unregister_engine_do_not(f)        
374
 
 
375
 
    def on_n_engines_registered_do(self, n, f, *args, **kwargs):
376
 
        return self.controller.on_n_engines_registered_do(n, f, *args, **kwargs)