199
87
newbase = os.path.isfile(os.path.join(SRC_DIR, instance, 'base', 'base.tar.7z'))
201
log('Put %s base on top' % (instance, ))
202
89
self.add(instance, -1)
204
91
self.add(instance)
206
93
def stopped(delete=False):
207
stop_service = os.path.join(SRC_DIR, 'stop_service')
95
stop_service = os.path.join(SRC_DIR, 'stop_service_new')
208
96
if os.path.exists(stop_service):
210
98
os.remove(stop_service)
215
def process_directory(thread, queue):
216
if not os.path.isdir(DUMP_DIR):
217
os.makedirs(DUMP_DIR)
219
psql_port = 5432 + thread
223
nb, instance = queue.pop()
224
if not instance or instance == 'INIT':
225
log('thread %s, sleep' % (thread,))
227
if instance == 'INIT':
228
with open(TOUCH_FILE_LOOP, 'w') as t_file:
229
t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
104
def __init__(self, thread, queue):
108
self.logger = logging.getLogger()
109
self.logger.setLevel(logging.DEBUG)
110
formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
113
handler = logging.handlers.TimedRotatingFileHandler('%s-%s' % (thread, LOG_FILE), when='midnight')
115
handler = logging.StreamHandler()
116
handler.setLevel(logging.INFO)
117
handler.setFormatter(formatter)
118
self.logger.addHandler(handler)
120
def log(self, message):
121
self.logger.info(message)
123
def error(self, message):
124
self.logger.error(message)
126
def un7zip(self, src_file, dest_dir, delete=False):
127
if not os.path.isdir(dest_dir):
128
raise Exception('un7zip: dest %s not found' % (dest_dir))
129
command = ['/usr/bin/7z', 'e', src_file, '-y', '-bd', '-bb0', '-bso0', '-bsp0', '-o%s'%dest_dir]
131
command.append('-sdel')
132
self.log('Uncompress: %s ' % ' '.join(command))
133
subprocess.check_output(command, stderr=subprocess.STDOUT)
135
def upload_od(self, file_path, oc):
136
importlib.reload(config)
139
'host': 'msfintl-my.sharepoint.com',
142
'username': 'UniField.MSF@geneva.msf.org',
143
'password': config.password,
146
if oc not in config.path:
147
self.error('%s unknown oc %s' % (file_path, oc))
148
dav_data['path'] = config.path.get(oc, '/personal/unifield_msf_geneva_msf_org/documents/Test')
151
buffer_size = 10 * 1024 * 1014
152
file_name = os.path.basename(file_path)
153
temp_file_name = 'Temp/%s'%file_name
154
fileobj = open(file_path, 'rb')
155
self.log('Start upload %s to %s '% (file_path, dav_data['path']))
158
dav_connected = False
162
if not dav_connected:
164
dav = webdav.Client(**dav_data)
166
self.log('Dav connected')
171
dav.create_folder('Temp')
173
self.log('Except Temp')
174
if retries > max_retries:
183
upload_ok, dav_error = dav.upload(fileobj, temp_file_name, buffer_size=buffer_size)
186
self.log('Moving File')
188
dav.delete(file_name)
189
dav.move(temp_file_name, file_name)
191
self.log('Except move')
192
if retries > max_retries:
196
self.log('File %s uploaded' % (file_path,))
199
self.log('Dav 1 retry')
200
if retries > max_retries:
201
raise Exception(dav_error)
204
if dav_connected and 'timed out' in dav_error or '2130575252' in dav_error:
205
self.log('%s OneDrive: session time out' % (file_path,))
208
except (requests.exceptions.RequestException, webdav.ConnectionFailed):
209
self.log('Dav 2 retry')
210
if retries > max_retries:
218
raise Exception(dav_error)
220
raise Exception('Unknown error')
223
def process_directory(self):
224
if not os.path.isdir(DUMP_DIR):
225
os.makedirs(DUMP_DIR)
227
psql_port = 5432 + self.thread
233
self.queue.resfresh()
235
nb, instance = self.queue.pop()
236
if not instance or instance == 'INIT':
239
if instance == 'INIT':
240
with open(TOUCH_FILE_LOOP, 'w') as t_file:
241
t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
242
self.queue.add('INIT')
246
forced_instance = False
248
self.log('Forced instance %s' % (instance, ))
249
forced_instance = True
234
forced_instance = False
236
forced_instance = True
238
full_name = os.path.join(SRC_DIR, instance)
240
if os.path.isdir(full_name):
245
basebackup = os.path.join(full_name, 'base', 'base.tar.7z')
246
dest_dir = os.path.join(DEST_DIR, instance)
248
if not os.path.isdir(dest_dir) and not os.path.isfile(basebackup):
249
# new instance wait for base.tar
252
for dir_to_create in [os.path.join(dest_dir, 'OLDWAL')]:
253
if not os.path.isdir(dir_to_create):
254
log('Create %s'%dir_to_create)
255
os.makedirs(dir_to_create)
257
dest_basebackup = os.path.join(dest_dir, 'base')
258
pg_xlog = os.path.join(dest_basebackup, 'pg_xlog')
259
pg_wal = os.path.join(dest_basebackup, 'pg_wal')
260
oldwal = os.path.join(dest_dir, 'OLDWAL')
262
if os.path.isdir(dest_basebackup):
263
# copy postgres and recovery / migration of new WAL destination
264
for conf in ['recovery.conf', 'postgresql.conf']:
265
shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
267
# Copy / extract basbackup
268
basebackup_found = False
269
if os.path.isfile(basebackup):
270
log('%s Found base backup %s'% (instance, basebackup))
271
old_base_moved = False
256
full_name = os.path.join(SRC_DIR, instance)
258
if os.path.isdir(full_name):
259
basebackup = os.path.join(full_name, 'base', 'base.tar.7z')
260
dest_dir = os.path.join(DEST_DIR, instance)
262
if not os.path.isdir(dest_dir) and not os.path.isfile(basebackup):
263
# new instance wait for base.tar
266
for dir_to_create in [os.path.join(dest_dir, 'OLDWAL')]:
267
if not os.path.isdir(dir_to_create):
268
self.log('Create %s'%dir_to_create)
269
os.makedirs(dir_to_create)
271
dest_basebackup = os.path.join(dest_dir, 'base')
272
pg_xlog = os.path.join(dest_basebackup, 'pg_xlog')
273
pg_wal = os.path.join(dest_basebackup, 'pg_wal')
274
oldwal = os.path.join(dest_dir, 'OLDWAL')
272
276
if os.path.isdir(dest_basebackup):
273
# previous base found, rename it
274
old_base_moved = os.path.join(dest_dir,'base_%s' % (time.strftime('%Y-%m-%d%H%M')))
275
shutil.move(dest_basebackup, old_base_moved)
276
log('Move old base %s'%dest_basebackup)
278
new_base = os.path.join(dest_dir, 'base.tar.7z')
279
shutil.move(basebackup, new_base)
280
un7zip(new_base, dest_dir)
281
os.makedirs(dest_basebackup)
282
untar = ['tar', '-xf', os.path.join(dest_dir, 'base.tar'), '-C', dest_basebackup]
284
subprocess.check_output(untar, stderr=subprocess.STDOUT)
285
os.remove(os.path.join(dest_dir, 'base.tar'))
287
for conf in ['recovery.conf', 'postgresql.conf', 'pg_hba.conf']:
288
shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
290
for del_recreate in [pg_wal, pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
291
if os.path.isdir(del_recreate):
292
shutil.rmtree(del_recreate)
293
os.makedirs(del_recreate)
295
basebackup_found = True
297
# Move WAL (copy + del to set right owner on target)
298
if not os.path.exists(pg_xlog):
299
log('Unable to copy WAL, base directory not found %s' % pg_xlog)
301
if not os.path.exists(oldwal):
302
log('Unable to copy WAL, wal directory not found %s' % oldwal)
306
forced_dump = forced_instance
307
for wal in os.listdir(full_name):
308
full_path_wal = os.path.join(full_name, wal)
309
if wal.endswith('7z') and not wal.startswith('.'):
311
un7zip(full_path_wal, oldwal)
312
os.remove(full_path_wal)
314
except subprocess.CalledProcessError as e:
315
# try to extract all WAL: UC when new bb generated to unlock a dump
316
error(e.output or e.stderr)
318
logger.exception('ERROR')
320
elif wal == 'force_dump':
321
os.remove(full_path_wal)
324
wal_not_dumped = os.path.join(dest_dir, 'wal_not_dumped')
327
log('%s, %d wal moved to %s' % (full_name, wal_moved, oldwal))
329
log('%s, dump forced' % (full_name, ))
330
elif os.path.exists(wal_not_dumped):
331
last_wal_date = datetime.datetime.fromtimestamp(os.path.getmtime(wal_not_dumped))
332
if last_wal_date < datetime.datetime.now() - relativedelta(hours=36):
333
log('%s, wal_not_dumped forced' % (full_name, ))
335
if forced_dump or wal_moved or basebackup_found:
336
last_dump_file = os.path.join(dest_dir, 'last_dump.txt')
337
last_wal_date = False
338
if not forced_dump and not basebackup_found and os.path.exists(last_dump_file):
339
last_dump_date = datetime.datetime.fromtimestamp(os.path.getmtime(last_dump_file))
340
with open(last_dump_file) as last_desc:
341
last_wal_date = last_desc.read()
342
# only 1 dump per day
343
if last_dump_date.strftime('%Y-%m-%d') == time.strftime('%Y-%m-%d'):
344
log('%s already dumped today' % instance)
345
open(wal_not_dumped, 'w').close()
350
PSQL_DIR = config.psql9_dir
351
VERSION_FILE = os.path.join(dest_basebackup, 'PG_VERSION')
352
if os.patch.isfile(VERSION_FILE):
353
with open(VERSION_FILE, 'r') as ve:
355
if version.startswith('14'):
356
PSQL_DIR = config.psql14_dir
357
log('%s, pg_version: %s'% (instance, PSQL_DIR))
358
psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'),'-o', '-p %s'%psql_port, '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
359
log(' '.join(psql_start))
360
subprocess.run(psql_start, check=True)
361
#subprocess.check_output(psql_start)
363
db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg port=%s'%psql_port)
365
# wait end of wall processing
366
previous_wall = False
368
cr.execute("SELECT pg_last_xlog_replay_location()")
369
prev = cr.fetchone()[0]
370
if prev == previous_wall:
372
log('%s wait recovery, previous: %s, current: %s' % (instance, previous_wall, prev))
377
if not previous_wall:
378
error('%s no WAL replayed' % instance)
380
cr.execute("SELECT pg_last_xact_replay_timestamp()")
381
restore_date = cr.fetchone()[0]
383
cr.execute("select checkpoint_time from pg_control_checkpoint()")
384
check_point = cr.fetchone()
386
restore_date = check_point[0]
387
log('%s : get restore date from pg_control_checkpoint: %s' % (instance, restore_date.strftime('%Y%m%d-%H%M%S')))
390
error('%s no last replay timestamp' % instance)
391
label_file = os.path.join(dest_basebackup, 'backup_label.old')
392
if os.path.exists(label_file):
393
restore_date = datetime.datetime.fromtimestamp(os.path.getmtime(label_file))
396
restore_date = datetime.datetime.now()
398
cr.execute('SELECT datname FROM pg_database')
399
all_dbs = cr.fetchall()
402
if last_wal_date and last_wal_date == restore_date.strftime('%Y%m%d-%H%M%S'):
403
# same wal as previous dump: do noting (the in-progess wal is the 1st one)
404
log('%s : same data (%s) as previous backup (%s), do not dump/push to od' % (instance, last_wal_date, restore_date.strftime('%Y%m%d-%H%M%S')))
407
# dump and zip the dump
277
# copy postgres and recovery / migration of new WAL destination
278
for conf in ['recovery.conf', 'postgresql.conf']:
279
shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
281
# Copy / extract basbackup
282
basebackup_found = False
283
if os.path.isfile(basebackup):
284
self.log('%s Found base backup %s'% (instance, basebackup))
285
old_base_moved = False
286
if os.path.isdir(dest_basebackup):
287
# previous base found, rename it
288
old_base_moved = os.path.join(dest_dir,'base_%s' % (time.strftime('%Y-%m-%d%H%M')))
289
shutil.move(dest_basebackup, old_base_moved)
290
self.log('Move old base %s'%dest_basebackup)
292
new_base = os.path.join(dest_dir, 'base.tar.7z')
293
shutil.move(basebackup, new_base)
294
self.un7zip(new_base, dest_dir)
295
os.makedirs(dest_basebackup)
296
untar = ['tar', '-xf', os.path.join(dest_dir, 'base.tar'), '-C', dest_basebackup]
298
subprocess.check_output(untar, stderr=subprocess.STDOUT)
299
os.remove(os.path.join(dest_dir, 'base.tar'))
301
for conf in ['recovery.conf', 'postgresql.conf', 'pg_hba.conf']:
302
shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
304
for del_recreate in [pg_wal, pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
305
if os.path.isdir(del_recreate):
306
shutil.rmtree(del_recreate)
307
os.makedirs(del_recreate)
309
basebackup_found = True
311
# Move WAL (copy + del to set right owner on target)
312
if not os.path.exists(pg_xlog):
313
self.log('Unable to copy WAL, base directory not found %s' % pg_xlog)
315
if not os.path.exists(oldwal):
316
self.log('Unable to copy WAL, wal directory not found %s' % oldwal)
320
forced_dump = forced_instance
321
for wal in os.listdir(full_name):
322
full_path_wal = os.path.join(full_name, wal)
323
if wal.endswith('7z') and not wal.startswith('.'):
412
if x[0] not in ['template0', 'template1', 'postgres']:
413
log('%s db found %s'% (instance, x[0]))
414
db = psycopg2.connect('dbname=%s host=127.0.0.1 user=openpg port=%s' % (x[0], psql_port))
416
cr.execute('SELECT name FROM sync_client_version where date is not null order by date desc limit 1')
417
version = cr.fetchone()
419
error('%s: version not found' % instance)
423
cr.execute('SELECT oc FROM sync_client_entity')
424
oc = cr.fetchone()[0]
426
error('%s: OC not found' % instance)
430
dump_file = os.path.join(DUMP_DIR, '%s-%s-C-%s.dump' % (x[0], restore_date.strftime('%Y%m%d-%H%M%S'), version))
431
log('Dump %s' % dump_file)
432
pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-p', psql_port, '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000', '-f', to_win(dump_file), x[0]]
433
subprocess.check_output(pg_dump, stderr=subprocess.STDOUT)
435
final_zip = os.path.join(DUMP_DIR, '%s-%s.zip' % (x[0], day_abr[datetime.datetime.now().weekday()]))
436
if os.path.exists(final_zip):
439
zip_c = ['zip', '-j', '-q', final_zip, dump_file]
441
subprocess.call(zip_c)
443
upload_od(final_zip, oc)
444
with open(last_dump_file, 'w') as last_desc:
445
last_desc.write(restore_date.strftime('%Y%m%d-%H%M%S'))
446
if os.path.exists(wal_not_dumped):
447
os.remove(wal_not_dumped)
325
self.un7zip(full_path_wal, oldwal)
326
os.remove(full_path_wal)
448
328
except subprocess.CalledProcessError as e:
449
error(e.output or e.stderr)
329
# try to extract all WAL: UC when new bb generated to unlock a dump
330
self.error(e.output or e.stderr)
450
331
except Exception:
451
logger.exception('ERROR')
453
if dump_file and os.path.exists(dump_file):
455
if final_zip and os.path.exists(final_zip):
459
psql_stop = [os.path.join(PSQL_DIR, 'pg_ctl.exe'), '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'stop']
460
log(' '.join(psql_stop))
461
subprocess.run(psql_stop)
463
thread_touch = '%s-%s' % (TOUCH_FILE_DUMP, thread)
464
with open(thread_touch, 'w') as t_file:
465
t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
467
except subprocess.CalledProcessError as e:
468
error(e.output or e.stderr)
470
logger.exception('ERROR')
332
self.logger.exception('ERROR')
334
elif wal == 'force_dump':
335
os.remove(full_path_wal)
338
wal_not_dumped = os.path.join(dest_dir, 'wal_not_dumped')
341
self.log('%s, %d wal moved to %s' % (full_name, wal_moved, oldwal))
343
self.log('%s, dump forced' % (full_name, ))
344
elif os.path.exists(wal_not_dumped):
345
last_wal_date = datetime.datetime.fromtimestamp(os.path.getmtime(wal_not_dumped))
346
if last_wal_date < datetime.datetime.now() - relativedelta(hours=36):
347
self.log('%s, wal_not_dumped forced' % (full_name, ))
349
if forced_dump or wal_moved or basebackup_found:
350
last_dump_file = os.path.join(dest_dir, 'last_dump.txt')
351
last_wal_date = False
352
if not forced_dump and not basebackup_found and os.path.exists(last_dump_file):
353
last_dump_date = datetime.datetime.fromtimestamp(os.path.getmtime(last_dump_file))
354
with open(last_dump_file) as last_desc:
355
last_wal_date = last_desc.read()
356
# only 1 dump per day
357
if last_dump_date.strftime('%Y-%m-%d') == time.strftime('%Y-%m-%d'):
358
self.log('%s already dumped today' % instance)
359
open(wal_not_dumped, 'w').close()
364
PSQL_DIR = config.psql9_dir
365
VERSION_FILE = os.path.join(dest_basebackup, 'PG_VERSION')
366
if os.patch.isfile(VERSION_FILE):
367
with open(VERSION_FILE, 'r') as ve:
369
if version.startswith('14'):
370
PSQL_DIR = config.psql14_dir
371
self.log('%s, pg_version: %s'% (instance, PSQL_DIR))
372
psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'),'-o', '-p %s'%psql_port, '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
373
self.log(' '.join(psql_start))
374
subprocess.run(psql_start, check=True)
375
#subprocess.check_output(psql_start)
377
db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg port=%s'%psql_port)
379
# wait end of wall processing
380
previous_wall = False
382
cr.execute("SELECT pg_last_xlog_replay_location()")
383
prev = cr.fetchone()[0]
384
if prev == previous_wall:
386
self.log('%s wait recovery, previous: %s, current: %s' % (instance, previous_wall, prev))
391
if not previous_wall:
392
self.error('%s no WAL replayed' % instance)
394
cr.execute("SELECT pg_last_xact_replay_timestamp()")
395
restore_date = cr.fetchone()[0]
397
cr.execute("select checkpoint_time from pg_control_checkpoint()")
398
check_point = cr.fetchone()
400
restore_date = check_point[0]
401
self.log('%s : get restore date from pg_control_checkpoint: %s' % (instance, restore_date.strftime('%Y%m%d-%H%M%S')))
404
self.error('%s no last replay timestamp' % instance)
405
label_file = os.path.join(dest_basebackup, 'backup_label.old')
406
if os.path.exists(label_file):
407
restore_date = datetime.datetime.fromtimestamp(os.path.getmtime(label_file))
410
restore_date = datetime.datetime.now()
412
cr.execute('SELECT datname FROM pg_database')
413
all_dbs = cr.fetchall()
416
if last_wal_date and last_wal_date == restore_date.strftime('%Y%m%d-%H%M%S'):
417
# same wal as previous dump: do noting (the in-progess wal is the 1st one)
418
self.log('%s : same data (%s) as previous backup (%s), do not dump/push to od' % (instance, last_wal_date, restore_date.strftime('%Y%m%d-%H%M%S')))
421
# dump and zip the dump
426
if x[0] not in ['template0', 'template1', 'postgres']:
427
self.log('%s db found %s'% (instance, x[0]))
428
db = psycopg2.connect('dbname=%s host=127.0.0.1 user=openpg port=%s' % (x[0], psql_port))
430
cr.execute('SELECT name FROM sync_client_version where date is not null order by date desc limit 1')
431
version = cr.fetchone()
433
self.error('%s: version not found' % instance)
437
cr.execute('SELECT oc FROM sync_client_entity')
438
oc = cr.fetchone()[0]
440
self.error('%s: OC not found' % instance)
444
dump_file = os.path.join(DUMP_DIR, '%s-%s-C-%s.dump' % (x[0], restore_date.strftime('%Y%m%d-%H%M%S'), version))
445
self.log('Dump %s' % dump_file)
446
pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-p', psql_port, '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000', '-f', to_win(dump_file), x[0]]
447
subprocess.check_output(pg_dump, stderr=subprocess.STDOUT)
449
final_zip = os.path.join(DUMP_DIR, '%s-%s.zip' % (x[0], day_abr[datetime.datetime.now().weekday()]))
450
if os.path.exists(final_zip):
453
zip_c = ['zip', '-j', '-q', final_zip, dump_file]
454
self.log(' '.join(zip_c))
455
subprocess.call(zip_c)
457
self.upload_od(final_zip, oc)
458
with open(last_dump_file, 'w') as last_desc:
459
last_desc.write(restore_date.strftime('%Y%m%d-%H%M%S'))
460
if os.path.exists(wal_not_dumped):
461
os.remove(wal_not_dumped)
462
except subprocess.CalledProcessError as e:
463
self.error(e.output or e.stderr)
465
self.logger.exception('ERROR')
467
if dump_file and os.path.exists(dump_file):
469
if final_zip and os.path.exists(final_zip):
473
psql_stop = [os.path.join(PSQL_DIR, 'pg_ctl.exe'), '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'stop']
474
self.log(' '.join(psql_stop))
475
subprocess.run(psql_stop)
477
thread_touch = '%s-%s' % (TOUCH_FILE_DUMP, self.thread)
478
with open(thread_touch, 'w') as t_file:
479
t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
481
except subprocess.CalledProcessError as e:
482
self.error(e.output or e.stderr)
484
self.logger.exception('ERROR')
472
486
if __name__ == '__main__':
473
log('Check directories')
478
491
for x in range(0, nb_threads):
479
t = threading.Thread(target=process_directory, args=(x, q))
492
t = threading.Thread(target=Process(x, q).process_directory)
480
493
threads.append(t)
483
496
for t in threads: