summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xpre-commit.sh3
-rw-r--r--pylintrc.toml2
-rwxr-xr-xsimpleddns.py256
3 files changed, 195 insertions, 66 deletions
diff --git a/pre-commit.sh b/pre-commit.sh
new file mode 100755
index 0000000..941879e
--- /dev/null
+++ b/pre-commit.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+pylint simpleddns.py || exit 1
+mypy --strict simpleddns.py || exit 1
diff --git a/pylintrc.toml b/pylintrc.toml
index 39bb057..e111d72 100644
--- a/pylintrc.toml
+++ b/pylintrc.toml
@@ -389,7 +389,7 @@ timeout-methods = ["requests.api.delete", "requests.api.get", "requests.api.head
[tool.pylint.miscellaneous]
# List of note tags to take in consideration, separated by a comma.
-notes = ["FIXME", "XXX", "TODO"]
+notes = []
# Regular expression of note tags to take in consideration.
# notes-rgx =
diff --git a/simpleddns.py b/simpleddns.py
index 45d9e89..45e8308 100755
--- a/simpleddns.py
+++ b/simpleddns.py
@@ -6,10 +6,12 @@ import sys
import subprocess
import argparse
import ast
+import dataclasses
+from time import sleep
from abc import ABC, abstractmethod
-from typing import NoReturn, Optional
+from typing import NoReturn, Optional, Any
from pathlib import Path
-# import requests
+import requests
def parse_args() -> argparse.Namespace:
'''Parse command-line arguments'''
@@ -20,25 +22,16 @@ def parse_args() -> argparse.Namespace:
parser.add_argument('--setup', help='Set up configuration', action='store_true')
return parser.parse_args()
-args = parse_args()
-
-simpleddns_config_dir = os.getenv('SIMPLEDDNS_CONFIG_DIR')
-xdg_config_dir = os.getenv('XDG_CONFIG_DIR')
-if simpleddns_config_dir:
- config_dir = Path(simpleddns_config_dir)
-elif xdg_config_dir:
- config_dir = Path(xdg_config_dir, 'simpleddns')
-else:
- config_dir = Path('~/.config/simpleddns').expanduser()
-os.makedirs(config_dir, exist_ok = True)
-config_path = Path(config_dir, 'config')
-
def fatal_error(message: str) -> NoReturn:
'''Output error message & exit'''
sys.stderr.write(f'Fatal error: {message}\n')
sys.exit(1)
-def setup_config() -> None:
+def warn(message: str) -> None:
+ '''Print warning message'''
+ sys.stderr.write(f'WARNING: {message}\n')
+
+def setup_config(config_path: Path) -> None:
'''Interactively set up configuration file'''
if config_path.exists():
should_delete = input('Configuration already exists. Delete it [y/n]? ')
@@ -60,73 +53,170 @@ def setup_config() -> None:
domain_type = input('Select a domain type from the list above [1-1]: ').strip()
if domain_type == '1':
access_token = input('Enter personal access token: ')
- options = f'access_token = {repr(access_token)}'
+ options = f'''# Personal access token (secret!!)
+ access_token = {repr(access_token)}'''
type_name = 'linode'
else:
print('Invalid choice')
return
fd = os.open(config_path, os.O_CREAT | os.O_WRONLY | os.O_EXCL, 0o600)
with os.fdopen(fd, 'w') as config:
- config.write(f'''Domain {type_name} {domain_name}
- getip = {repr(get_ip)}
+ config.write(f'''Settings
+ # Interval in seconds between checking IP address for changes
+ # (API calls are only made when it changes)
+ interval = 15
+ # Timeout to wait for API responses
+ timeout = 20
+Domain {type_name} {domain_name}
+ # Command(s) for getting IP address(es)
+ getip = [{repr(get_ip)}]
{options}
''')
-if args.setup:
- setup_config()
- sys.exit(0)
+ print(f'Configuration created successfully at {config_path}')
-if not config_path.exists():
- fatal_error("Configuration doesn't exist. Try running with --setup first.")
-if os.name == 'posix':
- config_mode = config_path.stat().st_mode
- if config_mode & 0o4:
- fatal_error(f'''DANGER: configuration file {config_path} allows
-reading by other users (mode {config_mode & 0o777:o}).
-If there are API tokens in there, revoke them immediately!!''')
+@dataclasses.dataclass
+class Settings:
+ '''Global (i.e. not per-domain) settings'''
+ interval: int = 15
+ timeout: int = 20
class Domain(ABC):
'''A domain whose DNS will be updated'''
- domain: str
- getip: str
- def __init__(self, domain: str):
- self.domain = domain
+ full_domain: str
+ root_domain: str
+ subdomain: str
+ getip: list[str]
+ settings: Settings
+ last_ips: list[str]
+ def __init__(self, settings: Settings, domain: str):
+ self.settings = settings
+ self.full_domain = domain
+ last_dot = domain.rfind('.')
+ if last_dot == -1:
+ raise ValueError(f'Domain {domain} has no . in it')
+ second_last_dot = domain.rfind('.', 0, last_dot)
+ if second_last_dot == -1:
+ self.subdomain = ''
+ self.root_domain = domain
+ else:
+ self.subdomain = domain[:second_last_dot]
+ self.root_domain = domain[second_last_dot+1:]
+ self.last_ips = []
+ self._init()
+ @abstractmethod
+ def _init(self) -> None:
+ '''Extra provider-specific initialization'''
@abstractmethod
- def update(self, ip: str) -> None:
- '''Make necessary API request to update domain to use new IP address'''
+ def update(self, ips: list[str]) -> bool:
+ '''Make necessary API request to update domain to use new IP addresses.
+ Returns False on failure.'''
@abstractmethod
def validate_specifics(self) -> str:
- '''Validate details specific to subclass'''
+ '''Validate provider-specific settings for domain'''
def validate(self) -> str:
- '''Validate this object (ensure all required fields are set)'''
+ '''Validate this object (ensure all required settings are set)'''
if not getattr(self, 'getip', ''):
return 'getip not set'
+ if not isinstance(self.getip, list):
+ return 'getip should be a list.'
return self.validate_specifics()
- def get_ip(self) -> Optional[str]:
- '''Get IP address using the registered command.'''
- result = subprocess.run(self.getip, shell=True, stdout=subprocess.PIPE, check=False)
- if result.returncode:
- print(f'WARNING: {self.getip} failed (exit code {result.returncode})')
- return None
- return result.stdout.decode(errors='replace').strip()
+ def get_ips(self) -> list[str]:
+ '''Get IP addresses using the registered commands.'''
+ ips = []
+ for cmd in self.getip:
+ result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, check=False)
+ if result.returncode:
+ warn(f'{self.getip} failed (exit code {result.returncode})')
+ return []
+ ip = result.stdout.decode(errors='replace').strip()
+ # this test isn't perfect, but it should catch most cases of weird stuff
+ if not ip or '..' in ip or any(c not in '0123456789abcdefABCDEF:.' for c in ip):
+ warn(f'IP address {repr(ip)} is invalid')
+ return []
+ ips.append(ip)
+ ips.sort()
+ return ips
+ def check_for_update(self) -> None:
+ '''Update DNS records if IP has changed'''
+ ips = self.get_ips()
+ if ips == self.last_ips:
+ return
+ if self.update(ips):
+ self.last_ips = ips
class LinodeDomain(Domain):
'''Domain registered with Linode Domains'''
access_token: str
+ _error: bool
+ # Domain ID for Linode API
+ _id: Optional[int]
+ def _init(self) -> None:
+ self._id = None
def __repr__(self) -> str:
- return f'<LinodeDomain domain={self.domain} ' \
+ return f'<LinodeDomain domain={self.full_domain} ' \
'getip={repr(self.getip)} ' \
'access_token={repr(self.access_token)}>'
- def update(self, ip: str) -> None:
- print(f'update IP: {ip}')
+ def _headers(self, method: str = 'GET') -> dict[str, str]:
+ '''Get HTTP headers for making requests'''
+ headers = {'Accept': 'application/json', 'Authorization': f'Bearer {self.access_token}'}
+ if method in ['POST', 'PUT']:
+ headers['Content-Type'] = 'application/json'
+ return headers
+ def _paginated_get(self, url: str) -> list[Any]:
+ '''Get results for a Linode paginated GET API endpoint.'''
+ headers = self._headers()
+ results = []
+ page_size = 500
+ for page in range(1,100):
+ page_url = f'{url}{"&" if "?" in url else "?"}page={page}&page_size={page_size}'
+ try:
+ response = requests.get(page_url, headers=headers, timeout=self.settings.timeout)
+ except requests.RequestException as e:
+ warn(f'Error making request to {url}: {e}')
+ sleep(0.33) # should prevent us from hitting Linode's rate limit
+ try:
+ response_json = response.json()
+ response_data = response_json['data']
+ if not isinstance(response_data, list):
+ raise ValueError('"data" member is not a list')
+ except (requests.JSONDecodeError, KeyError, ValueError) as e:
+ warn(f'Invalid JSON at endpoint {repr(page_url)}: {e}')
+ break
+ results.extend(response_data)
+ if len(response_data) < page_size:
+ # Reached last page, presumably
+ break
+ if page == 99:
+ warn(f'Giving up after 99 pages of responses to API endpoint {repr(url)}')
+ return results
+ def update(self, ips: list[str]) -> bool:
+ self._error = False
+ if self._id is None:
+ domains = self._paginated_get('https://api.linode.com/v4/domains')
+ if self._error:
+ return False
+ for domain in domains:
+ domain_name = domain['domain']
+ if self.root_domain == domain_name:
+ # this is it!
+ self._id = domain['id']
+ if self._id is None:
+ warn(f'Domain {self.root_domain} not found in Linode. Are you sure it is set up there?')
+ return False
+ records = self._paginated_get(f'https://api.linode.com/v4/domains/{self._id}/records')
+ print(records)
+ print(self.subdomain)
+ return not self._error
def validate_specifics(self) -> str:
if not getattr(self, 'access_token', ''):
return 'Access token not set'
# OK
return ''
-def parse_config() -> list[Domain]:
+def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]:
'''Parse configuration file'''
- curr_domain = None
+ curr_section: Settings | Domain | None = None
+ settings = Settings()
domains: list[Domain] = []
with open(config_path, encoding='utf-8') as config:
for line in config:
@@ -134,9 +224,13 @@ def parse_config() -> list[Domain]:
if line[0] == '#':
# comment
continue
- if line.startswith('Domain '):
- if curr_domain:
- domains.append(curr_domain)
+ if line.startswith('Settings'):
+ if isinstance(curr_section, Domain):
+ domains.append(curr_section)
+ curr_section = settings
+ elif line.startswith('Domain '):
+ if isinstance(curr_section, Domain):
+ domains.append(curr_section)
parts = line.split()
if len(parts) != 3:
fatal_error('Domain declaration should have exactly ' \
@@ -144,26 +238,58 @@ def parse_config() -> list[Domain]:
kind = parts[1]
domain_name = parts[2]
if kind == 'linode':
- curr_domain = LinodeDomain(domain_name)
+ curr_section = LinodeDomain(settings, domain_name)
else:
fatal_error(f'No such domain type: {kind}')
else:
- if not curr_domain:
- fatal_error('First non-empty line of configuration must be a Domain declaration')
+ if not curr_section:
+ fatal_error('First non-empty line of configuration must be a Domain declaration or Settings')
parts = [part.strip() for part in line.split('=', maxsplit=1)]
if len(parts) != 2:
fatal_error(f'Invalid syntax (want key = value): {line}')
[key, value] = parts
- setattr(curr_domain, key, ast.literal_eval(value))
- if curr_domain:
- domains.append(curr_domain)
+ setattr(curr_section, key, ast.literal_eval(value))
+ if isinstance(curr_section, Domain):
+ domains.append(curr_section)
for domain in domains:
err = domain.validate()
if err:
- fatal_error(f'In domain {domain.domain}: {err}')
- return domains
+ fatal_error(f'In domain {domain.full_domain}: {err}')
+ return (settings, domains)
+
+def main() -> None:
+ '''Run simpleddns'''
+ args = parse_args()
+ simpleddns_config_dir = os.getenv('SIMPLEDDNS_CONFIG_DIR')
+ xdg_config_dir = os.getenv('XDG_CONFIG_DIR')
+ if simpleddns_config_dir:
+ config_dir = Path(simpleddns_config_dir)
+ elif xdg_config_dir:
+ config_dir = Path(xdg_config_dir, 'simpleddns')
+ else:
+ config_dir = Path('~/.config/simpleddns').expanduser()
+ os.makedirs(config_dir, exist_ok = True)
+ config_path = Path(config_dir, 'config')
+ if args.setup:
+ setup_config(config_path)
+ sys.exit(0)
+ if not config_path.exists():
+ fatal_error("Configuration doesn't exist. Try running with --setup first.")
+ if os.name == 'posix':
+ config_mode = config_path.stat().st_mode
+ if config_mode & 0o4:
+ fatal_error(f'''DANGER: configuration file {config_path} allows
+ reading by other users (mode {config_mode & 0o777:o}).
+ If there are API tokens in there, revoke them immediately!!''')
+
+ settings, domains = parse_config(config_path)
+ if not domains:
+ fatal_error('No domains defined. Try running with --setup?')
+ while True:
+ print('Checking...')
+ for domain in domains:
+ domain.check_for_update()
+ sleep(settings.interval)
-configured_domains = parse_config()
-if not configured_domains:
- fatal_error('No domains defined. Try running with --setup?')
-print(configured_domains[0].get_ip())
+if __name__ == '__main__':
+ main()