~ubuntu-branches/ubuntu/trusty/drizzle/trusty

« back to all changes in this revision

Viewing changes to tests/kewpie/lib/server_mgmt/server_management.py

  • Committer: Package Import Robot
  • Author(s): Dmitrijs Ledkovs
  • Date: 2013-10-29 15:43:40 UTC
  • mfrom: (1.2.12) (2.1.19 trusty-proposed)
  • Revision ID: package-import@ubuntu.com-20131029154340-2gp39el6cv8bwf2o
Tags: 1:7.2.3-2ubuntu1
* Merge from debian, remaining changes:
  - Link against boost_system because of boost_thread.
  - Add required libs to message/include.am
  - Add upstart job and adjust init script to be upstart compatible.
  - Disable -floop-parallelize-all due to gcc-4.8/4.9 compiler ICE
    http://gcc.gnu.org/bugzilla/show_bug.cgi?id=57732

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#! /usr/bin/env python
2
 
# -*- mode: python; indent-tabs-mode: nil; -*-
3
 
# vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
4
 
#
5
 
# Copyright (C) 2010, 2011 Patrick Crews
6
 
#
7
 
# This program is free software; you can redistribute it and/or modify
8
 
# it under the terms of the GNU General Public License as published by
9
 
# the Free Software Foundation; either version 2 of the License, or
10
 
# (at your option) any later version.
11
 
#
12
 
# This program is distributed in the hope that it will be useful,
13
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 
# GNU General Public License for more details.
16
 
#
17
 
# You should have received a copy of the GNU General Public License
18
 
# along with this program; if not, write to the Free Software
19
 
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20
 
 
21
 
"""server_management.py
22
 
   code for dealing with apportioning servers
23
 
   to suit the needs of the tests and executors
24
 
 
25
 
"""
26
 
# imports
27
 
import time
28
 
import os
29
 
import shutil
30
 
import subprocess
31
 
from ConfigParser import RawConfigParser
32
 
 
33
 
class serverManager:
34
 
    """ code that handles the server objects
35
 
        We use this to track, do mass actions, etc
36
 
        Single point of contact for this business
37
 
 
38
 
    """
39
 
 
40
 
    def __init__(self, system_manager, variables):
41
 
        self.skip_keys = [ 'system_manager'
42
 
                         , 'env_manager'
43
 
                         , 'code_manager'
44
 
                         , 'logging'
45
 
                         , 'gdb'
46
 
                         ]
47
 
        self.debug = variables['debug']
48
 
        self.verbose = variables['verbose']
49
 
        self.initial_run = 1
50
 
        # we try this to shorten things - will see how this works
51
 
        self.server_base_name = 's'
52
 
        self.no_secure_file_priv = variables['nosecurefilepriv']
53
 
        self.system_manager = system_manager
54
 
        self.code_manager = system_manager.code_manager
55
 
        self.env_manager = system_manager.env_manager
56
 
        self.logging = system_manager.logging
57
 
        self.gdb  = self.system_manager.gdb
58
 
        self.default_storage_engine = variables['defaultengine']
59
 
        self.default_server_type = variables['defaultservertype']
60
 
        self.user_server_opts = variables['drizzledoptions']
61
 
        self.servers = {}
62
 
 
63
 
        self.libeatmydata = variables['libeatmydata']
64
 
        self.libeatmydata_path = variables['libeatmydatapath']
65
 
 
66
 
        self.logging.info("Using default-storage-engine: %s" %(self.default_storage_engine))
67
 
        test_server = self.allocate_server( 'test_bot' 
68
 
                                          , None 
69
 
                                          , []
70
 
                                          , self.system_manager.workdir
71
 
                                          )
72
 
        self.logging.info("Testing for Innodb / Xtradb version...")
73
 
        test_server.start(working_environ=os.environ)
74
 
        try:
75
 
            innodb_ver, xtradb_ver = test_server.get_engine_info()
76
 
            self.logging.info("Innodb version: %s" %innodb_ver)
77
 
            self.logging.info("Xtradb version: %s" %xtradb_ver)
78
 
        except Exception, e:
79
 
            self.logging.error("Problem detecting innodb/xtradb version:")
80
 
            self.logging.error(Exception)
81
 
            self.logging.error(e)
82
 
            self.logging.error("Dumping server error.log...")
83
 
            test_server.dump_errlog()
84
 
        test_server.stop()
85
 
        test_server.cleanup()
86
 
        shutil.rmtree(test_server.workdir)
87
 
        del(test_server)
88
 
 
89
 
        self.logging.debug_class(self)
90
 
 
91
 
    def request_servers( self
92
 
                       , requester 
93
 
                       , workdir
94
 
                       , cnf_path
95
 
                       , server_requests
96
 
                       , server_requirements
97
 
                       , test_executor
98
 
                       , expect_fail = 0):
99
 
        """ We produce the server objects / start the server processes
100
 
            as requested.  We report errors and whatnot if we can't
101
 
            That is, unless we expect the server to not start, then
102
 
            we just return a value / message.
103
 
 
104
 
            server_requirements is a list of lists.  Each list
105
 
            is a set of server options - we create one server
106
 
            for each set of options requested
107
 
    
108
 
        """
109
 
 
110
 
        # Make sure our server is in a decent state, if the last test
111
 
        # failed, then we reset the server
112
 
        self.check_server_status(requester)
113
 
        
114
 
        # Make sure we have the proper number of servers for this requester
115
 
        self.process_server_count( requester
116
 
                                 , test_executor
117
 
                                 , len(server_requirements)
118
 
                                 , workdir
119
 
                                 , server_requirements)
120
 
 
121
 
        # Make sure we are running with the correct options 
122
 
        self.evaluate_existing_servers( requester
123
 
                                      , cnf_path
124
 
                                      , server_requests
125
 
                                      , server_requirements)
126
 
 
127
 
        # Fire our servers up
128
 
        bad_start = self.start_servers( requester
129
 
                                      , expect_fail)
130
 
 
131
 
        # Return them to the requester
132
 
        return (self.get_server_list(requester), bad_start)        
133
 
 
134
 
 
135
 
    
136
 
    def allocate_server( self
137
 
                       , requester
138
 
                       , test_executor
139
 
                       , server_options
140
 
                       , workdir
141
 
                       , server_type=None
142
 
                       , server_version=None):
143
 
        """ Intialize an appropriate server object.
144
 
            Start up occurs elsewhere
145
 
 
146
 
        """
147
 
        # use default server type unless specifically requested to do otherwise
148
 
        if not server_type:
149
 
            server_type = self.default_server_type
150
 
 
151
 
        # Get a name for our server
152
 
        server_name = self.get_server_name(requester)
153
 
 
154
 
        # initialize our new server_object
155
 
        # get the right codeTree type from the code manager
156
 
        code_tree = self.code_manager.get_tree(server_type, server_version)
157
 
 
158
 
        # import the correct server type object
159
 
        if server_type == 'drizzle':
160
 
            from lib.server_mgmt.drizzled import drizzleServer as server_type
161
 
        elif server_type == 'mysql':
162
 
            from lib.server_mgmt.mysqld import mysqlServer as server_type
163
 
        elif server_type == 'galera':
164
 
            from lib.server_mgmt.galera import mysqlServer as server_type
165
 
 
166
 
        new_server = server_type( server_name
167
 
                                , self
168
 
                                , code_tree
169
 
                                , self.default_storage_engine
170
 
                                , server_options
171
 
                                , requester
172
 
                                , test_executor
173
 
                                , workdir )
174
 
        return new_server
175
 
 
176
 
    def start_servers(self, requester, expect_fail):
177
 
        """ Start all servers for the requester """
178
 
        bad_start = 0
179
 
 
180
 
        for server in self.get_server_list(requester):
181
 
            if server.status == 0:
182
 
                bad_start = bad_start + server.start()
183
 
            else:
184
 
                self.logging.debug("Server %s already running" %(server.name))
185
 
        return bad_start
186
 
 
187
 
    def stop_servers(self, requester):
188
 
        """ Stop all servers running for the requester """
189
 
        for server in self.get_server_list(requester):
190
 
            server.stop()
191
 
 
192
 
    def stop_server_list(self, server_list, free_ports=False):
193
 
        """ Stop the servers in an arbitrary list of them """
194
 
        for server in server_list:
195
 
            server.stop()
196
 
        if free_ports:
197
 
            server.cleanup()
198
 
 
199
 
    def stop_all_servers(self):
200
 
        """ Stop all running servers """
201
 
 
202
 
        self.logging.info("Stopping all running servers...")
203
 
        for server_list in self.servers.values():
204
 
            for server in server_list:
205
 
                server.stop()
206
 
 
207
 
    def cleanup_all_servers(self):
208
 
        """Mainly for freeing server ports for now """
209
 
        for server_list in self.servers.values():
210
 
            for server in server_list:
211
 
                server.cleanup()
212
 
 
213
 
    def cleanup(self):
214
 
        """Stop all servers and free their ports and whatnot """
215
 
        self.stop_all_servers()
216
 
        self.cleanup_all_servers()
217
 
 
218
 
    def get_server_name(self, requester):
219
 
        """ We name our servers requester.server_basename.count
220
 
            where count is on a per-requester basis
221
 
            We see how many servers this requester has and name things 
222
 
            appropriately
223
 
 
224
 
        """
225
 
        self.has_servers(requester) # if requester isn't there, we create a blank entry
226
 
        server_count = self.server_count(requester)
227
 
        return "%s%d" %(self.server_base_name, server_count)
228
 
 
229
 
    def has_servers(self, requester):
230
 
        """ Check if the given requester has any servers """
231
 
        if requester not in self.servers: # new requester
232
 
           self.log_requester(requester) 
233
 
        return self.server_count(requester)
234
 
 
235
 
    def log_requester(self, requester):
236
 
        """ We create a log entry for the new requester """
237
 
 
238
 
        self.servers[requester] = []
239
 
 
240
 
    def log_server(self, new_server, requester):
241
 
        self.servers[requester].append(new_server)
242
 
 
243
 
    def evaluate_existing_servers( self, requester, cnf_path
244
 
                                 , server_requests, server_requirements):
245
 
        """ See if the requester has any servers and if they
246
 
            are suitable for the current test
247
 
 
248
 
            We should have the proper number of servers at this point
249
 
 
250
 
        """
251
 
 
252
 
        # A dictionary that holds various tricks
253
 
        # we can do with our test servers
254
 
        special_processing_reqs = {}
255
 
        if server_requests:
256
 
            # we have a direct dictionary in the testcase
257
 
            # that asks for what we want and we use it
258
 
            special_processing_reqs = server_requests
259
 
 
260
 
        current_servers = self.servers[requester]
261
 
 
262
 
        for index,server in enumerate(current_servers):
263
 
            # We handle a reset in case we need it:
264
 
            if server.need_reset:
265
 
                self.reset_server(server)
266
 
                server.need_reset = False
267
 
 
268
 
            desired_server_options = server_requirements[index]
269
 
            
270
 
            # do any special config processing - this can alter
271
 
            # how we view our servers
272
 
            if cnf_path:
273
 
                self.handle_server_config_file( cnf_path
274
 
                                              , server
275
 
                                              , special_processing_reqs
276
 
                                              , desired_server_options
277
 
                                              )
278
 
 
279
 
            if self.compare_options( server.server_options
280
 
                                   , desired_server_options):
281
 
                pass 
282
 
            else:
283
 
                # We need to reset what is running and change the server
284
 
                # options
285
 
                desired_server_options = self.filter_server_options(desired_server_options)
286
 
                self.reset_server(server)
287
 
                self.update_server_options(server, desired_server_options)
288
 
        self.handle_special_server_requests(special_processing_reqs, current_servers)
289
 
 
290
 
    def handle_server_config_file( self
291
 
                                 , cnf_path
292
 
                                 , server
293
 
                                 , special_processing_reqs
294
 
                                 , desired_server_options
295
 
                                 ):
296
 
        # We have a config reader so we can do
297
 
        # special per-server magic for setting up more
298
 
        # complex scenario-based testing (eg we use a certain datadir)
299
 
        config_reader = RawConfigParser()
300
 
        config_reader.read(cnf_path)
301
 
 
302
 
        # Do our checking for config-specific madness we need to do
303
 
        if config_reader and config_reader.has_section(server.name):
304
 
            # mark server for restart in case it hasn't yet
305
 
            # this method is a bit hackish - need better method later
306
 
            if '--restart' not in desired_server_options:
307
 
                desired_server_options.append('--restart')
308
 
            # We handle various scenarios
309
 
            server_config_data = config_reader.items(server.name)
310
 
            for cnf_option, data in server_config_data:
311
 
                if cnf_option == 'load-datadir':
312
 
                    datadir_path = data
313
 
                    request_key = 'datadir_requests'
314
 
                if request_key not in special_processing_reqs:
315
 
                    special_processing_reqs[request_key] = []
316
 
                special_processing_reqs[request_key].append((datadir_path,server))
317
 
 
318
 
    def handle_special_server_requests(self, request_dictionary, current_servers):
319
 
        """ We run through our set of special requests and do 
320
 
            the appropriate voodoo
321
 
 
322
 
        """
323
 
        for key, item in request_dictionary.items():
324
 
            if key == 'datadir_requests':
325
 
                self.load_datadirs(item, current_servers)
326
 
            if key == 'join_cluster':
327
 
                self.join_clusters(item, current_servers)
328
 
 
329
 
    def filter_server_options(self, server_options):
330
 
        """ Remove a list of options we don't want passed to the server
331
 
            these are test-case specific options.
332
 
 
333
 
            NOTE: It is a bad hack to allow test-runner commands
334
 
            to mix with server options willy-nilly in master-opt files
335
 
            as we do.  We need to kill this at some point : (
336
 
 
337
 
        """
338
 
        remove_options = [ '--restart'
339
 
                         , '--skip-stack-trace'
340
 
                         , '--skip-core-file'
341
 
                         , '--'
342
 
                         ]
343
 
        for remove_option in remove_options:
344
 
            if remove_option in server_options:
345
 
                server_options.remove(remove_option)
346
 
        return server_options
347
 
            
348
 
    
349
 
    def compare_options(self, optlist1, optlist2):
350
 
        """ Compare two sets of server options and see if they match """
351
 
        return sorted(optlist1) == sorted(optlist2)
352
 
 
353
 
    def reset_server(self, server):
354
 
        server.stop()
355
 
        server.restore_snapshot()
356
 
        server.reset()
357
 
 
358
 
    def reset_servers(self, requester):
359
 
        for server in self.servers[requester]:
360
 
            self.reset_server(server)
361
 
 
362
 
    def load_datadirs(self, datadir_requests, current_servers):
363
 
        """ We load source_dir to the server's datadir """
364
 
        for source_dir, server in datadir_requests:
365
 
            self.load_datadir(source_dir, server, current_servers)
366
 
 
367
 
    def load_datadir(self, source_dir, server, current_servers):
368
 
        """ We load source_dir to the server's datadir """
369
 
 
370
 
        if type(server) == int:
371
 
            # we have just an index (as we use in unittest files)
372
 
            # and we get the server from current_servers[idx]
373
 
            server = current_servers[server]
374
 
        source_dir_path = os.path.join(server.vardir,'std_data_ln',source_dir)
375
 
        self.system_manager.remove_dir(server.datadir)
376
 
        self.system_manager.copy_dir(source_dir_path, server.datadir)
377
 
        # We need to signal that the server will need to be reset as we're
378
 
        # using a non-standard datadir
379
 
        server.need_reset = True
380
 
 
381
 
    def join_clusters(self, cluster_requests, current_servers):
382
 
        """ We get a list of master, slave tuples and join
383
 
            them as needed
384
 
 
385
 
        """
386
 
        for cluster_set in cluster_requests:
387
 
            self.join_node_to_cluster(cluster_set, current_servers)
388
 
        
389
 
 
390
 
    def join_node_to_cluster(self, node_set, current_servers):
391
 
        """ We join node_set[1] to node_set[0].
392
 
            The server object is responsible for 
393
 
            implementing the voodoo required to 
394
 
            make this happen
395
 
 
396
 
        """
397
 
            
398
 
        master = current_servers[node_set[0]]
399
 
        slave = current_servers[node_set[1]]
400
 
        slave.set_master(master)
401
 
        # Assuming we'll reset master and slave for now...
402
 
        master.need_reset = True
403
 
        slave.need_reset = True
404
 
 
405
 
    def process_server_count( self
406
 
                            , requester
407
 
                            , test_executor
408
 
                            , desired_count
409
 
                            , workdir
410
 
                            , server_reqs):
411
 
        """ We see how many servers we have.  We shrink / grow
412
 
            the requesters set of servers as needed.
413
 
 
414
 
            If we shrink, we shutdown / reset the discarded servers
415
 
            (naturally)
416
 
 
417
 
        """
418
 
        if desired_count < 0:  desired_count = 1
419
 
 
420
 
        current_count = self.has_servers(requester)
421
 
        if desired_count > current_count:
422
 
            for i in range(desired_count - current_count):
423
 
                # We pass an empty options list when allocating
424
 
                # We'll update the options to what is needed elsewhere
425
 
                self.allocate_server(requester, test_executor, [], workdir)
426
 
        elif desired_count < current_count:
427
 
            good_servers = self.get_server_list(requester)[:desired_count]
428
 
            retired_servers = self.get_server_list(requester)[desired_count - current_count:]
429
 
            self.stop_server_list(retired_servers, free_ports=True)
430
 
            self.set_server_list(requester, good_servers)
431
 
            
432
 
         
433
 
 
434
 
    def server_count(self, requester):
435
 
        """ Return how many servers the the requester has """
436
 
        return len(self.servers[requester])
437
 
 
438
 
    def get_server_list(self, requester):
439
 
        """ Return the list of servers assigned to the requester """
440
 
        self.has_servers(requester) # initialize, hacky : (
441
 
        return self.servers[requester]
442
 
 
443
 
    def set_server_list(self, requester, server_list):
444
 
        """ Set the requesters list of servers to server_list """
445
 
 
446
 
        self.servers[requester] = server_list
447
 
 
448
 
    def add_server(self, requester, new_server):
449
 
       """ Add new_server to the requester's set of servers """
450
 
       self.servers[requester].append(new_server)
451
 
 
452
 
    def update_server_options(self, server, server_options):
453
 
        """ Change the option_list a server has to use on startup """
454
 
        self.logging.debug("Updating server: %s options" %(server.name))
455
 
        self.logging.debug("FROM: %s" %(server.server_options))
456
 
        self.logging.debug("TO: %s" %(server_options))
457
 
        server.set_server_options(server_options)
458
 
 
459
 
    def get_server_count(self):
460
 
        """ Find out how many servers we have out """
461
 
        server_count = 0
462
 
        for server_list in self.servers.values():
463
 
            for server in server_list:
464
 
                server_count = server_count + 1
465
 
        return server_count
466
 
 
467
 
    def check_server_status(self, requester):
468
 
        """ Make sure our servers are good,
469
 
            reset the otherwise.
470
 
 
471
 
        """
472
 
        for server in self.get_server_list(requester):
473
 
            if server.failed_test:
474
 
                self.reset_server(server)
475
 
 
476
 
    def handle_environment_reqs(self, server, working_environ):
477
 
        """ We update the working_environ as we need to
478
 
            before starting the server.
479
 
 
480
 
            This includes things like libeatmydata, ld_preloads, etc
481
 
 
482
 
        """
483
 
        environment_reqs = {}
484
 
 
485
 
        if self.libeatmydata:
486
 
            # We want to use libeatmydata to disable fsyncs
487
 
            # this speeds up test execution, but we only want
488
 
            # it to happen for the servers' environments
489
 
 
490
 
            environment_reqs.update({'LD_PRELOAD':self.libeatmydata_path})
491
 
 
492
 
        # handle ld_preloads
493
 
        ld_lib_paths = self.env_manager.join_env_var_values(server.code_tree.ld_lib_paths)
494
 
        environment_reqs.update({'LD_LIBRARY_PATH' : self.env_manager.append_env_var( 'LD_LIBRARY_PATH'
495
 
                                                                                    , ld_lib_paths
496
 
                                                                                    , suffix = 0
497
 
                                                                                    , quiet = 1
498
 
                                                                                    )
499
 
                                , 'DYLD_LIBRARY_PATH' : self.env_manager.append_env_var( 'DYLD_LIBRARY_PATH'
500
 
                                                                                       , ld_lib_paths
501
 
                                                                                       , suffix = 0
502
 
                                                                                       , quiet = 1
503
 
                                                                                       )
504
 
 
505
 
                                 })
506
 
        self.env_manager.update_environment_vars(environment_reqs)
507