~nchohan/+junk/mytools

« back to all changes in this revision

Viewing changes to bin/dbtranstest2/clientside/master.py

  • Committer: root
  • Date: 2010-11-03 07:43:57 UTC
  • Revision ID: root@appscale-image0-20101103074357-xea7ja3sor3x93oc
init

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import SOAPpy
 
2
import getopt
 
3
import sys
 
4
import commands
 
5
import threading
 
6
import test_functions
 
7
import time
 
8
tf = test_functions 
 
9
TOTAL_NUM_ACCOUNTS = 100000
 
10
 
 
11
MASTER_BINDPORT = 9999
 
12
MY_IP = commands.getoutput("ifconfig").split("\n")[1].split()[1][5:]
 
13
#REQUEST_AMMOUNT_SET = [10,100,1000,10000]
 
14
REQUEST_AMMOUNT_SET = [10,100,1000,10000]
 
15
 
 
16
# IP and port for each slave
 
17
DEBUG = False
 
18
TRIALS = 5
 
19
RESULTS = {}
 
20
 
 
21
soapserver = SOAPpy.SOAPServer((MY_IP, MASTER_BINDPORT))
 
22
responses = {}
 
23
 
 
24
#def query_results():
 
25
#  global RESULTS
 
26
#  return RESULTS
 
27
 
 
28
def usage():
 
29
  print "--target or -t for targer web server (ip:port)"
 
30
  print "--slaves or -s for slaves which are clients (ip:port)"
 
31
  print "--prime or -p for prime (with --target flag)"
 
32
  print "--xdelete or -x for deleting all accounts (with --target flag)"
 
33
 
 
34
"""
 
35
class SoapThread(threading.Thread):
 
36
  def setup(self):
 
37
    global soapserver
 
38
    soapserver.registerFunction(query_results)
 
39
  def run(self):
 
40
    global soapserver
 
41
    while 1:
 
42
      soapserver.serve_forever()
 
43
"""
 
44
class RequestThread(threading.Thread):
 
45
  def setup(self, serv, totalrequest):
 
46
    self.server = serv
 
47
    self.nreq = totalrequest
 
48
    self.success = 0
 
49
    return
 
50
  def run(self):
 
51
    try:
 
52
      self.success, self.nreq = self.server.run()
 
53
    except e:
 
54
      self.success = 0
 
55
      self.nreq = 0
 
56
    print "thread output: ",self.success, self.nreq
 
57
    return 
 
58
 
 
59
def readFile(slavefile):
 
60
  FILE = open(slavefile, "r")
 
61
  contents = FILE.readlines()
 
62
  host= []
 
63
  for line in contents:
 
64
    if line[0] == '#':
 
65
      continue
 
66
    tokens = line.split(':')
 
67
    ip = tokens[0]
 
68
    port = tokens[1].rstrip()
 
69
    host.append((ip,port))
 
70
  return host
 
71
 
 
72
def runSlaves(slaves, targets, numreq):
 
73
  global DEBUG
 
74
  global MY_IP
 
75
  global MASTER_BINDPORT
 
76
  global RESULTS
 
77
  # Load balance the number of request
 
78
  # Thread for each slave
 
79
  threads = []
 
80
  seed_counter = 0
 
81
  req_left = int(numreq)
 
82
  for ii in slaves:
 
83
    # Each slave can only handle up to 350 requests
 
84
    if req_left < 350:
 
85
      requests = req_left
 
86
    else: 
 
87
      requests = 350 
 
88
    # Soap connection to slave
 
89
    server = SOAPpy.SOAPProxy("http://" + ii[0] + ":" + str(ii[1]))
 
90
    # initialize the run
 
91
    server.setup(targets, requests, seed_counter)
 
92
    req_left -= requests
 
93
    seed_counter += 1
 
94
 
 
95
    rt = RequestThread()
 
96
    rt.setup(server, numreq)
 
97
    threads.append(rt)
 
98
    if req_left <= 0:
 
99
      break
 
100
 
 
101
  # Start all threads
 
102
  for ii in threads:
 
103
    if DEBUG: print "Starting slave..."
 
104
    ii.start()
 
105
     
 
106
  total = 0
 
107
  success = 0
 
108
  for ii in threads:
 
109
    ii.join()    
 
110
    total += int(ii.nreq)
 
111
    success += int(ii.success)
 
112
  if DEBUG: print "success and total:",success,total
 
113
  return [success, total] 
 
114
 
 
115
def printResults():
 
116
  global RESULTS
 
117
  global REQUEST_AMMOUNT_SET
 
118
  global TRIALS 
 
119
  warnings = ""
 
120
  print "# concurrent request, avg successes, all samples, "
 
121
  for ii in REQUEST_AMMOUNT_SET:
 
122
    success = 0
 
123
    total = 0
 
124
    dataset = RESULTS[ii] 
 
125
    for kk in dataset:
 
126
      if ii == kk[1]:
 
127
        success += kk[0]
 
128
        total += kk[1]
 
129
      else:
 
130
        warnings += "Trial with " + str(kk) + " has an incorrect number of reported items.\n "
 
131
    print str(ii)+","+str(success/TRIALS)+","+str(total)
 
132
    if total != ii * TRIALS:
 
133
      warnings += str(ii) + " had " + str(total) + " request reported"
 
134
      warnings += ". It should have been " + str(ii * TRIALS)
 
135
      warnings += "\n\n"
 
136
  if warnings != "":
 
137
    print "*" * 50
 
138
    print warnings
 
139
    print "*" * 50
 
140
 
 
141
def main(argv):
 
142
  global DEBUG
 
143
  global RESULTS
 
144
  global TRIALS
 
145
  global REQUEST_AMMOUNT_SET
 
146
 
 
147
  target_file = ""
 
148
  slave_file = "" 
 
149
  slaves = ""
 
150
  targets = ""   
 
151
  deleteAll = False
 
152
  prime = False
 
153
  try:
 
154
    opts, args = getopt.getopt(argv, "t:s:pdx",
 
155
                          ["target=",
 
156
                           "slaves=",
 
157
                           "xdelete",
 
158
                           "prime",
 
159
                           "debug"]) 
 
160
  except getopt.GetoptError:
 
161
    usage()
 
162
    sys.exit(1)
 
163
 
 
164
  for opt, arg in opts:
 
165
    if opt in ("-p", "--prime"):
 
166
      prime = True
 
167
    if opt in ("--xdelete","-x"):
 
168
      deleteAll = True
 
169
    if opt in ("-t", "--targets"):
 
170
      target_file = str(arg)
 
171
    if opt in ("-s", "--slaves"):
 
172
      slave_file = str(arg)
 
173
    if opt in ("-d"):
 
174
      print "Debugging turned on"
 
175
      DEBUG = True
 
176
 
 
177
  if target_file == "":
 
178
    usage()
 
179
    sys.exit(1)
 
180
 
 
181
  targets = readFile(target_file)
 
182
 
 
183
  if targets == []:
 
184
    print "No targets given"
 
185
    sys.exit(1)
 
186
 
 
187
  if prime:
 
188
    # Use the first ip and port to prime the DB
 
189
    firsttarget = targets[0]
 
190
    ip = firsttarget[0]
 
191
    port = firsttarget[1]
 
192
    for ii in range(0, TOTAL_NUM_ACCOUNTS/1000):
 
193
      # ii is the offset, batches of 1k each
 
194
      ret = tf.BatchCreateAccount(ip, port, 1000,ii * 1000,1000)
 
195
      print ret
 
196
    print "Done creating accounts." 
 
197
    ret = tf.Root(ip, port)
 
198
    print "Priming tables"
 
199
    print ret
 
200
    print "Done."
 
201
    sys.exit(0)
 
202
 
 
203
  if deleteAll:
 
204
    firsttarget = targets[0]
 
205
    ip = firsttarget[0]
 
206
    port = firsttarget[1]
 
207
    for ii in range(0, TOTAL_NUM_ACCOUNTS/100):
 
208
      # ii is the offset, batches of 1k each
 
209
      ret = tf.BatchDeleteAccount(ip, port, 100,ii * 100, 1000)
 
210
      print ret
 
211
    print "Done deleting accounts." 
 
212
    sys.exit(0)
 
213
 
 
214
    
 
215
  if slave_file == "":
 
216
    usage()
 
217
    sys.exit(1)
 
218
 
 
219
  slaves = readFile(slave_file)  
 
220
  if slaves == []:
 
221
    print "No slaves given"
 
222
    sys.exit(1)
 
223
 
 
224
 
 
225
  # Start a thread for accumulating responses
 
226
  #soap_thread = SoapThread()
 
227
  #soap_thread.setup()
 
228
  #soap_thread.start()
 
229
  global REQUEST_AMMOUNT_SET
 
230
  num_runs = 0
 
231
  total_time = 0
 
232
  highest_time = 0
 
233
  shortest_time = 999999
 
234
  if DEBUG: print "Slave:",slaves
 
235
  for index,ii in enumerate(REQUEST_AMMOUNT_SET):
 
236
    if DEBUG: print "Requesting",ii,"concurent threads"
 
237
    RESULTS[ii] = []
 
238
    for kk in range(0,TRIALS):
 
239
      # runslaves command works in parallel with synchronous request
 
240
      if DEBUG:
 
241
        print "Running trial ",kk
 
242
      start = time.time()
 
243
      result = runSlaves(slaves, targets, str(ii))
 
244
      if DEBUG: 
 
245
        print "result of trial:",result
 
246
      end = time.time()
 
247
      num_runs += 1 
 
248
      total_time += end - start
 
249
      if (end - start) > highest_time:
 
250
        highest_time = end - start
 
251
      if (end - start) < shortest_time:
 
252
        shortest_time = end - start
 
253
      prev = RESULTS[ii]
 
254
      prev.append(result)  
 
255
      RESULTS[ii] = prev
 
256
  print "Average time taken:",(total_time/num_runs)
 
257
  print "Longest time for a run:",highest_time
 
258
  print "Shortest time for a run:",shortest_time
 
259
  print RESULTS
 
260
  printResults()
 
261
if __name__ == "__main__":
 
262
  main(sys.argv[1:])
 
263