~luks/acoustid-server/server

214.1.1 by adam
acoustid replication sync script
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
from collections import OrderedDict
4
import os
5
6
import urllib2
7
import logging
8
import tempfile
9
import shutil
10
import subprocess
11
import psycopg2.extensions
12
13
from contextlib import closing
14
from bz2 import BZ2File
15
from xml.sax._exceptions import SAXParseException
16
from xml.sax.xmlreader import InputSource
17
18
from acoustid.xml.digester import Digester
19
from acoustid.script import run_script
20
21
22
logger = logging.getLogger(__name__)
23
24
class DataImporter(Digester):
25
    def __init__(self, ictx, file):
26
        Digester.__init__(self)
27
        self._ictx = ictx
28
        self._file = file
29
        self._input = InputSource(file.name)
30
        self._input.setByteStream(BZ2File(file.name, 'r'))
31
        self._conn = ictx['conn'].connection
32
        self._cursor = self._conn.cursor()
33
        self.success = self._closed = False
34
        self._add_rules()
35
36
    def _add_rules(self):
37
        self.addOnBegin('packet', self._check_packet)
38
        self.addOnBeginAndEnd('packet/transaction/event', self._on_event, self._on_event_end)
39
        self.addOnBody('packet/transaction/event/keys/column', self._on_key_column)
40
        self.addOnBody('packet/transaction/event/values/column', self._on_value_column)
41
        self.addOnFinish(self._on_finish)
42
43
    def _check_packet(self, tag, attrs):
44
        if self._ictx['schema_seq'] != int(attrs.getValue('schema_seq')):
45
            raise Exception('<packet> schema_seq: {0} not matched the expected seq number {1}',
46
                            attrs.getValue('schema_seq'), self._ictx['replication_seq'])
47
48
        if self._ictx['replication_seq'] != int(attrs.getValue('replication_seq')):
49
            raise Exception('<packet> replication_seq: {0} not matched the expected seq number {1}',
50
                            attrs.getValue('replication_seq'), self._ictx['replication_seq'])
51
52
    def _on_key_column(self, tag, attrs, val):
53
        event = self.peek()
54
        event['keys'][attrs.getValue('name')] = val
55
56
    def _on_value_column(self, tag, attrs, val):
57
        event = self.peek()
58
        isNull = attrs.getValue("null") if attrs.has_key('null') else None
59
        event['values'][attrs.getValue('name')] = val if isNull != "yes" else None
60
61
    def _on_event(self, tag, attrs):
62
        event = {
63
            'op': attrs.getValue('op'),
64
            'table': attrs.getValue('table'),
65
            'keys': OrderedDict(), #array of tuples column name -> column val
66
            'values': OrderedDict() #array of tuples column name -> column val
67
        }
68
        self.push(event)
69
70
    def _on_event_end(self, tag):
71
        event = self.pop()
72
        type = event['op']
73
        table = event['table']
74
        keys = event['keys']
75
        values = event['values']
76
        params = []
77
        if type == 'I':
78
            sql_columns = ', '.join(values.keys())
79
            sql_values = ', '.join(['%s'] * len(values))
80
            sql = 'INSERT INTO %s (%s) VALUES (%s)' % (table, sql_columns, sql_values)
81
            params = values.values()
82
        elif type == 'U':
83
            sql_values = ', '.join('%s=%%s' % i for i in values)
84
            sql = 'UPDATE %s SET %s' % (table, sql_values)
85
            params = values.values()
86
        elif type == 'D':
87
            sql = 'DELETE FROM %s' % table
88
        else:
89
            raise Exception('Invalid <event> op: %s' % type)
90
91
        if type == 'D' or type == 'U':
92
            sql += ' WHERE ' + ' AND '.join('%s%s%%s' % (i, ' IS ' if keys[i] is None else '=') for i in keys.keys())
93
            params.extend(keys.values())
94
95
        #print '%s %s' % (sql, params)
96
        self._cursor.execute(sql, params)
97
98
    def _on_finish(self):
99
        pass
100
101
    def load(self):
102
        logger.warning('Saving dataset....')
103
        self.parse(self._input)
104
        self.success = True
105
106
    def recover(self):
107
        """ This is duty hack to remove weird characters presented in some replications files.
108
            Using the tidy tool.
109
        """
110
        logger.warning('Trying to recover invalid XML...')
111
        originalXML = None
112
        fixedXML = None
113
        try:
114
            originalXML = tempfile.NamedTemporaryFile(suffix='.xml', delete=False) #bunzipped tmp
115
            fixedXML = tempfile.NamedTemporaryFile(suffix='.xml', delete=False) #fixed tmp
116
            fixedXML.close()
117
118
            #Fetch uncompressed file data to recover
119
            bzf = self._input.getByteStream()
120
            bzf.seek(0)
121
            shutil.copyfileobj(bzf, originalXML)
122
            originalXML.close()
123
124
            cmd = ['tidy', '-xml', '-o', fixedXML.name, originalXML.name]
125
            logger.warning('Running: %s', ' '.join(cmd))
126
            ret = subprocess.call(cmd)
127
            if ret:
128
                #raise Exception('Failed to fix XML data, ret=%s' % ret)
129
                pass
130
131
            #ready to load
132
            self.close()
133
            self._file = file(fixedXML.name, 'r')
134
            self._input = InputSource(fixedXML.name)
135
            self._input.setByteStream(self._file)
136
            self._cursor = self._conn.cursor()
137
            self.success = self._closed = False
138
            self.reset()
139
            self._add_rules()
140
            self.load()
141
        finally:
142
            for f in [originalXML, fixedXML]:
143
                if f and not f.closed:
144
                    f.close()
145
                if f and os.path.exists(f.name):
146
                    os.unlink(f.name)
147
148
149
    def close(self):
150
        if self._closed:
151
            return
152
        try:
153
            if self.success:
154
                self._conn.commit()
155
                logger.warning('Done')
156
            else:
157
                logger.warning('Rolling back transaction. Seq number: {0}'.format(self._ictx['replication_seq']))
158
                self._conn.rollback()
159
            self._cursor.close()
160
        finally:
161
            self._closed = True
214.1.2 by adam
minors
162
            self._input.getByteStream().close()
214.1.1 by adam
acoustid replication sync script
163
            self._file.close()
164
165
166
def download_arch(url):
167
    logger.warning('Downloading: %s', url)
168
    try:
169
        data = urllib2.urlopen(url)
170
    except urllib2.HTTPError, e:
171
        if e.code == 404:
172
            logger.warning('Resource %s not found', url)
173
            return None
174
        raise
175
    except urllib2.URLError, e:
176
        if '[Errno 2]' in str(e.reason):
177
            logger.warning('Resource %s not found', url)
178
            return None
179
        raise
180
    tmp = tempfile.NamedTemporaryFile(suffix='.xml.bz2')
181
    shutil.copyfileobj(data, tmp)
182
    data.close()
183
    tmp.seek(0)
184
    logger.debug('Stored in %s', tmp.name)
185
    return tmp
186
187
188
def sync(script, ictx):
189
    if script.config.replication.import_acoustid is None:
190
        err = 'Missing required \'import_acoustid\' configuration parameter in [replication]'
191
        logger.error(err)
192
        exit(1)
193
194
    rseq = ictx['replication_seq']
195
    while True:
196
        rseq += 1
197
        ictx['replication_seq'] = rseq
198
        url = script.config.replication.import_acoustid.format(seq=rseq)
199
        arch = download_arch(url)
200
        if arch is None:
201
            logger.warning('Stopped on seq %d', rseq)
202
            break
203
        di = DataImporter(ictx, arch)
204
        with closing(di):
205
            try:
206
                di.load()
207
            except SAXParseException, spe:
208
                logger.error('XML data parsing error. %s URL: %s', spe, url)
209
                di.recover()
210
211
212
def main(script, opts, args):
213
    conn = script.engine.connect()
214
    conn.detach()
215
    with closing(conn):
216
        conn.connection.rollback()
217
        conn.connection.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
218
        cursor = conn.connection.cursor()
219
        with closing(cursor):
220
            cursor.execute('''SELECT current_schema_sequence,
221
                           current_replication_sequence FROM replication_control''')
222
            schema_seq, replication_seq = cursor.fetchone()
223
            conn.connection.commit()
224
225
        ictx = {
226
            'schema_seq': schema_seq,
227
            'replication_seq': replication_seq,
228
            'script': script,
229
            'conn': conn,
230
            }
231
        sync(script, ictx)
232
233
234
if __name__ == '__main__':
235
    run_script(main)
236