~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/doc/core/examples/stdiodemo.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
# Copyright (c) 2009 Twisted Matrix Laboratories.
 
4
# See LICENSE for details.
 
5
 
 
6
"""
 
7
Example using stdio, Deferreds, LineReceiver and twisted.web.client.
 
8
 
 
9
Note that the WebCheckerCommandProtocol protocol could easily be used in e.g.
 
10
a telnet server instead; see the comments for details.
 
11
 
 
12
Based on an example by Abe Fettig.
 
13
"""
 
14
 
 
15
from twisted.internet import stdio, reactor
 
16
from twisted.protocols import basic
 
17
from twisted.web import client
 
18
 
 
19
class WebCheckerCommandProtocol(basic.LineReceiver):
 
20
    delimiter = '\n' # unix terminal style newlines. remove this line
 
21
                     # for use with Telnet
 
22
 
 
23
    def connectionMade(self):
 
24
        self.sendLine("Web checker console. Type 'help' for help.")
 
25
 
 
26
    def lineReceived(self, line):
 
27
        # Ignore blank lines
 
28
        if not line: return
 
29
 
 
30
        # Parse the command
 
31
        commandParts = line.split()
 
32
        command = commandParts[0].lower()
 
33
        args = commandParts[1:]
 
34
 
 
35
        # Dispatch the command to the appropriate method.  Note that all you
 
36
        # need to do to implement a new command is add another do_* method.
 
37
        try:
 
38
            method = getattr(self, 'do_' + command)
 
39
        except AttributeError, e:
 
40
            self.sendLine('Error: no such command.')
 
41
        else:
 
42
            try:
 
43
                method(*args)
 
44
            except Exception, e:
 
45
                self.sendLine('Error: ' + str(e))
 
46
 
 
47
    def do_help(self, command=None):
 
48
        """help [command]: List commands, or show help on the given command"""
 
49
        if command:
 
50
            self.sendLine(getattr(self, 'do_' + command).__doc__)
 
51
        else:
 
52
            commands = [cmd[3:] for cmd in dir(self) if cmd.startswith('do_')]
 
53
            self.sendLine("Valid commands: " +" ".join(commands))
 
54
 
 
55
    def do_quit(self):
 
56
        """quit: Quit this session"""
 
57
        self.sendLine('Goodbye.')
 
58
        self.transport.loseConnection()
 
59
        
 
60
    def do_check(self, url):
 
61
        """check <url>: Attempt to download the given web page"""
 
62
        client.getPage(url).addCallback(
 
63
            self.__checkSuccess).addErrback(
 
64
            self.__checkFailure)
 
65
 
 
66
    def __checkSuccess(self, pageData):
 
67
        self.sendLine("Success: got %i bytes." % len(pageData))
 
68
 
 
69
    def __checkFailure(self, failure):
 
70
        self.sendLine("Failure: " + failure.getErrorMessage())
 
71
 
 
72
    def connectionLost(self, reason):
 
73
        # stop the reactor, only because this is meant to be run in Stdio.
 
74
        reactor.stop()
 
75
 
 
76
if __name__ == "__main__":
 
77
    stdio.StandardIO(WebCheckerCommandProtocol())
 
78
    reactor.run()