~nchohan/+junk/mytools

« back to all changes in this revision

Viewing changes to bin/notranstest/clientside/tester.py

  • Committer: root
  • Date: 2010-11-03 07:43:57 UTC
  • Revision ID: root@appscale-image0-20101103074357-xea7ja3sor3x93oc
init

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Author Navraj Chohan
 
2
# This program is the client side tester to dbtranstest
 
3
import sys 
 
4
import time
 
5
import math
 
6
import test_functions
 
7
import getopt
 
8
import threading
 
9
import random
 
10
tf = test_functions
 
11
DEFAULT_IP = ["128.111.55.223"]
 
12
DEFAULT_PORT = [8080]
 
13
DEFAULT_RATE = 1 # per second
 
14
DEFAULT_BATCH_SIZE = 1
 
15
DEFAULT_TRIAL_LENGTH = 1
 
16
ID_KEY_LENGTH = 15 
 
17
DEFAULT_TRIALS = 1
 
18
DEFAULT_TESTS = ["batchput"]
 
19
VALID_TESTS = ["put","get","delete","query","increment","allowance","query","batchput","batchget","batchdelete","batchincrement","batchallowance"]
 
20
SINGLE_TESTS = VALID_TESTS[0:6]
 
21
BATCH_TESTS = VALID_TESTS[7:]
 
22
DEBUG = False 
 
23
PUTCOUNTVALUE = "1"
 
24
INCREMENTVALUE = "1"
 
25
VALID_QUERY_KIND = ["Account", "Parent", "Child"]
 
26
DEFAULT_QUERY_KIND = VALID_QUERY_KIND[0]
 
27
DEFAULT_OFFSET = 1
 
28
 
 
29
def padString(string):
 
30
  zero_padded_id = ("0" * (ID_KEY_LENGTH - len(string))) + string 
 
31
  return zero_padded_id
 
32
 
 
33
def generateKeys(numOfKeys, offset = 1):
 
34
  keys = []
 
35
  for ii in range(offset, offset + numOfKeys + 1):
 
36
    keys.append(padString(str(ii)))     
 
37
  return keys
 
38
 
 
39
def getBatchKeyString(keys):
 
40
  return ':'.join(keys)
 
41
 
 
42
def getIPsAndPorts(file):
 
43
  FILE = open(file, "r")
 
44
  contents = FILE.readlines()
 
45
  ips = []
 
46
  ports = []
 
47
  for line in contents:
 
48
    if line[0] == '#':
 
49
      continue 
 
50
    tokens = line.split(':')
 
51
    ip = tokens[0]
 
52
    port = tokens[1].rstrip() 
 
53
    ips.append(ip)
 
54
    ports.append(port)
 
55
  return ips,ports
 
56
 
 
57
""" Latency test:
 
58
  Do X puts for a counter, time it
 
59
  Do X gets for those same counters, time it
 
60
  Do Y number of repeat queries, time each
 
61
  Do X increments on those same counters, time it
 
62
  Do X gets on each counter to verify
 
63
  Do X delets on all counters, time it
 
64
"""
 
65
def usage():
 
66
  print "Usage:"
 
67
  print "-r or --request for number of request per second"
 
68
  print "-t or --trials for the number of trials"
 
69
  print "--test for testing any web server's root page"
 
70
  print "-s or --slave_file for location of slave file" 
 
71
  print "\tFormat of file should be a ip:port for each line"
 
72
  print "-o or --output to specify the output file"
 
73
  print "-d for debug"
 
74
  print "-f or --offset for key offset number"
 
75
  print "\t useful for having multiple clients during testing"
 
76
  print "Examples:"
 
77
  print "TODO"
 
78
def main(argv):
 
79
  debug = DEBUG
 
80
  ips = DEFAULT_IP
 
81
  ports = DEFAULT_PORT
 
82
  req_per_sec = DEFAULT_RATE 
 
83
  batch_size = DEFAULT_BATCH_SIZE
 
84
  num_trials = DEFAULT_TRIALS
 
85
  trial_length = DEFAULT_TRIAL_LENGTH
 
86
  tests = DEFAULT_TESTS
 
87
  query_kind = DEFAULT_QUERY_KIND
 
88
  offset = DEFAULT_OFFSET
 
89
  output_file = ""
 
90
  try:
 
91
    opts, args = getopt.getopt(argv, "k:r:s:b:t:l:y:o:f:ud",
 
92
                            ["kind=",
 
93
                             "request=",
 
94
                             "slave_file=",
 
95
                             "batch_size=",
 
96
                             "trials=",
 
97
                             "length=",
 
98
                             "tests=",
 
99
                             "output=",
 
100
                             "offset=",
 
101
                             "usage",
 
102
                             "debug"])
 
103
  except getopt.GetoptError:
 
104
    usage()
 
105
    sys.exit(1)
 
106
  for opt, arg in opts:
 
107
    if opt in ("-r", "--request"):
 
108
      req_per_sec = int(arg)
 
109
      if debug: print "Number of request per second:",arg
 
110
    if opt in ("-s", "--slave_file"):
 
111
      ips, ports = getIPsAndPorts(arg)
 
112
      if debug: print "IPs:",str(ips)
 
113
      if debug: print "Ports:",str(ports)
 
114
    if opt in ("-b", "--batch_size"):
 
115
      batch_size = int(arg) 
 
116
      if debug: print "Batch size:",batch_size
 
117
    if opt in ("-t", "--trials"):
 
118
      num_trials = int(arg)
 
119
      if debug: print "Number of trials",num_trials
 
120
    if opt in ("-k", "--kind"):
 
121
      query_kind = arg
 
122
      if debug: print "Kind of query:",query_kind,"seconds"
 
123
      if query_kind not in VALID_QUERY_KIND:
 
124
        print "%s is not a valid kind of query type"%query_kind
 
125
        exit(1)
 
126
    if opt in ("-l", "--length"):
 
127
      trial_length = int(arg)
 
128
      if debug: print "Length of test:",trial_length,"seconds"
 
129
    if opt in ("-y", "--tests"):
 
130
      tests = arg
 
131
      tests = tests.split(":")
 
132
      error = False
 
133
      for ii in tests:
 
134
        if ii not in VALID_TESTS:
 
135
          error = True
 
136
          print "Test \"" + ii + "\" is not a valid test"
 
137
      if error:
 
138
          usage()
 
139
          exit(1)
 
140
      if debug: print "Running tests:",tests
 
141
    if opt in ("-o", "--output"):
 
142
      output_file = arg
 
143
      if debug: print "Output file:",output_file
 
144
    if opt in ("-u", "--usage"):
 
145
      usage()
 
146
      exit(0)
 
147
    if opt in ("-d", "--debug"):
 
148
      debug = True
 
149
      if debug: print "Debug mode is on"
 
150
    if opt in ("-f", "--offset"):
 
151
      offset = int(arg)
 
152
      if debug: print "Offset is:",offset
 
153
 
 
154
  error = False
 
155
  # Primes the database if needed (Hbase in particular)
 
156
  if debug:  print "Accessing web page",ips[0],":",ports[0]
 
157
  ret = tf.Root(ips[0], ports[0])
 
158
  if ret != "Hello From DB Trans Tester":
 
159
    error = True
 
160
    print "Error accessing web page",ips[0],":",ports[0]
 
161
  # Check to see that all slaves are running
 
162
  for ii in range(0, len(ips)): 
 
163
    if debug:  print "Accessing web page",ips[ii],":",ports[ii]
 
164
    ret = tf.MainIndex(ips[ii], ports[ii])
 
165
    if ret != "Hello From DB Trans Tester":
 
166
      error = True
 
167
      print "Error accessing web page",ips[ii],":",ports[ii]
 
168
  if error:
 
169
    print "exiting due to errors..."
 
170
    exit(1) 
 
171
 
 
172
  results = {}
 
173
  for ii in tests:  
 
174
    total_data = [] 
 
175
    if ii in SINGLE_TESTS or ii in BATCH_TESTS: 
 
176
      if debug: print "Running Single Test"
 
177
      for kk in range(0, num_trials):
 
178
        output = single_test(ips, 
 
179
                             ports, 
 
180
                             req_per_sec,  
 
181
                             trial_length, 
 
182
                             batch_size, 
 
183
                             ii, 
 
184
                             query_kind, 
 
185
                             offset,
 
186
                             debug)
 
187
        total_data.append(output)
 
188
      if debug: print "Method:",ii,",Output:\n",output 
 
189
    results[ii] = total_data
 
190
 
 
191
  logit(output_file, results)     
 
192
  stats(results)
 
193
 
 
194
def logit(output_file, results):
 
195
  if output_file:
 
196
    for r in results:
 
197
      ofile = open(output_file + "_" + r, "w")
 
198
      for ii in results[r]:
 
199
        ofile.write(ii)
 
200
        #file.write('\n#\n')
 
201
      ofile.close()
 
202
 
 
203
def getTotal(points):
 
204
  total = 0
 
205
  for ii in points:
 
206
    total += float(ii)
 
207
  return total
 
208
 
 
209
def getAverage(points, total = None):
 
210
  if total == None:
 
211
    total = getTotal(points)
 
212
  if len(points) == 0:
 
213
    return 0
 
214
  return total/len(points)
 
215
 
 
216
def getStDev(points, average=None):
 
217
  total = 0;
 
218
  if average == None:
 
219
    average = getAverage(points)
 
220
  for ii in points:
 
221
    total += (float(ii) - average) * (float(ii) - average)
 
222
  if len(points) == 0:
 
223
    return 0
 
224
  return math.sqrt(total/len(points))
 
225
 
 
226
 
 
227
def stats(results):
 
228
  for key in results:
 
229
    total = 0
 
230
    avg = 0
 
231
    min = 9999
 
232
    max = 0
 
233
    stdev = 0 
 
234
    numFailed = 0
 
235
    timings = []
 
236
    print "Key:",key
 
237
    for ii in results[key]:
 
238
      lines = ii.split('\n')
 
239
      isSuccess = False
 
240
      for line in lines:
 
241
        if line.startswith("<html>"):
 
242
          # Timeout error in appscale
 
243
          isSuccess = False
 
244
          numFailed += 1
 
245
        if line.startswith("Success:"): 
 
246
          if line.endswith("True"):
 
247
            isSuccess = True 
 
248
          else:
 
249
            isSuccess = False
 
250
            numFailed += 1
 
251
        if line.startswith("Timings:") and isSuccess:
 
252
          tokens = line.split(':')
 
253
          times = tokens[1]
 
254
          times = times.split(',')
 
255
          for index, tt in enumerate(times): 
 
256
            tt.rstrip('\n')
 
257
            times[index] = float(tt)
 
258
          timings += times
 
259
    for ii in timings:
 
260
      if ii < min:
 
261
        min = ii
 
262
      if ii > max:
 
263
        max = ii
 
264
    total = getTotal(timings)
 
265
    avg = getAverage(timings, total)
 
266
    stdev = getStDev(timings, avg)
 
267
    print "Average:",avg
 
268
    print "Total:",total
 
269
    print "Total Number:",len(timings)
 
270
    print "Stdev:",stdev
 
271
    print "Min:",min
 
272
    print "Max:",max
 
273
    print "Number of failures:",numFailed
 
274
    print "="*80
 
275
 
 
276
class testThread(threading.Thread):
 
277
  def setup(self, keyset1, keyset2, ip, port, command, batch_size, query_kind, debug=False):
 
278
    self.ip_ = ip
 
279
    self.port_ = port
 
280
    self.command_ = command 
 
281
    self.keyset1_ = keyset1
 
282
    self.keyset2_ = keyset2
 
283
    self.batch_size_ = batch_size
 
284
    self.query_kind_ = query_kind
 
285
    self.debug_ = debug
 
286
    self.status_ = "init"
 
287
    self.output_ = ""
 
288
    self.start_ = time.time()
 
289
    self.end_ = 0
 
290
    if self.debug_: print "ip: %s, port: %s, command: %s"\
 
291
                            %(self.ip_,self.port_,self.command_)
 
292
    
 
293
  def run(self):
 
294
    self.status_ = "running"
 
295
    if self.debug_: print "Thread %s run function: %s"\
 
296
              %(str(self),(self.command_ + "_test"))
 
297
    function = getattr(self, self.command_ + "_test")
 
298
    self.output_ = function()
 
299
    if self.debug_: print "Thread %s done"%(str(self))
 
300
    self.status_ = "terminated"
 
301
    self.end_ = time.time()
 
302
 
 
303
  def put_test(self):
 
304
    result = tf.PutCount(self.ip_, self.port_, self.keyset1_[0], PUTCOUNTVALUE)
 
305
    return result
 
306
 
 
307
  def get_test(self):
 
308
    result = tf.GetCount(self.ip_, self.port_, self.keyset1_[0])
 
309
    return result
 
310
 
 
311
  def delete_test(self):
 
312
    result = tf.DeleteCount(self.ip_, self.port_, self.keyset1_[0])
 
313
    return result
 
314
 
 
315
  def increment_test(self):
 
316
    result = tf.Increment(self.ip_, self.port_, self.keyset1_[0], INCREMENTVALUE)
 
317
    return result
 
318
 
 
319
  def allowance_test(self):
 
320
    tf.CreateParentChild(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0])
 
321
    result = tf.GiveAllowance(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0],  PUTCOUNTVALUE)
 
322
    return result
 
323
 
 
324
  def query_test(self):
 
325
    return tf.Query(self.ip_, self.port_, self.query_kind_)
 
326
 
 
327
  def batchput_test(self):
 
328
    result = tf.BatchPutCount(self.ip_, self.port_, self.keyset1_, PUTCOUNTVALUE)
 
329
    return result
 
330
 
 
331
  def batchget_test(self):
 
332
    result = tf.BatchGetCount(self.ip_, self.port_, self.keyset1_)
 
333
    return result
 
334
 
 
335
  def batchdelete_test(self):
 
336
    result = tf.BatchDeleteCount(self.ip_, self.port_, self.keyset1_)
 
337
    return result
 
338
 
 
339
  def batchincrement_test(self):
 
340
    result = tf.BatchIncrement(self.ip_, self.port_, self.keyset1_, PUTCOUNTVALUE)
 
341
    return result
 
342
 
 
343
  def batchallowance_test(self):
 
344
    tf.CreateParentChild(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0])
 
345
    result = tf.BatchGiveAllowance(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0], PUTCOUNTVALUE, self.batch_size_)
 
346
    return result
 
347
 
 
348
# returns results of a single trial 
 
349
def single_test(ips, ports, rps, trial_length, batch_size, method, query_kind, offset, debug = DEBUG):
 
350
  #log = logger()
 
351
  total_rtt = 0
 
352
  log = ""
 
353
  total_request = rps * trial_length * batch_size * len(ips)
 
354
  keys1 = generateKeys(total_request, offset)
 
355
  keys2 = generateKeys(total_request, 1 + len(keys1))
 
356
  if debug: print "Total keyspace 1:",keys1
 
357
  if debug: print "Total keyspace 2:",keys2
 
358
  if debug: print "ips %s, ports %s, rps %s, trial_length %s, debug %s"                           %(ips, ports, rps, trial_length, debug)
 
359
  if debug: print "Sending a total of %d request"%total_request
 
360
  key_index = 0
 
361
  threadlist = []
 
362
  index = 0
 
363
  for length in range(0, trial_length):
 
364
    for ii in range(0,rps):
 
365
      # send as threaded request
 
366
    #  for index, ip in enumerate(ips):
 
367
      if debug: print "Sending to ",ips[index],"on port",ports[index]
 
368
      thread = testThread()
 
369
      threadlist.append(thread)
 
370
      keysub1 = keys1[key_index:key_index + batch_size]
 
371
      keysub2 = keys2[key_index:key_index + batch_size]
 
372
      if debug: print "key space 1:",keysub1
 
373
      if debug: print "key space 2:",keysub2
 
374
      thread.setup(keysub1, keysub2, ips[index], ports[index], method, batch_size, query_kind, debug)
 
375
      thread.start()
 
376
      key_index += batch_size
 
377
      # round robin
 
378
      #index = (index + 1)%len(ips)
 
379
      # random 
 
380
      index = random.randint(0, len(ips) - 1)
 
381
    time.sleep(1)
 
382
  for thread in threadlist:
 
383
    thread.join()
 
384
    if debug: print "Joining thread %s"%str(thread)
 
385
    if thread.status_ == "terminated":
 
386
      log += "Method:"+thread.command_ + "\n"
 
387
      log += "RTT Start:"+ str(thread.start_) + "\n"
 
388
      log += "RTT End:"+str(thread.end_)+ "\n"
 
389
      log += "RTT Time:"+str(thread.end_ - thread.start_)+ "\n"
 
390
      total_rtt += thread.end_ - thread.start_
 
391
      log += thread.output_ 
 
392
      log += "#\n"
 
393
    else:
 
394
      print "Error with thread after join with status %s"%thread.status_
 
395
  print "Total round trip time of test:",total_rtt
 
396
  return log   
 
397
 
 
398
if __name__ == '__main__':
 
399
  main(sys.argv[1:])