1
"""A multi-producer, multi-consumer queue."""
3
from time import time as _time
4
from collections import deque
7
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
9
class Empty(Exception):
10
"Exception raised by Queue.get(block=0)/get_nowait()."
13
class Full(Exception):
14
"Exception raised by Queue.put(block=0)/put_nowait()."
18
"""Create a queue object with a given maximum size.
20
If maxsize is <= 0, the queue size is infinite.
22
def __init__(self, maxsize=0):
26
import dummy_threading as threading
27
self.maxsize = maxsize
29
# mutex must be held whenever the queue is mutating. All methods
30
# that acquire mutex must release it before returning. mutex
31
# is shared between the three conditions, so acquiring and
32
# releasing the conditions also acquires and releases mutex.
33
self.mutex = threading.Lock()
34
# Notify not_empty whenever an item is added to the queue; a
35
# thread waiting to get is notified then.
36
self.not_empty = threading.Condition(self.mutex)
37
# Notify not_full whenever an item is removed from the queue;
38
# a thread waiting to put is notified then.
39
self.not_full = threading.Condition(self.mutex)
40
# Notify all_tasks_done whenever the number of unfinished tasks
41
# drops to zero; thread waiting to join() is notified to resume
42
self.all_tasks_done = threading.Condition(self.mutex)
43
self.unfinished_tasks = 0
46
"""Indicate that a formerly enqueued task is complete.
48
Used by Queue consumer threads. For each get() used to fetch a task,
49
a subsequent call to task_done() tells the queue that the processing
50
on the task is complete.
52
If a join() is currently blocking, it will resume when all items
53
have been processed (meaning that a task_done() call was received
54
for every item that had been put() into the queue).
56
Raises a ValueError if called more times than there were items
59
self.all_tasks_done.acquire()
61
unfinished = self.unfinished_tasks - 1
64
raise ValueError('task_done() called too many times')
65
self.all_tasks_done.notify_all()
66
self.unfinished_tasks = unfinished
68
self.all_tasks_done.release()
71
"""Blocks until all items in the Queue have been gotten and processed.
73
The count of unfinished tasks goes up whenever an item is added to the
74
queue. The count goes down whenever a consumer thread calls task_done()
75
to indicate the item was retrieved and all work on it is complete.
77
When the count of unfinished tasks drops to zero, join() unblocks.
79
self.all_tasks_done.acquire()
81
while self.unfinished_tasks:
82
self.all_tasks_done.wait()
84
self.all_tasks_done.release()
87
"""Return the approximate size of the queue (not reliable!)."""
94
"""Return True if the queue is empty, False otherwise (not reliable!)."""
101
"""Return True if the queue is full, False otherwise (not reliable!)."""
103
n = 0 < self.maxsize == self._qsize()
107
def put(self, item, block=True, timeout=None):
108
"""Put an item into the queue.
110
If optional args 'block' is true and 'timeout' is None (the default),
111
block if necessary until a free slot is available. If 'timeout' is
112
a positive number, it blocks at most 'timeout' seconds and raises
113
the Full exception if no free slot was available within that time.
114
Otherwise ('block' is false), put an item on the queue if a free slot
115
is immediately available, else raise the Full exception ('timeout'
116
is ignored in that case).
118
self.not_full.acquire()
122
if self._qsize() == self.maxsize:
124
elif timeout is None:
125
while self._qsize() == self.maxsize:
128
raise ValueError("'timeout' must be a positive number")
130
endtime = _time() + timeout
131
while self._qsize() == self.maxsize:
132
remaining = endtime - _time()
135
self.not_full.wait(remaining)
137
self.unfinished_tasks += 1
138
self.not_empty.notify()
140
self.not_full.release()
142
def put_nowait(self, item):
143
"""Put an item into the queue without blocking.
145
Only enqueue the item if a free slot is immediately available.
146
Otherwise raise the Full exception.
148
return self.put(item, False)
150
def get(self, block=True, timeout=None):
151
"""Remove and return an item from the queue.
153
If optional args 'block' is true and 'timeout' is None (the default),
154
block if necessary until an item is available. If 'timeout' is
155
a positive number, it blocks at most 'timeout' seconds and raises
156
the Empty exception if no item was available within that time.
157
Otherwise ('block' is false), return an item if one is immediately
158
available, else raise the Empty exception ('timeout' is ignored
161
self.not_empty.acquire()
164
if not self._qsize():
166
elif timeout is None:
167
while not self._qsize():
168
self.not_empty.wait()
170
raise ValueError("'timeout' must be a positive number")
172
endtime = _time() + timeout
173
while not self._qsize():
174
remaining = endtime - _time()
177
self.not_empty.wait(remaining)
179
self.not_full.notify()
182
self.not_empty.release()
184
def get_nowait(self):
185
"""Remove and return an item from the queue without blocking.
187
Only get an item if one is immediately available. Otherwise
188
raise the Empty exception.
190
return self.get(False)
192
# Override these methods to implement other queue organizations
193
# (e.g. stack or priority queue).
194
# These will only be called with appropriate locks held
196
# Initialize the queue representation
197
def _init(self, maxsize):
200
def _qsize(self, len=len):
201
return len(self.queue)
203
# Put a new item in the queue
204
def _put(self, item):
205
self.queue.append(item)
207
# Get an item from the queue
209
return self.queue.popleft()
212
class PriorityQueue(Queue):
213
'''Variant of Queue that retrieves open entries in priority order (lowest first).
215
Entries are typically tuples of the form: (priority number, data).
218
def _init(self, maxsize):
221
def _qsize(self, len=len):
222
return len(self.queue)
224
def _put(self, item, heappush=heapq.heappush):
225
heappush(self.queue, item)
227
def _get(self, heappop=heapq.heappop):
228
return heappop(self.queue)
231
class LifoQueue(Queue):
232
'''Variant of Queue that retrieves most recently added entries first.'''
234
def _init(self, maxsize):
237
def _qsize(self, len=len):
238
return len(self.queue)
240
def _put(self, item):
241
self.queue.append(item)
244
return self.queue.pop()