~pythonregexp2.7/python/issue2636-01

« back to all changes in this revision

Viewing changes to Lib/test/test_queue.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-06-09 14:37:21 UTC
  • mfrom: (39022.1.14 Regexp-2.6)
  • Revision ID: darklord@timehorse.com-20080609143721-bj0g1mwta28038da
Merged in changes from the core Regexp branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Some simple queue module tests, plus some failure conditions
2
2
# to ensure the Queue locks remain stable.
3
 
import queue
 
3
import Queue
4
4
import sys
5
5
import threading
6
6
import time
107
107
        try:
108
108
            q.put("full", block=0)
109
109
            self.fail("Didn't appear to block with a full queue")
110
 
        except queue.Full:
 
110
        except Queue.Full:
111
111
            pass
112
112
        try:
113
113
            q.put("full", timeout=0.01)
114
114
            self.fail("Didn't appear to time-out with a full queue")
115
 
        except queue.Full:
 
115
        except Queue.Full:
116
116
            pass
117
117
        # Test a blocking put
118
118
        self.do_blocking_test(q.put, ("full",), q.get, ())
124
124
        try:
125
125
            q.get(block=0)
126
126
            self.fail("Didn't appear to block with an empty queue")
127
 
        except queue.Empty:
 
127
        except Queue.Empty:
128
128
            pass
129
129
        try:
130
130
            q.get(timeout=0.01)
131
131
            self.fail("Didn't appear to time-out with an empty queue")
132
 
        except queue.Empty:
 
132
        except Queue.Empty:
133
133
            pass
134
134
        # Test a blocking get
135
135
        self.do_blocking_test(q.get, (), q.put, ('empty',))
191
191
 
192
192
 
193
193
class QueueTest(BaseQueueTest):
194
 
    type2test = queue.Queue
 
194
    type2test = Queue.Queue
195
195
 
196
196
class LifoQueueTest(BaseQueueTest):
197
 
    type2test = queue.LifoQueue
 
197
    type2test = Queue.LifoQueue
198
198
 
199
199
class PriorityQueueTest(BaseQueueTest):
200
 
    type2test = queue.PriorityQueue
 
200
    type2test = Queue.PriorityQueue
201
201
 
202
202
 
203
203
 
205
205
class FailingQueueException(Exception):
206
206
    pass
207
207
 
208
 
class FailingQueue(queue.Queue):
 
208
class FailingQueue(Queue.Queue):
209
209
    def __init__(self, *args):
210
210
        self.fail_next_put = False
211
211
        self.fail_next_get = False
212
 
        queue.Queue.__init__(self, *args)
 
212
        Queue.Queue.__init__(self, *args)
213
213
    def _put(self, item):
214
214
        if self.fail_next_put:
215
215
            self.fail_next_put = False
216
216
            raise FailingQueueException, "You Lose"
217
 
        return queue.Queue._put(self, item)
 
217
        return Queue.Queue._put(self, item)
218
218
    def _get(self):
219
219
        if self.fail_next_get:
220
220
            self.fail_next_get = False
221
221
            raise FailingQueueException, "You Lose"
222
 
        return queue.Queue._get(self)
 
222
        return Queue.Queue._get(self)
223
223
 
224
224
class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
225
225