1
# Copyright (C) 2009 Canonical Ltd
3
# This program is free software: you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License version 3 as
5
# published by the Free Software Foundation.
7
# This program is distributed in the hope that it will be useful, but
8
# WITHOUT ANY WARRANTY; without even the implied warranty of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
10
# General Public License for more details.
12
# You should have received a copy of the GNU General Public License
13
# along with this program. If not, see <http://www.gnu.org/licenses/>.
15
"""Work with files on disk."""
20
import multiprocessing
22
multiprocessing = None
27
def open_file(filename):
28
"""Open a file which might be a regular file or a gzip.
30
:return: An iterator of lines, and a cleanup function.
32
source = open(filename, 'rb')
33
gzip_source = gzip.GzipFile(mode='rb', fileobj=source)
35
line = gzip_source.readline()
36
except KeyboardInterrupt:
39
# probably not a gzip file
43
# We don't need these anymore, so close them out in case the rest of
44
# the code raises an exception.
48
# preference - a gzip subprocess
49
if sys.platform == 'win32':
50
close_fds = False # not supported
54
process = subprocess.Popen(['gzip', '-d', '-c', filename],
55
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
56
stderr=subprocess.PIPE, close_fds=close_fds)
58
if e.errno == errno.ENOENT:
59
# failing that, use another python process
60
return _open_mprocess(filename)
61
# make reading from stdin, or writting errors cause immediate aborts
63
process.stderr.close()
64
terminate = getattr(process, 'terminate', None)
65
# terminate is a py2.6 thing
66
if terminate is not None:
67
return process.stdout, terminate
69
# We would like to use process.wait() but that can cause a deadlock
70
# if the child is still writing.
71
# The other alternative is process.communicate, but we closed
72
# stderr, and communicate wants to read from it. (We get:
73
# ValueError: I/O operation on closed file
74
# if we try it here. Also, for large files, this may be many GB
76
# So for now, live with the deadlock...
77
return process.stdout, process.wait
80
def _stream_file(filename, child):
81
gzip_source = gzip.GzipFile(filename, 'rb')
82
for line in gzip_source:
87
def _open_mprocess(filename):
88
if multiprocessing is None:
89
# can't multiprocess, use inprocess gzip.
90
return gzip.GzipFile(filename, mode='rb'), None
91
parent, child = multiprocessing.Pipe(False)
92
process = multiprocessing.Process(target=_stream_file, args=(filename, child))
100
return iter_pipe(), process.join