~jasonodoom/dmedia/docs

« back to all changes in this revision

Viewing changes to dmedia/tests/test_workers.py

  • Committer: Jason Gerard DeRose
  • Date: 2011-04-30 11:29:16 UTC
  • mfrom: (187.1.32 transfer-manager)
  • Revision ID: jderose@novacut.com-20110430112916-5h4k7b5wadfpio80
Add TransferManager, TransferWorker(s)

Show diffs side-by-side

added added

removed removed

Lines of Context:
320
320
        self.assertTrue(isinstance(inst._q, multiprocessing.queues.Queue))
321
321
        self.assertTrue(inst._thread is None)
322
322
 
 
323
    def test_start_signal_thread(self):
 
324
        env = {'foo': 'bar'}
 
325
        inst = self.klass(env)
 
326
 
 
327
        inst._workers['foo'] = None
 
328
        self.assertIsNone(inst._start_signal_thread())
 
329
        self.assertIs(inst._running, True)
 
330
        self.assertIsInstance(inst._thread, threading.Thread)
 
331
        self.assertIs(inst._thread.daemon, True)
 
332
        self.assertIs(inst._thread.is_alive(), True)
 
333
 
 
334
        # Shutdown thread:
 
335
        inst._running = False
 
336
        inst._thread.join()
 
337
 
 
338
    def test_kill_signal_thread(self):
 
339
        env = {'foo': 'bar'}
 
340
        inst = self.klass(env)
 
341
 
 
342
        inst._running = True
 
343
        inst._thread = threading.Thread(target=inst._signal_thread)
 
344
        inst._thread.daemon = True
 
345
        inst._thread.start()
 
346
 
 
347
        self.assertIsNone(inst._kill_signal_thread())
 
348
        self.assertIs(inst._running, False)
 
349
        inst._thread.join()
 
350
        self.assertIs(inst._thread.is_alive(), False)
 
351
 
323
352
    def test_process_message(self):
324
353
        class Example(self.klass):
325
354
            _call = None
340
369
    def test_on_terminate(self):
341
370
        env = {'foo': 'bar'}
342
371
        inst = self.klass(env)
 
372
 
343
373
        e = raises(KeyError, inst.on_terminate, 'foo')
 
374
 
344
375
        p = multiprocessing.Process(target=time.sleep, args=(1,))
345
376
        p.daemon = True
346
377
        inst._workers['foo'] = p
347
378
        p.start()
 
379
        inst._start_signal_thread()
 
380
 
348
381
        self.assertTrue(p.is_alive() is True)
349
382
        inst.on_terminate('foo')
350
383
        self.assertTrue(p.is_alive() is False)
351
384
        self.assertEqual(inst._workers, {})
352
385
 
353
 
    def test_start(self):
354
 
        env = {'foo': 'bar'}
355
 
        inst = self.klass(env)
356
 
 
357
 
        # Test that start() returns False when already running:
358
 
        inst._running = True
359
 
        self.assertTrue(inst.start() is False)
360
 
        self.assertTrue(inst._thread is None)
361
 
 
362
 
        # Start the Manager:
363
 
        inst._running = False
364
 
        self.assertTrue(inst.start() is True)
365
 
        self.assertTrue(inst._running is True)
366
 
        self.assertTrue(isinstance(inst._thread, threading.Thread))
367
 
        self.assertTrue(inst._thread.daemon is True)
368
 
        self.assertTrue(inst._thread.is_alive() is True)
369
 
 
370
 
        # Shutdown thread:
371
 
        inst._running = False
372
 
        inst._thread.join()
373
 
 
374
386
    def test_kill(self):
375
387
        env = {'foo': 'bar'}
376
388
        inst = self.klass(env)
383
395
        bar = infinite_process()
384
396
        baz = infinite_process()
385
397
        inst._workers.update(dict(foo=foo, bar=bar, baz=baz))
386
 
        self.assertTrue(inst.start() is True)
 
398
        self.assertIsNone(inst._start_signal_thread())
387
399
        self.assertTrue(inst._thread.is_alive())
388
400
 
389
401
        self.assertTrue(inst.kill() is True)