~pythonregexp2.7/python/issue2636-11

« back to all changes in this revision

Viewing changes to Lib/multiprocessing/util.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-09-21 17:53:26 UTC
  • mfrom: (39025.1.14 Regexp-2.7)
  • Revision ID: darklord@timehorse.com-20080921175326-92vaej2hc3yuecxb
Merged in changes from the core Regexp branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#
 
2
# Module providing various facilities to other parts of the package
 
3
#
 
4
# multiprocessing/util.py
 
5
#
 
6
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
 
7
#
 
8
 
 
9
import itertools
 
10
import weakref
 
11
import atexit
 
12
import threading        # we want threading to install it's
 
13
                        # cleanup function before multiprocessing does
 
14
 
 
15
from multiprocessing.process import current_process, active_children
 
16
 
 
17
__all__ = [
 
18
    'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
 
19
    'log_to_stderr', 'get_temp_dir', 'register_after_fork',
 
20
    'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
 
21
    ]
 
22
 
 
23
#
 
24
# Logging
 
25
#
 
26
 
 
27
NOTSET = 0
 
28
SUBDEBUG = 5
 
29
DEBUG = 10
 
30
INFO = 20
 
31
SUBWARNING = 25
 
32
 
 
33
LOGGER_NAME = 'multiprocessing'
 
34
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
 
35
 
 
36
_logger = None
 
37
_log_to_stderr = False
 
38
 
 
39
def sub_debug(msg, *args):
 
40
    if _logger:
 
41
        _logger.log(SUBDEBUG, msg, *args)
 
42
 
 
43
def debug(msg, *args):
 
44
    if _logger:
 
45
        _logger.log(DEBUG, msg, *args)
 
46
 
 
47
def info(msg, *args):
 
48
    if _logger:
 
49
        _logger.log(INFO, msg, *args)
 
50
 
 
51
def sub_warning(msg, *args):
 
52
    if _logger:
 
53
        _logger.log(SUBWARNING, msg, *args)
 
54
 
 
55
def get_logger():
 
56
    '''
 
57
    Returns logger used by multiprocessing
 
58
    '''
 
59
    global _logger
 
60
 
 
61
    if not _logger:
 
62
        import logging, atexit
 
63
 
 
64
        # XXX multiprocessing should cleanup before logging
 
65
        if hasattr(atexit, 'unregister'):
 
66
            atexit.unregister(_exit_function)
 
67
            atexit.register(_exit_function)
 
68
        else:
 
69
            atexit._exithandlers.remove((_exit_function, (), {}))
 
70
            atexit._exithandlers.append((_exit_function, (), {}))
 
71
 
 
72
        _check_logger_class()
 
73
        _logger = logging.getLogger(LOGGER_NAME)
 
74
 
 
75
    return _logger
 
76
 
 
77
def _check_logger_class():
 
78
    '''
 
79
    Make sure process name is recorded when loggers are used
 
80
    '''
 
81
    # XXX This function is unnecessary once logging is patched
 
82
    import logging
 
83
    if hasattr(logging, 'multiprocessing'):
 
84
        return
 
85
 
 
86
    logging._acquireLock()
 
87
    try:
 
88
        OldLoggerClass = logging.getLoggerClass()
 
89
        if not getattr(OldLoggerClass, '_process_aware', False):
 
90
            class ProcessAwareLogger(OldLoggerClass):
 
91
                _process_aware = True
 
92
                def makeRecord(self, *args, **kwds):
 
93
                    record = OldLoggerClass.makeRecord(self, *args, **kwds)
 
94
                    record.processName = current_process()._name
 
95
                    return record
 
96
            logging.setLoggerClass(ProcessAwareLogger)
 
97
    finally:
 
98
        logging._releaseLock()
 
99
 
 
100
def log_to_stderr(level=None):
 
101
    '''
 
102
    Turn on logging and add a handler which prints to stderr
 
103
    '''
 
104
    global _log_to_stderr
 
105
    import logging
 
106
    logger = get_logger()
 
107
    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
 
108
    handler = logging.StreamHandler()
 
109
    handler.setFormatter(formatter)
 
110
    logger.addHandler(handler)
 
111
    if level is not None:
 
112
        logger.setLevel(level)
 
113
    _log_to_stderr = True
 
114
 
 
115
#
 
116
# Function returning a temp directory which will be removed on exit
 
117
#
 
118
 
 
119
def get_temp_dir():
 
120
    # get name of a temp directory which will be automatically cleaned up
 
121
    if current_process()._tempdir is None:
 
122
        import shutil, tempfile
 
123
        tempdir = tempfile.mkdtemp(prefix='pymp-')
 
124
        info('created temp directory %s', tempdir)
 
125
        Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
 
126
        current_process()._tempdir = tempdir
 
127
    return current_process()._tempdir
 
128
 
 
129
#
 
130
# Support for reinitialization of objects when bootstrapping a child process
 
131
#
 
132
 
 
133
_afterfork_registry = weakref.WeakValueDictionary()
 
134
_afterfork_counter = itertools.count()
 
135
 
 
136
def _run_after_forkers():
 
137
    items = list(_afterfork_registry.items())
 
138
    items.sort()
 
139
    for (index, ident, func), obj in items:
 
140
        try:
 
141
            func(obj)
 
142
        except Exception, e:
 
143
            info('after forker raised exception %s', e)
 
144
 
 
145
def register_after_fork(obj, func):
 
146
    _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
 
147
 
 
148
#
 
149
# Finalization using weakrefs
 
150
#
 
151
 
 
152
_finalizer_registry = {}
 
153
_finalizer_counter = itertools.count()
 
154
 
 
155
 
 
156
class Finalize(object):
 
157
    '''
 
158
    Class which supports object finalization using weakrefs
 
159
    '''
 
160
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
 
161
        assert exitpriority is None or type(exitpriority) is int
 
162
 
 
163
        if obj is not None:
 
164
            self._weakref = weakref.ref(obj, self)
 
165
        else:
 
166
            assert exitpriority is not None
 
167
 
 
168
        self._callback = callback
 
169
        self._args = args
 
170
        self._kwargs = kwargs or {}
 
171
        self._key = (exitpriority, _finalizer_counter.next())
 
172
 
 
173
        _finalizer_registry[self._key] = self
 
174
 
 
175
    def __call__(self, wr=None):
 
176
        '''
 
177
        Run the callback unless it has already been called or cancelled
 
178
        '''
 
179
        try:
 
180
            del _finalizer_registry[self._key]
 
181
        except KeyError:
 
182
            sub_debug('finalizer no longer registered')
 
183
        else:
 
184
            sub_debug('finalizer calling %s with args %s and kwargs %s',
 
185
                     self._callback, self._args, self._kwargs)
 
186
            res = self._callback(*self._args, **self._kwargs)
 
187
            self._weakref = self._callback = self._args = \
 
188
                            self._kwargs = self._key = None
 
189
            return res
 
190
 
 
191
    def cancel(self):
 
192
        '''
 
193
        Cancel finalization of the object
 
194
        '''
 
195
        try:
 
196
            del _finalizer_registry[self._key]
 
197
        except KeyError:
 
198
            pass
 
199
        else:
 
200
            self._weakref = self._callback = self._args = \
 
201
                            self._kwargs = self._key = None
 
202
 
 
203
    def still_active(self):
 
204
        '''
 
205
        Return whether this finalizer is still waiting to invoke callback
 
206
        '''
 
207
        return self._key in _finalizer_registry
 
208
 
 
209
    def __repr__(self):
 
210
        try:
 
211
            obj = self._weakref()
 
212
        except (AttributeError, TypeError):
 
213
            obj = None
 
214
 
 
215
        if obj is None:
 
216
            return '<Finalize object, dead>'
 
217
 
 
218
        x = '<Finalize object, callback=%s' % \
 
219
            getattr(self._callback, '__name__', self._callback)
 
220
        if self._args:
 
221
            x += ', args=' + str(self._args)
 
222
        if self._kwargs:
 
223
            x += ', kwargs=' + str(self._kwargs)
 
224
        if self._key[0] is not None:
 
225
            x += ', exitprority=' + str(self._key[0])
 
226
        return x + '>'
 
227
 
 
228
 
 
229
def _run_finalizers(minpriority=None):
 
230
    '''
 
231
    Run all finalizers whose exit priority is not None and at least minpriority
 
232
 
 
233
    Finalizers with highest priority are called first; finalizers with
 
234
    the same priority will be called in reverse order of creation.
 
235
    '''
 
236
    if minpriority is None:
 
237
        f = lambda p : p[0][0] is not None
 
238
    else:
 
239
        f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
 
240
 
 
241
    items = [x for x in _finalizer_registry.items() if f(x)]
 
242
    items.sort(reverse=True)
 
243
 
 
244
    for key, finalizer in items:
 
245
        sub_debug('calling %s', finalizer)
 
246
        try:
 
247
            finalizer()
 
248
        except Exception:
 
249
            import traceback
 
250
            traceback.print_exc()
 
251
 
 
252
    if minpriority is None:
 
253
        _finalizer_registry.clear()
 
254
 
 
255
#
 
256
# Clean up on exit
 
257
#
 
258
 
 
259
def is_exiting():
 
260
    '''
 
261
    Returns true if the process is shutting down
 
262
    '''
 
263
    return _exiting or _exiting is None
 
264
 
 
265
_exiting = False
 
266
 
 
267
def _exit_function():
 
268
    global _exiting
 
269
 
 
270
    info('process shutting down')
 
271
    debug('running all "atexit" finalizers with priority >= 0')
 
272
    _run_finalizers(0)
 
273
 
 
274
    for p in active_children():
 
275
        if p._daemonic:
 
276
            info('calling terminate() for daemon %s', p.name)
 
277
            p._popen.terminate()
 
278
 
 
279
    for p in active_children():
 
280
        info('calling join() for process %s', p.name)
 
281
        p.join()
 
282
 
 
283
    debug('running the remaining "atexit" finalizers')
 
284
    _run_finalizers()
 
285
 
 
286
atexit.register(_exit_function)
 
287
 
 
288
#
 
289
# Some fork aware types
 
290
#
 
291
 
 
292
class ForkAwareThreadLock(object):
 
293
    def __init__(self):
 
294
        self._lock = threading.Lock()
 
295
        self.acquire = self._lock.acquire
 
296
        self.release = self._lock.release
 
297
        register_after_fork(self, ForkAwareThreadLock.__init__)
 
298
 
 
299
class ForkAwareLocal(threading.local):
 
300
    def __init__(self):
 
301
        register_after_fork(self, lambda obj : obj.__dict__.clear())
 
302
    def __reduce__(self):
 
303
        return type(self), ()