~jfb-tempo-consulting/unifield-toolbox/ct_psql14

« back to all changes in this revision

Viewing changes to dump_creator3.py

  • Committer: jf
  • Date: 2023-12-28 10:43:19 UTC
  • Revision ID: jfb@tempo-consulting.fr-20231228104319-18318z6b2gvpngkv
Log by thread

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
import importlib
17
17
import heapq
18
18
import threading
 
19
import itertools
19
20
 
20
21
PSQL_DIR = config.psql_dir
21
22
DEST_DIR = config.dest_dir
22
23
SRC_DIR = config.src_dir
23
24
PSQL_CONF = os.path.join(DEST_DIR, 'psql_conf')
24
25
DUMP_DIR = os.path.join(DEST_DIR, 'DUMPS')
25
 
TOUCH_FILE_DUMP = os.path.join(DUMP_DIR, 'touch_dump')
26
 
TOUCH_FILE_LOOP = os.path.join(DUMP_DIR, 'touch_loop')
 
26
# TODO
 
27
TOUCH_FILE_DUMP = os.path.join(DUMP_DIR, 'touch_dump-new')
 
28
TOUCH_FILE_LOOP = os.path.join(DUMP_DIR, 'touch_loop-new')
27
29
LOG_FILE = config.log_file
28
30
day_abr = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
29
31
 
30
 
logger = logging.getLogger()
31
 
logger.setLevel(logging.DEBUG)
32
 
formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
33
 
 
34
 
wal_forced = {}
35
 
if LOG_FILE:
36
 
    #handler = logging.handlers.RotatingFileHandler(LOG_FILE, 'a', 1024*1024, 365)
37
 
    handler = logging.handlers.TimedRotatingFileHandler(LOG_FILE, when='midnight')
38
 
else:
39
 
    handler = logging.StreamHandler()
40
 
handler.setLevel(logging.INFO)
41
 
handler.setFormatter(formatter)
42
 
logger.addHandler(handler)
43
 
 
44
 
def log(message):
45
 
    logger.info(message)
46
 
 
47
 
def error(message):
48
 
    logger.error(message)
49
32
 
50
33
def to_win(path):
51
34
    # internal cygwin commands use /cygdrive/c
53
36
    return path.replace('/cygdrive/c', 'C:').replace('/cygdrive/d', 'D:')
54
37
 
55
38
 
56
 
def upload_od(file_path, oc):
57
 
    importlib.reload(config)
58
 
 
59
 
    dav_data = {
60
 
        'host': 'msfintl-my.sharepoint.com',
61
 
        'port': 443,
62
 
        'protocol': 'https',
63
 
        'username': 'UniField.MSF@geneva.msf.org',
64
 
        'password': config.password,
65
 
    }
66
 
 
67
 
    if oc not in config.path:
68
 
        error('%s unknown oc %s' % (file_path, oc))
69
 
    dav_data['path'] = config.path.get(oc, '/personal/unifield_msf_geneva_msf_org/documents/Test')
70
 
    max_retries = 3
71
 
    retries = 0
72
 
    buffer_size = 10 * 1024 * 1014
73
 
    file_name = os.path.basename(file_path)
74
 
    temp_file_name = 'Temp/%s'%file_name
75
 
    fileobj = open(file_path, 'rb')
76
 
    log('Start upload %s to %s '% (file_path, dav_data['path']))
77
 
    upload_ok = False
78
 
    dav_error = False
79
 
    dav_connected = False
80
 
    temp_created = False
81
 
    while True:
82
 
        try:
83
 
            if not dav_connected:
84
 
                fileobj.seek(0)
85
 
                dav = webdav.Client(**dav_data)
86
 
                dav_connected = True
87
 
                log('Dav connected')
88
 
                retries = 0
89
 
 
90
 
            if not temp_created:
91
 
                try:
92
 
                    dav.create_folder('Temp')
93
 
                except:
94
 
                    log('Except Temp')
95
 
                    if retries > max_retries:
96
 
                        raise
97
 
                    retries += 1
98
 
                    time.sleep(2)
99
 
                temp_created = True
100
 
                log('Temp OK')
101
 
                retries = 0
102
 
 
103
 
            if not upload_ok:
104
 
                upload_ok, dav_error = dav.upload(fileobj, temp_file_name, buffer_size=buffer_size)
105
 
 
106
 
            if upload_ok:
107
 
                log('Moving File')
108
 
                try:
109
 
                    dav.delete(file_name)
110
 
                    dav.move(temp_file_name, file_name)
111
 
                except:
112
 
                    log('Except move')
113
 
                    if retries > max_retries:
114
 
                        raise
115
 
                    retries += 1
116
 
                    time.sleep(2)
117
 
                log('File %s uploaded' % (file_path,))
118
 
                return True
119
 
            else:
120
 
                log('Dav 1 retry')
121
 
                if retries > max_retries:
122
 
                    raise Exception(dav_error)
123
 
                retries += 1
124
 
                time.sleep(2)
125
 
                if dav_connected and 'timed out' in dav_error or '2130575252' in dav_error:
126
 
                    log('%s OneDrive: session time out' % (file_path,))
127
 
                    dav.login()
128
 
 
129
 
        except (requests.exceptions.RequestException, webdav.ConnectionFailed):
130
 
            log('Dav 2 retry')
131
 
            if retries > max_retries:
132
 
                raise
133
 
            retries += 1
134
 
            time.sleep(2)
135
 
 
136
 
    fileobj.close()
137
 
    if not upload_ok:
138
 
        if dav_error:
139
 
            raise Exception(dav_error)
140
 
        else:
141
 
            raise Exception('Unknown error')
142
 
    return True
143
 
 
144
 
def un7zip(src_file, dest_dir, delete=False):
145
 
    if not os.path.isdir(dest_dir):
146
 
        raise Exception('un7zip: dest %s not found' % (dest_dir))
147
 
    command = ['/usr/bin/7z', 'e', src_file, '-y', '-bd', '-bb0', '-bso0', '-bsp0', '-o%s'%dest_dir]
148
 
    if delete:
149
 
        command.append('-sdel')
150
 
    log('Uncompress: %s ' % ' '.join(command))
151
 
    subprocess.check_output(command, stderr=subprocess.STDOUT)
152
 
 
153
39
 
154
40
class Queue():
155
41
    lock = threading.RLock()
156
42
    queue = []
157
43
    list_items = {}
 
44
    counter = itertools.count()
158
45
 
159
46
    def add(self, item, prio=1):
160
47
        with self.lock:
161
48
            if item not in self.list_items:
162
49
                self.list_items[item] = prio
163
 
                heapq.heappush(self.queue, [prio, item])
 
50
                heapq.heappush(self.queue, [prio, next(self.counter), item])
164
51
            elif self.list_items[item] <= prio:
165
52
                return
166
53
            else:
167
54
                for t in self.queue:
168
55
                    if t[1] == item:
169
56
                        t[0] = 0 # delete item
170
 
                heapq.heappush(self.queue, [prio, item])
 
57
                self.list_items[item] = prio
 
58
                heapq.heappush(self.queue, [prio, next(self.counter), item])
171
59
 
172
60
    def pop(self):
173
61
        with self.lock:
174
62
            prio = 0
175
63
            while prio == 0:
176
64
                try:
177
 
                    prio, item = heapq.heappop(self.queue)
 
65
                    prio, timest, item = heapq.heappop(self.queue)
178
66
                except IndexError:
179
67
                    # empty queue
180
68
                    return False, False
186
74
 
187
75
    def resfresh(self):
188
76
        with self.lock:
189
 
            forced_path = os.path.join(SRC_DIR, 'forced_instance')
 
77
            forced_path = os.path.join(SRC_DIR, 'forced_instance_new')
190
78
            if os.path.exists(forced_path):
191
79
                with open(forced_path) as forced_path_desc:
192
80
                    instance = forced_path_desc.read()
198
86
                    continue
199
87
                newbase = os.path.isfile(os.path.join(SRC_DIR, instance, 'base', 'base.tar.7z'))
200
88
                if newbase:
201
 
                    log('Put %s base on top' % (instance, ))
202
89
                    self.add(instance, -1)
203
90
                else:
204
91
                    self.add(instance)
205
92
 
206
93
def stopped(delete=False):
207
 
    stop_service = os.path.join(SRC_DIR, 'stop_service')
 
94
    # TODO
 
95
    stop_service = os.path.join(SRC_DIR, 'stop_service_new')
208
96
    if os.path.exists(stop_service):
209
97
        if delete:
210
98
            os.remove(stop_service)
211
 
            log('Stopped')
212
99
            return True
213
100
    return False
214
101
 
215
 
def process_directory(thread, queue):
216
 
    if not os.path.isdir(DUMP_DIR):
217
 
        os.makedirs(DUMP_DIR)
218
 
 
219
 
    psql_port = 5432 + thread
220
 
    while True:
221
 
        queue.resfresh()
222
 
 
223
 
        nb, instance = queue.pop()
224
 
        if not instance or instance == 'INIT':
225
 
            log('thread %s, sleep' % (thread,))
226
 
            time.sleep(60)
227
 
            if instance == 'INIT':
228
 
                with open(TOUCH_FILE_LOOP, 'w') as t_file:
229
 
                    t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
230
 
                queue.add('INIT')
 
102
class Process():
 
103
 
 
104
    def __init__(self, thread, queue):
 
105
        self.thread = thread
 
106
        self.queue = queue
 
107
 
 
108
        self.logger = logging.getLogger()
 
109
        self.logger.setLevel(logging.DEBUG)
 
110
        formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
 
111
 
 
112
        if LOG_FILE:
 
113
            handler = logging.handlers.TimedRotatingFileHandler('%s-%s' % (thread, LOG_FILE), when='midnight')
 
114
        else:
 
115
            handler = logging.StreamHandler()
 
116
        handler.setLevel(logging.INFO)
 
117
        handler.setFormatter(formatter)
 
118
        self.logger.addHandler(handler)
 
119
 
 
120
    def log(self, message):
 
121
        self.logger.info(message)
 
122
 
 
123
    def error(self, message):
 
124
        self.logger.error(message)
 
125
 
 
126
    def un7zip(self, src_file, dest_dir, delete=False):
 
127
        if not os.path.isdir(dest_dir):
 
128
            raise Exception('un7zip: dest %s not found' % (dest_dir))
 
129
        command = ['/usr/bin/7z', 'e', src_file, '-y', '-bd', '-bb0', '-bso0', '-bsp0', '-o%s'%dest_dir]
 
130
        if delete:
 
131
            command.append('-sdel')
 
132
        self.log('Uncompress: %s ' % ' '.join(command))
 
133
        subprocess.check_output(command, stderr=subprocess.STDOUT)
 
134
 
 
135
    def upload_od(self, file_path, oc):
 
136
        importlib.reload(config)
 
137
 
 
138
        dav_data = {
 
139
            'host': 'msfintl-my.sharepoint.com',
 
140
            'port': 443,
 
141
            'protocol': 'https',
 
142
            'username': 'UniField.MSF@geneva.msf.org',
 
143
            'password': config.password,
 
144
        }
 
145
 
 
146
        if oc not in config.path:
 
147
            self.error('%s unknown oc %s' % (file_path, oc))
 
148
        dav_data['path'] = config.path.get(oc, '/personal/unifield_msf_geneva_msf_org/documents/Test')
 
149
        max_retries = 3
 
150
        retries = 0
 
151
        buffer_size = 10 * 1024 * 1014
 
152
        file_name = os.path.basename(file_path)
 
153
        temp_file_name = 'Temp/%s'%file_name
 
154
        fileobj = open(file_path, 'rb')
 
155
        self.log('Start upload %s to %s '% (file_path, dav_data['path']))
 
156
        upload_ok = False
 
157
        dav_error = False
 
158
        dav_connected = False
 
159
        temp_created = False
 
160
        while True:
 
161
            try:
 
162
                if not dav_connected:
 
163
                    fileobj.seek(0)
 
164
                    dav = webdav.Client(**dav_data)
 
165
                    dav_connected = True
 
166
                    self.log('Dav connected')
 
167
                    retries = 0
 
168
 
 
169
                if not temp_created:
 
170
                    try:
 
171
                        dav.create_folder('Temp')
 
172
                    except:
 
173
                        self.log('Except Temp')
 
174
                        if retries > max_retries:
 
175
                            raise
 
176
                        retries += 1
 
177
                        time.sleep(2)
 
178
                    temp_created = True
 
179
                    self.log('Temp OK')
 
180
                    retries = 0
 
181
 
 
182
                if not upload_ok:
 
183
                    upload_ok, dav_error = dav.upload(fileobj, temp_file_name, buffer_size=buffer_size)
 
184
 
 
185
                if upload_ok:
 
186
                    self.log('Moving File')
 
187
                    try:
 
188
                        dav.delete(file_name)
 
189
                        dav.move(temp_file_name, file_name)
 
190
                    except:
 
191
                        self.log('Except move')
 
192
                        if retries > max_retries:
 
193
                            raise
 
194
                        retries += 1
 
195
                        time.sleep(2)
 
196
                    self.log('File %s uploaded' % (file_path,))
 
197
                    return True
 
198
                else:
 
199
                    self.log('Dav 1 retry')
 
200
                    if retries > max_retries:
 
201
                        raise Exception(dav_error)
 
202
                    retries += 1
 
203
                    time.sleep(2)
 
204
                    if dav_connected and 'timed out' in dav_error or '2130575252' in dav_error:
 
205
                        self.log('%s OneDrive: session time out' % (file_path,))
 
206
                        dav.login()
 
207
 
 
208
            except (requests.exceptions.RequestException, webdav.ConnectionFailed):
 
209
                self.log('Dav 2 retry')
 
210
                if retries > max_retries:
 
211
                    raise
 
212
                retries += 1
 
213
                time.sleep(2)
 
214
 
 
215
        fileobj.close()
 
216
        if not upload_ok:
 
217
            if dav_error:
 
218
                raise Exception(dav_error)
 
219
            else:
 
220
                raise Exception('Unknown error')
 
221
        return True
 
222
 
 
223
    def process_directory(self):
 
224
        if not os.path.isdir(DUMP_DIR):
 
225
            os.makedirs(DUMP_DIR)
 
226
 
 
227
        psql_port = 5432 + self.thread
 
228
        while True:
 
229
            if stopped():
 
230
                self.log('Stopped')
 
231
                return False
 
232
 
 
233
            self.queue.resfresh()
 
234
 
 
235
            nb, instance = self.queue.pop()
 
236
            if not instance or instance == 'INIT':
 
237
                self.log('sleep')
 
238
                time.sleep(10)
 
239
                if instance == 'INIT':
 
240
                    with open(TOUCH_FILE_LOOP, 'w') as t_file:
 
241
                        t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
 
242
                    self.queue.add('INIT')
 
243
                continue
 
244
 
 
245
 
 
246
            forced_instance = False
 
247
            if nb < 0:
 
248
                self.log('Forced instance %s' % (instance, ))
 
249
                forced_instance = True
 
250
 
 
251
 
 
252
            # TODO
231
253
            continue
232
254
 
233
255
 
234
 
        forced_instance = False
235
 
        if nb < 0:
236
 
            forced_instance = True
237
 
 
238
 
        full_name = os.path.join(SRC_DIR, instance)
239
 
        try:
240
 
            if os.path.isdir(full_name):
241
 
 
242
 
                if stopped():
243
 
                    return False
244
 
 
245
 
                basebackup = os.path.join(full_name, 'base', 'base.tar.7z')
246
 
                dest_dir = os.path.join(DEST_DIR, instance)
247
 
 
248
 
                if not os.path.isdir(dest_dir) and not os.path.isfile(basebackup):
249
 
                    # new instance wait for base.tar
250
 
                    continue
251
 
 
252
 
                for dir_to_create in [os.path.join(dest_dir, 'OLDWAL')]:
253
 
                    if not os.path.isdir(dir_to_create):
254
 
                        log('Create %s'%dir_to_create)
255
 
                        os.makedirs(dir_to_create)
256
 
 
257
 
                dest_basebackup = os.path.join(dest_dir, 'base')
258
 
                pg_xlog = os.path.join(dest_basebackup, 'pg_xlog')
259
 
                pg_wal = os.path.join(dest_basebackup, 'pg_wal')
260
 
                oldwal = os.path.join(dest_dir, 'OLDWAL')
261
 
 
262
 
                if os.path.isdir(dest_basebackup):
263
 
                    # copy postgres and recovery / migration of new WAL destination
264
 
                    for conf in ['recovery.conf', 'postgresql.conf']:
265
 
                        shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
266
 
 
267
 
                # Copy / extract basbackup
268
 
                basebackup_found = False
269
 
                if os.path.isfile(basebackup):
270
 
                    log('%s Found base backup %s'% (instance, basebackup))
271
 
                    old_base_moved = False
 
256
            full_name = os.path.join(SRC_DIR, instance)
 
257
            try:
 
258
                if os.path.isdir(full_name):
 
259
                    basebackup = os.path.join(full_name, 'base', 'base.tar.7z')
 
260
                    dest_dir = os.path.join(DEST_DIR, instance)
 
261
 
 
262
                    if not os.path.isdir(dest_dir) and not os.path.isfile(basebackup):
 
263
                        # new instance wait for base.tar
 
264
                        continue
 
265
 
 
266
                    for dir_to_create in [os.path.join(dest_dir, 'OLDWAL')]:
 
267
                        if not os.path.isdir(dir_to_create):
 
268
                            self.log('Create %s'%dir_to_create)
 
269
                            os.makedirs(dir_to_create)
 
270
 
 
271
                    dest_basebackup = os.path.join(dest_dir, 'base')
 
272
                    pg_xlog = os.path.join(dest_basebackup, 'pg_xlog')
 
273
                    pg_wal = os.path.join(dest_basebackup, 'pg_wal')
 
274
                    oldwal = os.path.join(dest_dir, 'OLDWAL')
 
275
 
272
276
                    if os.path.isdir(dest_basebackup):
273
 
                        # previous base found, rename it
274
 
                        old_base_moved = os.path.join(dest_dir,'base_%s' % (time.strftime('%Y-%m-%d%H%M')))
275
 
                        shutil.move(dest_basebackup, old_base_moved)
276
 
                        log('Move old base %s'%dest_basebackup)
277
 
 
278
 
                    new_base = os.path.join(dest_dir, 'base.tar.7z')
279
 
                    shutil.move(basebackup, new_base)
280
 
                    un7zip(new_base, dest_dir)
281
 
                    os.makedirs(dest_basebackup)
282
 
                    untar = ['tar', '-xf', os.path.join(dest_dir, 'base.tar'), '-C', dest_basebackup]
283
 
                    log(untar)
284
 
                    subprocess.check_output(untar, stderr=subprocess.STDOUT)
285
 
                    os.remove(os.path.join(dest_dir, 'base.tar'))
286
 
 
287
 
                    for conf in ['recovery.conf', 'postgresql.conf', 'pg_hba.conf']:
288
 
                        shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
289
 
 
290
 
                    for del_recreate in [pg_wal, pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
291
 
                        if os.path.isdir(del_recreate):
292
 
                            shutil.rmtree(del_recreate)
293
 
                        os.makedirs(del_recreate)
294
 
 
295
 
                    basebackup_found = True
296
 
 
297
 
                # Move WAL (copy + del to set right owner on target)
298
 
                if not os.path.exists(pg_xlog):
299
 
                    log('Unable to copy WAL, base directory not found %s' % pg_xlog)
300
 
                    continue
301
 
                if not os.path.exists(oldwal):
302
 
                    log('Unable to copy WAL, wal directory not found %s' % oldwal)
303
 
                    continue
304
 
 
305
 
                wal_moved = 0
306
 
                forced_dump = forced_instance
307
 
                for wal in os.listdir(full_name):
308
 
                    full_path_wal = os.path.join(full_name, wal)
309
 
                    if wal.endswith('7z') and not wal.startswith('.'):
310
 
                        try:
311
 
                            un7zip(full_path_wal, oldwal)
312
 
                            os.remove(full_path_wal)
313
 
                            wal_moved += 1
314
 
                        except subprocess.CalledProcessError as e:
315
 
                            # try to extract all WAL: UC when new bb generated to unlock a dump
316
 
                            error(e.output or e.stderr)
317
 
                        except Exception:
318
 
                            logger.exception('ERROR')
319
 
 
320
 
                    elif wal == 'force_dump':
321
 
                        os.remove(full_path_wal)
322
 
                        forced_dump = True
323
 
 
324
 
                wal_not_dumped = os.path.join(dest_dir, 'wal_not_dumped')
325
 
 
326
 
                if wal_moved:
327
 
                    log('%s, %d wal moved to %s' % (full_name, wal_moved, oldwal))
328
 
                elif forced_dump:
329
 
                    log('%s, dump forced' % (full_name, ))
330
 
                elif os.path.exists(wal_not_dumped):
331
 
                    last_wal_date = datetime.datetime.fromtimestamp(os.path.getmtime(wal_not_dumped))
332
 
                    if last_wal_date < datetime.datetime.now() - relativedelta(hours=36):
333
 
                        log('%s, wal_not_dumped forced' % (full_name, ))
334
 
 
335
 
                if forced_dump or wal_moved or basebackup_found:
336
 
                    last_dump_file = os.path.join(dest_dir, 'last_dump.txt')
337
 
                    last_wal_date = False
338
 
                    if not forced_dump and not basebackup_found and os.path.exists(last_dump_file):
339
 
                        last_dump_date = datetime.datetime.fromtimestamp(os.path.getmtime(last_dump_file))
340
 
                        with open(last_dump_file) as last_desc:
341
 
                            last_wal_date = last_desc.read()
342
 
                        # only 1 dump per day
343
 
                        if last_dump_date.strftime('%Y-%m-%d') == time.strftime('%Y-%m-%d'):
344
 
                            log('%s already dumped today' % instance)
345
 
                            open(wal_not_dumped, 'w').close()
346
 
                            continue
347
 
 
348
 
                    try:
349
 
                        # Start psql
350
 
                        PSQL_DIR = config.psql9_dir
351
 
                        VERSION_FILE = os.path.join(dest_basebackup, 'PG_VERSION')
352
 
                        if os.patch.isfile(VERSION_FILE):
353
 
                            with open(VERSION_FILE, 'r') as ve:
354
 
                                version = ve.read()
355
 
                                if version.startswith('14'):
356
 
                                    PSQL_DIR = config.psql14_dir
357
 
                        log('%s, pg_version: %s'% (instance, PSQL_DIR))
358
 
                        psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'),'-o', '-p %s'%psql_port, '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
359
 
                        log(' '.join(psql_start))
360
 
                        subprocess.run(psql_start, check=True)
361
 
                        #subprocess.check_output(psql_start)
362
 
 
363
 
                        db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg port=%s'%psql_port)
364
 
                        cr = db.cursor()
365
 
                        # wait end of wall processing
366
 
                        previous_wall = False
367
 
                        while True:
368
 
                            cr.execute("SELECT pg_last_xlog_replay_location()")
369
 
                            prev = cr.fetchone()[0]
370
 
                            if prev == previous_wall:
371
 
                                break
372
 
                            log('%s wait recovery, previous: %s, current: %s' % (instance, previous_wall, prev))
373
 
                            previous_wall = prev
374
 
                            #time.sleep(10)
375
 
                            time.sleep(60)
376
 
 
377
 
                        if not previous_wall:
378
 
                            error('%s no WAL replayed' % instance)
379
 
 
380
 
                        cr.execute("SELECT pg_last_xact_replay_timestamp()")
381
 
                        restore_date = cr.fetchone()[0]
382
 
                        if not restore_date:
383
 
                            cr.execute("select checkpoint_time from pg_control_checkpoint()")
384
 
                            check_point = cr.fetchone()
385
 
                            if check_point:
386
 
                                restore_date = check_point[0]
387
 
                                log('%s : get restore date from pg_control_checkpoint: %s' % (instance, restore_date.strftime('%Y%m%d-%H%M%S')))
388
 
 
389
 
                        if not restore_date:
390
 
                            error('%s no last replay timestamp' % instance)
391
 
                            label_file = os.path.join(dest_basebackup, 'backup_label.old')
392
 
                            if os.path.exists(label_file):
393
 
                                restore_date = datetime.datetime.fromtimestamp(os.path.getmtime(label_file))
394
 
 
395
 
                        if not restore_date:
396
 
                            restore_date = datetime.datetime.now()
397
 
 
398
 
                        cr.execute('SELECT datname FROM pg_database')
399
 
                        all_dbs = cr.fetchall()
400
 
                        db.close()
401
 
 
402
 
                        if last_wal_date and last_wal_date == restore_date.strftime('%Y%m%d-%H%M%S'):
403
 
                            # same wal as previous dump: do noting (the in-progess wal is the 1st one)
404
 
                            log('%s : same data (%s) as previous backup (%s), do not dump/push to od' % (instance, last_wal_date, restore_date.strftime('%Y%m%d-%H%M%S')))
405
 
                            continue
406
 
 
407
 
                        # dump and zip the dump
408
 
                        for x in all_dbs:
 
277
                        # copy postgres and recovery / migration of new WAL destination
 
278
                        for conf in ['recovery.conf', 'postgresql.conf']:
 
279
                            shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
 
280
 
 
281
                    # Copy / extract basbackup
 
282
                    basebackup_found = False
 
283
                    if os.path.isfile(basebackup):
 
284
                        self.log('%s Found base backup %s'% (instance, basebackup))
 
285
                        old_base_moved = False
 
286
                        if os.path.isdir(dest_basebackup):
 
287
                            # previous base found, rename it
 
288
                            old_base_moved = os.path.join(dest_dir,'base_%s' % (time.strftime('%Y-%m-%d%H%M')))
 
289
                            shutil.move(dest_basebackup, old_base_moved)
 
290
                            self.log('Move old base %s'%dest_basebackup)
 
291
 
 
292
                        new_base = os.path.join(dest_dir, 'base.tar.7z')
 
293
                        shutil.move(basebackup, new_base)
 
294
                        self.un7zip(new_base, dest_dir)
 
295
                        os.makedirs(dest_basebackup)
 
296
                        untar = ['tar', '-xf', os.path.join(dest_dir, 'base.tar'), '-C', dest_basebackup]
 
297
                        self.log(untar)
 
298
                        subprocess.check_output(untar, stderr=subprocess.STDOUT)
 
299
                        os.remove(os.path.join(dest_dir, 'base.tar'))
 
300
 
 
301
                        for conf in ['recovery.conf', 'postgresql.conf', 'pg_hba.conf']:
 
302
                            shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
 
303
 
 
304
                        for del_recreate in [pg_wal, pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
 
305
                            if os.path.isdir(del_recreate):
 
306
                                shutil.rmtree(del_recreate)
 
307
                            os.makedirs(del_recreate)
 
308
 
 
309
                        basebackup_found = True
 
310
 
 
311
                    # Move WAL (copy + del to set right owner on target)
 
312
                    if not os.path.exists(pg_xlog):
 
313
                        self.log('Unable to copy WAL, base directory not found %s' % pg_xlog)
 
314
                        continue
 
315
                    if not os.path.exists(oldwal):
 
316
                        self.log('Unable to copy WAL, wal directory not found %s' % oldwal)
 
317
                        continue
 
318
 
 
319
                    wal_moved = 0
 
320
                    forced_dump = forced_instance
 
321
                    for wal in os.listdir(full_name):
 
322
                        full_path_wal = os.path.join(full_name, wal)
 
323
                        if wal.endswith('7z') and not wal.startswith('.'):
409
324
                            try:
410
 
                                final_zip = False
411
 
                                dump_file = False
412
 
                                if x[0] not in ['template0', 'template1', 'postgres']:
413
 
                                    log('%s db found %s'% (instance, x[0]))
414
 
                                    db = psycopg2.connect('dbname=%s host=127.0.0.1 user=openpg port=%s' % (x[0], psql_port))
415
 
                                    cr = db.cursor()
416
 
                                    cr.execute('SELECT name FROM sync_client_version where date is not null order by date desc limit 1')
417
 
                                    version = cr.fetchone()
418
 
                                    if not version:
419
 
                                        error('%s: version not found' % instance)
420
 
                                        version = 'XX'
421
 
                                    else:
422
 
                                        version = version[0]
423
 
                                    cr.execute('SELECT oc FROM sync_client_entity')
424
 
                                    oc = cr.fetchone()[0]
425
 
                                    if not oc:
426
 
                                        error('%s: OC not found' % instance)
427
 
                                        oc = 'XX'
428
 
                                    db.close()
429
 
 
430
 
                                    dump_file = os.path.join(DUMP_DIR, '%s-%s-C-%s.dump' % (x[0], restore_date.strftime('%Y%m%d-%H%M%S'), version))
431
 
                                    log('Dump %s' % dump_file)
432
 
                                    pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-p', psql_port, '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000',  '-f', to_win(dump_file), x[0]]
433
 
                                    subprocess.check_output(pg_dump, stderr=subprocess.STDOUT)
434
 
 
435
 
                                    final_zip = os.path.join(DUMP_DIR, '%s-%s.zip' % (x[0], day_abr[datetime.datetime.now().weekday()]))
436
 
                                    if os.path.exists(final_zip):
437
 
                                        os.remove(final_zip)
438
 
 
439
 
                                    zip_c = ['zip', '-j', '-q', final_zip, dump_file]
440
 
                                    log(' '.join(zip_c))
441
 
                                    subprocess.call(zip_c)
442
 
                                    os.remove(dump_file)
443
 
                                    upload_od(final_zip, oc)
444
 
                                    with open(last_dump_file, 'w') as last_desc:
445
 
                                        last_desc.write(restore_date.strftime('%Y%m%d-%H%M%S'))
446
 
                                    if os.path.exists(wal_not_dumped):
447
 
                                        os.remove(wal_not_dumped)
 
325
                                self.un7zip(full_path_wal, oldwal)
 
326
                                os.remove(full_path_wal)
 
327
                                wal_moved += 1
448
328
                            except subprocess.CalledProcessError as e:
449
 
                                error(e.output or e.stderr)
 
329
                                # try to extract all WAL: UC when new bb generated to unlock a dump
 
330
                                self.error(e.output or e.stderr)
450
331
                            except Exception:
451
 
                                logger.exception('ERROR')
452
 
                            finally:
453
 
                                if dump_file and os.path.exists(dump_file):
454
 
                                    os.remove(dump_file)
455
 
                                if final_zip and os.path.exists(final_zip):
456
 
                                    os.remove(final_zip)
457
 
 
458
 
                    finally:
459
 
                        psql_stop = [os.path.join(PSQL_DIR, 'pg_ctl.exe'), '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'stop']
460
 
                        log(' '.join(psql_stop))
461
 
                        subprocess.run(psql_stop)
462
 
 
463
 
            thread_touch = '%s-%s' % (TOUCH_FILE_DUMP, thread)
464
 
            with open(thread_touch, 'w') as t_file:
465
 
                t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
466
 
 
467
 
        except subprocess.CalledProcessError as e:
468
 
            error(e.output or e.stderr)
469
 
        except Exception:
470
 
            logger.exception('ERROR')
 
332
                                self.logger.exception('ERROR')
 
333
 
 
334
                        elif wal == 'force_dump':
 
335
                            os.remove(full_path_wal)
 
336
                            forced_dump = True
 
337
 
 
338
                    wal_not_dumped = os.path.join(dest_dir, 'wal_not_dumped')
 
339
 
 
340
                    if wal_moved:
 
341
                        self.log('%s, %d wal moved to %s' % (full_name, wal_moved, oldwal))
 
342
                    elif forced_dump:
 
343
                        self.log('%s, dump forced' % (full_name, ))
 
344
                    elif os.path.exists(wal_not_dumped):
 
345
                        last_wal_date = datetime.datetime.fromtimestamp(os.path.getmtime(wal_not_dumped))
 
346
                        if last_wal_date < datetime.datetime.now() - relativedelta(hours=36):
 
347
                            self.log('%s, wal_not_dumped forced' % (full_name, ))
 
348
 
 
349
                    if forced_dump or wal_moved or basebackup_found:
 
350
                        last_dump_file = os.path.join(dest_dir, 'last_dump.txt')
 
351
                        last_wal_date = False
 
352
                        if not forced_dump and not basebackup_found and os.path.exists(last_dump_file):
 
353
                            last_dump_date = datetime.datetime.fromtimestamp(os.path.getmtime(last_dump_file))
 
354
                            with open(last_dump_file) as last_desc:
 
355
                                last_wal_date = last_desc.read()
 
356
                            # only 1 dump per day
 
357
                            if last_dump_date.strftime('%Y-%m-%d') == time.strftime('%Y-%m-%d'):
 
358
                                self.log('%s already dumped today' % instance)
 
359
                                open(wal_not_dumped, 'w').close()
 
360
                                continue
 
361
 
 
362
                        try:
 
363
                            # Start psql
 
364
                            PSQL_DIR = config.psql9_dir
 
365
                            VERSION_FILE = os.path.join(dest_basebackup, 'PG_VERSION')
 
366
                            if os.patch.isfile(VERSION_FILE):
 
367
                                with open(VERSION_FILE, 'r') as ve:
 
368
                                    version = ve.read()
 
369
                                    if version.startswith('14'):
 
370
                                        PSQL_DIR = config.psql14_dir
 
371
                            self.log('%s, pg_version: %s'% (instance, PSQL_DIR))
 
372
                            psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'),'-o', '-p %s'%psql_port, '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
 
373
                            self.log(' '.join(psql_start))
 
374
                            subprocess.run(psql_start, check=True)
 
375
                            #subprocess.check_output(psql_start)
 
376
 
 
377
                            db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg port=%s'%psql_port)
 
378
                            cr = db.cursor()
 
379
                            # wait end of wall processing
 
380
                            previous_wall = False
 
381
                            while True:
 
382
                                cr.execute("SELECT pg_last_xlog_replay_location()")
 
383
                                prev = cr.fetchone()[0]
 
384
                                if prev == previous_wall:
 
385
                                    break
 
386
                                self.log('%s wait recovery, previous: %s, current: %s' % (instance, previous_wall, prev))
 
387
                                previous_wall = prev
 
388
                                #time.sleep(10)
 
389
                                time.sleep(60)
 
390
 
 
391
                            if not previous_wall:
 
392
                                self.error('%s no WAL replayed' % instance)
 
393
 
 
394
                            cr.execute("SELECT pg_last_xact_replay_timestamp()")
 
395
                            restore_date = cr.fetchone()[0]
 
396
                            if not restore_date:
 
397
                                cr.execute("select checkpoint_time from pg_control_checkpoint()")
 
398
                                check_point = cr.fetchone()
 
399
                                if check_point:
 
400
                                    restore_date = check_point[0]
 
401
                                    self.log('%s : get restore date from pg_control_checkpoint: %s' % (instance, restore_date.strftime('%Y%m%d-%H%M%S')))
 
402
 
 
403
                            if not restore_date:
 
404
                                self.error('%s no last replay timestamp' % instance)
 
405
                                label_file = os.path.join(dest_basebackup, 'backup_label.old')
 
406
                                if os.path.exists(label_file):
 
407
                                    restore_date = datetime.datetime.fromtimestamp(os.path.getmtime(label_file))
 
408
 
 
409
                            if not restore_date:
 
410
                                restore_date = datetime.datetime.now()
 
411
 
 
412
                            cr.execute('SELECT datname FROM pg_database')
 
413
                            all_dbs = cr.fetchall()
 
414
                            db.close()
 
415
 
 
416
                            if last_wal_date and last_wal_date == restore_date.strftime('%Y%m%d-%H%M%S'):
 
417
                                # same wal as previous dump: do noting (the in-progess wal is the 1st one)
 
418
                                self.log('%s : same data (%s) as previous backup (%s), do not dump/push to od' % (instance, last_wal_date, restore_date.strftime('%Y%m%d-%H%M%S')))
 
419
                                continue
 
420
 
 
421
                            # dump and zip the dump
 
422
                            for x in all_dbs:
 
423
                                try:
 
424
                                    final_zip = False
 
425
                                    dump_file = False
 
426
                                    if x[0] not in ['template0', 'template1', 'postgres']:
 
427
                                        self.log('%s db found %s'% (instance, x[0]))
 
428
                                        db = psycopg2.connect('dbname=%s host=127.0.0.1 user=openpg port=%s' % (x[0], psql_port))
 
429
                                        cr = db.cursor()
 
430
                                        cr.execute('SELECT name FROM sync_client_version where date is not null order by date desc limit 1')
 
431
                                        version = cr.fetchone()
 
432
                                        if not version:
 
433
                                            self.error('%s: version not found' % instance)
 
434
                                            version = 'XX'
 
435
                                        else:
 
436
                                            version = version[0]
 
437
                                        cr.execute('SELECT oc FROM sync_client_entity')
 
438
                                        oc = cr.fetchone()[0]
 
439
                                        if not oc:
 
440
                                            self.error('%s: OC not found' % instance)
 
441
                                            oc = 'XX'
 
442
                                        db.close()
 
443
 
 
444
                                        dump_file = os.path.join(DUMP_DIR, '%s-%s-C-%s.dump' % (x[0], restore_date.strftime('%Y%m%d-%H%M%S'), version))
 
445
                                        self.log('Dump %s' % dump_file)
 
446
                                        pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-p', psql_port, '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000',  '-f', to_win(dump_file), x[0]]
 
447
                                        subprocess.check_output(pg_dump, stderr=subprocess.STDOUT)
 
448
 
 
449
                                        final_zip = os.path.join(DUMP_DIR, '%s-%s.zip' % (x[0], day_abr[datetime.datetime.now().weekday()]))
 
450
                                        if os.path.exists(final_zip):
 
451
                                            os.remove(final_zip)
 
452
 
 
453
                                        zip_c = ['zip', '-j', '-q', final_zip, dump_file]
 
454
                                        self.log(' '.join(zip_c))
 
455
                                        subprocess.call(zip_c)
 
456
                                        os.remove(dump_file)
 
457
                                        self.upload_od(final_zip, oc)
 
458
                                        with open(last_dump_file, 'w') as last_desc:
 
459
                                            last_desc.write(restore_date.strftime('%Y%m%d-%H%M%S'))
 
460
                                        if os.path.exists(wal_not_dumped):
 
461
                                            os.remove(wal_not_dumped)
 
462
                                except subprocess.CalledProcessError as e:
 
463
                                    self.error(e.output or e.stderr)
 
464
                                except Exception:
 
465
                                    self.logger.exception('ERROR')
 
466
                                finally:
 
467
                                    if dump_file and os.path.exists(dump_file):
 
468
                                        os.remove(dump_file)
 
469
                                    if final_zip and os.path.exists(final_zip):
 
470
                                        os.remove(final_zip)
 
471
 
 
472
                        finally:
 
473
                            psql_stop = [os.path.join(PSQL_DIR, 'pg_ctl.exe'), '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'stop']
 
474
                            self.log(' '.join(psql_stop))
 
475
                            subprocess.run(psql_stop)
 
476
 
 
477
                thread_touch = '%s-%s' % (TOUCH_FILE_DUMP, self.thread)
 
478
                with open(thread_touch, 'w') as t_file:
 
479
                    t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
 
480
 
 
481
            except subprocess.CalledProcessError as e:
 
482
                self.error(e.output or e.stderr)
 
483
            except Exception:
 
484
                self.logger.exception('ERROR')
471
485
 
472
486
if __name__ == '__main__':
473
 
    log('Check directories')
474
 
    nb_threads = 2
 
487
    nb_threads = 3
475
488
    threads = []
476
489
    q = Queue()
477
490
    q.add('INIT')
478
491
    for x in range(0, nb_threads):
479
 
        t = threading.Thread(target=process_directory, args=(x, q))
 
492
        t = threading.Thread(target=Process(x, q).process_directory)
480
493
        threads.append(t)
481
 
        t.run()
 
494
        t.start()
482
495
 
483
496
    for t in threads:
484
497
        t.join()
485
 
        log('Thread ends')
486
498
 
487
499
    stopped(True)
488
 
    log('Process ends')
489
500
    sys.exit(0)
490
501