2
# This program is the client side tester to dbtranstest
11
DEFAULT_IP = ["128.111.55.223"]
13
DEFAULT_RATE = 1 # per second
14
DEFAULT_BATCH_SIZE = 1
15
DEFAULT_TRIAL_LENGTH = 1
18
DEFAULT_TESTS = ["batchput"]
19
VALID_TESTS = ["put","get","delete","query","increment","allowance","query","batchput","batchget","batchdelete","batchincrement","batchallowance"]
20
SINGLE_TESTS = VALID_TESTS[0:6]
21
BATCH_TESTS = VALID_TESTS[7:]
25
VALID_QUERY_KIND = ["Account", "Parent", "Child"]
26
DEFAULT_QUERY_KIND = VALID_QUERY_KIND[0]
29
def padString(string):
30
zero_padded_id = ("0" * (ID_KEY_LENGTH - len(string))) + string
33
def generateKeys(numOfKeys, offset = 1):
35
for ii in range(offset, offset + numOfKeys + 1):
36
keys.append(padString(str(ii)))
39
def getBatchKeyString(keys):
42
def getIPsAndPorts(file):
43
FILE = open(file, "r")
44
contents = FILE.readlines()
50
tokens = line.split(':')
52
port = tokens[1].rstrip()
58
Do X puts for a counter, time it
59
Do X gets for those same counters, time it
60
Do Y number of repeat queries, time each
61
Do X increments on those same counters, time it
62
Do X gets on each counter to verify
63
Do X delets on all counters, time it
67
print "-r or --request for number of request per second"
68
print "-t or --trials for the number of trials"
69
print "--test for testing any web server's root page"
70
print "-s or --slave_file for location of slave file"
71
print "\tFormat of file should be a ip:port for each line"
72
print "-o or --output to specify the output file"
74
print "-f or --offset for key offset number"
75
print "\t useful for having multiple clients during testing"
82
req_per_sec = DEFAULT_RATE
83
batch_size = DEFAULT_BATCH_SIZE
84
num_trials = DEFAULT_TRIALS
85
trial_length = DEFAULT_TRIAL_LENGTH
87
query_kind = DEFAULT_QUERY_KIND
88
offset = DEFAULT_OFFSET
91
opts, args = getopt.getopt(argv, "k:r:s:b:t:l:y:o:f:ud",
103
except getopt.GetoptError:
106
for opt, arg in opts:
107
if opt in ("-r", "--request"):
108
req_per_sec = int(arg)
109
if debug: print "Number of request per second:",arg
110
if opt in ("-s", "--slave_file"):
111
ips, ports = getIPsAndPorts(arg)
112
if debug: print "IPs:",str(ips)
113
if debug: print "Ports:",str(ports)
114
if opt in ("-b", "--batch_size"):
115
batch_size = int(arg)
116
if debug: print "Batch size:",batch_size
117
if opt in ("-t", "--trials"):
118
num_trials = int(arg)
119
if debug: print "Number of trials",num_trials
120
if opt in ("-k", "--kind"):
122
if debug: print "Kind of query:",query_kind,"seconds"
123
if query_kind not in VALID_QUERY_KIND:
124
print "%s is not a valid kind of query type"%query_kind
126
if opt in ("-l", "--length"):
127
trial_length = int(arg)
128
if debug: print "Length of test:",trial_length,"seconds"
129
if opt in ("-y", "--tests"):
131
tests = tests.split(":")
134
if ii not in VALID_TESTS:
136
print "Test \"" + ii + "\" is not a valid test"
140
if debug: print "Running tests:",tests
141
if opt in ("-o", "--output"):
143
if debug: print "Output file:",output_file
144
if opt in ("-u", "--usage"):
147
if opt in ("-d", "--debug"):
149
if debug: print "Debug mode is on"
150
if opt in ("-f", "--offset"):
152
if debug: print "Offset is:",offset
155
# Primes the database if needed (Hbase in particular)
156
if debug: print "Accessing web page",ips[0],":",ports[0]
157
ret = tf.Root(ips[0], ports[0])
158
if ret != "Hello From DB Trans Tester":
160
print "Error accessing web page",ips[0],":",ports[0]
161
# Check to see that all slaves are running
162
for ii in range(0, len(ips)):
163
if debug: print "Accessing web page",ips[ii],":",ports[ii]
164
ret = tf.MainIndex(ips[ii], ports[ii])
165
if ret != "Hello From DB Trans Tester":
167
print "Error accessing web page",ips[ii],":",ports[ii]
169
print "exiting due to errors..."
175
if ii in SINGLE_TESTS or ii in BATCH_TESTS:
176
if debug: print "Running Single Test"
177
for kk in range(0, num_trials):
178
output = single_test(ips,
187
total_data.append(output)
188
if debug: print "Method:",ii,",Output:\n",output
189
results[ii] = total_data
191
logit(output_file, results)
194
def logit(output_file, results):
197
ofile = open(output_file + "_" + r, "w")
198
for ii in results[r]:
203
def getTotal(points):
209
def getAverage(points, total = None):
211
total = getTotal(points)
214
return total/len(points)
216
def getStDev(points, average=None):
219
average = getAverage(points)
221
total += (float(ii) - average) * (float(ii) - average)
224
return math.sqrt(total/len(points))
237
for ii in results[key]:
238
lines = ii.split('\n')
241
if line.startswith("<html>"):
242
# Timeout error in appscale
245
if line.startswith("Success:"):
246
if line.endswith("True"):
251
if line.startswith("Timings:") and isSuccess:
252
tokens = line.split(':')
254
times = times.split(',')
255
for index, tt in enumerate(times):
257
times[index] = float(tt)
264
total = getTotal(timings)
265
avg = getAverage(timings, total)
266
stdev = getStDev(timings, avg)
269
print "Total Number:",len(timings)
273
print "Number of failures:",numFailed
276
class testThread(threading.Thread):
277
def setup(self, keyset1, keyset2, ip, port, command, batch_size, query_kind, debug=False):
280
self.command_ = command
281
self.keyset1_ = keyset1
282
self.keyset2_ = keyset2
283
self.batch_size_ = batch_size
284
self.query_kind_ = query_kind
286
self.status_ = "init"
288
self.start_ = time.time()
290
if self.debug_: print "ip: %s, port: %s, command: %s"\
291
%(self.ip_,self.port_,self.command_)
294
self.status_ = "running"
295
if self.debug_: print "Thread %s run function: %s"\
296
%(str(self),(self.command_ + "_test"))
297
function = getattr(self, self.command_ + "_test")
298
self.output_ = function()
299
if self.debug_: print "Thread %s done"%(str(self))
300
self.status_ = "terminated"
301
self.end_ = time.time()
304
result = tf.PutCount(self.ip_, self.port_, self.keyset1_[0], PUTCOUNTVALUE)
308
result = tf.GetCount(self.ip_, self.port_, self.keyset1_[0])
311
def delete_test(self):
312
result = tf.DeleteCount(self.ip_, self.port_, self.keyset1_[0])
315
def increment_test(self):
316
result = tf.Increment(self.ip_, self.port_, self.keyset1_[0], INCREMENTVALUE)
319
def allowance_test(self):
320
tf.CreateParentChild(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0])
321
result = tf.GiveAllowance(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0], PUTCOUNTVALUE)
324
def query_test(self):
325
return tf.Query(self.ip_, self.port_, self.query_kind_)
327
def batchput_test(self):
328
result = tf.BatchPutCount(self.ip_, self.port_, self.keyset1_, PUTCOUNTVALUE)
331
def batchget_test(self):
332
result = tf.BatchGetCount(self.ip_, self.port_, self.keyset1_)
335
def batchdelete_test(self):
336
result = tf.BatchDeleteCount(self.ip_, self.port_, self.keyset1_)
339
def batchincrement_test(self):
340
result = tf.BatchIncrement(self.ip_, self.port_, self.keyset1_, PUTCOUNTVALUE)
343
def batchallowance_test(self):
344
tf.CreateParentChild(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0])
345
result = tf.BatchGiveAllowance(self.ip_, self.port_, self.keyset1_[0], self.keyset2_[0], PUTCOUNTVALUE, self.batch_size_)
348
# returns results of a single trial
349
def single_test(ips, ports, rps, trial_length, batch_size, method, query_kind, offset, debug = DEBUG):
353
total_request = rps * trial_length * batch_size * len(ips)
354
keys1 = generateKeys(total_request, offset)
355
keys2 = generateKeys(total_request, 1 + len(keys1))
356
if debug: print "Total keyspace 1:",keys1
357
if debug: print "Total keyspace 2:",keys2
358
if debug: print "ips %s, ports %s, rps %s, trial_length %s, debug %s" %(ips, ports, rps, trial_length, debug)
359
if debug: print "Sending a total of %d request"%total_request
363
for length in range(0, trial_length):
364
for ii in range(0,rps):
365
# send as threaded request
366
# for index, ip in enumerate(ips):
367
if debug: print "Sending to ",ips[index],"on port",ports[index]
368
thread = testThread()
369
threadlist.append(thread)
370
keysub1 = keys1[key_index:key_index + batch_size]
371
keysub2 = keys2[key_index:key_index + batch_size]
372
if debug: print "key space 1:",keysub1
373
if debug: print "key space 2:",keysub2
374
thread.setup(keysub1, keysub2, ips[index], ports[index], method, batch_size, query_kind, debug)
376
key_index += batch_size
378
#index = (index + 1)%len(ips)
380
index = random.randint(0, len(ips) - 1)
382
for thread in threadlist:
384
if debug: print "Joining thread %s"%str(thread)
385
if thread.status_ == "terminated":
386
log += "Method:"+thread.command_ + "\n"
387
log += "RTT Start:"+ str(thread.start_) + "\n"
388
log += "RTT End:"+str(thread.end_)+ "\n"
389
log += "RTT Time:"+str(thread.end_ - thread.start_)+ "\n"
390
total_rtt += thread.end_ - thread.start_
391
log += thread.output_
394
print "Error with thread after join with status %s"%thread.status_
395
print "Total round trip time of test:",total_rtt
398
if __name__ == '__main__':