~divmod-dev/divmod.org/trunk

« back to all changes in this revision

Viewing changes to Epsilon/epsilon/test/utils.py

  • Committer: Jean-Paul Calderone
  • Date: 2014-06-29 20:33:04 UTC
  • mfrom: (2749.1.1 remove-epsilon-1325289)
  • Revision ID: exarkun@twistedmatrix.com-20140629203304-gdkmbwl1suei4m97
mergeĀ lp:~exarkun/divmod.org/remove-epsilon-1325289

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
 
2
 
from zope.interface import implements
3
 
 
4
 
from twisted.internet import interfaces
5
 
 
6
 
class FileWrapper:
7
 
    """A wrapper around a file-like object to make it behave as a Transport.
8
 
 
9
 
    This doesn't actually stream the file to the attached protocol,
10
 
    and is thus useful mainly as a utility for debugging protocols.
11
 
    """
12
 
 
13
 
    implements(interfaces.ITransport)
14
 
 
15
 
    closed = 0
16
 
    disconnecting = 0
17
 
    disconnected = 0
18
 
    producer = None
19
 
    streamingProducer = 0
20
 
 
21
 
    def __init__(self, file):
22
 
        self.file = file
23
 
 
24
 
    def write(self, data):
25
 
        try:
26
 
            self.file.write(data)
27
 
        except:
28
 
            self.handleException()
29
 
        # self._checkProducer()
30
 
 
31
 
    def _checkProducer(self):
32
 
        # Cheating; this is called at "idle" times to allow producers to be
33
 
        # found and dealt with
34
 
        if self.producer:
35
 
            self.producer.resumeProducing()
36
 
 
37
 
    def registerProducer(self, producer, streaming):
38
 
        """From abstract.FileDescriptor
39
 
        """
40
 
        self.producer = producer
41
 
        self.streamingProducer = streaming
42
 
        if not streaming:
43
 
            producer.resumeProducing()
44
 
 
45
 
    def unregisterProducer(self):
46
 
        self.producer = None
47
 
 
48
 
    def stopConsuming(self):
49
 
        self.unregisterProducer()
50
 
        self.loseConnection()
51
 
 
52
 
    def writeSequence(self, iovec):
53
 
        self.write("".join(iovec))
54
 
 
55
 
    def loseConnection(self):
56
 
        self.disconnecting = True
57
 
 
58
 
    def getPeer(self):
59
 
        # XXX: According to ITransport, this should return an IAddress!
60
 
        return 'file', 'file'
61
 
 
62
 
    def getHost(self):
63
 
        # XXX: According to ITransport, this should return an IAddress!
64
 
        return 'file'
65
 
 
66
 
    def handleException(self):
67
 
        pass
68
 
 
69
 
    def resumeProducing(self):
70
 
        # Never sends data anyways
71
 
        pass
72
 
 
73
 
    def pauseProducing(self):
74
 
        # Never sends data anyways
75
 
        pass
76
 
 
77
 
    def stopProducing(self):
78
 
        self.loseConnection()
79
 
 
80
 
    # Additional Q2Q Transport requirements
81
 
    def getQ2QPeer(self):
82
 
        from vertex import q2q
83
 
        return q2q.Q2QAddress('file.domain', 'peer.resource')
84