27
27
import SocketServer
30
# this class keeps track of the state of a connected peer, including
31
# the socket used to talk to it, it's name, and a queue of records waiting
32
# to be written into it.
32
# this class keeps track of the state of a peer, including the socket
33
# used to talk to it, it's name, and a queue of records waiting to be
33
35
class PeerTracker():
36
def __init__(self, name):
35
37
self.writequeue = [ ] # list of records to be sent TO peer
37
# The main "threaded web server" class. Has a condition variable and
38
# a list of connected peers.
38
self.connected = False
41
# scrub my state when the peer disconnects
44
self.connected = False
47
# The main "threaded server" class. Has a condition variable and a
48
# dict of connected peers.
39
49
class CheetahServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
40
def init_cheetah(self):
50
def init_cheetah(self, maxpeers):
41
51
self.cond = threading.Condition() # used to coordinate client lists
42
self.peers = [ ] # a list of connected peers
52
self.peers = { } # a dict of peers
53
for i in range(0, maxpeers):
54
self.peers[i] = PeerTracker("Peer " + str(i))
44
56
# A new ThreadedCheetahHandler is instantiated for each incoming connection,
45
57
# and a new thread is dispatched to handle() to service that connection.
58
70
# peer is killed off. if parsing succeeds but only one peer is
59
71
# connected, the record is dropped but the peer stays up.
60
72
def processinput(self, data):
61
# make sure data has 6 fields. if not, drop peer.
73
# make sure data has 7 fields. if not, drop peer.
62
74
components = data.split()
63
if (len(components) != 6):
75
if (len(components) != 7):
66
78
# try to parse the fields. if fail, drop peer.
69
valdict['watts'] = float(components[0])
70
valdict['hr'] = float(components[1])
71
valdict['time'] = int(components[2])
72
valdict['speed'] = float(components[3])
73
valdict['rpm'] = float(components[4])
74
valdict['load'] = float(components[5])
81
valdict['peername'] = components[0]
82
valdict['watts'] = float(components[1])
83
valdict['hr'] = float(components[2])
84
valdict['time'] = int(components[3])
85
valdict['speed'] = float(components[4])
86
valdict['rpm'] = float(components[5])
87
valdict['load'] = float(components[6])
76
89
# parse succeeded, so add to peer's input queue
77
90
print "(" + self.me_peer.peername + ")", \
78
91
"well-formed record arrived..."
79
print " ... watts:%.2f hr:%.2f time:%ld speed:%.2f rpm:%.2f load:%.3f" % \
80
(valdict['watts'], valdict['hr'], valdict['time'], \
92
print " ... name:%s watts:%.2f hr:%.2f time:%ld speed:%.2f rpm:%.2f load:%.3f" % \
93
(valdict['peername'], valdict['watts'], valdict['hr'], valdict['time'], \
81
94
valdict['speed'], valdict['rpm'], valdict['load'])
82
95
self.server.cond.acquire()
83
if (len(self.server.peers) == 2):
84
if (self.server.peers[0] == self.me_peer):
85
otherpeer = self.server.peers[1]
87
otherpeer = self.server.peers[0]
88
otherpeer.writequeue.append(valdict)
97
for i in range(0, len(self.server.peers)):
98
if (not (self.server.peers[i] == self.me_peer)) and (self.server.peers[i].connected):
99
self.server.peers[i].writequeue.append(valdict)
100
print " ...queued for", self.server.peers[i].peername + "\n"
101
numqueued = numqueued + 1
90
103
# notify writing thread that there is work to do
91
104
self.server.cond.notify()
92
print " ...queued for", otherpeer.peername + "\n"
94
print " ...but other peer not connected,", \
106
print " ...but no other peers are connected,", \
95
107
"so dropped it.\n"
96
108
self.server.cond.release()
97
109
except ValueError:
112
124
print "(server) new incoming connection..."
113
125
self.server.cond.acquire()
114
126
# If the server is full, bonk out, else add.
115
if (len(self.server.peers) < 2):
116
# We have room. Create record for new peer.
117
self.me_peer = PeerTracker()
118
self.me_peer.socket = self.request
119
if (len(self.server.peers) == 0):
120
self.me_peer.peername = "Peer A"
121
elif (self.server.peers[0].peername == "Peer A"):
122
self.me_peer.peername = "Peer B"
124
self.me_peer.peername = "Peer A"
125
self.server.peers.append(self.me_peer)
126
print " ...added peer #", str(len(self.server.peers)), \
127
"named \"" + self.me_peer.peername + "\"\n"
128
for i in range(0, len(self.server.peers)):
129
if (not addedpeer) and (self.server.peers[i].connected == False):
130
# We have room. Create record for new peer in slot i
131
self.me_peer = self.server.peers[i]
132
self.me_peer.connected = True
133
self.me_peer.socket = self.request
134
print " ...connected " + self.me_peer.peername + "\n"
137
if addedpeer == False:
130
138
# Server is full, so bonk out.
131
print " ...but server already has 2 peers, so dropping.\n"
139
print " ...but server already has max peers, so dropping.\n"
133
141
self.server.cond.release()
146
155
# record into a peer's socket. if the write fails, we ignore for now,
147
156
# and rely on the read on the socket to fail and clean up later.
148
157
def write_to_peer(item, peer):
149
s = "%.2f %.2f %d %.2f %.2f %.3f\n" % \
150
(item['watts'], item['hr'], item['time'], \
158
s = "%s %.2f %.2f %d %.2f %.2f %.3f\n" % \
159
(item['peername'], item['watts'], item['hr'], item['time'], \
151
160
item['speed'], item['rpm'], item['load'])
153
162
peer.socket.sendall(s)
169
178
server.cond.wait()
170
179
# see if the first peer has some work
171
for peer in server.peers:
180
for peer in server.peers.itervalues():
172
181
while len(peer.writequeue) > 0:
173
182
next = peer.writequeue.pop(0)
174
183
write_to_peer(next, peer)
175
184
server.cond.release()
177
# main() invokes this to spawn the web server and the writer thread.
178
def run_server(port):
179
# initialize the web server
186
# main() invokes this to spawn the server and the writer thread.
187
def run_server(port, maxpeers):
188
# initialize the server
180
189
server = CheetahServer(("", port), ThreadedCheetahHandler)
181
190
server.allow_reuse_address = True
182
191
ip, port = server.server_address
183
192
print "(server) running in thread:", \
184
193
threading.currentThread().getName() + "\n"
185
server.init_cheetah()
194
server.init_cheetah(maxpeers)
187
196
# fire up the writer thread
188
197
writer_thread = threading.Thread(target=thread_write, args=(server,))
198
207
###############################
201
print "usage: ./simpleserver.py listen_port"
210
print "usage: ./simpleserver.py listen_port max_num_peers"
211
print " where 2 <= maxpeers < " + str(MAXPEERS) + "\n"
202
212
sys.exit(1) # failure
205
215
# validate arguments
209
219
port = int(argv[0])
220
maxpeers = int(argv[1])
210
221
except ValueError:
212
if ((port < 1) or (port > 65535)):
223
if ((port < 1) or (port > 65535) or (maxpeers < 2) or (maxpeers > MAXPEERS)):
227
run_server(port, maxpeers)
218
229
if __name__ == "__main__":
219
230
main(sys.argv[1:])