148
150
log('Uncompress: %s ' % ' '.join(command))
149
151
subprocess.check_output(command, stderr=subprocess.STDOUT)
151
def list_instances(src_dir, seen):
154
for instance in sorted(os.listdir(SRC_DIR)):
155
if instance.startswith('.'):
157
newbase = os.path.isfile(os.path.join(SRC_DIR, instance, 'base', 'base.tar.7z'))
158
if newbase and (not seen.get(instance) or seen.get(instance) < datetime.datetime.now() + relativedelta(minutes=-30)):
159
log('Put %s base on top' % (instance, ))
161
elif seen.get(instance):
164
to_analyze.append(instance)
165
return top + to_analyze
167
def process_directory():
155
lock = threading.RLock()
159
def add(self, item, prio=1):
161
if item not in self.list_items:
162
self.list_items[item] = prio
163
heapq.heappush(self.queue, [prio, item])
164
elif self.list_items[item] <= prio:
169
t[0] = 0 # delete item
170
heapq.heappush(self.queue, [prio, item])
177
prio, item = heapq.heappop(self.queue)
182
del self.list_items[item]
189
forced_path = os.path.join(SRC_DIR, 'forced_instance')
190
if os.path.exists(forced_path):
191
with open(forced_path) as forced_path_desc:
192
instance = forced_path_desc.read()
193
self.add(instance, -2)
194
os.remove(forced_path)
196
for instance in sorted(os.listdir(SRC_DIR)):
197
if instance.startswith('.'):
199
newbase = os.path.isfile(os.path.join(SRC_DIR, instance, 'base', 'base.tar.7z'))
201
log('Put %s base on top' % (instance, ))
202
self.add(instance, -1)
206
def stopped(delete=False):
207
stop_service = os.path.join(SRC_DIR, 'stop_service')
208
if os.path.exists(stop_service):
210
os.remove(stop_service)
215
def process_directory(thread, queue):
168
216
if not os.path.isdir(DUMP_DIR):
169
217
os.makedirs(DUMP_DIR)
171
to_analyze = list_instances(SRC_DIR, instances_seen)
174
instance = to_analyze.pop(0)
219
psql_port = 5432 + thread
223
nb, instance = queue.pop()
224
if not instance or instance == 'INIT':
225
log('thread %s, sleep' % (thread,))
227
if instance == 'INIT':
228
with open(TOUCH_FILE_LOOP, 'w') as t_file:
229
t_file.write(time.strftime('%Y-%m-%d%H%M%S'))
175
234
forced_instance = False
176
forced_path = os.path.join(SRC_DIR, 'forced_instance')
177
if os.path.exists(forced_path):
178
with open(forced_path) as forced_path_desc:
179
instance = forced_path_desc.read()
180
236
forced_instance = True
181
os.remove(forced_path)
183
instances_seen[instance] = datetime.datetime.now()
185
238
full_name = os.path.join(SRC_DIR, instance)
187
240
if os.path.isdir(full_name):
188
#log('##### Instance %s'%full_name)
190
stop_service = os.path.join(SRC_DIR, 'stop_service')
191
if os.path.exists(stop_service):
192
os.remove(stop_service)
196
245
basebackup = os.path.join(full_name, 'base', 'base.tar.7z')
197
246
dest_dir = os.path.join(DEST_DIR, instance)
199
248
if not os.path.isdir(dest_dir) and not os.path.isfile(basebackup):
200
249
# new instance wait for base.tar
201
instances_seen[instance] = False
204
to_analyze = list_instances(SRC_DIR, instances_seen)
206
252
for dir_to_create in [os.path.join(dest_dir, 'OLDWAL')]:
207
253
if not os.path.isdir(dir_to_create):
208
254
log('Create %s'%dir_to_create)
240
287
for conf in ['recovery.conf', 'postgresql.conf', 'pg_hba.conf']:
241
288
shutil.copy(os.path.join(PSQL_CONF, conf), dest_basebackup)
243
for del_recreate in [pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
290
for del_recreate in [pg_wal, pg_xlog, os.path.join(dest_basebackup, 'pg_log')]:
244
291
if os.path.isdir(del_recreate):
245
292
shutil.rmtree(del_recreate)
246
293
os.makedirs(del_recreate)
249
# # old base moved, copy previous WAL in the new basebackup
250
# # case of WAL created during this base backup was already moved to the old WAL
251
# shutil.rmtree(pg_xlog)
252
# shutil.copytree(os.path.join(old_base_moved, 'pg_xlog'), pg_xlog)
253
295
basebackup_found = True
255
# is there an in-progress rsync ?
256
rsync_temp = os.path.join(full_name, '.rsync-partial')
257
if os.path.exists(rsync_temp):
258
partial_modification_date = datetime.datetime.fromtimestamp(os.path.getctime(rsync_temp))
259
if partial_modification_date > datetime.datetime.now() + relativedelta(minutes=-10):
260
log('%s, rsync in progess %s' % (full_name, partial_modification_date.strftime('%Y-%m-%d %H:%M')))
263
297
# Move WAL (copy + del to set right owner on target)
264
298
if not os.path.exists(pg_xlog):
265
299
log('Unable to copy WAL, base directory not found %s' % pg_xlog)
301
if not os.path.exists(oldwal):
302
log('Unable to copy WAL, wal directory not found %s' % oldwal)
270
306
forced_dump = forced_instance
274
for_next_loop = False
275
for wal in os.listdir(full_name):
276
full_path_wal = os.path.join(full_name, wal)
277
if wal.endswith('7z') and not wal.startswith('.'):
279
un7zip(full_path_wal, oldwal)
280
os.remove(full_path_wal)
282
except subprocess.CalledProcessError as e:
283
# try to extract all WAL: UC when new bb generated to unlock a dump
284
error(e.output or e.stderr)
286
logger.exception('ERROR')
288
elif wal == 'force_dump':
307
for wal in os.listdir(full_name):
308
full_path_wal = os.path.join(full_name, wal)
309
if wal.endswith('7z') and not wal.startswith('.'):
311
un7zip(full_path_wal, oldwal)
289
312
os.remove(full_path_wal)
291
elif wal.startswith('.') and '.7z.' in wal:
292
# there is an inprogress rsync
294
partial_modification_date = datetime.datetime.fromtimestamp(os.path.getctime(full_path_wal))
295
if partial_modification_date > datetime.datetime.now() + relativedelta(minutes=-45):
296
# rsync inprogress is less than X min, go to the next instance
297
log('%s found %s, next_loop (%s)' % (full_name, wal, partial_modification_date.strftime('%Y-%m-%d %H:%M')))
301
# inprogress is more than X min: too slow, generate backup
302
log('%s found %s, too old, force wal (%s)' % (full_name, wal, partial_modification_date.strftime('%Y-%m-%d %H:%M')))
303
if not wal_forced.get(full_name, {}).get(full_path_wal):
304
wal_forced[full_name] = {full_path_wal: True}
306
except FileNotFoundError:
307
# between listdir and getctime the temp file has been removed, retry listdir
308
log('%s file not found %s, retry' % (full_name, wal))
314
except subprocess.CalledProcessError as e:
315
# try to extract all WAL: UC when new bb generated to unlock a dump
316
error(e.output or e.stderr)
318
logger.exception('ERROR')
320
elif wal == 'force_dump':
321
os.remove(full_path_wal)
316
324
wal_not_dumped = os.path.join(dest_dir, 'wal_not_dumped')
319
327
log('%s, %d wal moved to %s' % (full_name, wal_moved, oldwal))
321
log('%s, wal forced' % (full_name, ))
322
328
elif forced_dump:
323
329
log('%s, dump forced' % (full_name, ))
324
330
elif os.path.exists(wal_not_dumped):
325
331
last_wal_date = datetime.datetime.fromtimestamp(os.path.getmtime(wal_not_dumped))
326
332
if last_wal_date < datetime.datetime.now() - relativedelta(hours=36):
327
333
log('%s, wal_not_dumped forced' % (full_name, ))
330
if forced_wal or forced_dump or wal_moved or basebackup_found:
335
if forced_dump or wal_moved or basebackup_found:
331
336
last_dump_file = os.path.join(dest_dir, 'last_dump.txt')
332
337
last_wal_date = False
333
338
if not forced_dump and not basebackup_found and os.path.exists(last_dump_file):
350
355
if version.startswith('14'):
351
356
PSQL_DIR = config.psql14_dir
352
357
log('%s, pg_version: %s'% (instance, PSQL_DIR))
353
psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'), '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
358
psql_start = [os.path.join(PSQL_DIR, 'pg_ctl.exe'),'-o', '-p %s'%psql_port, '-D', to_win(dest_basebackup), '-t', '1200', '-w', 'start']
354
359
log(' '.join(psql_start))
355
360
subprocess.run(psql_start, check=True)
356
361
#subprocess.check_output(psql_start)
358
db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg')
363
db = psycopg2.connect('dbname=template1 host=127.0.0.1 user=openpg port=%s'%psql_port)
360
365
# wait end of wall processing
361
366
previous_wall = False
425
430
dump_file = os.path.join(DUMP_DIR, '%s-%s-C-%s.dump' % (x[0], restore_date.strftime('%Y%m%d-%H%M%S'), version))
426
431
log('Dump %s' % dump_file)
427
pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000', '-f', to_win(dump_file), x[0]]
432
pg_dump = [os.path.join(PSQL_DIR, 'pg_dump.exe'), '-h', '127.0.0.1', '-p', psql_port, '-U', 'openpg', '-Fc', '--lock-wait-timeout=120000', '-f', to_win(dump_file), x[0]]
428
433
subprocess.check_output(pg_dump, stderr=subprocess.STDOUT)
430
435
final_zip = os.path.join(DUMP_DIR, '%s-%s.zip' % (x[0], day_abr[datetime.datetime.now().weekday()]))