3
## This Amulet test deploys memcached.
9
# The number of seconds to wait for the environment to setup.
12
d = amulet.Deployment()
13
# Add the memcached charm to the deployment.
15
# Add the mediawiki charm to the deployment.
17
# Create a relation from memcached to mediawiki
18
d.relate('memcached:cache', 'mediawiki:cache')
20
# Create a configuration dictionary for custom memcached values.
21
configuration = {'size': 512, 'connection-limit': 128, 'factor': 1.10,
22
'tcp-port': 11212, 'udp-port': 11213}
23
d.configure('memcached', configuration)
24
# Expose memcached so it is visible to the tests.
28
# Execute the deployer with the current mapping.
29
d.setup(timeout=seconds)
30
# Wait for the relation to finish the translations.
31
d.sentry.wait(seconds)
32
except amulet.helpers.TimeoutError:
33
message = 'The environment did not setup in %d seconds.' % seconds
34
# The SKIP status enables skip or fail the test based on configuration.
35
amulet.raise_status(amulet.SKIP, msg=message)
39
# Get the sentry for memcached.
40
memcached_unit = d.sentry.unit['memcached/0']
42
## Test if the memcached service is running.
44
# Run the command that checks if the memcached server instance is running.
45
command = 'service memcached status'
46
# Execute the command on the deployed service.
47
output, code = memcached_unit.run(command)
50
message = 'The ' + command + ' returned %d.' % code
52
amulet.raise_status(amulet.FAIL, msg=message)
54
message = 'The memcached service is running.'
58
## Test memcached using telnet commands.
60
# Get the public address for memcached instance.
61
memcached_address = memcached_unit.info['public-address']
62
# Get the port for memcached instance.
63
memcached_port = configuration['tcp-port']
66
# Connect to memcached via telnet.
67
tn = telnetlib.Telnet(memcached_address, memcached_port)
68
date_time = time.strftime("%F %r")
69
# Write the command that gets the current greeting.
70
tn.write(b'get greeting\r\n')
71
# Read the current greeting.
72
response = tn.read_until(b'END', 2)
73
# Create a string with date and time for this test.
74
string = 'memcached test %s' % date_time
75
command = 'set greeting 1 0 %d' % len(string)
76
# Write the command that sets the new greeting.
77
tn.write(command.encode() + b'\r\n')
78
tn.write(string.encode() + b'\r\n')
80
response = tn.read_until(b'STORED', 2)
81
# Write the command that gets the greeting.
82
tn.write(b'get greeting\r\n')
83
# Get the new greeting in memcached.
84
response = tn.read_until(b'END', 2)
85
response = response.decode()
86
print('get greeting response:')
88
# Look for the string in the response.
89
index = response.find(string)
91
print('Found %s in the greeting response.' % string)
94
message = 'Did not find %s in the greeting from memcached.' % string
95
amulet.raise_status(amulet.FAIL, msg=message)
97
except Exception as e:
98
message = 'An error occurred communicating with memcached over telnet ' \
99
'{0}:{1} {3}'.format(memcached_address, memcached_port, str(e))
100
amulet.raise_status(amulet.FAIL, msg=message)
104
## Test if the memcached service is configured properly.
106
# Get the contents of the memcached configuration file.
107
config_string = memcached_unit.file_contents('/etc/memcached.conf')
108
# Parse the configuration file for the values sent in the deployment.
109
for line in config_string.splitlines():
110
if line.startswith('-m '):
111
size = line.split()[1]
112
elif line.startswith('-c '):
113
limit = line.split()[1]
114
elif line.startswith('-f '):
115
factor = line.split()[1]
117
# Check for the configured values.
118
if (configuration['size'] != int(size) or
119
configuration['connection-limit'] != int(limit) or
120
configuration['factor'] != float(factor)):
121
message = 'The memcached deployment was not configured correctly, size: ' \
122
'{0} limit: {1} factor: {2}'.format(size, limit, factor)
123
amulet.raise_status(amulet.FAIL, msg=message)
125
message = 'The memcached deployment was configured correctly.'
128
## Test if the relation is complete and data was exchanged properly.
130
memcached_unit = d.sentry.unit['memcached/0']
131
# Get the relation from memcached to mediawiki.
132
relation = memcached_unit.relation('cache', 'mediawiki:cache')
134
# Make sure the relation got the port information set by the configuration.
135
if (configuration['tcp-port'] != int(relation['port']) or
136
configuration['udp-port'] != int(relation['udp-port'])):
137
message = 'The memcached relation was not configured correctly, port: ' \
138
'{0} udp-port: {1}'.format(relation['port'], relation['udp-port'])
139
amulet.raise_status(amulet.FAIL, msg=message)
141
message = 'The memcached relation was configured correctly.'
144
# Print a message indicating the charm passed all the tests.
145
message = 'The memcached charm passed the deploy tests!'