1
# Phatch - Photo Batch Processor
2
# Copyright (C) 2007-2008 www.stani.be
4
# This program is free software: you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program. If not, see http://www.gnu.org/licenses/
17
# Phatch recommends SPE (http://pythonide.stani.be) for editing python files.
19
"""Execute actions mainloop queue"""
28
if getattr(__main__, 'FORCE_THREADS', False):
29
raise ImportError("Forcing use of threads")
30
from multiprocessing import freeze_support, cpu_count, Manager, Process
31
from multiprocessing.queues import Queue, Empty
32
HAS_MULTIPROCESSING = True
35
HAS_MULTIPROCESSING = False
38
from Queue import Queue, Empty
39
from threading import Thread
41
# taken from multiprocessing module
44
Returns the number of CPUs in the system
46
if sys.platform == 'win32':
48
num = int(os.environ['NUMBER_OF_PROCESSORS'])
49
except (ValueError, KeyError):
51
elif 'bsd' in sys.platform or sys.platform == 'darwin':
53
num = int(os.popen('sysctl -n hw.ncpu').read())
58
num = os.sysconf('SC_NPROCESSORS_ONLN')
59
except (ValueError, OSError, AttributeError):
65
raise NotImplementedError('cannot determine number of cpus')
72
from api import apply_action, get_photo, flush_metadata, ReadOnlyDict
73
from message import send
76
try: NR_OF_CPUS = cpu_count()
77
except NotImplementedError: NR_OF_CPUS = 1
80
class ImageFilesQueue(Queue):
81
""" ImageFilesQueue(prepopulate_list=[], maxsize=0) -> queue object
83
Near duplicate of a normal multi-producer, multi-consumer queue
84
adding functions to prepopulate or populate the queue with a list
85
and draining the queue of all queued items.
88
def __init__(self, prepopulate_list=[], maxsize=0):
89
Queue.__init__(self, maxsize=maxsize)
90
self.populate(prepopulate_list)
92
def populate(self, populate_list=[]):
93
for element in populate_list:
97
while not self.empty():
101
# this exception should not happen, unless something
102
# is blocking (deadlocking?) the queue.
103
raise Exception("Failed to drain the queue")
107
def execution_consumer(actions, settings, image_files_queue, shared_state):
109
skip_existing_images = not (settings['overwrite_existing_images'] or\
110
settings['overwrite_existing_images_forced']) and\
111
not settings['no_save']
112
copy_metadata = settings['copy_metadata']
114
'stop_for_errors' : settings['stop_for_errors'],
115
'last_answer' : None,
117
is_done = actions[-1].is_done #checking method for resuming
118
read_only_settings = ReadOnlyDict(settings)
120
while not image_files_queue.empty():
121
if shared_state['stop_processing']: break
123
image_index, (folder, image_file) = image_files_queue.get(False, 1)
125
# TODO: Handle exception (that should not occur) here where
126
# the queue should be not-empty and still the consumer is
127
# unable to get a new item from the queue. Basically report
128
# the error and bail out of consumer
131
shared_state['image_index'] = image_index
132
shared_state['image_file'] = image_file
134
#open image and check for errors
135
photo, result = get_photo(image_file, image_index, result,
136
copy_metadata, folder)
138
####if result['abort']: return
139
####elif result['skip']: break
141
#check if already not done
142
if skip_existing_images and is_done(photo):
146
for action_index, action in enumerate(actions):
147
if shared_state['stop_processing']: break
148
if action.flush_metadata_before:
149
photo, result = flush_metadata(photo, image_file, result)
150
####if result['abort']: return
151
####elif result['skip']: break
153
photo, result = apply_action(action, photo, read_only_settings, cache,
155
####if result['abort']: return
156
####elif result['skip']: break
157
photo, result = flush_metadata(photo, image_file, result)
158
del photo, action_index, action
159
####if result['abort']: return
161
def execute_actions(actions, image_files, settings, image_amount):
163
if HAS_MULTIPROCESSING:
164
shared_state = Manager().dict()
166
shared_state = dict()
167
shared_state.update({
168
'stop_processing': False,
173
current_image_index = 0
174
nr_of_consumers = settings['nr_of_consumers']
176
if nr_of_consumers == -1:
177
nr_of_consumers = NR_OF_CPUS
180
image_files_queue = ImageFilesQueue(enumerate(image_files))
182
for i in xrange(nr_of_consumers):
183
args = (actions, settings, image_files_queue, shared_state)
184
worker = Consumer(target=execution_consumer, args=args)
186
workers.append(worker)
188
print nr_of_consumers, Consumer, workers
190
# Main action queue loop
192
while not image_files_queue.empty():
193
#if not multiprocessing:
196
image_index = shared_state['image_index']
197
if image_index > current_image_index:
198
current_image_index = image_index
199
image_file = shared_state['image_file']
202
send.progress_update_filename(progress_result, current_image_index, image_file)
203
if progress_result and not progress_result['keepgoing']:
204
shared_state['stop_processing'] = True
205
send.progress_update_filename({}, image_amount, "Stopping...")
207
image_files_queue.drain()
209
send.progress_close()
214
for worker in workers: