~pythonregexp2.7/python/issue2636

« back to all changes in this revision

Viewing changes to Lib/queue.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-05-24 16:05:21 UTC
  • mfrom: (39021.1.401 Regexp-2.6)
  • Revision ID: darklord@timehorse.com-20080524160521-1xenj7p6u3wb89et
Merged in changes from the latest python source snapshot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""A multi-producer, multi-consumer queue."""
 
2
 
 
3
from time import time as _time
 
4
from collections import deque
 
5
import heapq
 
6
 
 
7
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
 
8
 
 
9
class Empty(Exception):
 
10
    "Exception raised by Queue.get(block=0)/get_nowait()."
 
11
    pass
 
12
 
 
13
class Full(Exception):
 
14
    "Exception raised by Queue.put(block=0)/put_nowait()."
 
15
    pass
 
16
 
 
17
class Queue:
 
18
    """Create a queue object with a given maximum size.
 
19
 
 
20
    If maxsize is <= 0, the queue size is infinite.
 
21
    """
 
22
    def __init__(self, maxsize=0):
 
23
        try:
 
24
            import threading
 
25
        except ImportError:
 
26
            import dummy_threading as threading
 
27
        self.maxsize = maxsize
 
28
        self._init(maxsize)
 
29
        # mutex must be held whenever the queue is mutating.  All methods
 
30
        # that acquire mutex must release it before returning.  mutex
 
31
        # is shared between the three conditions, so acquiring and
 
32
        # releasing the conditions also acquires and releases mutex.
 
33
        self.mutex = threading.Lock()
 
34
        # Notify not_empty whenever an item is added to the queue; a
 
35
        # thread waiting to get is notified then.
 
36
        self.not_empty = threading.Condition(self.mutex)
 
37
        # Notify not_full whenever an item is removed from the queue;
 
38
        # a thread waiting to put is notified then.
 
39
        self.not_full = threading.Condition(self.mutex)
 
40
        # Notify all_tasks_done whenever the number of unfinished tasks
 
41
        # drops to zero; thread waiting to join() is notified to resume
 
42
        self.all_tasks_done = threading.Condition(self.mutex)
 
43
        self.unfinished_tasks = 0
 
44
 
 
45
    def task_done(self):
 
46
        """Indicate that a formerly enqueued task is complete.
 
47
 
 
48
        Used by Queue consumer threads.  For each get() used to fetch a task,
 
49
        a subsequent call to task_done() tells the queue that the processing
 
50
        on the task is complete.
 
51
 
 
52
        If a join() is currently blocking, it will resume when all items
 
53
        have been processed (meaning that a task_done() call was received
 
54
        for every item that had been put() into the queue).
 
55
 
 
56
        Raises a ValueError if called more times than there were items
 
57
        placed in the queue.
 
58
        """
 
59
        self.all_tasks_done.acquire()
 
60
        try:
 
61
            unfinished = self.unfinished_tasks - 1
 
62
            if unfinished <= 0:
 
63
                if unfinished < 0:
 
64
                    raise ValueError('task_done() called too many times')
 
65
                self.all_tasks_done.notifyAll()
 
66
            self.unfinished_tasks = unfinished
 
67
        finally:
 
68
            self.all_tasks_done.release()
 
69
 
 
70
    def join(self):
 
71
        """Blocks until all items in the Queue have been gotten and processed.
 
72
 
 
73
        The count of unfinished tasks goes up whenever an item is added to the
 
74
        queue. The count goes down whenever a consumer thread calls task_done()
 
75
        to indicate the item was retrieved and all work on it is complete.
 
76
 
 
77
        When the count of unfinished tasks drops to zero, join() unblocks.
 
78
        """
 
79
        self.all_tasks_done.acquire()
 
80
        try:
 
81
            while self.unfinished_tasks:
 
82
                self.all_tasks_done.wait()
 
83
        finally:
 
84
            self.all_tasks_done.release()
 
85
 
 
86
    def qsize(self):
 
87
        """Return the approximate size of the queue (not reliable!)."""
 
88
        self.mutex.acquire()
 
89
        n = self._qsize()
 
90
        self.mutex.release()
 
91
        return n
 
92
 
 
93
    def empty(self):
 
94
        """Return True if the queue is empty, False otherwise (not reliable!)."""
 
95
        self.mutex.acquire()
 
96
        n = not self._qsize()
 
97
        self.mutex.release()
 
98
        return n
 
99
 
 
100
    def full(self):
 
101
        """Return True if the queue is full, False otherwise (not reliable!)."""
 
102
        self.mutex.acquire()
 
103
        n = 0 < self.maxsize == self._qsize()
 
104
        self.mutex.release()
 
105
        return n
 
106
 
 
107
    def put(self, item, block=True, timeout=None):
 
108
        """Put an item into the queue.
 
109
 
 
110
        If optional args 'block' is true and 'timeout' is None (the default),
 
111
        block if necessary until a free slot is available. If 'timeout' is
 
112
        a positive number, it blocks at most 'timeout' seconds and raises
 
113
        the Full exception if no free slot was available within that time.
 
114
        Otherwise ('block' is false), put an item on the queue if a free slot
 
115
        is immediately available, else raise the Full exception ('timeout'
 
116
        is ignored in that case).
 
117
        """
 
118
        self.not_full.acquire()
 
119
        try:
 
120
            if self.maxsize > 0:
 
121
                if not block:
 
122
                    if self._qsize() == self.maxsize:
 
123
                        raise Full
 
124
                elif timeout is None:
 
125
                    while self._qsize() == self.maxsize:
 
126
                        self.not_full.wait()
 
127
                elif timeout < 0:
 
128
                    raise ValueError("'timeout' must be a positive number")
 
129
                else:
 
130
                    endtime = _time() + timeout
 
131
                    while self._qsize() == self.maxsize:
 
132
                        remaining = endtime - _time()
 
133
                        if remaining <= 0.0:
 
134
                            raise Full
 
135
                        self.not_full.wait(remaining)
 
136
            self._put(item)
 
137
            self.unfinished_tasks += 1
 
138
            self.not_empty.notify()
 
139
        finally:
 
140
            self.not_full.release()
 
141
 
 
142
    def put_nowait(self, item):
 
143
        """Put an item into the queue without blocking.
 
144
 
 
145
        Only enqueue the item if a free slot is immediately available.
 
146
        Otherwise raise the Full exception.
 
147
        """
 
148
        return self.put(item, False)
 
149
 
 
150
    def get(self, block=True, timeout=None):
 
151
        """Remove and return an item from the queue.
 
152
 
 
153
        If optional args 'block' is true and 'timeout' is None (the default),
 
154
        block if necessary until an item is available. If 'timeout' is
 
155
        a positive number, it blocks at most 'timeout' seconds and raises
 
156
        the Empty exception if no item was available within that time.
 
157
        Otherwise ('block' is false), return an item if one is immediately
 
158
        available, else raise the Empty exception ('timeout' is ignored
 
159
        in that case).
 
160
        """
 
161
        self.not_empty.acquire()
 
162
        try:
 
163
            if not block:
 
164
                if not self._qsize():
 
165
                    raise Empty
 
166
            elif timeout is None:
 
167
                while not self._qsize():
 
168
                    self.not_empty.wait()
 
169
            elif timeout < 0:
 
170
                raise ValueError("'timeout' must be a positive number")
 
171
            else:
 
172
                endtime = _time() + timeout
 
173
                while not self._qsize():
 
174
                    remaining = endtime - _time()
 
175
                    if remaining <= 0.0:
 
176
                        raise Empty
 
177
                    self.not_empty.wait(remaining)
 
178
            item = self._get()
 
179
            self.not_full.notify()
 
180
            return item
 
181
        finally:
 
182
            self.not_empty.release()
 
183
 
 
184
    def get_nowait(self):
 
185
        """Remove and return an item from the queue without blocking.
 
186
 
 
187
        Only get an item if one is immediately available. Otherwise
 
188
        raise the Empty exception.
 
189
        """
 
190
        return self.get(False)
 
191
 
 
192
    # Override these methods to implement other queue organizations
 
193
    # (e.g. stack or priority queue).
 
194
    # These will only be called with appropriate locks held
 
195
 
 
196
    # Initialize the queue representation
 
197
    def _init(self, maxsize):
 
198
        self.queue = deque()
 
199
 
 
200
    def _qsize(self, len=len):
 
201
        return len(self.queue)
 
202
 
 
203
    # Put a new item in the queue
 
204
    def _put(self, item):
 
205
        self.queue.append(item)
 
206
 
 
207
    # Get an item from the queue
 
208
    def _get(self):
 
209
        return self.queue.popleft()
 
210
 
 
211
 
 
212
class PriorityQueue(Queue):
 
213
    '''Variant of Queue that retrieves open entries in priority order (lowest first).
 
214
 
 
215
    Entries are typically tuples of the form:  (priority number, data).
 
216
    '''
 
217
 
 
218
    def _init(self, maxsize):
 
219
        self.queue = []
 
220
 
 
221
    def _qsize(self, len=len):
 
222
        return len(self.queue)
 
223
 
 
224
    def _put(self, item, heappush=heapq.heappush):
 
225
        heappush(self.queue, item)
 
226
 
 
227
    def _get(self, heappop=heapq.heappop):
 
228
        return heappop(self.queue)
 
229
 
 
230
 
 
231
class LifoQueue(Queue):
 
232
    '''Variant of Queue that retrieves most recently added entries first.'''
 
233
 
 
234
    def _init(self, maxsize):
 
235
        self.queue = []
 
236
 
 
237
    def _qsize(self, len=len):
 
238
        return len(self.queue)
 
239
 
 
240
    def _put(self, item):
 
241
        self.queue.append(item)
 
242
 
 
243
    def _get(self):
 
244
        return self.queue.pop()