diff options
author | pommicket <pommicket@gmail.com> | 2025-08-30 22:58:44 -0400 |
---|---|---|
committer | pommicket <pommicket@gmail.com> | 2025-08-30 22:58:44 -0400 |
commit | 555af5932e99b7b5369e8c6c9f80bd4ea5b48760 (patch) | |
tree | 3b4f32427f57e33b2df58959c02239d56cfb12fd /simpleddns.py | |
parent | d60c2eeed60e264ad30361ac99048f99d4975cfb (diff) |
Start linode API stuff
Diffstat (limited to 'simpleddns.py')
-rwxr-xr-x | simpleddns.py | 256 |
1 files changed, 191 insertions, 65 deletions
diff --git a/simpleddns.py b/simpleddns.py index 45d9e89..45e8308 100755 --- a/simpleddns.py +++ b/simpleddns.py @@ -6,10 +6,12 @@ import sys import subprocess import argparse import ast +import dataclasses +from time import sleep from abc import ABC, abstractmethod -from typing import NoReturn, Optional +from typing import NoReturn, Optional, Any from pathlib import Path -# import requests +import requests def parse_args() -> argparse.Namespace: '''Parse command-line arguments''' @@ -20,25 +22,16 @@ def parse_args() -> argparse.Namespace: parser.add_argument('--setup', help='Set up configuration', action='store_true') return parser.parse_args() -args = parse_args() - -simpleddns_config_dir = os.getenv('SIMPLEDDNS_CONFIG_DIR') -xdg_config_dir = os.getenv('XDG_CONFIG_DIR') -if simpleddns_config_dir: - config_dir = Path(simpleddns_config_dir) -elif xdg_config_dir: - config_dir = Path(xdg_config_dir, 'simpleddns') -else: - config_dir = Path('~/.config/simpleddns').expanduser() -os.makedirs(config_dir, exist_ok = True) -config_path = Path(config_dir, 'config') - def fatal_error(message: str) -> NoReturn: '''Output error message & exit''' sys.stderr.write(f'Fatal error: {message}\n') sys.exit(1) -def setup_config() -> None: +def warn(message: str) -> None: + '''Print warning message''' + sys.stderr.write(f'WARNING: {message}\n') + +def setup_config(config_path: Path) -> None: '''Interactively set up configuration file''' if config_path.exists(): should_delete = input('Configuration already exists. Delete it [y/n]? ') @@ -60,73 +53,170 @@ def setup_config() -> None: domain_type = input('Select a domain type from the list above [1-1]: ').strip() if domain_type == '1': access_token = input('Enter personal access token: ') - options = f'access_token = {repr(access_token)}' + options = f'''# Personal access token (secret!!) + access_token = {repr(access_token)}''' type_name = 'linode' else: print('Invalid choice') return fd = os.open(config_path, os.O_CREAT | os.O_WRONLY | os.O_EXCL, 0o600) with os.fdopen(fd, 'w') as config: - config.write(f'''Domain {type_name} {domain_name} - getip = {repr(get_ip)} + config.write(f'''Settings + # Interval in seconds between checking IP address for changes + # (API calls are only made when it changes) + interval = 15 + # Timeout to wait for API responses + timeout = 20 +Domain {type_name} {domain_name} + # Command(s) for getting IP address(es) + getip = [{repr(get_ip)}] {options} ''') -if args.setup: - setup_config() - sys.exit(0) + print(f'Configuration created successfully at {config_path}') -if not config_path.exists(): - fatal_error("Configuration doesn't exist. Try running with --setup first.") -if os.name == 'posix': - config_mode = config_path.stat().st_mode - if config_mode & 0o4: - fatal_error(f'''DANGER: configuration file {config_path} allows -reading by other users (mode {config_mode & 0o777:o}). -If there are API tokens in there, revoke them immediately!!''') +@dataclasses.dataclass +class Settings: + '''Global (i.e. not per-domain) settings''' + interval: int = 15 + timeout: int = 20 class Domain(ABC): '''A domain whose DNS will be updated''' - domain: str - getip: str - def __init__(self, domain: str): - self.domain = domain + full_domain: str + root_domain: str + subdomain: str + getip: list[str] + settings: Settings + last_ips: list[str] + def __init__(self, settings: Settings, domain: str): + self.settings = settings + self.full_domain = domain + last_dot = domain.rfind('.') + if last_dot == -1: + raise ValueError(f'Domain {domain} has no . in it') + second_last_dot = domain.rfind('.', 0, last_dot) + if second_last_dot == -1: + self.subdomain = '' + self.root_domain = domain + else: + self.subdomain = domain[:second_last_dot] + self.root_domain = domain[second_last_dot+1:] + self.last_ips = [] + self._init() + @abstractmethod + def _init(self) -> None: + '''Extra provider-specific initialization''' @abstractmethod - def update(self, ip: str) -> None: - '''Make necessary API request to update domain to use new IP address''' + def update(self, ips: list[str]) -> bool: + '''Make necessary API request to update domain to use new IP addresses. + Returns False on failure.''' @abstractmethod def validate_specifics(self) -> str: - '''Validate details specific to subclass''' + '''Validate provider-specific settings for domain''' def validate(self) -> str: - '''Validate this object (ensure all required fields are set)''' + '''Validate this object (ensure all required settings are set)''' if not getattr(self, 'getip', ''): return 'getip not set' + if not isinstance(self.getip, list): + return 'getip should be a list.' return self.validate_specifics() - def get_ip(self) -> Optional[str]: - '''Get IP address using the registered command.''' - result = subprocess.run(self.getip, shell=True, stdout=subprocess.PIPE, check=False) - if result.returncode: - print(f'WARNING: {self.getip} failed (exit code {result.returncode})') - return None - return result.stdout.decode(errors='replace').strip() + def get_ips(self) -> list[str]: + '''Get IP addresses using the registered commands.''' + ips = [] + for cmd in self.getip: + result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, check=False) + if result.returncode: + warn(f'{self.getip} failed (exit code {result.returncode})') + return [] + ip = result.stdout.decode(errors='replace').strip() + # this test isn't perfect, but it should catch most cases of weird stuff + if not ip or '..' in ip or any(c not in '0123456789abcdefABCDEF:.' for c in ip): + warn(f'IP address {repr(ip)} is invalid') + return [] + ips.append(ip) + ips.sort() + return ips + def check_for_update(self) -> None: + '''Update DNS records if IP has changed''' + ips = self.get_ips() + if ips == self.last_ips: + return + if self.update(ips): + self.last_ips = ips class LinodeDomain(Domain): '''Domain registered with Linode Domains''' access_token: str + _error: bool + # Domain ID for Linode API + _id: Optional[int] + def _init(self) -> None: + self._id = None def __repr__(self) -> str: - return f'<LinodeDomain domain={self.domain} ' \ + return f'<LinodeDomain domain={self.full_domain} ' \ 'getip={repr(self.getip)} ' \ 'access_token={repr(self.access_token)}>' - def update(self, ip: str) -> None: - print(f'update IP: {ip}') + def _headers(self, method: str = 'GET') -> dict[str, str]: + '''Get HTTP headers for making requests''' + headers = {'Accept': 'application/json', 'Authorization': f'Bearer {self.access_token}'} + if method in ['POST', 'PUT']: + headers['Content-Type'] = 'application/json' + return headers + def _paginated_get(self, url: str) -> list[Any]: + '''Get results for a Linode paginated GET API endpoint.''' + headers = self._headers() + results = [] + page_size = 500 + for page in range(1,100): + page_url = f'{url}{"&" if "?" in url else "?"}page={page}&page_size={page_size}' + try: + response = requests.get(page_url, headers=headers, timeout=self.settings.timeout) + except requests.RequestException as e: + warn(f'Error making request to {url}: {e}') + sleep(0.33) # should prevent us from hitting Linode's rate limit + try: + response_json = response.json() + response_data = response_json['data'] + if not isinstance(response_data, list): + raise ValueError('"data" member is not a list') + except (requests.JSONDecodeError, KeyError, ValueError) as e: + warn(f'Invalid JSON at endpoint {repr(page_url)}: {e}') + break + results.extend(response_data) + if len(response_data) < page_size: + # Reached last page, presumably + break + if page == 99: + warn(f'Giving up after 99 pages of responses to API endpoint {repr(url)}') + return results + def update(self, ips: list[str]) -> bool: + self._error = False + if self._id is None: + domains = self._paginated_get('https://api.linode.com/v4/domains') + if self._error: + return False + for domain in domains: + domain_name = domain['domain'] + if self.root_domain == domain_name: + # this is it! + self._id = domain['id'] + if self._id is None: + warn(f'Domain {self.root_domain} not found in Linode. Are you sure it is set up there?') + return False + records = self._paginated_get(f'https://api.linode.com/v4/domains/{self._id}/records') + print(records) + print(self.subdomain) + return not self._error def validate_specifics(self) -> str: if not getattr(self, 'access_token', ''): return 'Access token not set' # OK return '' -def parse_config() -> list[Domain]: +def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]: '''Parse configuration file''' - curr_domain = None + curr_section: Settings | Domain | None = None + settings = Settings() domains: list[Domain] = [] with open(config_path, encoding='utf-8') as config: for line in config: @@ -134,9 +224,13 @@ def parse_config() -> list[Domain]: if line[0] == '#': # comment continue - if line.startswith('Domain '): - if curr_domain: - domains.append(curr_domain) + if line.startswith('Settings'): + if isinstance(curr_section, Domain): + domains.append(curr_section) + curr_section = settings + elif line.startswith('Domain '): + if isinstance(curr_section, Domain): + domains.append(curr_section) parts = line.split() if len(parts) != 3: fatal_error('Domain declaration should have exactly ' \ @@ -144,26 +238,58 @@ def parse_config() -> list[Domain]: kind = parts[1] domain_name = parts[2] if kind == 'linode': - curr_domain = LinodeDomain(domain_name) + curr_section = LinodeDomain(settings, domain_name) else: fatal_error(f'No such domain type: {kind}') else: - if not curr_domain: - fatal_error('First non-empty line of configuration must be a Domain declaration') + if not curr_section: + fatal_error('First non-empty line of configuration must be a Domain declaration or Settings') parts = [part.strip() for part in line.split('=', maxsplit=1)] if len(parts) != 2: fatal_error(f'Invalid syntax (want key = value): {line}') [key, value] = parts - setattr(curr_domain, key, ast.literal_eval(value)) - if curr_domain: - domains.append(curr_domain) + setattr(curr_section, key, ast.literal_eval(value)) + if isinstance(curr_section, Domain): + domains.append(curr_section) for domain in domains: err = domain.validate() if err: - fatal_error(f'In domain {domain.domain}: {err}') - return domains + fatal_error(f'In domain {domain.full_domain}: {err}') + return (settings, domains) + +def main() -> None: + '''Run simpleddns''' + args = parse_args() + simpleddns_config_dir = os.getenv('SIMPLEDDNS_CONFIG_DIR') + xdg_config_dir = os.getenv('XDG_CONFIG_DIR') + if simpleddns_config_dir: + config_dir = Path(simpleddns_config_dir) + elif xdg_config_dir: + config_dir = Path(xdg_config_dir, 'simpleddns') + else: + config_dir = Path('~/.config/simpleddns').expanduser() + os.makedirs(config_dir, exist_ok = True) + config_path = Path(config_dir, 'config') + if args.setup: + setup_config(config_path) + sys.exit(0) + if not config_path.exists(): + fatal_error("Configuration doesn't exist. Try running with --setup first.") + if os.name == 'posix': + config_mode = config_path.stat().st_mode + if config_mode & 0o4: + fatal_error(f'''DANGER: configuration file {config_path} allows + reading by other users (mode {config_mode & 0o777:o}). + If there are API tokens in there, revoke them immediately!!''') + + settings, domains = parse_config(config_path) + if not domains: + fatal_error('No domains defined. Try running with --setup?') + while True: + print('Checking...') + for domain in domains: + domain.check_for_update() + sleep(settings.interval) -configured_domains = parse_config() -if not configured_domains: - fatal_error('No domains defined. Try running with --setup?') -print(configured_domains[0].get_ip()) +if __name__ == '__main__': + main() |