~patrick-hetu/+junk/volume-ephemeral-storage

« back to all changes in this revision

Viewing changes to hooks/hooks.py

  • Committer: Mark Mims
  • Date: 2013-08-26 20:37:00 UTC
  • mfrom: (48.4.4 fix-races)
  • Revision ID: mark.mims@canonical.com-20130826203700-e2ni03twv354i0pu
merging lp:~stub/charms/precise/postgresql/fix-races as per https://code.launchpad.net/~stub/charms/precise/postgresql/fix-races/+merge/181740

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
from charmhelpers.core import hookenv, host
22
22
from charmhelpers.core.hookenv import (
23
 
    CRITICAL, ERROR, WARNING, INFO, DEBUG, log,
 
23
    CRITICAL, ERROR, WARNING, INFO, DEBUG,
24
24
    )
25
25
 
26
26
hooks = hookenv.Hooks()
28
28
# jinja2 may not be importable until the install hook has installed the
29
29
# required packages.
30
30
def Template(*args, **kw):
 
31
    """jinja2.Template with deferred jinja2 import"""
31
32
    from jinja2 import Template
32
33
    return Template(*args, **kw)
33
34
 
34
35
 
35
36
def log(msg, lvl=INFO):
36
 
    # Per Bug #1208787, log messages sent via juju-log are being lost.
37
 
    # Spit messages out to a log file to work around the problem.
 
37
    '''Log a message.
 
38
 
 
39
    Per Bug #1208787, log messages sent via juju-log are being lost.
 
40
    Spit messages out to a log file to work around the problem.
 
41
    It is also rather nice to have the log messages we explicitly emit
 
42
    in a separate log file, rather than just mashed up with all the
 
43
    juju noise.
 
44
    '''
38
45
    myname = hookenv.local_unit().replace('/', '-')
39
 
    with open('/tmp/{}-debug.log'.format(myname), 'a') as f:
40
 
        f.write('{}: {}\n'.format(lvl, msg))
 
46
    ts = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
 
47
    with open('/var/log/juju/{}-debug.log'.format(myname), 'a') as f:
 
48
        f.write('{} {}: {}\n'.format(ts, lvl, msg))
41
49
    hookenv.log(msg, lvl)
42
50
 
43
51
 
49
57
        self.load()
50
58
 
51
59
    def load(self):
 
60
        '''Load stored state from local disk.'''
52
61
        if os.path.exists(self._state_file):
53
62
            state = pickle.load(open(self._state_file, 'rb'))
54
63
        else:
58
67
        self.update(state)
59
68
 
60
69
    def save(self):
 
70
        '''Store state to local disk.'''
61
71
        state = {}
62
72
        state.update(self)
63
73
        pickle.dump(state, open(self._state_file, 'wb'))
181
191
 
182
192
 
183
193
def postgresql_autostart(enabled):
 
194
    startup_file = os.path.join(postgresql_config_dir, 'start.conf')
184
195
    if enabled:
185
196
        log("Enabling PostgreSQL startup in {}".format(startup_file))
186
197
        mode = 'auto'
187
198
    else:
188
199
        log("Disabling PostgreSQL startup in {}".format(startup_file))
189
200
        mode = 'manual'
190
 
    startup_file = os.path.join(postgresql_config_dir, 'start.conf')
191
201
    contents = Template(open("templates/start_conf.tmpl").read()).render(
192
202
        {'mode': mode})
193
203
    host.write_file(
209
219
 
210
220
 
211
221
def postgresql_is_running():
 
222
    '''Return true if PostgreSQL is running.'''
212
223
    # init script always return true (9.1), add extra check to make it useful
213
224
    status, output = commands.getstatusoutput("invoke-rc.d postgresql status")
214
225
    if status != 0:
219
230
 
220
231
 
221
232
def postgresql_stop():
222
 
    host.service_stop('postgresql')
223
 
    return not postgresql_is_running()
 
233
    '''Shutdown PostgreSQL.'''
 
234
    success = host.service_stop('postgresql')
 
235
    return not (success and postgresql_is_running())
224
236
 
225
237
 
226
238
def postgresql_start():
227
 
    host.service_start('postgresql')
228
 
    return postgresql_is_running()
 
239
    '''Start PostgreSQL if it is not already running.'''
 
240
    success = host.service_start('postgresql')
 
241
    return success and postgresql_is_running()
229
242
 
230
243
 
231
244
def postgresql_restart():
 
245
    '''Restart PostgreSQL, or start it if it is not already running.'''
232
246
    if postgresql_is_running():
233
 
        # If the database is in backup mode, we don't want to restart
234
 
        # PostgreSQL and abort the procedure. This may be another unit being
235
 
        # cloned, or a filesystem level backup is being made. There is no
236
 
        # timeout here, as backups can take hours or days. Instead, keep
237
 
        # logging so admins know wtf is going on.
238
 
        last_warning = time.time()
239
 
        while postgresql_is_in_backup_mode():
240
 
            if time.time() + 120 > last_warning:
241
 
                log("In backup mode. PostgreSQL restart blocked.", WARNING)
242
 
                log(
243
 
                    "Run \"psql -U postgres -c 'SELECT pg_stop_backup()'\""
244
 
                    "to cancel backup mode and forcefully unblock this hook.")
245
 
                last_warning = time.time()
246
 
            time.sleep(5)
247
 
 
248
 
        return host.service_restart('postgresql')
 
247
        with restart_lock(hookenv.local_unit(), True):
 
248
            # 'service postgresql restart' fails; it only does a reload.
 
249
            # success = host.service_restart('postgresql')
 
250
            try:
 
251
                run('pg_ctlcluster -force {version} {cluster_name} '
 
252
                    'restart'.format(**config_data))
 
253
                success = True
 
254
            except subprocess.CalledProcessError as e:
 
255
                success = False
249
256
    else:
250
 
        return host.service_start('postgresql')
 
257
        success = host.service_start('postgresql')
251
258
 
252
259
    # Store a copy of our known live configuration so
253
260
    # postgresql_reload_or_restart() can make good choices.
254
 
    if 'saved_config' in local_state:
 
261
    if success and 'saved_config' in local_state:
255
262
        local_state['live_config'] = local_state['saved_config']
256
263
        local_state.save()
257
264
 
258
 
    return postgresql_is_running()
 
265
    return success and postgresql_is_running()
259
266
 
260
267
 
261
268
def postgresql_reload():
 
269
    '''Make PostgreSQL reload its configuration.'''
262
270
    # reload returns a reliable exit status
263
271
    status, output = commands.getstatusoutput("invoke-rc.d postgresql reload")
264
272
    return (status == 0)
265
273
 
266
274
 
267
 
def postgresql_reload_or_restart():
268
 
    """Reload PostgreSQL configuration, restarting if necessary."""
269
 
    # Pull in current values of settings that can only be changed on
270
 
    # server restart.
 
275
def requires_restart():
 
276
    '''Check for configuration changes requiring a restart to take effect.'''
271
277
    if not postgresql_is_running():
272
 
        return postgresql_restart()
 
278
        return True
273
279
 
274
 
    # Suck in the config last written to postgresql.conf.
275
280
    saved_config = local_state.get('saved_config', None)
276
281
    if not saved_config:
277
282
        # No record of postgresql.conf state, perhaps an upgrade.
278
283
        # Better restart.
279
 
        return postgresql_restart()
 
284
        return True
280
285
 
281
 
    # Suck in our live config from last time we restarted.
282
286
    live_config = local_state.setdefault('live_config', {})
283
287
 
284
288
    # Pull in a list of PostgreSQL settings.
285
289
    cur = db_cursor()
286
290
    cur.execute("SELECT name, context FROM pg_settings")
287
 
    requires_restart = False
 
291
    restart = False
288
292
    for name, context in cur.fetchall():
289
293
        live_value = live_config.get(name, None)
290
294
        new_value = saved_config.get(name, None)
296
300
            if context == 'postmaster':
297
301
                # A setting has changed that requires PostgreSQL to be
298
302
                # restarted before it will take effect.
299
 
                requires_restart = True
300
 
 
301
 
    if requires_restart:
302
 
        # A change has been requested that requires a restart.
303
 
        log(
304
 
            "Configuration change requires PostgreSQL restart. Restarting.",
 
303
                restart = True
 
304
    return restart
 
305
 
 
306
 
 
307
def postgresql_reload_or_restart():
 
308
    """Reload PostgreSQL configuration, restarting if necessary."""
 
309
    if requires_restart():
 
310
        log("Configuration change requires PostgreSQL restart. Restarting.",
305
311
            WARNING)
306
 
        rc = postgresql_restart()
 
312
        success = postgresql_restart()
 
313
        if not success or requires_restart():
 
314
            log("Configuration changes failed to apply", WARNING)
 
315
            success = False
307
316
    else:
308
 
        log("PostgreSQL reload, config changes taking effect.", DEBUG)
309
 
        rc = postgresql_reload()  # No pending need to bounce, just reload.
 
317
        success = host.service_reload('postgresql')
310
318
 
311
 
    if rc == 0 and 'saved_config' in local_state:
312
 
        local_state['live_config'] = local_state['saved_config']
 
319
    if success:
 
320
        local_state['saved_config'] = local_state['live_config']
313
321
        local_state.save()
314
322
 
315
 
    return rc
 
323
    return success
316
324
 
317
325
 
318
326
def get_service_port(postgresql_config):
344
352
            config_data["shared_buffers"] = \
345
353
                "%sMB" % (int(int(total_ram) * 0.15),)
346
354
        # XXX: This is very messy - should probably be a subordinate charm
347
 
        # file overlaps with __builtin__.file ... renaming to conf_file
348
 
        # negronjl
349
355
        conf_file = open("/etc/sysctl.d/50-postgresql.conf", "w")
350
356
        conf_file.write("kernel.sem = 250 32000 100 1024\n")
351
357
        conf_file.write("kernel.shmall = %s\n" %
579
585
 
580
586
 
581
587
def db_cursor(autocommit=False, db='template1', user='postgres',
582
 
              host=None, timeout=120):
 
588
              host=None, timeout=30):
583
589
    import psycopg2
584
590
    if host:
585
591
        conn_str = "dbname={} host={} user={}".format(db, host, user)
855
861
 
856
862
@hooks.hook()
857
863
def start():
858
 
    if not postgresql_restart():
 
864
    if not postgresql_reload_or_restart():
859
865
        raise SystemExit(1)
860
866
 
861
867
 
862
868
@hooks.hook()
863
869
def stop():
864
 
    if not postgresql_stop():
865
 
        raise SystemExit(1)
 
870
    if postgresql_is_running():
 
871
        with restart_lock(hookenv.local_unit(), True):
 
872
            if not postgresql_stop():
 
873
                raise SystemExit(1)
866
874
 
867
875
 
868
876
def quote_identifier(identifier):
1163
1171
def db_relation_broken():
1164
1172
    from psycopg2.extensions import AsIs
1165
1173
 
1166
 
    relid = os.environ['JUJU_RELATION_ID']
 
1174
    relid = hookenv.relation_id()
1167
1175
    if relid not in local_state['relations']['db']:
1168
1176
        # This was to be a hot standby, but it had not yet got as far as
1169
1177
        # receiving and handling credentials from the master.
1174
1182
    # we used from there. Instead, we have to persist this information
1175
1183
    # ourselves.
1176
1184
    relation = local_state['relations']['db'][relid]
1177
 
    unit_relation_data = relation[os.environ['JUJU_UNIT_NAME']]
 
1185
    unit_relation_data = relation[hookenv.local_unit()]
1178
1186
 
1179
1187
    if local_state['state'] in ('master', 'standalone'):
1180
1188
        user = unit_relation_data.get('user', None)
1303
1311
        log("I am already the master", DEBUG)
1304
1312
        return hookenv.local_unit()
1305
1313
 
 
1314
    if local_state['state'] == 'hot standby':
 
1315
        log("I am already following {}".format(
 
1316
            local_state['following']), DEBUG)
 
1317
        return local_state['following']
 
1318
 
 
1319
    replication_relid = hookenv.relation_ids('replication')[0]
 
1320
    replication_units = hookenv.related_units(replication_relid)
 
1321
 
 
1322
    if local_state['state'] == 'standalone':
 
1323
        log("I'm a standalone unit wanting to participate in replication")
 
1324
        existing_replication = False
 
1325
        for unit in replication_units:
 
1326
            # If another peer thinks it is the master, believe it.
 
1327
            remote_state = hookenv.relation_get(
 
1328
                'state', unit, replication_relid)
 
1329
            if remote_state == 'master':
 
1330
                log("{} thinks it is the master, believing it".format(
 
1331
                    unit), DEBUG)
 
1332
                return unit
 
1333
 
 
1334
            # If we find a peer that isn't standalone, we know
 
1335
            # replication has already been setup at some point.
 
1336
            if remote_state != 'standalone':
 
1337
                existing_replication = True
 
1338
 
 
1339
        # If we are joining a peer relation where replication has
 
1340
        # already been setup, but there is currently no master, wait
 
1341
        # until one of the remaining participating units has been
 
1342
        # promoted to master. Only they have the data we need to
 
1343
        # preserve.
 
1344
        if existing_replication:
 
1345
            log("Peers participating in replication need to elect a master",
 
1346
                DEBUG)
 
1347
            return None
 
1348
 
 
1349
        # There are no peers claiming to be master, and there is no
 
1350
        # election in progress, so lowest numbered unit wins.
 
1351
        units = replication_units + [hookenv.local_unit()]
 
1352
        master = unit_sorted(units)[0]
 
1353
        if master == hookenv.local_unit():
 
1354
            log("I'm Master - lowest numbered unit in new peer group")
 
1355
            return master
 
1356
        else:
 
1357
            log("Waiting on {} to declare itself Master".format(master), DEBUG)
 
1358
            return None
 
1359
 
1306
1360
    if local_state['state'] == 'failover':
1307
1361
        former_master = local_state['following']
1308
1362
        log("Failover from {}".format(former_master))
1309
1363
 
1310
1364
        units_not_in_failover = set()
1311
 
        for relid in hookenv.relation_ids('replication'):
1312
 
            for unit in hookenv.related_units(relid):
1313
 
                if unit == former_master:
1314
 
                    log("Found dying master {}".format(unit), DEBUG)
1315
 
                    continue
1316
 
 
1317
 
                relation = hookenv.relation_get(unit=unit, rid=relid)
1318
 
 
1319
 
                if relation['state'] == 'master':
1320
 
                    log(
1321
 
                        "{} says it already won the election".format(unit),
1322
 
                        INFO)
1323
 
                    return unit
1324
 
 
1325
 
                if relation['state'] != 'failover':
1326
 
                    units_not_in_failover.add(unit)
 
1365
        candidates = set()
 
1366
        for unit in replication_units:
 
1367
            if unit == former_master:
 
1368
                log("Found dying master {}".format(unit), DEBUG)
 
1369
                continue
 
1370
 
 
1371
            relation = hookenv.relation_get(unit=unit, rid=replication_relid)
 
1372
 
 
1373
            if relation['state'] == 'master':
 
1374
                log("{} says it already won the election".format(unit),
 
1375
                    INFO)
 
1376
                return unit
 
1377
 
 
1378
            if relation['state'] == 'failover':
 
1379
                candidates.add(unit)
 
1380
 
 
1381
            elif relation['state'] != 'standalone':
 
1382
                units_not_in_failover.add(unit)
1327
1383
 
1328
1384
        if units_not_in_failover:
1329
1385
            log("{} unaware of impending election. Deferring result.".format(
1333
1389
        log("Election in progress")
1334
1390
        winner = None
1335
1391
        winning_offset = -1
1336
 
        for relid in hookenv.relation_ids('replication'):
1337
 
            candidates = set(hookenv.related_units(relid))
1338
 
            candidates.add(hookenv.local_unit())
1339
 
            candidates.discard(former_master)
1340
 
            # Sort the unit lists so we get consistent results in a tie
1341
 
            # and lowest unit number wins.
1342
 
            for unit in unit_sorted(candidates):
1343
 
                relation = hookenv.relation_get(unit=unit, rid=relid)
1344
 
                if int(relation['wal_received_offset']) > winning_offset:
1345
 
                    winner = unit
1346
 
                    winning_offset = int(relation['wal_received_offset'])
 
1392
        candidates.add(hookenv.local_unit())
 
1393
        # Sort the unit lists so we get consistent results in a tie
 
1394
        # and lowest unit number wins.
 
1395
        for unit in unit_sorted(candidates):
 
1396
            relation = hookenv.relation_get(unit=unit, rid=replication_relid)
 
1397
            if int(relation['wal_received_offset']) > winning_offset:
 
1398
                winner = unit
 
1399
                winning_offset = int(relation['wal_received_offset'])
1347
1400
 
1348
1401
        # All remaining hot standbys are in failover mode and have
1349
1402
        # reported their wal_received_offset. We can declare victory.
1350
 
        log("{} won the election as is the new master".format(winner))
1351
 
        return winner
1352
 
 
1353
 
    # Maybe another peer thinks it is the master?
1354
 
    for relid in hookenv.relation_ids('replication'):
1355
 
        for unit in hookenv.related_units(relid):
1356
 
            if hookenv.relation_get('state', unit, relid) == 'master':
1357
 
                return unit
1358
 
 
1359
 
    # New peer group. Lowest numbered unit will be the master.
1360
 
    for relid in hookenv.relation_ids('replication'):
1361
 
        units = hookenv.related_units(relid) + [hookenv.local_unit()]
1362
 
        master = unit_sorted(units)[0]
1363
 
        log("New peer group. {} is elected master".format(master))
1364
 
        return master
 
1403
        if winner == hookenv.local_unit():
 
1404
            log("I won the election, announcing myself winner")
 
1405
            return winner
 
1406
        else:
 
1407
            log("Waiting for {} to announce its victory".format(winner),
 
1408
                DEBUG)
 
1409
            return None
1365
1410
 
1366
1411
 
1367
1412
@hooks.hook('replication-relation-joined', 'replication-relation-changed')
1419
1464
            log("Fresh unit. I will clone {} and become a hot standby".format(
1420
1465
                master))
1421
1466
 
1422
 
            # Before we start destroying anything, ensure that the
1423
 
            # master is contactable.
1424
1467
            master_ip = hookenv.relation_get('private-address', master)
1425
 
            wait_for_db(db='postgres', user='juju_replication', host=master_ip)
1426
1468
 
1427
1469
            clone_database(master, master_ip)
1428
1470
 
1592
1634
        os.chdir(org_dir)
1593
1635
 
1594
1636
 
 
1637
@contextmanager
 
1638
def restart_lock(unit, exclusive):
 
1639
    '''Aquire the database restart lock on the given unit.
 
1640
 
 
1641
    A database needing a restart should grab an exclusive lock before
 
1642
    doing so. To block a remote database from doing a restart, grab a shared
 
1643
    lock.
 
1644
    '''
 
1645
    import psycopg2
 
1646
    key = long(config_data['advisory_lock_restart_key'])
 
1647
    if exclusive:
 
1648
        lock_function = 'pg_advisory_lock'
 
1649
    else:
 
1650
        lock_function = 'pg_advisory_lock_shared'
 
1651
    q = 'SELECT {}({})'.format(lock_function, key)
 
1652
 
 
1653
    # We will get an exception if the database is rebooted while waiting
 
1654
    # for a shared lock. If the connection is killed, we retry a few
 
1655
    # times to cope.
 
1656
    num_retries = 3
 
1657
 
 
1658
    for count in range(0, num_retries):
 
1659
        try:
 
1660
            if unit == hookenv.local_unit():
 
1661
                cur = db_cursor(autocommit=True)
 
1662
            else:
 
1663
                host = hookenv.relation_get('private-address', unit)
 
1664
                cur = db_cursor(
 
1665
                    autocommit=True, db='postgres',
 
1666
                    user='juju_replication', host=host)
 
1667
            cur.execute(q)
 
1668
            break
 
1669
        except psycopg2.Error:
 
1670
            if count == num_retries - 1:
 
1671
                raise
 
1672
 
 
1673
    try:
 
1674
        yield
 
1675
    finally:
 
1676
        # Close our connection, swallowing any exceptions as the database
 
1677
        # may be being rebooted now we have released our lock.
 
1678
        try:
 
1679
            del cur
 
1680
        except psycopg2.Error:
 
1681
            pass
 
1682
 
 
1683
 
1595
1684
def clone_database(master_unit, master_host):
1596
 
    with pgpass():
 
1685
    with restart_lock(master_unit, False):
1597
1686
        postgresql_stop()
1598
1687
        log("Cloning master {}".format(master_unit))
1599
1688
 
1607
1696
            shutil.rmtree(postgresql_cluster_dir)
1608
1697
 
1609
1698
        try:
1610
 
            # Change directory the postgres user can read.
1611
 
            with switch_cwd('/tmp'):
1612
 
                # Run the sudo command.
 
1699
            # Change directory the postgres user can read, and need
 
1700
            # .pgpass too.
 
1701
            with switch_cwd('/tmp'), pgpass():
 
1702
                # Clone the master with pg_basebackup.
1613
1703
                output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
1614
1704
            log(output, DEBUG)
1615
1705
            # Debian by default expects SSL certificates in the datadir.
1626
1716
            # can retry hooks again. Even assuming the charm is
1627
1717
            # functioning correctly, the clone may still fail
1628
1718
            # due to eg. lack of disk space.
1629
 
            log("Clone failed, db cluster destroyed", ERROR)
1630
1719
            log(x.output, ERROR)
 
1720
            log("Clone failed, local db destroyed", ERROR)
1631
1721
            if os.path.exists(postgresql_cluster_dir):
1632
1722
                shutil.rmtree(postgresql_cluster_dir)
1633
1723
            if os.path.exists(postgresql_config_dir):
1652
1742
        os.path.join(postgresql_cluster_dir, 'backup_label'))
1653
1743
 
1654
1744
 
 
1745
def pg_basebackup_is_running():
 
1746
    cur = db_cursor(autocommit=True)
 
1747
    cur.execute("""
 
1748
        SELECT count(*) FROM pg_stat_activity
 
1749
        WHERE usename='juju_replication' AND application_name='pg_basebackup'
 
1750
        """)
 
1751
    return cur.fetchone()[0] > 0
 
1752
 
 
1753
 
1655
1754
def postgresql_wal_received_offset():
1656
1755
    """How much WAL we have.
1657
1756
 
1694
1793
    try:
1695
1794
        nagios_uid = getpwnam('nagios').pw_uid
1696
1795
        nagios_gid = getgrnam('nagios').gr_gid
1697
 
    except:
 
1796
    except Exception:
1698
1797
        hookenv.log("Nagios user not set up.", hookenv.DEBUG)
1699
1798
        return
1700
1799