2
# -*- test-case-name: IPython.kernel.test.test_controllerservice -*-
4
"""A Twisted Service for the IPython Controller.
6
The IPython Controller:
8
* Listens for Engines to connect and then manages access to those engines.
9
* Listens for clients and passes commands from client to the Engines.
10
* Exposes an asynchronous interfaces to the Engines which themselves can block.
11
* Acts as a gateway to the Engines.
13
The design of the controller is somewhat abstract to allow flexibility in how
14
the controller is presented to clients. This idea is that there is a basic
15
ControllerService class that allows engines to connect to it. But, this
16
basic class has no client interfaces. To expose client interfaces developers
17
provide an adapter that makes the ControllerService look like something. For
18
example, one client interface might support task farming and another might
19
support interactive usage. The important thing is that by using interfaces
20
and adapters, a single controller can be accessed from multiple interfaces.
21
Furthermore, by adapting various client interfaces to various network
22
protocols, each client interface can be exposed to multiple network protocols.
23
See multiengine.py for an example of how to adapt the ControllerService
24
to a client interface.
27
__docformat__ = "restructuredtext en"
29
#-------------------------------------------------------------------------------
30
# Copyright (C) 2008 The IPython Development Team
32
# Distributed under the terms of the BSD License. The full license is in
33
# the file COPYING, distributed as part of this software.
34
#-------------------------------------------------------------------------------
36
#-------------------------------------------------------------------------------
38
#-------------------------------------------------------------------------------
42
from twisted.application import service
43
from twisted.internet import defer, reactor
44
from twisted.python import log, components
45
from zope.interface import Interface, implements, Attribute
46
import zope.interface as zi
48
from IPython.kernel.engineservice import \
53
from IPython.genutils import get_ipython_dir
54
from IPython.kernel import codeutil
56
#-------------------------------------------------------------------------------
57
# Interfaces for the Controller
58
#-------------------------------------------------------------------------------
60
class IControllerCore(Interface):
61
"""Basic methods any controller must have.
63
This is basically the aspect of the controller relevant to the
64
engines and does not assume anything about how the engines will
65
be presented to a client.
68
engines = Attribute("A dict of engine ids and engine instances.")
70
def register_engine(remoteEngine, id=None, ip=None, port=None,
72
"""Register new remote engine.
74
The controller can use the ip, port, pid of the engine to do useful things
75
like kill the engines.
79
An implementer of IEngineCore, IEngineSerialized and IEngineQueued.
83
IP address the engine is running on.
85
Port the engine is on.
87
pid of the running engine.
89
:Returns: A dict of {'id':id} and possibly other key, value pairs.
92
def unregister_engine(id):
93
"""Handle a disconnecting engine.
97
The integer engine id of the engine to unregister.
100
def on_register_engine_do(f, includeID, *args, **kwargs):
101
"""Call ``f(*args, **kwargs)`` when an engine is registered.
105
If True the first argument to f will be the id of the engine.
108
def on_unregister_engine_do(f, includeID, *args, **kwargs):
109
"""Call ``f(*args, **kwargs)`` when an engine is unregistered.
113
If True the first argument to f will be the id of the engine.
116
def on_register_engine_do_not(f):
117
"""Stop calling f on engine registration"""
119
def on_unregister_engine_do_not(f):
120
"""Stop calling f on engine unregistration"""
122
def on_n_engines_registered_do(n, f, *arg, **kwargs):
123
"""Call f(*args, **kwargs) the first time the nth engine registers."""
125
class IControllerBase(IControllerCore):
126
"""The basic controller interface."""
130
#-------------------------------------------------------------------------------
131
# Implementation of the ControllerService
132
#-------------------------------------------------------------------------------
134
class ControllerService(object, service.Service):
135
"""A basic Controller represented as a Twisted Service.
137
This class doesn't implement any client notification mechanism. That
138
is up to adapted subclasses.
141
# I also pick up the IService interface by inheritance from service.Service
142
implements(IControllerBase)
143
name = 'ControllerService'
145
def __init__(self, maxEngines=511, saveIDs=False):
146
self.saveIDs = saveIDs
148
self.availableIDs = range(maxEngines,-1,-1) # [511,...,0]
149
self._onRegister = []
150
self._onUnregister = []
151
self._onNRegistered = []
153
#---------------------------------------------------------------------------
154
# Methods used to save the engine info to a log file
155
#---------------------------------------------------------------------------
157
def _buildEngineInfoString(self, id, ip, port, pid):
166
return "Engine Info: %d %s %d %d" % (id, ip , port, pid)
168
def _logEngineInfo(self, id, ip, port, pid):
169
log.msg(self._buildEngineInfoString(id,ip,port,pid))
171
def _getEngineInfoLogFile(self):
172
# Store all logs inside the ipython directory
173
ipdir = get_ipython_dir()
175
logdir_base = pjoin(ipdir,'log')
176
if not os.path.isdir(logdir_base):
177
os.makedirs(logdir_base)
178
logfile = os.path.join(logdir_base,'ipcontroller-%s-engine-info.log' % os.getpid())
181
def _logEngineInfoToFile(self, id, ip, port, pid):
182
"""Log info about an engine to a log file.
184
When an engine registers with a ControllerService, the ControllerService
185
saves information about the engine to a log file. That information
186
can be useful for various purposes, such as killing hung engines, etc.
188
This method takes the assigned id, ip/port and pid of the engine
189
and saves it to a file of the form:
191
~/.ipython/log/ipcontroller-###-engine-info.log
193
where ### is the pid of the controller.
195
Each line of this file has the form:
197
Engine Info: ip ip port pid
199
If any of the entries are not known, they are replaced by -99.
202
fname = self._getEngineInfoLogFile()
204
s = self._buildEngineInfoString(id,ip,port,pid)
208
#---------------------------------------------------------------------------
209
# IControllerCore methods
210
#---------------------------------------------------------------------------
212
def register_engine(self, remoteEngine, id=None,
213
ip=None, port=None, pid=None):
214
"""Register new engine connection"""
216
# What happens if these assertions fail?
217
assert IEngineCore.providedBy(remoteEngine), \
218
"engine passed to register_engine doesn't provide IEngineCore"
219
assert IEngineSerialized.providedBy(remoteEngine), \
220
"engine passed to register_engine doesn't provide IEngineSerialized"
221
assert IEngineQueued.providedBy(remoteEngine), \
222
"engine passed to register_engine doesn't provide IEngineQueued"
223
assert isinstance(id, int) or id is None, \
224
"id to register_engine must be an integer or None"
225
assert isinstance(ip, str) or ip is None, \
226
"ip to register_engine must be a string or None"
227
assert isinstance(port, int) or port is None, \
228
"port to register_engine must be an integer or None"
229
assert isinstance(pid, int) or pid is None, \
230
"pid to register_engine must be an integer or None"
233
if desiredID in self.engines.keys():
236
if desiredID in self.availableIDs:
238
self.availableIDs.remove(desiredID)
240
getID = self.availableIDs.pop()
241
remoteEngine.id = getID
242
remoteEngine.service = self
243
self.engines[getID] = remoteEngine
245
# Log the Engine Information for monitoring purposes
246
self._logEngineInfoToFile(getID, ip, port, pid)
248
msg = "registered engine with id: %i" %getID
251
for i in range(len(self._onRegister)):
252
(f,args,kwargs,ifid) = self._onRegister[i]
255
f(getID, *args, **kwargs)
259
self._onRegister.pop(i)
261
# Call functions when the nth engine is registered and them remove them
262
for i, (n, f, args, kwargs) in enumerate(self._onNRegistered):
263
if len(self.engines.keys()) == n:
268
log.msg("Function %r failed when the %ith engine registered" % (f, n))
270
self._onNRegistered.pop(i)
274
def unregister_engine(self, id):
275
"""Unregister engine by id."""
277
assert isinstance(id, int) or id is None, \
278
"id to unregister_engine must be an integer or None"
280
msg = "unregistered engine with id: %i" %id
285
log.msg("engine with id %i was not registered" % id)
288
self.availableIDs.append(id)
289
# Sort to assign lower ids first
290
self.availableIDs.sort(reverse=True)
292
log.msg("preserving id %i" %id)
294
for i in range(len(self._onUnregister)):
295
(f,args,kwargs,ifid) = self._onUnregister[i]
298
f(id, *args, **kwargs)
302
self._onUnregister.pop(i)
304
def on_register_engine_do(self, f, includeID, *args, **kwargs):
305
assert callable(f), "f must be callable"
306
self._onRegister.append((f,args,kwargs,includeID))
308
def on_unregister_engine_do(self, f, includeID, *args, **kwargs):
309
assert callable(f), "f must be callable"
310
self._onUnregister.append((f,args,kwargs,includeID))
312
def on_register_engine_do_not(self, f):
313
for i in range(len(self._onRegister)):
314
g = self._onRegister[i][0]
316
self._onRegister.pop(i)
319
def on_unregister_engine_do_not(self, f):
320
for i in range(len(self._onUnregister)):
321
g = self._onUnregister[i][0]
323
self._onUnregister.pop(i)
326
def on_n_engines_registered_do(self, n, f, *args, **kwargs):
327
if len(self.engines.keys()) >= n:
330
self._onNRegistered.append((n,f,args,kwargs))
333
#-------------------------------------------------------------------------------
334
# Base class for adapting controller to different client APIs
335
#-------------------------------------------------------------------------------
337
class ControllerAdapterBase(object):
338
"""All Controller adapters should inherit from this class.
340
This class provides a wrapped version of the IControllerBase interface that
341
can be used to easily create new custom controllers. Subclasses of this
342
will provide a full implementation of IControllerBase.
344
This class doesn't implement any client notification mechanism. That
348
implements(IControllerBase)
350
def __init__(self, controller):
351
self.controller = controller
352
# Needed for IControllerCore
353
self.engines = self.controller.engines
355
def register_engine(self, remoteEngine, id=None,
356
ip=None, port=None, pid=None):
357
return self.controller.register_engine(remoteEngine,
360
def unregister_engine(self, id):
361
return self.controller.unregister_engine(id)
363
def on_register_engine_do(self, f, includeID, *args, **kwargs):
364
return self.controller.on_register_engine_do(f, includeID, *args, **kwargs)
366
def on_unregister_engine_do(self, f, includeID, *args, **kwargs):
367
return self.controller.on_unregister_engine_do(f, includeID, *args, **kwargs)
369
def on_register_engine_do_not(self, f):
370
return self.controller.on_register_engine_do_not(f)
372
def on_unregister_engine_do_not(self, f):
373
return self.controller.on_unregister_engine_do_not(f)
375
def on_n_engines_registered_do(self, n, f, *args, **kwargs):
376
return self.controller.on_n_engines_registered_do(n, f, *args, **kwargs)