~nickwinston123/armagetronad/arma_chatbot_config

« back to all changes in this revision

Viewing changes to manage_chatbot.py

  • Committer: hackermans
  • Date: 2025-05-28 18:34:25 UTC
  • Revision ID: nickwinston123@gmail.com-20250528183425-z5cssgt5eeqyqox3

consolidated arma chatbot config programs

- arma_terminal: live console viewer with input passthrough
- game_manager: manages bans, IP rotation, and keeping the game open
- game_updater: syncs updated game files from shared folder
- ollama_chat: ai chatbot 

- launcher: launches all not already opened programs with positioning and window checks

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import os
2
 
import sys
3
 
import time
4
 
import random
5
 
import subprocess
6
 
import psutil
7
 
import requests
8
 
import ctypes
9
 
import logging
10
 
import configparser
11
 
import re
12
 
 
13
 
config = configparser.ConfigParser()
14
 
try:
15
 
    config.read('manage_chatbot_py_config.ini')
16
 
except Exception as e:
17
 
    logging.error(f"Error reading configuration: {e}")
18
 
    sys.exit(1)
19
 
 
20
 
try:
21
 
    OPENVPN_PATH  = config.get('Paths', 'openvpn_path')
22
 
    VPN_LOG       = config.get('Paths', 'vpn_log')
23
 
    OVPN_DIR      = config.get('Paths', 'ovpn_dir')
24
 
    BANNED_FILE   = config.get('Paths', 'banned_file')
25
 
    COMMANDS_FILE = config.get('Paths', 'commands_file')
26
 
    BANNED_LOG    = config.get('Paths', 'banned_log')
27
 
    OUTPUT_LOG     = config.get('Paths', 'output_log')
28
 
    REAL_IP       = config.get('Settings', 'real_ip')
29
 
except Exception as e:
30
 
    logging.error(f"Error retrieving configuration values: {e}")
31
 
    sys.exit(1)
32
 
 
33
 
 
34
 
log_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S")
35
 
 
36
 
file_handler = logging.FileHandler(OUTPUT_LOG)
37
 
file_handler.setFormatter(log_formatter)
38
 
 
39
 
import io
40
 
 
41
 
class StreamToUTF8(io.TextIOWrapper):
42
 
    def write(self, b):
43
 
        try:
44
 
            super().write(b)
45
 
        except UnicodeEncodeError:
46
 
            super().write(b.encode('utf-8', errors='replace').decode('utf-8'))
47
 
 
48
 
console_handler = logging.StreamHandler(StreamToUTF8(sys.stdout.buffer, encoding='utf-8', errors='replace'))
49
 
console_handler.setFormatter(log_formatter)
50
 
 
51
 
 
52
 
logger = logging.getLogger()
53
 
logger.setLevel(logging.INFO)
54
 
logger.addHandler(file_handler)
55
 
logger.addHandler(console_handler)
56
 
 
57
 
 
58
 
# global variables
59
 
alt_mode                 = False
60
 
force_rebuild_active     = False
61
 
rebuild_start_time       = 0
62
 
last_rebuild_ban_trigger = -1
63
 
script_start_time        = time.time()
64
 
last_ban_time            = None
65
 
reset_after_ban_applied  = False
66
 
current_mode             = "Default"
67
 
last_applied_mode        = None
68
 
command_write_count      = 0
69
 
initial_launch           = True  
70
 
ban_count                = 0  
71
 
active_ban_count         = 0  
72
 
 
73
 
def print_log(message):
74
 
    print(message)
75
 
    logging.info(message)
76
 
 
77
 
 
78
 
def log_stats():
79
 
    try:
80
 
        elapsed = int(time.time() - script_start_time)
81
 
        hrs, rem = divmod(elapsed, 3600)
82
 
        mins, secs = divmod(rem, 60)
83
 
        uptime_str = f"{hrs}h {mins}m {secs}s"
84
 
        logging.info("-------- STATS --------")
85
 
        logging.info(f"Total Bans: {ban_count}")
86
 
        logging.info(f"Active Bans: {active_ban_count}")
87
 
        logging.info(f"Script Uptime: {uptime_str}")
88
 
        if last_ban_time:
89
 
            since_last = int(time.time() - last_ban_time)
90
 
            m, s = divmod(since_last, 60)
91
 
            last_ban_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_ban_time))
92
 
            logging.info(f"Last Ban Time: {last_ban_str}")
93
 
            logging.info(f"Time Since Last Ban: {m}m {s}s")
94
 
        logging.info(f"Current Mode: {current_mode}")
95
 
        logging.info(f"Force Rebuild Active: {force_rebuild_active}")
96
 
        if force_rebuild_active:
97
 
            remaining = int(300 - (time.time() - rebuild_start_time))
98
 
            m, s = divmod(remaining, 60)
99
 
            logging.info(f"Rebuild Cooldown: {m}m {s}s remaining")
100
 
        logging.info(f"Alt Mode: {'ON' if alt_mode else 'OFF'}")
101
 
        logging.info("-----------------------")
102
 
    except Exception as e:
103
 
        logging.error(f"Error logging stats: {e}")
104
 
 
105
 
def run_as_admin():
106
 
    try:
107
 
        admin = os.getuid() == 0
108
 
    except AttributeError:
109
 
        try:
110
 
            admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
111
 
        except Exception as e:
112
 
            logging.error(f"Error checking admin privileges: {e}")
113
 
            admin = False
114
 
    if not admin:
115
 
        logging.warning("Elevating privileges...")
116
 
        try:
117
 
            params = " ".join(sys.argv)
118
 
            ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, params, None, 1)
119
 
        except Exception as e:
120
 
            logging.error(f"Error during privilege elevation: {e}")
121
 
        sys.exit()
122
 
 
123
 
def kill_existing_vpn():
124
 
    try:
125
 
        logging.info("Killing any existing OpenVPN processes...")
126
 
        for proc in psutil.process_iter(['name']):
127
 
            try:
128
 
                if proc.info['name'] and "openvpn.exe" in proc.info['name'].lower():
129
 
                    logging.info(f"Killing process {proc.pid} ({proc.info['name']})")
130
 
                    proc.kill()
131
 
            except Exception as e:
132
 
                logging.error(f"Error killing process {proc.pid}: {e}")
133
 
    except Exception as e:
134
 
        logging.error(f"Error iterating processes for VPN kill: {e}")
135
 
 
136
 
def parse_ban_duration(reason):
137
 
    try:
138
 
        m = re.search(r'at least (\d+)', reason)
139
 
        if m:
140
 
            return int(m.group(1)) * 60
141
 
    except Exception as e:
142
 
        logging.error(f"Error parsing ban duration from reason '{reason}': {e}")
143
 
    return 3600
144
 
 
145
 
def add_to_banned_log(ovpn_file, banned_ip, ban_reason):
146
 
    try:
147
 
        if "kicked" in ban_reason.lower():
148
 
            logging.info("Kick detected; entry not logged: " + ban_reason)
149
 
            return
150
 
 
151
 
        ban_duration = parse_ban_duration(ban_reason)
152
 
        ts = time.time()
153
 
        entry = f"{banned_ip} | {ovpn_file} | {ts} | {ban_duration} | {ban_reason}"
154
 
        
155
 
        file_exists = os.path.exists(BANNED_LOG)
156
 
        file_empty = not file_exists or os.path.getsize(BANNED_LOG) == 0
157
 
 
158
 
        with open(BANNED_LOG, 'a') as f:
159
 
            if file_empty:
160
 
                header = "Banned IP | OVPN File | Timestamp | Duration (sec) | Ban Reason"
161
 
                f.write(header + "\n")
162
 
            f.write(entry + "\n")
163
 
        
164
 
        logging.info(f"Added banned log entry: {entry}")
165
 
    except Exception as e:
166
 
        logging.error(f"Error adding to banned log: {e}")
167
 
 
168
 
def perform_ban_actions():
169
 
    global current_mode, last_applied_mode, force_rebuild_active, rebuild_start_time, last_rebuild_ban_trigger, alt_mode
170
 
    try:
171
 
        if active_ban_count == 3 and last_applied_mode != "RANDOM_SMART":
172
 
            update_commands(COMMANDS_FILE, "PLAYER_NAME_RANDOM_SMART 1")
173
 
            current_mode = last_applied_mode = "RANDOM_SMART"
174
 
        elif active_ban_count == 5 and last_applied_mode != "RANDOM_STEAL":
175
 
            update_commands(COMMANDS_FILE, "PLAYER_NAME_RANDOM_SMART 0")
176
 
            update_commands(COMMANDS_FILE, "PLAYER_NAME_RANDOM_STEAL 1")
177
 
            current_mode = last_applied_mode = "RANDOM_STEAL"
178
 
        elif active_ban_count == 7 and last_applied_mode != "PLAYERIDS":
179
 
            update_commands(COMMANDS_FILE, "PLAYER_NAME_RANDOM_SMART 0")
180
 
            update_commands(COMMANDS_FILE, "PLAYER_NAME_RANDOM_STEAL 0")
181
 
            update_commands(COMMANDS_FILE, "PLAYER_NAME_PLAYERIDS 1")
182
 
            current_mode = last_applied_mode = "PLAYERIDS"
183
 
        elif active_ban_count >= 8:
184
 
            if active_ban_count % 8 == 0 and active_ban_count != last_rebuild_ban_trigger:
185
 
                update_commands(COMMANDS_FILE, "FORCE_PLAYER_ZREBUILD 1")
186
 
                rebuild_start_time = time.time()
187
 
                force_rebuild_active = True
188
 
                last_rebuild_ban_trigger = active_ban_count
189
 
                logging.info("FORCE_PLAYER_ZREBUILD activated.")
190
 
            if alt_mode:
191
 
                update_commands(COMMANDS_FILE, "PLAYER_NAME_PLAYERIDS 1")
192
 
                update_commands(COMMANDS_FILE, "PLAYER_NAME_RANDOM_STEAL 0")
193
 
                current_mode = "RANDOM_STEAL"
194
 
            else:
195
 
                update_commands(COMMANDS_FILE, "PLAYER_NAME_PLAYERIDS 0")
196
 
                update_commands(COMMANDS_FILE, "PLAYER_NAME_RANDOM_STEAL 1")
197
 
                current_mode = "RANDOM_STEAL"
198
 
            alt_mode = not alt_mode
199
 
    except Exception as e:
200
 
        logging.error(f"Error performing ban actions: {e}")
201
 
 
202
 
def load_banned_log():
203
 
    banned = {}
204
 
    try:
205
 
        if os.path.exists(BANNED_LOG):
206
 
            with open(BANNED_LOG, 'r') as f:
207
 
                header_skipped = False
208
 
                for line in f:
209
 
                    if not header_skipped:
210
 
                        if line.strip().startswith("Banned IP"):
211
 
                            header_skipped = True
212
 
                            continue
213
 
                        header_skipped = True  
214
 
                    parts = line.strip().split(" | ")
215
 
                    if len(parts) == 5:
216
 
                        ip, ovpn_file, ts, duration, reason = parts
217
 
                        banned[ip] = (ovpn_file, float(ts), int(duration), reason)
218
 
    except Exception as e:
219
 
        logging.error(f"Error loading banned log: {e}")
220
 
    return banned
221
 
 
222
 
def is_ip_banned(ip):
223
 
    try:
224
 
        banned = load_banned_log()
225
 
        if ip in banned:
226
 
            ovpn_file, ts, duration, reason = banned[ip]
227
 
            if time.time() < ts + duration:
228
 
                logging.warning(f"IP {ip} is banned until {time.strftime('%H:%M:%S', time.localtime(ts+duration))}.")
229
 
                return True
230
 
    except Exception as e:
231
 
        logging.error(f"Error checking if IP {ip} is banned: {e}")
232
 
    return False
233
 
 
234
 
def update_commands(commands_file, command_str):
235
 
    global command_write_count
236
 
    try:
237
 
        with open(commands_file, 'a') as f:
238
 
            f.write(command_str + "\n")
239
 
        command_write_count += 1
240
 
        logging.info(f"-> {command_str} (Total Written: {command_write_count})")
241
 
    except Exception as e:
242
 
        logging.error(f"Error updating commands file: {e}")
243
 
 
244
 
def start_game():
245
 
    logging.info("Launching Armagetronad...")
246
 
    subprocess.Popen([r"C:\Users\itsne\Desktop\dist\armagetronad.exe"])
247
 
 
248
 
def connect_vpn_filtered(vpn_path, ovpn_dir, log_path):
249
 
    try:
250
 
        kill_existing_vpn()
251
 
        try:
252
 
            open(log_path, 'w').close()
253
 
        except Exception as e:
254
 
            logging.error(f"Error clearing VPN log: {e}")
255
 
        
256
 
        all_ovpn_files = []
257
 
        try:
258
 
            all_ovpn_files = [os.path.join(ovpn_dir, f) for f in os.listdir(ovpn_dir)
259
 
                              if f.lower().endswith('.ovpn')]
260
 
        except Exception as e:
261
 
            logging.error(f"Error listing ovpn files: {e}")
262
 
        
263
 
        total_count = len(all_ovpn_files)
264
 
        if not all_ovpn_files:
265
 
            logging.error("No .ovpn files found!")
266
 
            return None, None
267
 
 
268
 
        banned = load_banned_log()
269
 
        banned_ovpn_files = set(entry[0] for entry in banned.values() if time.time() < entry[1] + entry[2])
270
 
        banned_count = sum(1 for f in all_ovpn_files if f in banned_ovpn_files)
271
 
        
272
 
        available_ovpn_files = [f for f in all_ovpn_files if f not in banned_ovpn_files]
273
 
        available_count = len(available_ovpn_files)
274
 
 
275
 
        logging.info(f"OVPN Files - Total: {total_count}, Available: {available_count}, Banned: {banned_count}")
276
 
 
277
 
        if not available_ovpn_files:
278
 
            logging.warning("All ovpn files are banned; using one anyway.")
279
 
            available_ovpn_files = all_ovpn_files
280
 
 
281
 
        rand_file = random.choice(available_ovpn_files)
282
 
        logging.info(f"Using VPN configuration file: {rand_file}")
283
 
        
284
 
        proc = subprocess.Popen(
285
 
            [vpn_path, '--config', rand_file, '--dev', 'tun', '--ifconfig', '10.8.0.2', '10.8.0.1'],
286
 
            stdout=open(log_path, 'w'), stderr=subprocess.STDOUT
287
 
        )
288
 
        return proc, rand_file
289
 
    except Exception as e:
290
 
        logging.error(f"Error in connect_vpn_filtered: {e}")
291
 
        return None, None
292
 
 
293
 
def wait_for_vpn_initialization(log_path, timeout=30, interval=1):
294
 
    start_time = time.time()
295
 
    while time.time() - start_time < timeout:
296
 
        try:
297
 
            with open(log_path, 'r') as f:
298
 
                content = f.read()
299
 
                if "Initialization Sequence Completed" in content:
300
 
                    logging.info("VPN initialization complete.")
301
 
                    return True
302
 
                if "AUTH: Received control message: AUTH_FAILED" in content:
303
 
                    logging.error("VPN authentication failed.")
304
 
                    return False
305
 
        except Exception as e:
306
 
            logging.error(f"Error reading VPN log: {e}")
307
 
        time.sleep(interval)
308
 
    logging.warning("Timeout reached waiting for VPN initialization.")
309
 
    return False
310
 
 
311
 
def get_current_ip(real_ip, retry_interval=1, max_retries=30):
312
 
    retries = 0
313
 
    while retries < max_retries:
314
 
        try:
315
 
            resp = requests.get('https://httpbin.org/ip', timeout=5)
316
 
            current_ip = resp.json()['origin'].strip()
317
 
            logging.info(f"Fetched IP: {current_ip}")
318
 
            if current_ip != real_ip:
319
 
                return current_ip
320
 
            logging.warning("Current IP still matches REAL_IP. Waiting for new IP...")
321
 
        except Exception as e:
322
 
            logging.error(f"Error fetching IP (retrying): {e}")
323
 
        time.sleep(retry_interval)
324
 
        retries += 1
325
 
    return None
326
 
 
327
 
def connect_until_new_ip(real_ip, banned_ip=None):
328
 
    while True:
329
 
        try:
330
 
            proc, used_ovpn = connect_vpn_filtered(OPENVPN_PATH, OVPN_DIR, VPN_LOG)
331
 
            if not proc:
332
 
                logging.error("VPN process not started; retrying...")
333
 
                time.sleep(3)
334
 
                continue
335
 
            if not wait_for_vpn_initialization(VPN_LOG):
336
 
                if proc and proc.poll() is None:
337
 
                    proc.kill()
338
 
                    time.sleep(3)
339
 
                continue
340
 
            candidate_ip = get_current_ip(real_ip)
341
 
            if candidate_ip is None:
342
 
                continue
343
 
            if banned_ip and candidate_ip == banned_ip:
344
 
                logging.warning("Candidate IP matches banned IP. Killing VPN and retrying...")
345
 
                kill_existing_vpn()
346
 
                time.sleep(3)
347
 
                continue
348
 
            if is_ip_banned(candidate_ip):
349
 
                logging.warning(f"Candidate IP {candidate_ip} is in banned log. Retrying...")
350
 
                kill_existing_vpn()
351
 
                time.sleep(3)
352
 
                continue
353
 
            logging.info(f"VPN IP acquired: {candidate_ip}")
354
 
            return candidate_ip, used_ovpn
355
 
        except Exception as e:
356
 
            logging.error(f"Error in connect_until_new_ip: {e}")
357
 
            time.sleep(3)
358
 
 
359
 
def check_banned(banned_file):
360
 
    try:
361
 
        if os.path.exists(banned_file):
362
 
            with open(banned_file, 'r') as f:
363
 
                content = f.read().strip()
364
 
            if content:
365
 
                logging.warning("Ban detected!")
366
 
                logging.info(f"Ban Reason: {content}")
367
 
                return True
368
 
    except Exception as e:
369
 
        logging.error(f"Error reading ban file: {e}")
370
 
    return False
371
 
 
372
 
def main():
373
 
    global alt_mode, force_rebuild_active, rebuild_start_time, last_rebuild_ban_trigger
374
 
    global last_ban_time, reset_after_ban_applied, current_mode, last_applied_mode, initial_launch
375
 
    global ban_count, active_ban_count
376
 
 
377
 
    try:
378
 
        run_as_admin()
379
 
    except Exception as e:
380
 
        logging.error(f"Error in run_as_admin: {e}")
381
 
        sys.exit(1)
382
 
    
383
 
    for cmd in [
384
 
        "PLAYER_NAME_RANDOM_SMART 0",
385
 
        "PLAYER_NAME_RANDOM_STEAL 0",
386
 
        "PLAYER_NAME_PLAYERIDS 0",
387
 
        "FORCE_PLAYER_ZREBUILD 0"
388
 
    ]:
389
 
        update_commands(COMMANDS_FILE, cmd)
390
 
 
391
 
    counter = 0
392
 
 
393
 
    while True:
394
 
        try:
395
 
            if any("armagetronad.exe" in p.name() for p in psutil.process_iter()):
396
 
                logging.info("Armagetronad is running.")
397
 
            else:
398
 
                logging.info("Armagetronad is not running. Checking VPN...")
399
 
                try:
400
 
                    resp = requests.get('https://httpbin.org/ip', timeout=5)
401
 
                    current_ip = resp.json()['origin'].strip()
402
 
                except Exception as e:
403
 
                    logging.error(f"IP fetch error: {e}")
404
 
                    current_ip = REAL_IP
405
 
 
406
 
                new_ip, used_ovpn = connect_until_new_ip(REAL_IP)
407
 
 
408
 
                if not initial_launch and check_banned(BANNED_FILE):
409
 
                    ban_count += 1
410
 
                    active_ban_count += 1
411
 
                    last_ban_time = time.time()
412
 
                    reset_after_ban_applied = False
413
 
                    try:
414
 
                        with open(BANNED_FILE, 'r') as f:
415
 
                            ban_reason = f.read().strip()
416
 
                    except Exception as e:
417
 
                        logging.error(f"Error reading banned file: {e}")
418
 
                        ban_reason = "Unknown"
419
 
                    logging.warning(f"[Ban {ban_count}] Triggered at {time.strftime('%H:%M:%S')}")
420
 
                    log_stats()
421
 
                    add_to_banned_log(used_ovpn, new_ip, ban_reason)
422
 
                    
423
 
                    perform_ban_actions()                
424
 
 
425
 
                    kill_existing_vpn()
426
 
                    time.sleep(3)
427
 
                    new_ip, used_ovpn = connect_until_new_ip(REAL_IP, banned_ip=new_ip)
428
 
                
429
 
                if force_rebuild_active:
430
 
                    update_commands(COMMANDS_FILE, "FORCE_PLAYER_ZREBUILD 1")
431
 
                    
432
 
                start_game()
433
 
                
434
 
                if initial_launch:
435
 
                    initial_launch = False
436
 
 
437
 
            if last_ban_time and (time.time() - last_ban_time > 1800) and not reset_after_ban_applied:
438
 
                logging.info("30 minutes since last ban. Resetting command modes to default and active ban count.")
439
 
                for cmd in [
440
 
                    "PLAYER_NAME_RANDOM_SMART 0",
441
 
                    "PLAYER_NAME_RANDOM_STEAL 0",
442
 
                    "PLAYER_NAME_PLAYERIDS 0",
443
 
                    "FORCE_PLAYER_ZREBUILD 0"
444
 
                ]:
445
 
                    update_commands(COMMANDS_FILE, cmd)
446
 
                reset_after_ban_applied = True
447
 
                current_mode = last_applied_mode = "Default"
448
 
                active_ban_count = 0  
449
 
 
450
 
            counter += 1
451
 
            if counter >= 10:
452
 
                log_stats()
453
 
                counter = 0
454
 
 
455
 
            if force_rebuild_active and time.time() - rebuild_start_time > 300:
456
 
                update_commands(COMMANDS_FILE, "FORCE_PLAYER_ZREBUILD 0")
457
 
                logging.info("5-minute punishment ended.")
458
 
                force_rebuild_active = False
459
 
 
460
 
            time.sleep(5)
461
 
        except Exception as e:
462
 
            logging.error(f"Error in main loop: {e}")
463
 
            time.sleep(5)
464
 
 
465
 
if __name__ == '__main__':
466
 
    try:
467
 
        main()
468
 
    except Exception as e:
469
 
        logging.critical(f"Fatal error in main: {e}")
470
 
        sys.exit(1)