~jfb-tempo-consulting/unifield-toolbox/ct_psql14

« back to all changes in this revision

Viewing changes to dump_creator3.py

  • Committer: jf
  • Date: 2023-12-28 09:10:29 UTC
  • Revision ID: jfb@tempo-consulting.fr-20231228091029-wn2db25umh24wzhm
Multi thread

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
import sys
15
15
import config
16
16
import importlib
 
17
import heapq
 
18
import threading
17
19
 
18
20
PSQL_DIR = config.psql_dir
19
21
DEST_DIR = config.dest_dir
148
150
    log('Uncompress: %s ' % ' '.join(command))
149
151
    subprocess.check_output(command, stderr=subprocess.STDOUT)
150
152
 
151
 
def list_instances(src_dir, seen):
152
 
    to_analyze = []
153
 
    top = []
154
 
    for instance in sorted(os.listdir(SRC_DIR)):
155
 
        if instance.startswith('.'):
156
 
            continue
157
 
        newbase = os.path.isfile(os.path.join(SRC_DIR, instance, 'base', 'base.tar.7z'))
158
 
        if newbase and (not seen.get(instance) or seen.get(instance) < datetime.datetime.now() + relativedelta(minutes=-30)):
159
 
            log('Put %s base on top' % (instance, ))
160
 
            top.append(instance)
161
 
        elif seen.get(instance):
162
 
            continue
163
 
        else:
164
 
            to_analyze.append(instance)
165
 
    return top + to_analyze
166
 
 
167
 
def process_directory():
 
153
 
 
154
class Queue():
 
155
    lock = threading.RLock()
 
156
    queue = []
 
157
    list_items = {}
 
158
 
 
159
    def add(self, item, prio=1):
 
160
        with self.lock:
 
161
            if item not in self.list_items:
 
162
                self.list_items[item] = prio
 
163
                heapq.heappush(self.queue, [prio, item])
 
164
            elif self.list_items[item] <= prio:
 
165
                return
 
166
            else:
 
167
                for t in self.queue:
 
168
                    if t[1] == item:
 
169
                        t[0] = 0 # delete item
 
170
                heapq.heappush(self.queue, [prio, item])
 
171
 
 
172
    def pop(self):
 
173
        with self.lock:
 
174
            prio = 0
 
175
            while prio == 0:
 
176
                try:
 
177
                    prio, item = heapq.heappop(self.queue)
 
178
                except IndexError:
 
179
                    # empty queue
 
180
                    return False, False
 
181
                try:
 
182
                    del self.list_items[item]
 
183
                except KeyError:
 
184
                    pass
 
185
            return prio, item
 
186
 
 
187
    def resfresh(self):
 
188
        with self.lock:
 
189
            forced_path = os.path.join(SRC_DIR, 'forced_instance')
 
190
            if os.path.exists(forced_path):
 
191
                with open(forced_path) as forced_path_desc:
 
192
                    instance = forced_path_desc.read()
 
193
                self.add(instance, -2)
 
194
                os.remove(forced_path)
 
195
 
 
196
            for instance in sorted(os.listdir(SRC_DIR)):
 
197
                if instance.startswith('.'):
 
198
                    continue
 
199
                newbase = os.path.isfile(os.path.join(SRC_DIR, instance, 'base', 'base.tar.7z'))
 
200
                if newbase:
 
201
                    log('Put %s base on top' % (instance, ))
 
202
                    self.add(instance, -1)
 
203
                else:
 
204
                    self.add(instance)
 
205
 
 
206
def stopped(delete=False):
 
207
    stop_service = os.path.join(SRC_DIR, 'stop_service')
 
208
    if os.path.exists(stop_service):
 
209
        if delete:
 
210
            os.remove(stop_service)
 
211
            log('Stopped')
 
212
            return True
 
213
    return False
 
214
 
 
215
def process_directory(thread, queue):
168
216
    if not os.path.isdir(DUMP_DIR):
169
217
        os.makedirs(DUMP_DIR)
170
 
    instances_seen = {}
171
 
    to_analyze = list_instances(SRC_DIR, instances_seen)
172
 
    while to_analyze:
173
 
 
174
 
        instance = to_analyze.pop(0)
 
218
 
 
219
    psql_port = 5432 + thread
 
220
    while True:
 
221
        queue.resfresh()
 
222
 
 
223
        nb, instance = queue.pop()
 
224
        if not instance or instance == 'INIT':
 
225
            log('thread %s, sleep' % (thread,))
 
226
            time.sleep(60)
 
227
            if instance == 'INIT':
 
228
                with open(TOUCH_FILE_LOOP, 'w') as t_file:
 
229
                    t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
 
230
                queue.add('INIT')
 
231
            continue
 
232
 
 
233
 
175
234
        forced_instance = False
176
 
        forced_path = os.path.join(SRC_DIR, 'forced_instance')
177
 
        if os.path.exists(forced_path):
178
 
            with open(forced_path) as forced_path_desc:
179
 
                instance = forced_path_desc.read()
 
235
        if nb < 0:
180
236
            forced_instance = True
181
 
            os.remove(forced_path)
182
 
 
183
 
        instances_seen[instance] = datetime.datetime.now()
184
237
 
185
238
        full_name = os.path.join(SRC_DIR, instance)
186
239
        try:
187
240
            if os.path.isdir(full_name):
188
 
                #log('##### Instance %s'%full_name)
189
241
 
190
 
                stop_service = os.path.join(SRC_DIR, 'stop_service')
191
 
                if os.path.exists(stop_service):
192
 
                    os.remove(stop_service)
193
 
                    log('Stopped')
194
 
                    sys.exit(0)
 
242
                if stopped():
 
243
                    return False
195
244
 
196
245
                basebackup = os.path.join(full_name, 'base', 'base.tar.7z')
197
246
                dest_dir = os.path.join(DEST_DIR, instance)
198
247
 
199
248
                if not os.path.isdir(dest_dir) and not os.path.isfile(basebackup):
200
249
                    # new instance wait for base.tar
201
 
                    instances_seen[instance] = False
202
250
                    continue
203
251
 
204
 
                to_analyze = list_instances(SRC_DIR, instances_seen)
205
 
 
206
252
                for dir_to_create in [os.path.join(dest_dir, 'OLDWAL')]:
207
253
                    if not os.path.isdir(dir_to_create):
208
254
                        log('Create %s'%dir_to_create)
210
256
 
211
257
                dest_basebackup = os.path.join(dest_dir, 'base')
212
258
                pg_xlog = os.path.join(dest_basebackup, 'pg_xlog')
 
259
                pg_wal = os.path.join(dest_basebackup, 'pg_wal')
213
260
                oldwal = os.path.join(dest_dir, 'OLDWAL')
214
261
 
215
262
                if os.path.isdir(dest_basebackup):
240
287
                    for conf in ['recovery.conf', 'postgresql.conf', 'pg_hba.conf']:
241
288
                        shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
242
289
 
243
 
                    for del_recreate in [pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
 
290
                    for del_recreate in [pg_wal, pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
244
291
                        if os.path.isdir(del_recreate):
245
292
                            shutil.rmtree(del_recreate)
246
293
                        os.makedirs(del_recreate)
247
294
 
248
 
                    #if old_base_moved:
249
 
                    #    # old base moved, copy previous WAL in the new basebackup
250
 
                    #    # case of WAL created during this base backup was already moved to the old WAL
251
 
                    #    shutil.rmtree(pg_xlog)
252
 
                    #    shutil.copytree(os.path.join(old_base_moved, 'pg_xlog'), pg_xlog)
253
295
                    basebackup_found = True
254
296
 
255
 
                # is there an in-progress rsync ?
256
 
                rsync_temp = os.path.join(full_name, '.rsync-partial')
257
 
                if os.path.exists(rsync_temp):
258
 
                    partial_modification_date = datetime.datetime.fromtimestamp(os.path.getctime(rsync_temp))
259
 
                    if partial_modification_date > datetime.datetime.now() + relativedelta(minutes=-10):
260
 
                        log('%s, rsync in progess %s' % (full_name, partial_modification_date.strftime('%Y-%m-%d %H:%M')))
261
 
                        continue
262
 
 
263
297
                # Move WAL (copy + del to set right owner on target)
264
298
                if not os.path.exists(pg_xlog):
265
299
                    log('Unable to copy WAL, base directory not found %s' % pg_xlog)
266
300
                    continue
 
301
                if not os.path.exists(oldwal):
 
302
                    log('Unable to copy WAL, wal directory not found %s' % oldwal)
 
303
                    continue
267
304
 
268
305
                wal_moved = 0
269
 
                forced_wal = False
270
306
                forced_dump = forced_instance
271
 
                retry = True
272
 
                while retry:
273
 
                    retry_wal = False
274
 
                    for_next_loop = False
275
 
                    for wal in os.listdir(full_name):
276
 
                        full_path_wal = os.path.join(full_name, wal)
277
 
                        if wal.endswith('7z') and not wal.startswith('.'):
278
 
                            try:
279
 
                                un7zip(full_path_wal, oldwal)
280
 
                                os.remove(full_path_wal)
281
 
                                wal_moved += 1
282
 
                            except subprocess.CalledProcessError as e:
283
 
                                # try to extract all WAL: UC when new bb generated to unlock a dump
284
 
                                error(e.output or e.stderr)
285
 
                            except Exception:
286
 
                                logger.exception('ERROR')
287
 
 
288
 
                        elif wal == 'force_dump':
 
307
                for wal in os.listdir(full_name):
 
308
                    full_path_wal = os.path.join(full_name, wal)
 
309
                    if wal.endswith('7z') and not wal.startswith('.'):
 
310
                        try:
 
311
                            un7zip(full_path_wal, oldwal)
289
312
                            os.remove(full_path_wal)
290
 
                            forced_dump = True
291
 
                        elif wal.startswith('.') and '.7z.' in wal:
292
 
                            # there is an inprogress rsync
293
 
                            try:
294
 
                                partial_modification_date = datetime.datetime.fromtimestamp(os.path.getctime(full_path_wal))
295
 
                                if partial_modification_date > datetime.datetime.now() + relativedelta(minutes=-45):
296
 
                                    # rsync inprogress is less than X min, go to the next instance
297
 
                                    log('%s found %s, next_loop (%s)' % (full_name, wal, partial_modification_date.strftime('%Y-%m-%d %H:%M')))
298
 
                                    for_next_loop = True
299
 
                                    break
300
 
                                else:
301
 
                                    # inprogress is more than X min: too slow, generate backup
302
 
                                    log('%s found %s, too old, force wal (%s)' % (full_name, wal, partial_modification_date.strftime('%Y-%m-%d %H:%M')))
303
 
                                    if not wal_forced.get(full_name, {}).get(full_path_wal):
304
 
                                        wal_forced[full_name] = {full_path_wal: True}
305
 
                                        forced_wal = True
306
 
                            except FileNotFoundError:
307
 
                                # between listdir and getctime the temp file has been removed, retry listdir
308
 
                                log('%s file not found %s, retry' % (full_name, wal))
309
 
                                retry_wal = True
310
 
                                break
311
 
                    retry = retry_wal
 
313
                            wal_moved += 1
 
314
                        except subprocess.CalledProcessError as e:
 
315
                            # try to extract all WAL: UC when new bb generated to unlock a dump
 
316
                            error(e.output or e.stderr)
 
317
                        except Exception:
 
318
                            logger.exception('ERROR')
312
319
 
313
 
                if for_next_loop:
314
 
                    continue
 
320
                    elif wal == 'force_dump':
 
321
                        os.remove(full_path_wal)
 
322
                        forced_dump = True
315
323
 
316
324
                wal_not_dumped = os.path.join(dest_dir, 'wal_not_dumped')
317
325
 
318
326
                if wal_moved:
319
327
                    log('%s, %d wal moved to %s' % (full_name, wal_moved, oldwal))
320
 
                elif forced_wal:
321
 
                    log('%s, wal forced' % (full_name, ))
322
328
                elif forced_dump:
323
329
                    log('%s, dump forced' % (full_name, ))
324
330
                elif os.path.exists(wal_not_dumped):
325
331
                    last_wal_date = datetime.datetime.fromtimestamp(os.path.getmtime(wal_not_dumped))
326
332
                    if last_wal_date < datetime.datetime.now() - relativedelta(hours=36):
327
333
                        log('%s, wal_not_dumped forced' % (full_name, ))
328
 
                        forced_wal = True
329
334
 
330
 
                if forced_wal or forced_dump or wal_moved or basebackup_found:
 
335
                if forced_dump or wal_moved or basebackup_found:
331
336
                    last_dump_file = os.path.join(dest_dir, 'last_dump.txt')
332
337
                    last_wal_date = False
333
338
                    if not forced_dump and not basebackup_found and os.path.exists(last_dump_file):
350
355
                                if version.startswith('14'):
351
356
                                    PSQL_DIR = config.psql14_dir
352
357
                        log('%s, pg_version: %s'% (instance, PSQL_DIR))
353
 
                        psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'), '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
 
358
                        psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'),'-o', '-p %s'%psql_port, '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
354
359
                        log(' '.join(psql_start))
355
360
                        subprocess.run(psql_start, check=True)
356
361
                        #subprocess.check_output(psql_start)
357
362
 
358
 
                        db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg')
 
363
                        db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg port=%s'%psql_port)
359
364
                        cr = db.cursor()
360
365
                        # wait end of wall processing
361
366
                        previous_wall = False
394
399
                        all_dbs = cr.fetchall()
395
400
                        db.close()
396
401
 
397
 
                        if forced_wal and last_wal_date and last_wal_date == restore_date.strftime('%Y%m%d-%H%M%S'):
 
402
                        if last_wal_date and last_wal_date == restore_date.strftime('%Y%m%d-%H%M%S'):
398
403
                            # same wal as previous dump: do noting (the in-progess wal is the 1st one)
399
404
                            log('%s : same data (%s) as previous backup (%s), do not dump/push to od' % (instance, last_wal_date, restore_date.strftime('%Y%m%d-%H%M%S')))
400
405
                            continue
406
411
                                dump_file = False
407
412
                                if x[0] not in ['template0', 'template1', 'postgres']:
408
413
                                    log('%s db found %s'% (instance, x[0]))
409
 
                                    db = psycopg2.connect('dbname=%s host=127.0.0.1 user=openpg' % (x[0], ))
 
414
                                    db = psycopg2.connect('dbname=%s host=127.0.0.1 user=openpg port=%s' % (x[0], psql_port))
410
415
                                    cr = db.cursor()
411
416
                                    cr.execute('SELECT name FROM sync_client_version where date is not null order by date desc limit 1')
412
417
                                    version = cr.fetchone()
424
429
 
425
430
                                    dump_file = os.path.join(DUMP_DIR, '%s-%s-C-%s.dump' % (x[0], restore_date.strftime('%Y%m%d-%H%M%S'), version))
426
431
                                    log('Dump %s' % dump_file)
427
 
                                    pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000',  '-f', to_win(dump_file), x[0]]
 
432
                                    pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-p', psql_port, '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000',  '-f', to_win(dump_file), x[0]]
428
433
                                    subprocess.check_output(pg_dump, stderr=subprocess.STDOUT)
429
434
 
430
435
                                    final_zip = os.path.join(DUMP_DIR, '%s-%s.zip' % (x[0], day_abr[datetime.datetime.now().weekday()]))
455
460
                        log(' '.join(psql_stop))
456
461
                        subprocess.run(psql_stop)
457
462
 
458
 
            with open(TOUCH_FILE_DUMP, 'w') as t_file:
 
463
            thread_touch = '%s-%s' % (TOUCH_FILE_DUMP, thread)
 
464
            with open(thread_touch, 'w') as t_file:
459
465
                t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
460
466
 
461
467
        except subprocess.CalledProcessError as e:
464
470
            logger.exception('ERROR')
465
471
 
466
472
if __name__ == '__main__':
467
 
    while True:
468
 
        log('Check directories')
469
 
        process_directory()
470
 
        if sys.argv and len(sys.argv) > 1 and sys.argv[1] == '-1':
471
 
            log('Process ends')
472
 
            sys.exit(0)
473
 
 
474
 
        log('sleep')
475
 
        with open(TOUCH_FILE_LOOP, 'w') as t_file:
476
 
            t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
477
 
        time.sleep(120)
 
473
    log('Check directories')
 
474
    nb_threads = 2
 
475
    threads = []
 
476
    q = Queue()
 
477
    q.add('INIT')
 
478
    for x in range(0, nb_threads):
 
479
        t = threading.Thread(target=process_directory, args=(x, q))
 
480
        threads.append(t)
 
481
        t.run()
 
482
 
 
483
    for t in threads:
 
484
        t.join()
 
485
        log('Thread ends')
 
486
 
 
487
    stopped(True)
 
488
    log('Process ends')
 
489
    sys.exit(0)
 
490