1
# -*- test-case-name: twisted.test.test_stdio -*-
4
Windows-specific implementation of the L{twisted.internet.stdio} interface.
10
from zope.interface import implements
12
from twisted.internet.interfaces import IHalfCloseableProtocol, ITransport, IAddress
13
from twisted.internet.interfaces import IConsumer, IPushProducer
15
from twisted.internet import _pollingfile, main
16
from twisted.python.failure import Failure
19
class Win32PipeAddress(object):
24
class StandardIO(_pollingfile._PollingTimer):
26
implements(ITransport,
33
def __init__(self, proto):
35
Start talking to standard IO with the given protocol.
37
Also, put it stdin/stdout/stderr into binary mode.
39
from twisted.internet import reactor
41
for stdfd in range(0, 1, 2):
42
msvcrt.setmode(stdfd, os.O_BINARY)
44
_pollingfile._PollingTimer.__init__(self, reactor)
47
hstdin = win32api.GetStdHandle(win32api.STD_INPUT_HANDLE)
48
hstdout = win32api.GetStdHandle(win32api.STD_OUTPUT_HANDLE)
50
self.stdin = _pollingfile._PollableReadPipe(
51
hstdin, self.dataReceived, self.readConnectionLost)
53
self.stdout = _pollingfile._PollableWritePipe(
54
hstdout, self.writeConnectionLost)
56
self._addPollableResource(self.stdin)
57
self._addPollableResource(self.stdout)
59
self.proto.makeConnection(self)
61
def dataReceived(self, data):
62
self.proto.dataReceived(data)
64
def readConnectionLost(self):
65
if IHalfCloseableProtocol.providedBy(self.proto):
66
self.proto.readConnectionLost()
69
def writeConnectionLost(self):
70
if IHalfCloseableProtocol.providedBy(self.proto):
71
self.proto.writeConnectionLost()
76
def checkConnLost(self):
78
if self.connsLost >= 2:
79
self.disconnecting = True
80
self.disconnected = True
81
self.proto.connectionLost(Failure(main.CONNECTION_DONE))
85
def write(self, data):
86
self.stdout.write(data)
88
def writeSequence(self, seq):
89
self.stdout.write(''.join(seq))
91
def loseConnection(self):
92
self.disconnecting = True
97
return Win32PipeAddress()
100
return Win32PipeAddress()
104
def registerProducer(self, producer, streaming):
105
return self.stdout.registerProducer(producer, streaming)
107
def unregisterProducer(self):
108
return self.stdout.unregisterProducer()
114
def stopProducing(self):
115
self.stdin.stopProducing()
119
def pauseProducing(self):
120
self.stdin.pauseProducing()
122
def resumeProducing(self):
123
self.stdin.resumeProducing()