From 92213c9097c9b26a0aeed86897b12ef54fcba5f3 Mon Sep 17 00:00:00 2001 From: pommicket Date: Fri, 29 Aug 2025 17:41:51 -0400 Subject: Setting up and reading the configruation file --- simpleddns.py | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 149 insertions(+), 21 deletions(-) (limited to 'simpleddns.py') diff --git a/simpleddns.py b/simpleddns.py index 19196ce..9434be6 100755 --- a/simpleddns.py +++ b/simpleddns.py @@ -1,10 +1,26 @@ #!/usr/bin/env python3 +'''Simple Dynamic DNS updater''' + import os import sys -import requests -import stat import subprocess +import argparse +import ast +from abc import ABC, abstractmethod +from typing import NoReturn, Optional from pathlib import Path +# import requests + +def parse_args() -> argparse.Namespace: + '''Parse command-line arguments''' + parser = argparse.ArgumentParser(prog='simpleddns', + description='Simple dynamic DNS updater', + epilog='''ENVIRONMENT VARIABLES + SIMPLEDDNS_CONFIG_DIR - directory where configburation files are stored''') + parser.add_argument('--setup', help='Set up configuration', action='store_true') + return parser.parse_args() + +args = parse_args() simpleddns_config_dir = os.getenv('SIMPLEDDNS_CONFIG_DIR') xdg_config_dir = os.getenv('XDG_CONFIG_DIR') @@ -15,27 +31,139 @@ elif xdg_config_dir: else: config_dir = Path('~/.config/simpleddns').expanduser() os.makedirs(config_dir, exist_ok = True) +config_path = Path(config_dir, 'config') -def fatal_error(message): +def fatal_error(message: str) -> NoReturn: + '''Output error message & exit''' sys.stderr.write(f'Fatal error: {message}\n') - exit(1) + sys.exit(1) + +def setup_config() -> None: + '''Interactively set up configuration file''' + if config_path.exists(): + should_delete = input('Configuration already exists. Delete it [y/n]? ') + if not should_delete.strip().lower().startswith('y'): + print('Aborting.') + return + config_path.unlink() + default_get_ip = 'curl --no-progress-meter ifconfig.co' + get_ip = input(f'Command for getting IP address [default: {default_get_ip}]? ').strip() + if not get_ip: + get_ip = default_get_ip + domain_name = input('Domain name? ').strip() + if not domain_name: + fatal_error('Domain name must be set') + if any(c.isspace() for c in domain_name): + fatal_error('Domain name must not contain any whitespace') + print('1. Linode Domains') + # TODO: AWS + domain_type = input('Select a domain type from the list above [1-1]: ').strip() + if domain_type == '1': + access_token = input('Enter personal access token: ') + options = f'access_token = {repr(access_token)}' + type_name = 'linode' + else: + print('Invalid choice') + return + fd = os.open(config_path, os.O_CREAT | os.O_WRONLY | os.O_EXCL, 0o600) + with os.fdopen(fd, 'w') as config: + config.write(f'''Domain {type_name} {domain_name} + getip = {repr(get_ip)} + {options} +''') +if args.setup: + setup_config() + sys.exit(0) -get_ip_path = Path(config_dir, 'getip') +if not config_path.exists(): + fatal_error("Configuration doesn't exist. Try running with --setup first.") +if os.name == 'posix': + config_mode = config_path.stat().st_mode + if config_mode & 0o4: + fatal_error(f'''DANGER: configuration file {config_path} allows +reading by other users (mode {config_mode}). +If there are API tokens in there, revoke them immediately!!''') -if not get_ip_path.exists(): - print(f'Warning: {get_ip_path} does not exist. Putting default of "curl ifconfig.co" there') - with open(get_ip_path, 'w') as f: - f.write('curl --no-progress-meter ifconfig.co') +class Domain(ABC): + '''A domain whose DNS will be updated''' + domain: str + getip: str + def __init__(self, domain: str): + self.domain = domain + @abstractmethod + def update(self, ip: str) -> None: + '''Make necessary API request to update domain to use new IP address''' + @abstractmethod + def validate_specifics(self) -> str: + '''Validate details specific to subclass''' + def validate(self) -> str: + '''Validate this object (ensure all required fields are set)''' + if not getattr(self, 'getip', ''): + return 'getip not set' + return self.validate_specifics() + def get_ip(self) -> Optional[str]: + '''Get IP address using the registered command.''' + result = subprocess.run(self.getip, shell=True, stdout=subprocess.PIPE, check=False) + if result.returncode: + print(f'WARNING: {self.getip} failed (exit code {result.returncode})') + return None + return result.stdout.decode(errors='replace').strip() -def get_ip(): - if os.name == 'posix' and (get_ip_path.stat().st_mode & 0o100): - command = str(get_ip_path) - else: - command = open(get_ip_path).read() - result = subprocess.run(command, shell=True, stdout=subprocess.PIPE) - if result.returncode: - print(f'WARNING: {command} failed (exit code {result.returncode})') - return None - return result.stdout.decode(errors='replace').strip() - -print(get_ip()) +class LinodeDomain(Domain): + '''Domain registered with Linode Domains''' + access_token: str + def __repr__(self) -> str: + return f'' + def update(self, ip: str) -> None: + print(f'update IP: {ip}') + def validate_specifics(self) -> str: + if not getattr(self, 'access_token', ''): + return 'Access token not set' + # OK + return '' + +def parse_config() -> list[Domain]: + '''Parse configuration file''' + curr_domain = None + domains: list[Domain] = [] + with open(config_path, encoding='utf-8') as config: + for line in config: + line = line.strip() + if line[0] == '#': + # comment + continue + if line.startswith('Domain '): + if curr_domain: + domains.append(curr_domain) + parts = line.split() + if len(parts) != 3: + fatal_error('Domain declaration should have exactly ' \ + 'two space-separated arguments (type and name)') + kind = parts[1] + domain_name = parts[2] + if kind == 'linode': + curr_domain = LinodeDomain(domain_name) + else: + fatal_error(f'No such domain type: {kind}') + else: + if not curr_domain: + fatal_error('First non-empty line of configuration must be a Domain declaration') + parts = [part.strip() for part in line.split('=', maxsplit=1)] + if len(parts) != 2: + fatal_error(f'Invalid syntax (want key = value): {line}') + [key, value] = parts + setattr(curr_domain, key, ast.literal_eval(value)) + if curr_domain: + domains.append(curr_domain) + for domain in domains: + err = domain.validate() + if err: + fatal_error(f'In domain {domain.domain}: {err}') + return domains + +configured_domains = parse_config() +if not configured_domains: + fatal_error('No domains defined. Try running with --setup?') +print(configured_domains[0].get_ip()) -- cgit v1.2.3