summaryrefslogtreecommitdiff
path: root/simpleddns.py
diff options
context:
space:
mode:
authorpommicket <pommicket@gmail.com>2025-08-29 17:41:51 -0400
committerpommicket <pommicket@gmail.com>2025-08-29 17:44:05 -0400
commit92213c9097c9b26a0aeed86897b12ef54fcba5f3 (patch)
treef23f078fe7f98b3625a2c6c79f25ff4310c28740 /simpleddns.py
parent1b20f518a2fe7ec139cf2107b9b0c949153a1e32 (diff)
Setting up and reading the configruation file
Diffstat (limited to 'simpleddns.py')
-rwxr-xr-xsimpleddns.py170
1 files changed, 149 insertions, 21 deletions
diff --git a/simpleddns.py b/simpleddns.py
index 19196ce..9434be6 100755
--- a/simpleddns.py
+++ b/simpleddns.py
@@ -1,10 +1,26 @@
#!/usr/bin/env python3
+'''Simple Dynamic DNS updater'''
+
import os
import sys
-import requests
-import stat
import subprocess
+import argparse
+import ast
+from abc import ABC, abstractmethod
+from typing import NoReturn, Optional
from pathlib import Path
+# import requests
+
+def parse_args() -> argparse.Namespace:
+ '''Parse command-line arguments'''
+ parser = argparse.ArgumentParser(prog='simpleddns',
+ description='Simple dynamic DNS updater',
+ epilog='''ENVIRONMENT VARIABLES
+ SIMPLEDDNS_CONFIG_DIR - directory where configburation files are stored''')
+ parser.add_argument('--setup', help='Set up configuration', action='store_true')
+ return parser.parse_args()
+
+args = parse_args()
simpleddns_config_dir = os.getenv('SIMPLEDDNS_CONFIG_DIR')
xdg_config_dir = os.getenv('XDG_CONFIG_DIR')
@@ -15,27 +31,139 @@ elif xdg_config_dir:
else:
config_dir = Path('~/.config/simpleddns').expanduser()
os.makedirs(config_dir, exist_ok = True)
+config_path = Path(config_dir, 'config')
-def fatal_error(message):
+def fatal_error(message: str) -> NoReturn:
+ '''Output error message & exit'''
sys.stderr.write(f'Fatal error: {message}\n')
- exit(1)
+ sys.exit(1)
+
+def setup_config() -> None:
+ '''Interactively set up configuration file'''
+ if config_path.exists():
+ should_delete = input('Configuration already exists. Delete it [y/n]? ')
+ if not should_delete.strip().lower().startswith('y'):
+ print('Aborting.')
+ return
+ config_path.unlink()
+ default_get_ip = 'curl --no-progress-meter ifconfig.co'
+ get_ip = input(f'Command for getting IP address [default: {default_get_ip}]? ').strip()
+ if not get_ip:
+ get_ip = default_get_ip
+ domain_name = input('Domain name? ').strip()
+ if not domain_name:
+ fatal_error('Domain name must be set')
+ if any(c.isspace() for c in domain_name):
+ fatal_error('Domain name must not contain any whitespace')
+ print('1. Linode Domains')
+ # TODO: AWS
+ domain_type = input('Select a domain type from the list above [1-1]: ').strip()
+ if domain_type == '1':
+ access_token = input('Enter personal access token: ')
+ options = f'access_token = {repr(access_token)}'
+ type_name = 'linode'
+ else:
+ print('Invalid choice')
+ return
+ fd = os.open(config_path, os.O_CREAT | os.O_WRONLY | os.O_EXCL, 0o600)
+ with os.fdopen(fd, 'w') as config:
+ config.write(f'''Domain {type_name} {domain_name}
+ getip = {repr(get_ip)}
+ {options}
+''')
+if args.setup:
+ setup_config()
+ sys.exit(0)
-get_ip_path = Path(config_dir, 'getip')
+if not config_path.exists():
+ fatal_error("Configuration doesn't exist. Try running with --setup first.")
+if os.name == 'posix':
+ config_mode = config_path.stat().st_mode
+ if config_mode & 0o4:
+ fatal_error(f'''DANGER: configuration file {config_path} allows
+reading by other users (mode {config_mode}).
+If there are API tokens in there, revoke them immediately!!''')
-if not get_ip_path.exists():
- print(f'Warning: {get_ip_path} does not exist. Putting default of "curl ifconfig.co" there')
- with open(get_ip_path, 'w') as f:
- f.write('curl --no-progress-meter ifconfig.co')
+class Domain(ABC):
+ '''A domain whose DNS will be updated'''
+ domain: str
+ getip: str
+ def __init__(self, domain: str):
+ self.domain = domain
+ @abstractmethod
+ def update(self, ip: str) -> None:
+ '''Make necessary API request to update domain to use new IP address'''
+ @abstractmethod
+ def validate_specifics(self) -> str:
+ '''Validate details specific to subclass'''
+ def validate(self) -> str:
+ '''Validate this object (ensure all required fields are set)'''
+ if not getattr(self, 'getip', ''):
+ return 'getip not set'
+ return self.validate_specifics()
+ def get_ip(self) -> Optional[str]:
+ '''Get IP address using the registered command.'''
+ result = subprocess.run(self.getip, shell=True, stdout=subprocess.PIPE, check=False)
+ if result.returncode:
+ print(f'WARNING: {self.getip} failed (exit code {result.returncode})')
+ return None
+ return result.stdout.decode(errors='replace').strip()
-def get_ip():
- if os.name == 'posix' and (get_ip_path.stat().st_mode & 0o100):
- command = str(get_ip_path)
- else:
- command = open(get_ip_path).read()
- result = subprocess.run(command, shell=True, stdout=subprocess.PIPE)
- if result.returncode:
- print(f'WARNING: {command} failed (exit code {result.returncode})')
- return None
- return result.stdout.decode(errors='replace').strip()
-
-print(get_ip())
+class LinodeDomain(Domain):
+ '''Domain registered with Linode Domains'''
+ access_token: str
+ def __repr__(self) -> str:
+ return f'<LinodeDomain domain={self.domain} ' \
+ 'getip={repr(self.getip)} ' \
+ 'access_token={repr(self.access_token)}>'
+ def update(self, ip: str) -> None:
+ print(f'update IP: {ip}')
+ def validate_specifics(self) -> str:
+ if not getattr(self, 'access_token', ''):
+ return 'Access token not set'
+ # OK
+ return ''
+
+def parse_config() -> list[Domain]:
+ '''Parse configuration file'''
+ curr_domain = None
+ domains: list[Domain] = []
+ with open(config_path, encoding='utf-8') as config:
+ for line in config:
+ line = line.strip()
+ if line[0] == '#':
+ # comment
+ continue
+ if line.startswith('Domain '):
+ if curr_domain:
+ domains.append(curr_domain)
+ parts = line.split()
+ if len(parts) != 3:
+ fatal_error('Domain declaration should have exactly ' \
+ 'two space-separated arguments (type and name)')
+ kind = parts[1]
+ domain_name = parts[2]
+ if kind == 'linode':
+ curr_domain = LinodeDomain(domain_name)
+ else:
+ fatal_error(f'No such domain type: {kind}')
+ else:
+ if not curr_domain:
+ fatal_error('First non-empty line of configuration must be a Domain declaration')
+ parts = [part.strip() for part in line.split('=', maxsplit=1)]
+ if len(parts) != 2:
+ fatal_error(f'Invalid syntax (want key = value): {line}')
+ [key, value] = parts
+ setattr(curr_domain, key, ast.literal_eval(value))
+ if curr_domain:
+ domains.append(curr_domain)
+ for domain in domains:
+ err = domain.validate()
+ if err:
+ fatal_error(f'In domain {domain.domain}: {err}')
+ return domains
+
+configured_domains = parse_config()
+if not configured_domains:
+ fatal_error('No domains defined. Try running with --setup?')
+print(configured_domains[0].get_ip())