diff options
-rw-r--r-- | requirements.txt | 1 | ||||
-rwxr-xr-x | simpleddns.py | 75 |
2 files changed, 75 insertions, 1 deletions
diff --git a/requirements.txt b/requirements.txt index e571ccb..18781e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests ~= 2.32.5 boto3 ~= 1.40.21 pydo ~= 0.15.0 +dnspython ~= 2.6.1 diff --git a/simpleddns.py b/simpleddns.py index 1665c56..58551ef 100755 --- a/simpleddns.py +++ b/simpleddns.py @@ -11,6 +11,8 @@ from time import sleep from abc import ABC, abstractmethod from typing import NoReturn, Optional, Any from pathlib import Path +import dns +import dns.resolver import requests VERSION = '0.1.0' @@ -109,6 +111,10 @@ Settings # but you may want to set this higher if you have other things # making frequent API calls. request_delay = 0.4 + # Allow Dynamic DNS to proceed without a CAA record blocking ACME http challenges. + # Enabling this is almost definitely a bad idea, since it lets people forge + # certificates for your domain if they get in the way of the IP address lookup process. + allow_no_caa = False Domain {type_name} {domain_name} # Command(s) for getting IP address(es) getip = [{repr(get_ip)}] @@ -130,11 +136,14 @@ class Domain(ABC): full_domain: str root_domain: str subdomain: str + allow_no_caa: bool getip: list[str] settings: Settings last_ips: list[str] _had_error: bool + def __init__(self, settings: Settings, domain: str): + self.allow_no_caa = False self.settings = settings self.full_domain = domain last_dot = domain.rfind('.') @@ -149,39 +158,90 @@ class Domain(ABC): self.root_domain = domain[second_last_dot+1:] self.last_ips = [] self._init() + def _info(self, message: str) -> None: '''Print informational message prefixed with domain name''' print(f'{self.full_domain}: {message}') + def _error(self, message: str) -> None: '''Print warning and set _had_error field to True.''' self._had_error = True warn(f'{self.full_domain}: {message}') + def _ttl(self) -> int: '''Get TTL value to use for records for this domain''' return getattr(self, 'ttl', 0) or self.settings.ttl + def _timeout(self) -> int: '''Get API request timeout''' return getattr(self, 'timeout', 0) or self.settings.timeout + def _request_delay(self) -> float: '''Get delay between requests (seconds)''' return getattr(self, 'request_delay', 0) or self.settings.request_delay + @abstractmethod def _init(self) -> None: '''Extra provider-specific initialization''' + @abstractmethod def update(self, ips: list[str]) -> bool: '''Make necessary API request to update domain to use new IP addresses. Returns False on failure.''' + @abstractmethod def validate_specifics(self) -> str: '''Validate provider-specific settings for domain''' + + def check_caa(self) -> None: + '''Ensure that a CAA record exists blocking http ACME challenges, + Unless allow_no_caa is set.''' + if self.allow_no_caa: + return + dot = -1 + # In a case like foo.bar.example.com, the following domains are checked + # in order for CAA records: + # 1. foo.bar.example.com + # 2. bar.example.com + # 3. example.com + # The first one with a CAA record is taken as authoritative. + ok = False + while '.' in (subdomain := self.full_domain[dot + 1:]): + try: + subdomain_records = list(dns.resolver.resolve(subdomain, 'CAA')) + except dns.resolver.NoAnswer: + dot = self.full_domain.find('.', dot + 1) + continue + ok = False + for record in subdomain_records: + # (We have to use getattr here because mypy doesn't + # like dnspython's CAA-specific type.) + if b'http-' in getattr(record, 'value'): + ok = False + break + if getattr(record, 'tag') != b'issue': + continue + ok = True + if not ok: + fatal_error(f'CAA records for {subdomain} allow http ACME:\n' + \ + ''.join(record.to_text() + '\n' for record in subdomain_records) + \ + '''You should disable HTTP challenges, otherwise anyone who can hijack your +get-IP-address command can forge TLS certificates for your domain.''') + break + if not ok: + fatal_error(f'''No CAA record found for {self.full_domain}. +You should create a CAA record that doesn't allow HTTP challenges; +otherwise anyone who can hijack your get-IP-address command +can forge TLS certificates for your domain.''') + def validate(self) -> str: - '''Validate this object (ensure all required settings are set)''' + '''Validate this object (ensure all required settings are set, etc.)''' if not getattr(self, 'getip', ''): return 'getip not set' if not isinstance(self.getip, list): return 'getip should be a list.' return self.validate_specifics() + def get_ips(self) -> list[str]: '''Get IP addresses using the registered commands.''' ips = [] @@ -198,6 +258,7 @@ class Domain(ABC): ips.append(ip) ips.sort() return ips + def check_for_update(self) -> bool: '''Update DNS records if IP has changed. Returns False on error.''' ips = self.get_ips() @@ -215,18 +276,22 @@ class LinodeDomain(Domain): access_token: str # Domain ID for Linode API _id: Optional[int] + def _init(self) -> None: self._id = None + def __repr__(self) -> str: return f'<LinodeDomain domain={self.full_domain} ' \ 'getip={repr(self.getip)} ' \ 'access_token={repr(self.access_token)}>' + def _headers(self, has_body: bool = False) -> dict[str, str]: '''Get HTTP headers for making requests''' headers = {'Accept': 'application/json', 'Authorization': f'Bearer {self.access_token}'} if has_body: headers['Content-Type'] = 'application/json' return headers + def _make_request(self, method: str, url: str, body: Any = None) -> Any: sleep(self._request_delay()) headers = self._headers(has_body = body is not None) @@ -374,11 +439,14 @@ class LinodeDomain(Domain): class DigitalOceanDomain(Domain): '''Domain registered with DigitalOcean Domains''' access_token: str + def _init(self) -> None: pass + def update(self, ips: list[str]) -> bool: print('TODO',self.access_token) return True + def validate_specifics(self) -> str: if not getattr(self, 'access_token', ''): return 'Access token not set' @@ -513,6 +581,7 @@ class Route53Domain(Domain): start_record_name = '' sleep(self._request_delay()) return record_sets + def _print_record_set_changes(self, changes: list[dict[str, Any]]) -> None: for change in changes: action = change['Action'] @@ -521,6 +590,7 @@ class Route53Domain(Domain): kind = record_set['Type'] values = [record['Value'] for record in record_set['ResourceRecords']] self._info(f'{action} {kind} record for {domain}: {values}') + def update(self, ips: list[str]) -> bool: import botocore self._had_error = False @@ -629,6 +699,9 @@ def main() -> None: settings.dry_run = args.dry_run if not domains: fatal_error('No domains defined. Try running with --setup?') + for domain in domains: + domain.check_caa() + print('simpleddns started.') while True: for domain in domains: |