~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/test/test_process.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
import gzip
15
15
import os
16
16
import popen2
17
 
import time
18
17
import sys
19
18
import signal
20
 
import shutil
 
19
import warnings
 
20
from pprint import pformat
21
21
 
22
22
try:
23
23
    import cStringIO as StringIO
26
26
 
27
27
# Twisted Imports
28
28
from twisted.internet import reactor, protocol, error, interfaces, defer
29
 
from twisted.python import util, runtime, components
 
29
from twisted.internet.protocol import ProcessProtocol
 
30
from twisted.internet.defer import Deferred
 
31
 
 
32
from twisted.python import util, runtime
30
33
from twisted.python import procutils
31
34
 
32
35
class TrivialProcessProtocol(protocol.ProcessProtocol):
173
176
        else:
174
177
            self.deferred.errback(reason)
175
178
 
 
179
 
 
180
 
 
181
class UtilityProcessProtocol(ProcessProtocol):
 
182
    """
 
183
    Helper class for launching a Python process and getting a result from it.
 
184
 
 
185
    @ivar program: A string giving a Python program for the child process to
 
186
    run.
 
187
    """
 
188
    program = None
 
189
 
 
190
    def run(cls, reactor, argv, env):
 
191
        """
 
192
        Run a Python process connected to a new instance of this protocol
 
193
        class.  Return the protocol instance.
 
194
 
 
195
        The Python process is given C{self.program} on the command line to
 
196
        execute, in addition to anything specified by C{argv}.  C{env} is
 
197
        the complete environment.
 
198
        """
 
199
        exe = sys.executable
 
200
        self = cls()
 
201
        reactor.spawnProcess(
 
202
            self, exe, [exe, "-c", self.program] + argv, env=env)
 
203
        return self
 
204
    run = classmethod(run)
 
205
 
 
206
 
 
207
    def __init__(self):
 
208
        self.bytes = []
 
209
        self.requests = []
 
210
 
 
211
 
 
212
    def parseChunks(self, bytes):
 
213
        """
 
214
        Called with all bytes received on stdout when the process exits.
 
215
        """
 
216
        raise NotImplementedError()
 
217
 
 
218
 
 
219
    def getResult(self):
 
220
        """
 
221
        Return a Deferred which will fire with the result of L{parseChunks}
 
222
        when the child process exits.
 
223
        """
 
224
        d = Deferred()
 
225
        self.requests.append(d)
 
226
        return d
 
227
 
 
228
 
 
229
    def _fireResultDeferreds(self, result):
 
230
        """
 
231
        Callback all Deferreds returned up until now by L{getResult}
 
232
        with the given result object.
 
233
        """
 
234
        requests = self.requests
 
235
        self.requests = None
 
236
        for d in requests:
 
237
            d.callback(result)
 
238
 
 
239
 
 
240
    def outReceived(self, bytes):
 
241
        """
 
242
        Accumulate output from the child process in a list.
 
243
        """
 
244
        self.bytes.append(bytes)
 
245
 
 
246
 
 
247
    def processEnded(self, reason):
 
248
        """
 
249
        Handle process termination by parsing all received output and firing
 
250
        any waiting Deferreds.
 
251
        """
 
252
        self._fireResultDeferreds(self.parseChunks(self.bytes))
 
253
 
 
254
 
 
255
 
 
256
 
 
257
class GetArgumentVector(UtilityProcessProtocol):
 
258
    """
 
259
    Protocol which will read a serialized argv from a process and
 
260
    expose it to interested parties.
 
261
    """
 
262
    program = (
 
263
        "from sys import stdout, argv\n"
 
264
        "stdout.write(chr(0).join(argv))\n"
 
265
        "stdout.flush()\n")
 
266
 
 
267
    def parseChunks(self, chunks):
 
268
        """
 
269
        Parse the output from the process to which this protocol was
 
270
        connected, which is a single unterminated line of \\0-separated
 
271
        strings giving the argv of that process.  Return this as a list of
 
272
        str objects.
 
273
        """
 
274
        return ''.join(chunks).split('\0')
 
275
 
 
276
 
 
277
 
 
278
class GetEnvironmentDictionary(UtilityProcessProtocol):
 
279
    """
 
280
    Protocol which will read a serialized environment dict from a process
 
281
    and expose it to interested parties.
 
282
    """
 
283
    program = (
 
284
        "from sys import stdout\n"
 
285
        "from os import environ\n"
 
286
        "items = environ.iteritems()\n"
 
287
        "stdout.write(chr(0).join([k + chr(0) + v for k, v in items]))\n"
 
288
        "stdout.flush()\n")
 
289
 
 
290
    def parseChunks(self, chunks):
 
291
        """
 
292
        Parse the output from the process to which this protocol was
 
293
        connected, which is a single unterminated line of \\0-separated
 
294
        strings giving key value pairs of the environment from that process. 
 
295
        Return this as a dictionary.
 
296
        """
 
297
        environString = ''.join(chunks)
 
298
        if not environString:
 
299
            return {}
 
300
        environ = iter(environString.split('\0'))
 
301
        d = {}
 
302
        while 1:
 
303
            try:
 
304
                k = environ.next()
 
305
            except StopIteration:
 
306
                break
 
307
            else:
 
308
                v = environ.next()
 
309
                d[k] = v
 
310
        return d
 
311
 
 
312
 
 
313
 
176
314
class ProcessTestCase(SignalMixin, unittest.TestCase):
177
315
    """Test running a process."""
178
316
 
289
427
        return d.addCallback(processEnded)
290
428
 
291
429
 
 
430
    def test_wrongArguments(self):
 
431
        """
 
432
        Test invalid arguments to spawnProcess: arguments and environment
 
433
        must only contains string or unicode, and not null bytes.
 
434
        """
 
435
        exe = sys.executable
 
436
        p = protocol.ProcessProtocol()
 
437
 
 
438
        badEnvs = [
 
439
            {"foo": 2},
 
440
            {"foo": "egg\0a"},
 
441
            {3: "bar"},
 
442
            {"bar\0foo": "bar"}]
 
443
 
 
444
        badArgs = [
 
445
            [exe, 2],
 
446
            "spam",
 
447
            [exe, "foo\0bar"]]
 
448
 
 
449
        # Sanity check - this will fail for people who have mucked with
 
450
        # their site configuration in a stupid way, but there's nothing we
 
451
        # can do about that.
 
452
        badUnicode = u'\N{SNOWMAN}'
 
453
        try:
 
454
            badUnicode.encode(sys.getdefaultencoding())
 
455
        except UnicodeEncodeError:
 
456
            # Okay, that unicode doesn't encode, put it in as a bad environment
 
457
            # key.
 
458
            badEnvs.append({badUnicode: 'value for bad unicode key'})
 
459
            badEnvs.append({'key for bad unicode value': badUnicode})
 
460
            badArgs.append([exe, badUnicode])
 
461
        else:
 
462
            # It _did_ encode.  Most likely, Gtk2 is being used and the
 
463
            # default system encoding is UTF-8, which can encode anything. 
 
464
            # In any case, if implicit unicode -> str conversion works for
 
465
            # that string, we can't test that TypeError gets raised instead,
 
466
            # so just leave it off.
 
467
            pass
 
468
 
 
469
        for env in badEnvs:
 
470
            self.assertRaises(
 
471
                TypeError,
 
472
                reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env)
 
473
 
 
474
        for args in badArgs:
 
475
            self.assertRaises(
 
476
                TypeError,
 
477
                reactor.spawnProcess, p, exe, args, env=None)
 
478
 
 
479
 
 
480
    # Use upper-case so that the environment key test uses an upper case
 
481
    # name: some versions of Windows only support upper case environment
 
482
    # variable names, and I think Python (as of 2.5) doesn't use the right
 
483
    # syscall for lowercase or mixed case names to work anyway.
 
484
    okayUnicode = u"UNICODE"
 
485
    encodedValue = "UNICODE"
 
486
 
 
487
    def _deprecatedUnicodeSupportTest(self, processProtocolClass, argv=[], env={}):
 
488
        """
 
489
        Check that a deprecation warning is emitted when passing unicode to
 
490
        spawnProcess for an argv value or an environment key or value. 
 
491
        Check that the warning is of the right type, has the right message,
 
492
        and refers to the correct file.  Unfortunately, don't check that the
 
493
        line number is correct, because that is too hard for me to figure
 
494
        out.
 
495
 
 
496
        @param processProtocolClass: A L{UtilityProcessProtocol} subclass
 
497
        which will be instantiated to communicate with the child process.
 
498
 
 
499
        @param argv: The argv argument to spawnProcess.
 
500
 
 
501
        @param env: The env argument to spawnProcess.
 
502
 
 
503
        @return: A Deferred which fires when the test is complete.
 
504
        """
 
505
        # Sanity to check to make sure we can actually encode this unicode
 
506
        # with the default system encoding.  This may be excessively
 
507
        # paranoid. -exarkun
 
508
        self.assertEqual(
 
509
            self.okayUnicode.encode(sys.getdefaultencoding()),
 
510
            self.encodedValue)
 
511
 
 
512
        warningsShown = []
 
513
        def showwarning(*args):
 
514
            warningsShown.append(args)
 
515
 
 
516
        origshow = warnings.showwarning
 
517
        origregistry = globals().get('__warningregistry__', {})
 
518
        try:
 
519
            warnings.showwarning = showwarning
 
520
            globals()['__warningregistry__'] = {}
 
521
            p = processProtocolClass.run(reactor, argv, env)
 
522
        finally:
 
523
            warnings.showwarning = origshow
 
524
            globals()['__warningregistry__'] = origregistry
 
525
 
 
526
        d = p.getResult()
 
527
        self.assertEqual(len(warningsShown), 1, pformat(warningsShown))
 
528
        message, category, filename, lineno = warningsShown[0]
 
529
        self.assertEqual(
 
530
            message.args,
 
531
            ("Argument strings and environment keys/values passed to "
 
532
             "reactor.spawnProcess should be str, not unicode.",))
 
533
        self.assertIdentical(category, DeprecationWarning)
 
534
 
 
535
        # Use starts with because of .pyc/.pyo issues.
 
536
        self.failUnless(
 
537
            __file__.startswith(filename),
 
538
            'Warning in %r, expected %r' % (filename, __file__))
 
539
 
 
540
        # It would be nice to be able to check the line number as well, but
 
541
        # different configurations actually end up reporting different line
 
542
        # numbers (generally the variation is only 1 line, but that's enough
 
543
        # to fail the test erroneously...).
 
544
        # self.assertEqual(lineno, 202)
 
545
 
 
546
        return d
 
547
 
 
548
    def test_deprecatedUnicodeArgvSupport(self):
 
549
        """
 
550
        Test that a unicode string passed for an argument value is allowed
 
551
        if it can be encoded with the default system encoding, but that a
 
552
        deprecation warning is emitted.
 
553
        """
 
554
        d = self._deprecatedUnicodeSupportTest(GetArgumentVector, argv=[self.okayUnicode])
 
555
        def gotArgVector(argv):
 
556
            self.assertEqual(argv, ['-c', self.encodedValue])
 
557
        d.addCallback(gotArgVector)
 
558
        return d
 
559
 
 
560
 
 
561
    def test_deprecatedUnicodeEnvKeySupport(self):
 
562
        """
 
563
        Test that a unicode string passed for the key of the environment
 
564
        dictionary is allowed if it can be encoded with the default system
 
565
        encoding, but that a deprecation warning is emitted.
 
566
        """
 
567
        d = self._deprecatedUnicodeSupportTest(
 
568
            GetEnvironmentDictionary, env={self.okayUnicode: self.encodedValue})
 
569
        def gotEnvironment(environ):
 
570
            self.assertEqual(environ[self.encodedValue], self.encodedValue)
 
571
        d.addCallback(gotEnvironment)
 
572
        return d
 
573
 
 
574
 
 
575
    def test_deprecatedUnicodeEnvValueSupport(self):
 
576
        """
 
577
        Test that a unicode string passed for the value of the environment
 
578
        dictionary is allowed if it can be encoded with the default system
 
579
        encoding, but that a deprecation warning is emitted.
 
580
        """
 
581
        d = self._deprecatedUnicodeSupportTest(
 
582
            GetEnvironmentDictionary, env={self.encodedValue: self.okayUnicode})
 
583
        def gotEnvironment(environ):
 
584
            # On Windows, the environment contains more things than we
 
585
            # specified, so only make sure that at least the key we wanted
 
586
            # is there, rather than testing the dictionary for exact
 
587
            # equality.
 
588
            self.assertEqual(environ[self.encodedValue], self.encodedValue)
 
589
        d.addCallback(gotEnvironment)
 
590
        return d
 
591
 
 
592
 
 
593
 
292
594
class TwoProcessProtocol(protocol.ProcessProtocol):
293
595
    num = -1
294
596
    finished = 0
701
1003
        self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, childFDs={1:'r'})
702
1004
 
703
1005
class UtilTestCase(unittest.TestCase):
704
 
    def setUpClass(klass):
705
 
        j = os.path.join
706
 
        foobar = j("foo", "bar")
707
 
        foobaz = j("foo", "baz")
708
 
        bazfoo = j("baz", "foo")
709
 
        barfoo = j("baz", "bar")
710
 
 
711
 
        for d in "foo", foobar, foobaz, "baz", bazfoo, barfoo:
712
 
            if os.path.exists(d):
713
 
                shutil.rmtree(d, True)
714
 
            os.mkdir(d)
715
 
 
716
 
        f = file(j(foobaz, "executable"), "w")
717
 
        f.close()
718
 
        os.chmod(j(foobaz, "executable"), 0700)
719
 
 
720
 
        f = file(j("foo", "executable"), "w")
721
 
        f.close()
722
 
        os.chmod(j("foo", "executable"), 0700)
723
 
 
724
 
        f = file(j(bazfoo, "executable"), "w")
725
 
        f.close()
726
 
        os.chmod(j(bazfoo, "executable"), 0700)
727
 
 
728
 
        f = file(j(bazfoo, "executable.bin"), "w")
729
 
        f.close()
730
 
        os.chmod(j(bazfoo, "executable.bin"), 0700)
731
 
 
732
 
        f = file(j(barfoo, "executable"), "w")
733
 
        f.close()
734
 
 
735
 
        klass.oldPath = os.environ['PATH']
736
 
        os.environ['PATH'] = os.pathsep.join((foobar, foobaz, bazfoo, barfoo))
737
 
 
738
 
    def tearDownClass(klass):
739
 
        j = os.path.join
740
 
        os.environ['PATH'] = klass.oldPath
741
 
        foobar = j("foo", "bar")
742
 
        foobaz = j("foo", "baz")
743
 
        bazfoo = j("baz", "foo")
744
 
        barfoo = j("baz", "bar")
745
 
 
746
 
 
747
 
        os.remove(j(foobaz, "executable"))
748
 
        os.remove(j("foo", "executable"))
749
 
        os.remove(j(bazfoo, "executable"))
750
 
        os.remove(j(bazfoo, "executable.bin"))
751
 
        os.remove(j(barfoo, "executable"))
752
 
 
753
 
        for d in foobar, foobaz, bazfoo, barfoo, "foo", "baz":
754
 
            os.rmdir(d)
 
1006
    """
 
1007
    Tests for process-related helper functions (currently only
 
1008
    L{procutils.which}.
 
1009
    """
 
1010
    def setUp(self):
 
1011
        """
 
1012
        Create several directories and files, some of which are executable
 
1013
        and some of which are not.  Save the current PATH setting.
 
1014
        """
 
1015
        j = os.path.join
 
1016
 
 
1017
        base = self.mktemp()
 
1018
 
 
1019
        self.foo = j(base, "foo")
 
1020
        self.baz = j(base, "baz")
 
1021
        self.foobar = j(self.foo, "bar")
 
1022
        self.foobaz = j(self.foo, "baz")
 
1023
        self.bazfoo = j(self.baz, "foo")
 
1024
        self.bazbar = j(self.baz, "bar")
 
1025
 
 
1026
        for d in self.foobar, self.foobaz, self.bazfoo, self.bazbar:
 
1027
            os.makedirs(d)
 
1028
 
 
1029
        for name, mode in [(j(self.foobaz, "executable"), 0700),
 
1030
                           (j(self.foo, "executable"), 0700),
 
1031
                           (j(self.bazfoo, "executable"), 0700),
 
1032
                           (j(self.bazfoo, "executable.bin"), 0700),
 
1033
                           (j(self.bazbar, "executable"), 0)]:
 
1034
            f = file(name, "w")
 
1035
            f.close()
 
1036
            os.chmod(name, mode)
 
1037
 
 
1038
        self.oldPath = os.environ.get('PATH', None)
 
1039
        os.environ['PATH'] = os.pathsep.join((
 
1040
            self.foobar, self.foobaz, self.bazfoo, self.bazbar))
 
1041
 
 
1042
 
 
1043
    def tearDown(self):
 
1044
        """
 
1045
        Restore the saved PATH setting.
 
1046
        """
 
1047
        if self.oldPath is None:
 
1048
            try:
 
1049
                del os.environ['PATH']
 
1050
            except KeyError:
 
1051
                pass
 
1052
        else:
 
1053
            os.environ['PATH'] = self.oldPath
 
1054
 
 
1055
 
 
1056
    def test_whichWithoutPATH(self):
 
1057
        """
 
1058
        Test that if C{os.environ} does not have a C{'PATH'} key,
 
1059
        L{procutils.which} returns an empty list.
 
1060
        """
 
1061
        del os.environ['PATH']
 
1062
        self.assertEqual(procutils.which("executable"), [])
 
1063
 
755
1064
 
756
1065
    def testWhich(self):
757
1066
        j = os.path.join
758
1067
        paths = procutils.which("executable")
759
 
        expectedPaths = [j("foo", "baz", "executable"),
760
 
                         j("baz", "foo", "executable")]
 
1068
        expectedPaths = [j(self.foobaz, "executable"),
 
1069
                         j(self.bazfoo, "executable")]
761
1070
        if runtime.platform.isWindows():
762
 
            expectedPaths.append(j("baz", "bar", "executable"))
 
1071
            expectedPaths.append(j(self.bazbar, "executable"))
763
1072
        self.assertEquals(paths, expectedPaths)
764
1073
 
 
1074
 
765
1075
    def testWhichPathExt(self):
766
1076
        j = os.path.join
767
1077
        old = os.environ.get('PATHEXT', None)
773
1083
                del os.environ['PATHEXT']
774
1084
            else:
775
1085
                os.environ['PATHEXT'] = old
776
 
        expectedPaths = [j("foo", "baz", "executable"),
777
 
                         j("baz", "foo", "executable"),
778
 
                         j("baz", "foo", "executable.bin")]
 
1086
        expectedPaths = [j(self.foobaz, "executable"),
 
1087
                         j(self.bazfoo, "executable"),
 
1088
                         j(self.bazfoo, "executable.bin")]
779
1089
        if runtime.platform.isWindows():
780
 
            expectedPaths.append(j("baz", "bar", "executable"))
 
1090
            expectedPaths.append(j(self.bazbar, "executable"))
781
1091
        self.assertEquals(paths, expectedPaths)
782
1092
 
 
1093
 
 
1094
 
783
1095
class ClosingPipesProcessProtocol(protocol.ProcessProtocol):
784
1096
    output = ''
785
1097
    errput = ''