~pythonregexp2.7/python/issue2636-01+09-01

« back to all changes in this revision

Viewing changes to Lib/multiprocessing/util.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-21 20:54:52 UTC
  • Revision ID: darklord@timehorse.com-20080921205452-d22asn5v93pr6rkl
Merged in changes from the Atomic Grouping / Possessive Qualifiers branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#
2
 
# Module providing various facilities to other parts of the package
3
 
#
4
 
# multiprocessing/util.py
5
 
#
6
 
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7
 
#
8
 
 
9
 
import itertools
10
 
import weakref
11
 
import atexit
12
 
import threading        # we want threading to install it's
13
 
                        # cleanup function before multiprocessing does
14
 
 
15
 
from multiprocessing.process import current_process, active_children
16
 
 
17
 
__all__ = [
18
 
    'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
19
 
    'log_to_stderr', 'get_temp_dir', 'register_after_fork',
20
 
    'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
21
 
    ]
22
 
 
23
 
#
24
 
# Logging
25
 
#
26
 
 
27
 
NOTSET = 0
28
 
SUBDEBUG = 5
29
 
DEBUG = 10
30
 
INFO = 20
31
 
SUBWARNING = 25
32
 
 
33
 
LOGGER_NAME = 'multiprocessing'
34
 
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
35
 
 
36
 
_logger = None
37
 
_log_to_stderr = False
38
 
 
39
 
def sub_debug(msg, *args):
40
 
    if _logger:
41
 
        _logger.log(SUBDEBUG, msg, *args)
42
 
 
43
 
def debug(msg, *args):
44
 
    if _logger:
45
 
        _logger.log(DEBUG, msg, *args)
46
 
 
47
 
def info(msg, *args):
48
 
    if _logger:
49
 
        _logger.log(INFO, msg, *args)
50
 
 
51
 
def sub_warning(msg, *args):
52
 
    if _logger:
53
 
        _logger.log(SUBWARNING, msg, *args)
54
 
 
55
 
def get_logger():
56
 
    '''
57
 
    Returns logger used by multiprocessing
58
 
    '''
59
 
    global _logger
60
 
 
61
 
    if not _logger:
62
 
        import logging, atexit
63
 
 
64
 
        # XXX multiprocessing should cleanup before logging
65
 
        if hasattr(atexit, 'unregister'):
66
 
            atexit.unregister(_exit_function)
67
 
            atexit.register(_exit_function)
68
 
        else:
69
 
            atexit._exithandlers.remove((_exit_function, (), {}))
70
 
            atexit._exithandlers.append((_exit_function, (), {}))
71
 
 
72
 
        _check_logger_class()
73
 
        _logger = logging.getLogger(LOGGER_NAME)
74
 
 
75
 
    return _logger
76
 
 
77
 
def _check_logger_class():
78
 
    '''
79
 
    Make sure process name is recorded when loggers are used
80
 
    '''
81
 
    # XXX This function is unnecessary once logging is patched
82
 
    import logging
83
 
    if hasattr(logging, 'multiprocessing'):
84
 
        return
85
 
 
86
 
    logging._acquireLock()
87
 
    try:
88
 
        OldLoggerClass = logging.getLoggerClass()
89
 
        if not getattr(OldLoggerClass, '_process_aware', False):
90
 
            class ProcessAwareLogger(OldLoggerClass):
91
 
                _process_aware = True
92
 
                def makeRecord(self, *args, **kwds):
93
 
                    record = OldLoggerClass.makeRecord(self, *args, **kwds)
94
 
                    record.processName = current_process()._name
95
 
                    return record
96
 
            logging.setLoggerClass(ProcessAwareLogger)
97
 
    finally:
98
 
        logging._releaseLock()
99
 
 
100
 
def log_to_stderr(level=None):
101
 
    '''
102
 
    Turn on logging and add a handler which prints to stderr
103
 
    '''
104
 
    global _log_to_stderr
105
 
    import logging
106
 
    logger = get_logger()
107
 
    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
108
 
    handler = logging.StreamHandler()
109
 
    handler.setFormatter(formatter)
110
 
    logger.addHandler(handler)
111
 
    if level is not None:
112
 
        logger.setLevel(level)
113
 
    _log_to_stderr = True
114
 
 
115
 
#
116
 
# Function returning a temp directory which will be removed on exit
117
 
#
118
 
 
119
 
def get_temp_dir():
120
 
    # get name of a temp directory which will be automatically cleaned up
121
 
    if current_process()._tempdir is None:
122
 
        import shutil, tempfile
123
 
        tempdir = tempfile.mkdtemp(prefix='pymp-')
124
 
        info('created temp directory %s', tempdir)
125
 
        Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
126
 
        current_process()._tempdir = tempdir
127
 
    return current_process()._tempdir
128
 
 
129
 
#
130
 
# Support for reinitialization of objects when bootstrapping a child process
131
 
#
132
 
 
133
 
_afterfork_registry = weakref.WeakValueDictionary()
134
 
_afterfork_counter = itertools.count()
135
 
 
136
 
def _run_after_forkers():
137
 
    items = list(_afterfork_registry.items())
138
 
    items.sort()
139
 
    for (index, ident, func), obj in items:
140
 
        try:
141
 
            func(obj)
142
 
        except Exception, e:
143
 
            info('after forker raised exception %s', e)
144
 
 
145
 
def register_after_fork(obj, func):
146
 
    _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
147
 
 
148
 
#
149
 
# Finalization using weakrefs
150
 
#
151
 
 
152
 
_finalizer_registry = {}
153
 
_finalizer_counter = itertools.count()
154
 
 
155
 
 
156
 
class Finalize(object):
157
 
    '''
158
 
    Class which supports object finalization using weakrefs
159
 
    '''
160
 
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
161
 
        assert exitpriority is None or type(exitpriority) is int
162
 
 
163
 
        if obj is not None:
164
 
            self._weakref = weakref.ref(obj, self)
165
 
        else:
166
 
            assert exitpriority is not None
167
 
 
168
 
        self._callback = callback
169
 
        self._args = args
170
 
        self._kwargs = kwargs or {}
171
 
        self._key = (exitpriority, _finalizer_counter.next())
172
 
 
173
 
        _finalizer_registry[self._key] = self
174
 
 
175
 
    def __call__(self, wr=None):
176
 
        '''
177
 
        Run the callback unless it has already been called or cancelled
178
 
        '''
179
 
        try:
180
 
            del _finalizer_registry[self._key]
181
 
        except KeyError:
182
 
            sub_debug('finalizer no longer registered')
183
 
        else:
184
 
            sub_debug('finalizer calling %s with args %s and kwargs %s',
185
 
                     self._callback, self._args, self._kwargs)
186
 
            res = self._callback(*self._args, **self._kwargs)
187
 
            self._weakref = self._callback = self._args = \
188
 
                            self._kwargs = self._key = None
189
 
            return res
190
 
 
191
 
    def cancel(self):
192
 
        '''
193
 
        Cancel finalization of the object
194
 
        '''
195
 
        try:
196
 
            del _finalizer_registry[self._key]
197
 
        except KeyError:
198
 
            pass
199
 
        else:
200
 
            self._weakref = self._callback = self._args = \
201
 
                            self._kwargs = self._key = None
202
 
 
203
 
    def still_active(self):
204
 
        '''
205
 
        Return whether this finalizer is still waiting to invoke callback
206
 
        '''
207
 
        return self._key in _finalizer_registry
208
 
 
209
 
    def __repr__(self):
210
 
        try:
211
 
            obj = self._weakref()
212
 
        except (AttributeError, TypeError):
213
 
            obj = None
214
 
 
215
 
        if obj is None:
216
 
            return '<Finalize object, dead>'
217
 
 
218
 
        x = '<Finalize object, callback=%s' % \
219
 
            getattr(self._callback, '__name__', self._callback)
220
 
        if self._args:
221
 
            x += ', args=' + str(self._args)
222
 
        if self._kwargs:
223
 
            x += ', kwargs=' + str(self._kwargs)
224
 
        if self._key[0] is not None:
225
 
            x += ', exitprority=' + str(self._key[0])
226
 
        return x + '>'
227
 
 
228
 
 
229
 
def _run_finalizers(minpriority=None):
230
 
    '''
231
 
    Run all finalizers whose exit priority is not None and at least minpriority
232
 
 
233
 
    Finalizers with highest priority are called first; finalizers with
234
 
    the same priority will be called in reverse order of creation.
235
 
    '''
236
 
    if minpriority is None:
237
 
        f = lambda p : p[0][0] is not None
238
 
    else:
239
 
        f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
240
 
 
241
 
    items = [x for x in _finalizer_registry.items() if f(x)]
242
 
    items.sort(reverse=True)
243
 
 
244
 
    for key, finalizer in items:
245
 
        sub_debug('calling %s', finalizer)
246
 
        try:
247
 
            finalizer()
248
 
        except Exception:
249
 
            import traceback
250
 
            traceback.print_exc()
251
 
 
252
 
    if minpriority is None:
253
 
        _finalizer_registry.clear()
254
 
 
255
 
#
256
 
# Clean up on exit
257
 
#
258
 
 
259
 
def is_exiting():
260
 
    '''
261
 
    Returns true if the process is shutting down
262
 
    '''
263
 
    return _exiting or _exiting is None
264
 
 
265
 
_exiting = False
266
 
 
267
 
def _exit_function():
268
 
    global _exiting
269
 
 
270
 
    info('process shutting down')
271
 
    debug('running all "atexit" finalizers with priority >= 0')
272
 
    _run_finalizers(0)
273
 
 
274
 
    for p in active_children():
275
 
        if p._daemonic:
276
 
            info('calling terminate() for daemon %s', p.name)
277
 
            p._popen.terminate()
278
 
 
279
 
    for p in active_children():
280
 
        info('calling join() for process %s', p.name)
281
 
        p.join()
282
 
 
283
 
    debug('running the remaining "atexit" finalizers')
284
 
    _run_finalizers()
285
 
 
286
 
atexit.register(_exit_function)
287
 
 
288
 
#
289
 
# Some fork aware types
290
 
#
291
 
 
292
 
class ForkAwareThreadLock(object):
293
 
    def __init__(self):
294
 
        self._lock = threading.Lock()
295
 
        self.acquire = self._lock.acquire
296
 
        self.release = self._lock.release
297
 
        register_after_fork(self, ForkAwareThreadLock.__init__)
298
 
 
299
 
class ForkAwareLocal(threading.local):
300
 
    def __init__(self):
301
 
        register_after_fork(self, lambda obj : obj.__dict__.clear())
302
 
    def __reduce__(self):
303
 
        return type(self), ()