~luks/acoustid-server/server

107.1.20 by Lukáš Lalinský
Updated script to export the database
1
#!/usr/bin/env python
2
3
# Copyright (C) 2011 Lukas Lalinsky
4
# Distributed under the MIT license, see the LICENSE file for details.
5
6
import os
7
import logging
181.2.5 by Lukáš Lalinský
Database exporting
8
import skytools
9
import xml.etree.cElementTree as etree
10
import psycopg2.extensions
11
from contextlib import closing
107.1.20 by Lukáš Lalinský
Updated script to export the database
12
from acoustid.script import run_script
13
from acoustid.data.track import merge_missing_mbids
14
15
logger = logging.getLogger(__name__)
16
17
181.2.5 by Lukáš Lalinský
Database exporting
18
CORE_TABLES = [
107.1.20 by Lukáš Lalinský
Updated script to export the database
19
    ("fingerprint", None),
20
    ("format", None),
21
    ("meta", None),
181.2.5 by Lukáš Lalinský
Database exporting
22
    ("replication_control", None),
107.1.20 by Lukáš Lalinský
Updated script to export the database
23
    ("stats", None),
181.2.5 by Lukáš Lalinský
Database exporting
24
    ("track", None),
107.1.38 by Lukáš Lalinský
Export the new tables
25
    ("track_mbid", None),
181.2.5 by Lukáš Lalinský
Database exporting
26
    ("track_meta", None),
27
    ("track_puid", None),
28
]
29
30
PRIVATE_TABLES = [
31
    #("account", "SELECT id, 'account' || id::text, 'apikey' || id::text, '', anonymous, created, lastlogin, submission_count FROM account"),
32
    #("account_stats_control", None),
33
    #("application", "SELECT id, 'app' || id::text, '', 'apikey' || id::text, created, active, account_id FROM application"),
34
    ("fingerprint_source", None),
35
    ("source", None),
36
    ("track_mbid_change", None),
107.1.38 by Lukáš Lalinský
Export the new tables
37
    ("track_mbid_source", None),
38
    ("track_meta_source", None),
39
    ("track_puid_source", None),
107.1.20 by Lukáš Lalinský
Updated script to export the database
40
]
41
181.2.5 by Lukáš Lalinský
Database exporting
42
MUSICBRAINZ_TABLES = [
43
    ("acoustid_mb_replication_control", None),
44
    ("recording_acoustid", None),
45
]
46
47
318 by Lukáš Lalinský
Data export changes
48
def export_tables(cursor, name, tables, data_dir):
49
    base_path = os.path.join(data_dir, name)
107.1.20 by Lukáš Lalinský
Updated script to export the database
50
    os.mkdir(base_path)
181.2.5 by Lukáš Lalinský
Database exporting
51
    for table, sql in tables:
107.1.20 by Lukáš Lalinský
Updated script to export the database
52
        path = os.path.join(base_path, table)
53
        logger.info("Exporting %s to %s", table, path)
54
        with open(path, 'w') as fileobj:
55
            if sql is None:
56
                copy_sql = "COPY %s TO STDOUT" % table
57
            else:
58
                copy_sql = "COPY (%s) TO STDOUT" % sql
59
            cursor.copy_expert(copy_sql, fileobj)
60
61
181.2.5 by Lukáš Lalinský
Database exporting
62
def dump_colums(root, root_name, columns):
63
    if columns:
64
        node = etree.SubElement(root, root_name)
65
        for name, value in columns.iteritems():
66
            column_node = etree.SubElement(node, 'column')
67
            column_node.attrib['name'] = name
68
            if value is None:
69
                column_node.attrib['null'] = 'yes'
70
            else:
71
                column_node.text = value.decode('UTF-8')
72
73
339 by Lukáš Lalinský
Revert "Restrict the size of replication packets"
74
def create_musicbrainz_replication_packet(cursor, data_dir):
181.2.5 by Lukáš Lalinský
Database exporting
75
    cursor.execute("""
76
        UPDATE acoustid_mb_replication_control
77
        SET current_replication_sequence = current_replication_sequence + 1,
78
            last_replication_date = now()
79
        RETURNING current_schema_sequence, current_replication_sequence""")
80
    schema_seq, replication_seq = cursor.fetchone()
81
    cursor.execute("""
82
        SELECT * FROM mirror_queue
83
        WHERE tblname IN ('recording_acoustid', 'acoustid_mb_replication_control')
339 by Lukáš Lalinský
Revert "Restrict the size of replication packets"
84
        ORDER BY txid, id""")
181.2.5 by Lukáš Lalinský
Database exporting
85
    packet_node = etree.Element('packet')
86
    packet_node.attrib['schema_seq'] = str(schema_seq)
87
    packet_node.attrib['replication_seq'] = str(replication_seq)
88
    transaction_node = None
89
    transaction_id = None
90
    for seqid, txid, table, operation, data in cursor:
91
        if transaction_id is None or transaction_id != txid:
92
            transaction_node = etree.SubElement(packet_node, 'transaction')
93
            transaction_node.attrib['id'] = str(txid)
94
            transaction_id = txid
95
        event_node = etree.SubElement(transaction_node, 'event')
96
        event_node.attrib['table'] = table
97
        event_node.attrib['op'] = operation
98
        event_node.attrib['id'] = str(seqid)
99
        keys, values = skytools.parse_logtriga_sql(operation, data.encode('UTF-8'), splitkeys=True)
100
	dump_colums(event_node, 'keys', keys)
101
	dump_colums(event_node, 'values', values)
318 by Lukáš Lalinský
Data export changes
102
    fp = open(os.path.join(data_dir, 'acoustid-musicbrainz-update-%d.xml' % replication_seq), 'w')
181.2.5 by Lukáš Lalinský
Database exporting
103
    fp.write(etree.tostring(packet_node, encoding="UTF-8"))
104
    fp.flush()
105
    os.fsync(fp.fileno())
106
    fp.close()
107
108
339 by Lukáš Lalinský
Revert "Restrict the size of replication packets"
109
def create_replication_packet(cursor, data_dir):
181.2.5 by Lukáš Lalinský
Database exporting
110
    cursor.execute("""
111
        UPDATE replication_control
112
        SET current_replication_sequence = current_replication_sequence + 1,
113
            last_replication_date = now()
114
        RETURNING current_schema_sequence, current_replication_sequence""")
115
    schema_seq, replication_seq = cursor.fetchone()
116
    cursor.execute("""
117
        SELECT * FROM mirror_queue
118
        WHERE tblname NOT IN ('recording_acoustid', 'acoustid_mb_replication_control')
339 by Lukáš Lalinský
Revert "Restrict the size of replication packets"
119
        ORDER BY txid, id""")
181.2.5 by Lukáš Lalinský
Database exporting
120
    packet_node = etree.Element('packet')
121
    packet_node.attrib['schema_seq'] = str(schema_seq)
122
    packet_node.attrib['replication_seq'] = str(replication_seq)
123
    transaction_node = None
124
    transaction_id = None
125
    for seqid, txid, table, operation, data in cursor:
126
        if transaction_id is None or transaction_id != txid:
127
            transaction_node = etree.SubElement(packet_node, 'transaction')
128
            transaction_node.attrib['id'] = str(txid)
129
            transaction_id = txid
130
        event_node = etree.SubElement(transaction_node, 'event')
131
        event_node.attrib['table'] = table
132
        event_node.attrib['op'] = operation
133
        event_node.attrib['id'] = str(seqid)
134
        keys, values = skytools.parse_logtriga_sql(operation, data.encode('UTF-8'), splitkeys=True)
135
	dump_colums(event_node, 'keys', keys)
136
	dump_colums(event_node, 'values', values)
318 by Lukáš Lalinský
Data export changes
137
    fp = open(os.path.join(data_dir, 'acoustid-update-%d.xml' % replication_seq), 'w')
181.2.5 by Lukáš Lalinský
Database exporting
138
    fp.write(etree.tostring(packet_node, encoding="UTF-8"))
139
    fp.flush()
140
    os.fsync(fp.fileno())
141
    fp.close()
142
143
318 by Lukáš Lalinský
Data export changes
144
def export_replication(cursor, data_dir):
339 by Lukáš Lalinský
Revert "Restrict the size of replication packets"
145
    create_replication_packet(cursor, data_dir)
146
    create_musicbrainz_replication_packet(cursor, data_dir)
147
    cursor.execute("DELETE FROM mirror_queue")
181.2.5 by Lukáš Lalinský
Database exporting
148
149
107.1.20 by Lukáš Lalinský
Updated script to export the database
150
def main(script, opts, args):
151
    conn = script.engine.connect()
181.2.5 by Lukáš Lalinský
Database exporting
152
    conn.detach()
153
    with closing(conn):
154
        conn.connection.rollback()
339 by Lukáš Lalinský
Revert "Restrict the size of replication packets"
155
        conn.connection.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
156
        cursor = conn.connection.cursor()
157
        export_replication(cursor, opts.data_dir)
158
        if opts.full:
159
            export_tables(cursor, 'acoustid-dump', CORE_TABLES, opts.data_dir)
160
            export_tables(cursor, 'acoustid-musicbrainz-dump', MUSICBRAINZ_TABLES, opts.data_dir)
161
        conn.connection.commit()
181.2.5 by Lukáš Lalinský
Database exporting
162
163
 
164
def add_options(parser):
318 by Lukáš Lalinský
Data export changes
165
    parser.add_option("-d", "--dir", dest="data_dir", default="/tmp/acoustid-export", help="directory")
181.2.5 by Lukáš Lalinský
Database exporting
166
    parser.add_option("-f", "--full", dest="full", action="store_true",
167
        default=False, help="full export")
168
169
170
run_script(main, add_options)
107.1.20 by Lukáš Lalinský
Updated script to export the database
171