1
"""Event loop and event loop policy."""
3
__all__ = ['AbstractEventLoopPolicy',
4
'AbstractEventLoop', 'AbstractServer',
5
'Handle', 'TimerHandle',
6
'get_event_loop_policy', 'set_event_loop_policy',
7
'get_event_loop', 'set_event_loop', 'new_event_loop',
8
'get_child_watcher', 'set_child_watcher',
16
from .log import logger
20
"""Object returned by callback registration methods."""
22
def __init__(self, callback, args):
23
self._callback = callback
25
self._cancelled = False
28
res = 'Handle({}, {})'.format(self._callback, self._args)
34
self._cancelled = True
38
self._callback(*self._args)
40
logger.exception('Exception in callback %s %r',
41
self._callback, self._args)
42
self = None # Needed to break cycles when an exception occurs.
45
def make_handle(callback, args):
46
# TODO: Inline this? Or make it a private EventLoop method?
47
assert not isinstance(callback, Handle), 'A Handle is not a callback'
48
return Handle(callback, args)
51
class TimerHandle(Handle):
52
"""Object returned by timed callback registration methods."""
54
def __init__(self, when, callback, args):
55
assert when is not None
56
super().__init__(callback, args)
61
res = 'TimerHandle({}, {}, {})'.format(self._when,
70
return hash(self._when)
72
def __lt__(self, other):
73
return self._when < other._when
75
def __le__(self, other):
76
if self._when < other._when:
78
return self.__eq__(other)
80
def __gt__(self, other):
81
return self._when > other._when
83
def __ge__(self, other):
84
if self._when > other._when:
86
return self.__eq__(other)
88
def __eq__(self, other):
89
if isinstance(other, TimerHandle):
90
return (self._when == other._when and
91
self._callback == other._callback and
92
self._args == other._args and
93
self._cancelled == other._cancelled)
96
def __ne__(self, other):
97
equal = self.__eq__(other)
98
return NotImplemented if equal is NotImplemented else not equal
101
class AbstractServer:
102
"""Abstract server returned by create_service()."""
105
"""Stop serving. This leaves existing connections open."""
106
return NotImplemented
108
def wait_closed(self):
109
"""Coroutine to wait until service is closed."""
110
return NotImplemented
113
class AbstractEventLoop:
114
"""Abstract event loop."""
116
# Running and stopping the event loop.
118
def run_forever(self):
119
"""Run the event loop until stop() is called."""
120
raise NotImplementedError
122
def run_until_complete(self, future):
123
"""Run the event loop until a Future is done.
125
Return the Future's result, or raise its exception.
127
raise NotImplementedError
130
"""Stop the event loop as soon as reasonable.
132
Exactly how soon that is may depend on the implementation, but
133
no more I/O callbacks should be scheduled.
135
raise NotImplementedError
137
def is_running(self):
138
"""Return whether the event loop is currently running."""
139
raise NotImplementedError
144
The loop should not be running.
146
This is idempotent and irreversible.
148
No other methods should be called after this one.
150
raise NotImplementedError
152
# Methods scheduling callbacks. All these return Handles.
154
def call_soon(self, callback, *args):
155
return self.call_later(0, callback, *args)
157
def call_later(self, delay, callback, *args):
158
raise NotImplementedError
160
def call_at(self, when, callback, *args):
161
raise NotImplementedError
164
raise NotImplementedError
166
# Methods for interacting with threads.
168
def call_soon_threadsafe(self, callback, *args):
169
raise NotImplementedError
171
def run_in_executor(self, executor, callback, *args):
172
raise NotImplementedError
174
def set_default_executor(self, executor):
175
raise NotImplementedError
177
# Network I/O methods returning Futures.
179
def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
180
raise NotImplementedError
182
def getnameinfo(self, sockaddr, flags=0):
183
raise NotImplementedError
185
def create_connection(self, protocol_factory, host=None, port=None, *,
186
ssl=None, family=0, proto=0, flags=0, sock=None,
187
local_addr=None, server_hostname=None):
188
raise NotImplementedError
190
def create_server(self, protocol_factory, host=None, port=None, *,
191
family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
192
sock=None, backlog=100, ssl=None, reuse_address=None):
193
"""A coroutine which creates a TCP server bound to host and port.
195
The return value is a Server object which can be used to stop
198
If host is an empty string or None all interfaces are assumed
199
and a list of multiple sockets will be returned (most likely
200
one for IPv4 and another one for IPv6).
202
family can be set to either AF_INET or AF_INET6 to force the
203
socket to use IPv4 or IPv6. If not set it will be determined
204
from host (defaults to AF_UNSPEC).
206
flags is a bitmask for getaddrinfo().
208
sock can optionally be specified in order to use a preexisting
211
backlog is the maximum number of queued connections passed to
212
listen() (defaults to 100).
214
ssl can be set to an SSLContext to enable SSL over the
215
accepted connections.
217
reuse_address tells the kernel to reuse a local socket in
218
TIME_WAIT state, without waiting for its natural timeout to
219
expire. If not specified will automatically be set to True on
222
raise NotImplementedError
224
def create_datagram_endpoint(self, protocol_factory,
225
local_addr=None, remote_addr=None, *,
226
family=0, proto=0, flags=0):
227
raise NotImplementedError
229
# Pipes and subprocesses.
231
def connect_read_pipe(self, protocol_factory, pipe):
232
"""Register read pipe in eventloop.
234
protocol_factory should instantiate object with Protocol interface.
235
pipe is file-like object already switched to nonblocking.
236
Return pair (transport, protocol), where transport support
238
# The reason to accept file-like object instead of just file descriptor
239
# is: we need to own pipe and close it at transport finishing
240
# Can got complicated errors if pass f.fileno(),
241
# close fd in pipe transport then close f and vise versa.
242
raise NotImplementedError
244
def connect_write_pipe(self, protocol_factory, pipe):
245
"""Register write pipe in eventloop.
247
protocol_factory should instantiate object with BaseProtocol interface.
248
Pipe is file-like object already switched to nonblocking.
249
Return pair (transport, protocol), where transport support
250
WriteTransport ABC"""
251
# The reason to accept file-like object instead of just file descriptor
252
# is: we need to own pipe and close it at transport finishing
253
# Can got complicated errors if pass f.fileno(),
254
# close fd in pipe transport then close f and vise versa.
255
raise NotImplementedError
257
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
258
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
260
raise NotImplementedError
262
def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
263
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
265
raise NotImplementedError
267
# Ready-based callback registration methods.
268
# The add_*() methods return None.
269
# The remove_*() methods return True if something was removed,
270
# False if there was nothing to delete.
272
def add_reader(self, fd, callback, *args):
273
raise NotImplementedError
275
def remove_reader(self, fd):
276
raise NotImplementedError
278
def add_writer(self, fd, callback, *args):
279
raise NotImplementedError
281
def remove_writer(self, fd):
282
raise NotImplementedError
284
# Completion based I/O methods returning Futures.
286
def sock_recv(self, sock, nbytes):
287
raise NotImplementedError
289
def sock_sendall(self, sock, data):
290
raise NotImplementedError
292
def sock_connect(self, sock, address):
293
raise NotImplementedError
295
def sock_accept(self, sock):
296
raise NotImplementedError
300
def add_signal_handler(self, sig, callback, *args):
301
raise NotImplementedError
303
def remove_signal_handler(self, sig):
304
raise NotImplementedError
307
class AbstractEventLoopPolicy:
308
"""Abstract policy for accessing the event loop."""
310
def get_event_loop(self):
312
raise NotImplementedError
314
def set_event_loop(self, loop):
316
raise NotImplementedError
318
def new_event_loop(self):
320
raise NotImplementedError
322
# Child processes handling (Unix only).
324
def get_child_watcher(self):
326
raise NotImplementedError
328
def set_child_watcher(self, watcher):
330
raise NotImplementedError
333
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
334
"""Default policy implementation for accessing the event loop.
336
In this policy, each thread has its own event loop. However, we
337
only automatically create an event loop by default for the main
338
thread; other threads by default have no event loop.
340
Other policies may have different rules (e.g. a single global
341
event loop, or automatically creating an event loop per thread, or
342
using some other notion of context to which an event loop is
348
class _Local(threading.local):
353
self._local = self._Local()
355
def get_event_loop(self):
356
"""Get the event loop.
358
This may be None or an instance of EventLoop.
360
if (self._local._loop is None and
361
not self._local._set_called and
362
isinstance(threading.current_thread(), threading._MainThread)):
363
self._local._loop = self.new_event_loop()
364
assert self._local._loop is not None, \
365
('There is no current event loop in thread %r.' %
366
threading.current_thread().name)
367
return self._local._loop
369
def set_event_loop(self, loop):
370
"""Set the event loop."""
371
self._local._set_called = True
372
assert loop is None or isinstance(loop, AbstractEventLoop)
373
self._local._loop = loop
375
def new_event_loop(self):
376
"""Create a new event loop.
378
You must call set_event_loop() to make this the current event
381
return self._loop_factory()
384
# Event loop policy. The policy itself is always global, even if the
385
# policy's rules say that there is an event loop per thread (or other
386
# notion of context). The default policy is installed by the first
387
# call to get_event_loop_policy().
388
_event_loop_policy = None
390
# Lock for protecting the on-the-fly creation of the event loop policy.
391
_lock = threading.Lock()
394
def _init_event_loop_policy():
395
global _event_loop_policy
397
if _event_loop_policy is None: # pragma: no branch
398
from . import DefaultEventLoopPolicy
399
_event_loop_policy = DefaultEventLoopPolicy()
402
def get_event_loop_policy():
404
if _event_loop_policy is None:
405
_init_event_loop_policy()
406
return _event_loop_policy
409
def set_event_loop_policy(policy):
411
global _event_loop_policy
412
assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
413
_event_loop_policy = policy
416
def get_event_loop():
418
return get_event_loop_policy().get_event_loop()
421
def set_event_loop(loop):
423
get_event_loop_policy().set_event_loop(loop)
426
def new_event_loop():
428
return get_event_loop_policy().new_event_loop()
431
def get_child_watcher():
433
return get_event_loop_policy().get_child_watcher()
436
def set_child_watcher(watcher):
438
return get_event_loop_policy().set_child_watcher(watcher)