summaryrefslogtreecommitdiff
path: root/simpleddns.py
diff options
context:
space:
mode:
Diffstat (limited to 'simpleddns.py')
-rwxr-xr-xsimpleddns.py75
1 files changed, 74 insertions, 1 deletions
diff --git a/simpleddns.py b/simpleddns.py
index 1665c56..58551ef 100755
--- a/simpleddns.py
+++ b/simpleddns.py
@@ -11,6 +11,8 @@ from time import sleep
from abc import ABC, abstractmethod
from typing import NoReturn, Optional, Any
from pathlib import Path
+import dns
+import dns.resolver
import requests
VERSION = '0.1.0'
@@ -109,6 +111,10 @@ Settings
# but you may want to set this higher if you have other things
# making frequent API calls.
request_delay = 0.4
+ # Allow Dynamic DNS to proceed without a CAA record blocking ACME http challenges.
+ # Enabling this is almost definitely a bad idea, since it lets people forge
+ # certificates for your domain if they get in the way of the IP address lookup process.
+ allow_no_caa = False
Domain {type_name} {domain_name}
# Command(s) for getting IP address(es)
getip = [{repr(get_ip)}]
@@ -130,11 +136,14 @@ class Domain(ABC):
full_domain: str
root_domain: str
subdomain: str
+ allow_no_caa: bool
getip: list[str]
settings: Settings
last_ips: list[str]
_had_error: bool
+
def __init__(self, settings: Settings, domain: str):
+ self.allow_no_caa = False
self.settings = settings
self.full_domain = domain
last_dot = domain.rfind('.')
@@ -149,39 +158,90 @@ class Domain(ABC):
self.root_domain = domain[second_last_dot+1:]
self.last_ips = []
self._init()
+
def _info(self, message: str) -> None:
'''Print informational message prefixed with domain name'''
print(f'{self.full_domain}: {message}')
+
def _error(self, message: str) -> None:
'''Print warning and set _had_error field to True.'''
self._had_error = True
warn(f'{self.full_domain}: {message}')
+
def _ttl(self) -> int:
'''Get TTL value to use for records for this domain'''
return getattr(self, 'ttl', 0) or self.settings.ttl
+
def _timeout(self) -> int:
'''Get API request timeout'''
return getattr(self, 'timeout', 0) or self.settings.timeout
+
def _request_delay(self) -> float:
'''Get delay between requests (seconds)'''
return getattr(self, 'request_delay', 0) or self.settings.request_delay
+
@abstractmethod
def _init(self) -> None:
'''Extra provider-specific initialization'''
+
@abstractmethod
def update(self, ips: list[str]) -> bool:
'''Make necessary API request to update domain to use new IP addresses.
Returns False on failure.'''
+
@abstractmethod
def validate_specifics(self) -> str:
'''Validate provider-specific settings for domain'''
+
+ def check_caa(self) -> None:
+ '''Ensure that a CAA record exists blocking http ACME challenges,
+ Unless allow_no_caa is set.'''
+ if self.allow_no_caa:
+ return
+ dot = -1
+ # In a case like foo.bar.example.com, the following domains are checked
+ # in order for CAA records:
+ # 1. foo.bar.example.com
+ # 2. bar.example.com
+ # 3. example.com
+ # The first one with a CAA record is taken as authoritative.
+ ok = False
+ while '.' in (subdomain := self.full_domain[dot + 1:]):
+ try:
+ subdomain_records = list(dns.resolver.resolve(subdomain, 'CAA'))
+ except dns.resolver.NoAnswer:
+ dot = self.full_domain.find('.', dot + 1)
+ continue
+ ok = False
+ for record in subdomain_records:
+ # (We have to use getattr here because mypy doesn't
+ # like dnspython's CAA-specific type.)
+ if b'http-' in getattr(record, 'value'):
+ ok = False
+ break
+ if getattr(record, 'tag') != b'issue':
+ continue
+ ok = True
+ if not ok:
+ fatal_error(f'CAA records for {subdomain} allow http ACME:\n' + \
+ ''.join(record.to_text() + '\n' for record in subdomain_records) + \
+ '''You should disable HTTP challenges, otherwise anyone who can hijack your
+get-IP-address command can forge TLS certificates for your domain.''')
+ break
+ if not ok:
+ fatal_error(f'''No CAA record found for {self.full_domain}.
+You should create a CAA record that doesn't allow HTTP challenges;
+otherwise anyone who can hijack your get-IP-address command
+can forge TLS certificates for your domain.''')
+
def validate(self) -> str:
- '''Validate this object (ensure all required settings are set)'''
+ '''Validate this object (ensure all required settings are set, etc.)'''
if not getattr(self, 'getip', ''):
return 'getip not set'
if not isinstance(self.getip, list):
return 'getip should be a list.'
return self.validate_specifics()
+
def get_ips(self) -> list[str]:
'''Get IP addresses using the registered commands.'''
ips = []
@@ -198,6 +258,7 @@ class Domain(ABC):
ips.append(ip)
ips.sort()
return ips
+
def check_for_update(self) -> bool:
'''Update DNS records if IP has changed. Returns False on error.'''
ips = self.get_ips()
@@ -215,18 +276,22 @@ class LinodeDomain(Domain):
access_token: str
# Domain ID for Linode API
_id: Optional[int]
+
def _init(self) -> None:
self._id = None
+
def __repr__(self) -> str:
return f'<LinodeDomain domain={self.full_domain} ' \
'getip={repr(self.getip)} ' \
'access_token={repr(self.access_token)}>'
+
def _headers(self, has_body: bool = False) -> dict[str, str]:
'''Get HTTP headers for making requests'''
headers = {'Accept': 'application/json', 'Authorization': f'Bearer {self.access_token}'}
if has_body:
headers['Content-Type'] = 'application/json'
return headers
+
def _make_request(self, method: str, url: str, body: Any = None) -> Any:
sleep(self._request_delay())
headers = self._headers(has_body = body is not None)
@@ -374,11 +439,14 @@ class LinodeDomain(Domain):
class DigitalOceanDomain(Domain):
'''Domain registered with DigitalOcean Domains'''
access_token: str
+
def _init(self) -> None:
pass
+
def update(self, ips: list[str]) -> bool:
print('TODO',self.access_token)
return True
+
def validate_specifics(self) -> str:
if not getattr(self, 'access_token', ''):
return 'Access token not set'
@@ -513,6 +581,7 @@ class Route53Domain(Domain):
start_record_name = ''
sleep(self._request_delay())
return record_sets
+
def _print_record_set_changes(self, changes: list[dict[str, Any]]) -> None:
for change in changes:
action = change['Action']
@@ -521,6 +590,7 @@ class Route53Domain(Domain):
kind = record_set['Type']
values = [record['Value'] for record in record_set['ResourceRecords']]
self._info(f'{action} {kind} record for {domain}: {values}')
+
def update(self, ips: list[str]) -> bool:
import botocore
self._had_error = False
@@ -629,6 +699,9 @@ def main() -> None:
settings.dry_run = args.dry_run
if not domains:
fatal_error('No domains defined. Try running with --setup?')
+ for domain in domains:
+ domain.check_caa()
+
print('simpleddns started.')
while True:
for domain in domains: