~trbs/phatch/multiprocessing

« back to all changes in this revision

Viewing changes to phatch/core/execute.py

  • Committer: trbs
  • Date: 2009-06-15 01:28:21 UTC
  • Revision ID: trbs-20090615012821-aanxim44kankcapz
initial commit for multiprocessing patch

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Phatch - Photo Batch Processor
 
2
# Copyright (C) 2007-2008 www.stani.be
 
3
#
 
4
# This program is free software: you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License as published by
 
6
# the Free Software Foundation, either version 3 of the License, or
 
7
# (at your option) any later version.
 
8
#
 
9
# This program is distributed in the hope that it will be useful,
 
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
# GNU General Public License for more details.
 
13
 
14
# You should have received a copy of the GNU General Public License
 
15
# along with this program.  If not, see http://www.gnu.org/licenses/
 
16
#
 
17
# Phatch recommends SPE (http://pythonide.stani.be) for editing python files.
 
18
 
 
19
"""Execute actions mainloop queue"""
 
20
 
 
21
#License: latest GPL
 
22
 
 
23
#---import modules
 
24
 
 
25
#multiprocessing
 
26
try:
 
27
    import __main__
 
28
    if getattr(__main__, 'FORCE_THREADS', False):
 
29
        raise ImportError("Forcing use of threads")
 
30
    from multiprocessing import freeze_support, cpu_count, Manager, Process
 
31
    from multiprocessing.queues import Queue, Empty
 
32
    HAS_MULTIPROCESSING = True
 
33
    Consumer = Process
 
34
except ImportError:
 
35
    HAS_MULTIPROCESSING = False
 
36
    import sys
 
37
    import os
 
38
    from Queue import Queue, Empty
 
39
    from threading import Thread
 
40
    Consumer = Thread
 
41
    # taken from multiprocessing module
 
42
    def cpu_count():
 
43
        '''
 
44
        Returns the number of CPUs in the system
 
45
        '''
 
46
        if sys.platform == 'win32':
 
47
            try:
 
48
                num = int(os.environ['NUMBER_OF_PROCESSORS'])
 
49
            except (ValueError, KeyError):
 
50
                num = 0
 
51
        elif 'bsd' in sys.platform or sys.platform == 'darwin':
 
52
            try:
 
53
                num = int(os.popen('sysctl -n hw.ncpu').read())
 
54
            except ValueError:
 
55
                num = 0
 
56
        else:
 
57
            try:
 
58
                num = os.sysconf('SC_NPROCESSORS_ONLN')
 
59
            except (ValueError, OSError, AttributeError):
 
60
                num = 0
 
61
 
 
62
        if num >= 1:
 
63
            return num
 
64
        else:
 
65
            raise NotImplementedError('cannot determine number of cpus')
 
66
 
 
67
#standard library    
 
68
import threading
 
69
import time
 
70
 
 
71
#gui-independent
 
72
from api import apply_action, get_photo, flush_metadata, ReadOnlyDict
 
73
from message import send
 
74
 
 
75
#---constants
 
76
try: NR_OF_CPUS = cpu_count()
 
77
except NotImplementedError: NR_OF_CPUS = 1
 
78
 
 
79
#---classes
 
80
class ImageFilesQueue(Queue):
 
81
    """ ImageFilesQueue(prepopulate_list=[], maxsize=0) -> queue object
 
82
    
 
83
    Near duplicate of a normal multi-producer, multi-consumer queue
 
84
    adding functions to prepopulate or populate the queue with a list
 
85
    and draining the queue of all queued items.
 
86
    """
 
87
    
 
88
    def __init__(self, prepopulate_list=[], maxsize=0):
 
89
        Queue.__init__(self, maxsize=maxsize)
 
90
        self.populate(prepopulate_list)
 
91
        
 
92
    def populate(self, populate_list=[]):
 
93
        for element in populate_list:
 
94
            self.put(element)
 
95
 
 
96
    def drain(self):
 
97
        while not self.empty():
 
98
            try:
 
99
                self.get(False, 1)
 
100
            except Empty:
 
101
                # this exception should not happen, unless something
 
102
                # is blocking (deadlocking?) the queue.
 
103
                raise Exception("Failed to drain the queue")
 
104
 
 
105
#---functions
 
106
 
 
107
def execution_consumer(actions, settings, image_files_queue, shared_state):
 
108
    cache                   = {}
 
109
    skip_existing_images    = not (settings['overwrite_existing_images'] or\
 
110
                                settings['overwrite_existing_images_forced']) and\
 
111
                                not settings['no_save']
 
112
    copy_metadata           = settings['copy_metadata']
 
113
    result                  = {
 
114
        'stop_for_errors'   : settings['stop_for_errors'],
 
115
        'last_answer'       : None,
 
116
    }
 
117
    is_done                 = actions[-1].is_done #checking method for resuming
 
118
    read_only_settings      = ReadOnlyDict(settings)
 
119
    
 
120
    while not image_files_queue.empty():
 
121
        if shared_state['stop_processing']: break
 
122
        try:
 
123
            image_index, (folder, image_file) = image_files_queue.get(False, 1)
 
124
        except mcpu.Empty:
 
125
            # TODO: Handle exception (that should not occur) here where
 
126
            # the queue should be not-empty and still the consumer is
 
127
            # unable to get a new item from the queue. Basically report
 
128
            # the error and bail out of consumer
 
129
            raise
 
130
        
 
131
        shared_state['image_index'] = image_index
 
132
        shared_state['image_file'] = image_file
 
133
        
 
134
        #open image and check for errors
 
135
        photo, result = get_photo(image_file, image_index, result,
 
136
                                  copy_metadata, folder)
 
137
        
 
138
        ####if      result['abort']:  return
 
139
        ####elif    result['skip']:   break
 
140
        
 
141
        #check if already not done
 
142
        if skip_existing_images and is_done(photo):
 
143
            continue
 
144
        
 
145
        #do the actions
 
146
        for action_index, action in enumerate(actions):
 
147
            if shared_state['stop_processing']: break
 
148
            if action.flush_metadata_before:
 
149
                photo, result  = flush_metadata(photo, image_file, result)
 
150
                ####if      result['abort']: return
 
151
                ####elif    result['skip']: break
 
152
            #apply action
 
153
            photo, result  = apply_action(action, photo, read_only_settings, cache,
 
154
                                          image_file, result)
 
155
            ####if      result['abort']: return
 
156
            ####elif    result['skip']: break
 
157
        photo, result  = flush_metadata(photo, image_file, result)
 
158
        del photo, action_index, action
 
159
        ####if result['abort']: return
 
160
 
 
161
def execute_actions(actions, image_files, settings, image_amount):
 
162
    workers = []
 
163
    if HAS_MULTIPROCESSING:
 
164
        shared_state = Manager().dict()
 
165
    else:
 
166
        shared_state = dict()
 
167
    shared_state.update({
 
168
        'stop_processing': False,
 
169
        'image_index': 0,
 
170
        'image_file': "...",
 
171
    })
 
172
    
 
173
    current_image_index = 0
 
174
    nr_of_consumers = settings['nr_of_consumers']
 
175
 
 
176
    if nr_of_consumers == -1:
 
177
        nr_of_consumers = NR_OF_CPUS
 
178
 
 
179
    #Setup Queue
 
180
    image_files_queue = ImageFilesQueue(enumerate(image_files))
 
181
    
 
182
    for i in xrange(nr_of_consumers):
 
183
        args = (actions, settings, image_files_queue, shared_state)
 
184
        worker = Consumer(target=execution_consumer, args=args)
 
185
        worker.start()
 
186
        workers.append(worker)
 
187
    
 
188
    print nr_of_consumers, Consumer, workers
 
189
    
 
190
    # Main action queue loop
 
191
    try:
 
192
        while not image_files_queue.empty():
 
193
            #if not multiprocessing:
 
194
            #    consumer.consume()
 
195
            
 
196
            image_index = shared_state['image_index']
 
197
            if image_index > current_image_index:
 
198
                current_image_index = image_index
 
199
            image_file = shared_state['image_file']
 
200
            
 
201
            progress_result = {}
 
202
            send.progress_update_filename(progress_result, current_image_index, image_file)
 
203
            if progress_result and not progress_result['keepgoing']:
 
204
                shared_state['stop_processing'] = True
 
205
                send.progress_update_filename({}, image_amount, "Stopping...")
 
206
                # drain the queue
 
207
                image_files_queue.drain()
 
208
                # close dialog
 
209
                send.progress_close()
 
210
                return False
 
211
            time.sleep(0.1)
 
212
    finally:
 
213
        # join all workers
 
214
        for worker in workers:
 
215
            worker.join()
 
216
    return True