~luks/acoustid-server/server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python

# Copyright (C) 2011 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.

import os
import logging
import skytools
import xml.etree.cElementTree as etree
import psycopg2.extensions
from contextlib import closing
from acoustid.script import run_script
from acoustid.data.track import merge_missing_mbids

logger = logging.getLogger(__name__)


CORE_TABLES = [
    ("fingerprint", None),
    ("format", None),
    ("meta", None),
    ("replication_control", None),
    ("stats", None),
    ("track", None),
    ("track_mbid", None),
    ("track_meta", None),
    ("track_puid", None),
]

PRIVATE_TABLES = [
    #("account", "SELECT id, 'account' || id::text, 'apikey' || id::text, '', anonymous, created, lastlogin, submission_count FROM account"),
    #("account_stats_control", None),
    #("application", "SELECT id, 'app' || id::text, '', 'apikey' || id::text, created, active, account_id FROM application"),
    ("fingerprint_source", None),
    ("source", None),
    ("track_mbid_change", None),
    ("track_mbid_source", None),
    ("track_meta_source", None),
    ("track_puid_source", None),
]

MUSICBRAINZ_TABLES = [
    ("acoustid_mb_replication_control", None),
    ("recording_acoustid", None),
]


def export_tables(cursor, name, tables, data_dir):
    base_path = os.path.join(data_dir, name)
    os.mkdir(base_path)
    for table, sql in tables:
        path = os.path.join(base_path, table)
        logger.info("Exporting %s to %s", table, path)
        with open(path, 'w') as fileobj:
            if sql is None:
                copy_sql = "COPY %s TO STDOUT" % table
            else:
                copy_sql = "COPY (%s) TO STDOUT" % sql
            cursor.copy_expert(copy_sql, fileobj)


def dump_colums(root, root_name, columns):
    if columns:
        node = etree.SubElement(root, root_name)
        for name, value in columns.iteritems():
            column_node = etree.SubElement(node, 'column')
            column_node.attrib['name'] = name
            if value is None:
                column_node.attrib['null'] = 'yes'
            else:
                column_node.text = value.decode('UTF-8')


def create_musicbrainz_replication_packet(cursor, data_dir):
    cursor.execute("""
        UPDATE acoustid_mb_replication_control
        SET current_replication_sequence = current_replication_sequence + 1,
            last_replication_date = now()
        RETURNING current_schema_sequence, current_replication_sequence""")
    schema_seq, replication_seq = cursor.fetchone()
    cursor.execute("""
        SELECT * FROM mirror_queue
        WHERE tblname IN ('recording_acoustid', 'acoustid_mb_replication_control')
        ORDER BY txid, id""")
    packet_node = etree.Element('packet')
    packet_node.attrib['schema_seq'] = str(schema_seq)
    packet_node.attrib['replication_seq'] = str(replication_seq)
    transaction_node = None
    transaction_id = None
    for seqid, txid, table, operation, data in cursor:
        if transaction_id is None or transaction_id != txid:
            transaction_node = etree.SubElement(packet_node, 'transaction')
            transaction_node.attrib['id'] = str(txid)
            transaction_id = txid
        event_node = etree.SubElement(transaction_node, 'event')
        event_node.attrib['table'] = table
        event_node.attrib['op'] = operation
        event_node.attrib['id'] = str(seqid)
        keys, values = skytools.parse_logtriga_sql(operation, data.encode('UTF-8'), splitkeys=True)
	dump_colums(event_node, 'keys', keys)
	dump_colums(event_node, 'values', values)
    fp = open(os.path.join(data_dir, 'acoustid-musicbrainz-update-%d.xml' % replication_seq), 'w')
    fp.write(etree.tostring(packet_node, encoding="UTF-8"))
    fp.flush()
    os.fsync(fp.fileno())
    fp.close()


def create_replication_packet(cursor, data_dir):
    cursor.execute("""
        UPDATE replication_control
        SET current_replication_sequence = current_replication_sequence + 1,
            last_replication_date = now()
        RETURNING current_schema_sequence, current_replication_sequence""")
    schema_seq, replication_seq = cursor.fetchone()
    cursor.execute("""
        SELECT * FROM mirror_queue
        WHERE tblname NOT IN ('recording_acoustid', 'acoustid_mb_replication_control')
        ORDER BY txid, id""")
    packet_node = etree.Element('packet')
    packet_node.attrib['schema_seq'] = str(schema_seq)
    packet_node.attrib['replication_seq'] = str(replication_seq)
    transaction_node = None
    transaction_id = None
    for seqid, txid, table, operation, data in cursor:
        if transaction_id is None or transaction_id != txid:
            transaction_node = etree.SubElement(packet_node, 'transaction')
            transaction_node.attrib['id'] = str(txid)
            transaction_id = txid
        event_node = etree.SubElement(transaction_node, 'event')
        event_node.attrib['table'] = table
        event_node.attrib['op'] = operation
        event_node.attrib['id'] = str(seqid)
        keys, values = skytools.parse_logtriga_sql(operation, data.encode('UTF-8'), splitkeys=True)
	dump_colums(event_node, 'keys', keys)
	dump_colums(event_node, 'values', values)
    fp = open(os.path.join(data_dir, 'acoustid-update-%d.xml' % replication_seq), 'w')
    fp.write(etree.tostring(packet_node, encoding="UTF-8"))
    fp.flush()
    os.fsync(fp.fileno())
    fp.close()


def export_replication(cursor, data_dir):
    create_replication_packet(cursor, data_dir)
    create_musicbrainz_replication_packet(cursor, data_dir)
    cursor.execute("DELETE FROM mirror_queue")


def main(script, opts, args):
    conn = script.engine.connect()
    conn.detach()
    with closing(conn):
        conn.connection.rollback()
        conn.connection.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
        cursor = conn.connection.cursor()
        export_replication(cursor, opts.data_dir)
        if opts.full:
            export_tables(cursor, 'acoustid-dump', CORE_TABLES, opts.data_dir)
            export_tables(cursor, 'acoustid-musicbrainz-dump', MUSICBRAINZ_TABLES, opts.data_dir)
        conn.connection.commit()

 
def add_options(parser):
    parser.add_option("-d", "--dir", dest="data_dir", default="/tmp/acoustid-export", help="directory")
    parser.add_option("-f", "--full", dest="full", action="store_true",
        default=False, help="full export")


run_script(main, add_options)