214.1.1
by adam
acoustid replication sync script |
1 |
#!/usr/bin/env python
|
2 |
# -*- coding: utf-8 -*-
|
|
3 |
from collections import OrderedDict |
|
4 |
import os |
|
5 |
||
6 |
import urllib2 |
|
7 |
import logging |
|
8 |
import tempfile |
|
9 |
import shutil |
|
10 |
import subprocess |
|
11 |
import psycopg2.extensions |
|
12 |
||
13 |
from contextlib import closing |
|
14 |
from bz2 import BZ2File |
|
15 |
from xml.sax._exceptions import SAXParseException |
|
16 |
from xml.sax.xmlreader import InputSource |
|
17 |
||
18 |
from acoustid.xml.digester import Digester |
|
19 |
from acoustid.script import run_script |
|
20 |
||
21 |
||
22 |
logger = logging.getLogger(__name__) |
|
23 |
||
24 |
class DataImporter(Digester): |
|
25 |
def __init__(self, ictx, file): |
|
26 |
Digester.__init__(self) |
|
27 |
self._ictx = ictx |
|
28 |
self._file = file |
|
29 |
self._input = InputSource(file.name) |
|
30 |
self._input.setByteStream(BZ2File(file.name, 'r')) |
|
31 |
self._conn = ictx['conn'].connection |
|
32 |
self._cursor = self._conn.cursor() |
|
33 |
self.success = self._closed = False |
|
34 |
self._add_rules() |
|
35 |
||
36 |
def _add_rules(self): |
|
37 |
self.addOnBegin('packet', self._check_packet) |
|
38 |
self.addOnBeginAndEnd('packet/transaction/event', self._on_event, self._on_event_end) |
|
39 |
self.addOnBody('packet/transaction/event/keys/column', self._on_key_column) |
|
40 |
self.addOnBody('packet/transaction/event/values/column', self._on_value_column) |
|
41 |
self.addOnFinish(self._on_finish) |
|
42 |
||
43 |
def _check_packet(self, tag, attrs): |
|
44 |
if self._ictx['schema_seq'] != int(attrs.getValue('schema_seq')): |
|
45 |
raise Exception('<packet> schema_seq: {0} not matched the expected seq number {1}', |
|
46 |
attrs.getValue('schema_seq'), self._ictx['replication_seq']) |
|
47 |
||
48 |
if self._ictx['replication_seq'] != int(attrs.getValue('replication_seq')): |
|
49 |
raise Exception('<packet> replication_seq: {0} not matched the expected seq number {1}', |
|
50 |
attrs.getValue('replication_seq'), self._ictx['replication_seq']) |
|
51 |
||
52 |
def _on_key_column(self, tag, attrs, val): |
|
53 |
event = self.peek() |
|
54 |
event['keys'][attrs.getValue('name')] = val |
|
55 |
||
56 |
def _on_value_column(self, tag, attrs, val): |
|
57 |
event = self.peek() |
|
58 |
isNull = attrs.getValue("null") if attrs.has_key('null') else None |
|
59 |
event['values'][attrs.getValue('name')] = val if isNull != "yes" else None |
|
60 |
||
61 |
def _on_event(self, tag, attrs): |
|
62 |
event = { |
|
63 |
'op': attrs.getValue('op'), |
|
64 |
'table': attrs.getValue('table'), |
|
65 |
'keys': OrderedDict(), #array of tuples column name -> column val |
|
66 |
'values': OrderedDict() #array of tuples column name -> column val |
|
67 |
}
|
|
68 |
self.push(event) |
|
69 |
||
70 |
def _on_event_end(self, tag): |
|
71 |
event = self.pop() |
|
72 |
type = event['op'] |
|
73 |
table = event['table'] |
|
74 |
keys = event['keys'] |
|
75 |
values = event['values'] |
|
76 |
params = [] |
|
77 |
if type == 'I': |
|
78 |
sql_columns = ', '.join(values.keys()) |
|
79 |
sql_values = ', '.join(['%s'] * len(values)) |
|
80 |
sql = 'INSERT INTO %s (%s) VALUES (%s)' % (table, sql_columns, sql_values) |
|
81 |
params = values.values() |
|
82 |
elif type == 'U': |
|
83 |
sql_values = ', '.join('%s=%%s' % i for i in values) |
|
84 |
sql = 'UPDATE %s SET %s' % (table, sql_values) |
|
85 |
params = values.values() |
|
86 |
elif type == 'D': |
|
87 |
sql = 'DELETE FROM %s' % table |
|
88 |
else: |
|
89 |
raise Exception('Invalid <event> op: %s' % type) |
|
90 |
||
91 |
if type == 'D' or type == 'U': |
|
92 |
sql += ' WHERE ' + ' AND '.join('%s%s%%s' % (i, ' IS ' if keys[i] is None else '=') for i in keys.keys()) |
|
93 |
params.extend(keys.values()) |
|
94 |
||
95 |
#print '%s %s' % (sql, params)
|
|
96 |
self._cursor.execute(sql, params) |
|
97 |
||
98 |
def _on_finish(self): |
|
99 |
pass
|
|
100 |
||
101 |
def load(self): |
|
102 |
logger.warning('Saving dataset....') |
|
103 |
self.parse(self._input) |
|
104 |
self.success = True |
|
105 |
||
106 |
def recover(self): |
|
107 |
""" This is duty hack to remove weird characters presented in some replications files.
|
|
108 |
Using the tidy tool.
|
|
109 |
"""
|
|
110 |
logger.warning('Trying to recover invalid XML...') |
|
111 |
originalXML = None |
|
112 |
fixedXML = None |
|
113 |
try: |
|
114 |
originalXML = tempfile.NamedTemporaryFile(suffix='.xml', delete=False) #bunzipped tmp |
|
115 |
fixedXML = tempfile.NamedTemporaryFile(suffix='.xml', delete=False) #fixed tmp |
|
116 |
fixedXML.close() |
|
117 |
||
118 |
#Fetch uncompressed file data to recover
|
|
119 |
bzf = self._input.getByteStream() |
|
120 |
bzf.seek(0) |
|
121 |
shutil.copyfileobj(bzf, originalXML) |
|
122 |
originalXML.close() |
|
123 |
||
124 |
cmd = ['tidy', '-xml', '-o', fixedXML.name, originalXML.name] |
|
125 |
logger.warning('Running: %s', ' '.join(cmd)) |
|
126 |
ret = subprocess.call(cmd) |
|
127 |
if ret: |
|
128 |
#raise Exception('Failed to fix XML data, ret=%s' % ret)
|
|
129 |
pass
|
|
130 |
||
131 |
#ready to load
|
|
132 |
self.close() |
|
133 |
self._file = file(fixedXML.name, 'r') |
|
134 |
self._input = InputSource(fixedXML.name) |
|
135 |
self._input.setByteStream(self._file) |
|
136 |
self._cursor = self._conn.cursor() |
|
137 |
self.success = self._closed = False |
|
138 |
self.reset() |
|
139 |
self._add_rules() |
|
140 |
self.load() |
|
141 |
finally: |
|
142 |
for f in [originalXML, fixedXML]: |
|
143 |
if f and not f.closed: |
|
144 |
f.close() |
|
145 |
if f and os.path.exists(f.name): |
|
146 |
os.unlink(f.name) |
|
147 |
||
148 |
||
149 |
def close(self): |
|
150 |
if self._closed: |
|
151 |
return
|
|
152 |
try: |
|
153 |
if self.success: |
|
154 |
self._conn.commit() |
|
155 |
logger.warning('Done') |
|
156 |
else: |
|
157 |
logger.warning('Rolling back transaction. Seq number: {0}'.format(self._ictx['replication_seq'])) |
|
158 |
self._conn.rollback() |
|
159 |
self._cursor.close() |
|
160 |
finally: |
|
161 |
self._closed = True |
|
214.1.2
by adam
minors |
162 |
self._input.getByteStream().close() |
214.1.1
by adam
acoustid replication sync script |
163 |
self._file.close() |
164 |
||
165 |
||
166 |
def download_arch(url): |
|
167 |
logger.warning('Downloading: %s', url) |
|
168 |
try: |
|
169 |
data = urllib2.urlopen(url) |
|
170 |
except urllib2.HTTPError, e: |
|
171 |
if e.code == 404: |
|
172 |
logger.warning('Resource %s not found', url) |
|
173 |
return None |
|
174 |
raise
|
|
175 |
except urllib2.URLError, e: |
|
176 |
if '[Errno 2]' in str(e.reason): |
|
177 |
logger.warning('Resource %s not found', url) |
|
178 |
return None |
|
179 |
raise
|
|
180 |
tmp = tempfile.NamedTemporaryFile(suffix='.xml.bz2') |
|
181 |
shutil.copyfileobj(data, tmp) |
|
182 |
data.close() |
|
183 |
tmp.seek(0) |
|
184 |
logger.debug('Stored in %s', tmp.name) |
|
185 |
return tmp |
|
186 |
||
187 |
||
188 |
def sync(script, ictx): |
|
189 |
if script.config.replication.import_acoustid is None: |
|
190 |
err = 'Missing required \'import_acoustid\' configuration parameter in [replication]' |
|
191 |
logger.error(err) |
|
192 |
exit(1) |
|
193 |
||
194 |
rseq = ictx['replication_seq'] |
|
195 |
while True: |
|
196 |
rseq += 1 |
|
197 |
ictx['replication_seq'] = rseq |
|
198 |
url = script.config.replication.import_acoustid.format(seq=rseq) |
|
199 |
arch = download_arch(url) |
|
200 |
if arch is None: |
|
201 |
logger.warning('Stopped on seq %d', rseq) |
|
202 |
break
|
|
203 |
di = DataImporter(ictx, arch) |
|
204 |
with closing(di): |
|
205 |
try: |
|
206 |
di.load() |
|
207 |
except SAXParseException, spe: |
|
208 |
logger.error('XML data parsing error. %s URL: %s', spe, url) |
|
209 |
di.recover() |
|
210 |
||
211 |
||
212 |
def main(script, opts, args): |
|
213 |
conn = script.engine.connect() |
|
214 |
conn.detach() |
|
215 |
with closing(conn): |
|
216 |
conn.connection.rollback() |
|
217 |
conn.connection.set_session(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) |
|
218 |
cursor = conn.connection.cursor() |
|
219 |
with closing(cursor): |
|
220 |
cursor.execute('''SELECT current_schema_sequence, |
|
221 |
current_replication_sequence FROM replication_control''') |
|
222 |
schema_seq, replication_seq = cursor.fetchone() |
|
223 |
conn.connection.commit() |
|
224 |
||
225 |
ictx = { |
|
226 |
'schema_seq': schema_seq, |
|
227 |
'replication_seq': replication_seq, |
|
228 |
'script': script, |
|
229 |
'conn': conn, |
|
230 |
}
|
|
231 |
sync(script, ictx) |
|
232 |
||
233 |
||
234 |
if __name__ == '__main__': |
|
235 |
run_script(main) |
|
236 |