diff options
-rw-r--r-- | mypy.ini | 3 | ||||
-rwxr-xr-x | pre-commit.sh | 2 | ||||
-rw-r--r-- | pylintrc.toml | 2 | ||||
-rw-r--r-- | requirements.txt | 1 | ||||
-rwxr-xr-x | simpleddns.py | 115 |
5 files changed, 109 insertions, 14 deletions
diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..37da880 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,3 @@ +[mypy] +strict = true +ignore_missing_imports = True diff --git a/pre-commit.sh b/pre-commit.sh index 941879e..9b323ff 100755 --- a/pre-commit.sh +++ b/pre-commit.sh @@ -1,3 +1,3 @@ #!/bin/sh pylint simpleddns.py || exit 1 -mypy --strict simpleddns.py || exit 1 +mypy simpleddns.py || exit 1 diff --git a/pylintrc.toml b/pylintrc.toml index e111d72..351ed06 100644 --- a/pylintrc.toml +++ b/pylintrc.toml @@ -318,7 +318,7 @@ max-module-lines = 1000 [tool.pylint.imports] # List of modules that can be imported at any level, not just the top level one. -# allow-any-import-level = +allow-any-import-level = ["boto3", "botocore"] # Allow explicit reexports by alias from a package __init__. # allow-reexport-from-package = diff --git a/requirements.txt b/requirements.txt index d166403..315884a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ requests ~= 2.32.5 +boto3 ~= 1.40.21 diff --git a/simpleddns.py b/simpleddns.py index 3cc3b32..cf2862a 100755 --- a/simpleddns.py +++ b/simpleddns.py @@ -61,13 +61,17 @@ def setup_config(config_path: Path) -> None: if any(c.isspace() for c in domain_name): fatal_error('Domain name must not contain any whitespace') print('1. Linode Domains') - # TODO: AWS - domain_type = input('Select a domain type from the list above [1-1]: ').strip() + print('2. AWS Route 53') + domain_type = input('Select a domain type from the list above [1-2]: ').strip() if domain_type == '1': access_token = input('Enter personal access token: ') options = f'''# Personal access token (secret!!) access_token = {repr(access_token)}''' type_name = 'linode' + elif domain_type == '2': + type_name = 'aws_route53' + options = '' + print('(Credentials in ~/.aws/credentials will be used)') else: print('Invalid choice') return @@ -132,7 +136,7 @@ class Domain(ABC): def _error(self, message: str) -> None: '''Print warning and set _had_error field to True.''' self._had_error = True - warn(message) + warn(f'{self.full_domain}: {message}') def _ttl(self) -> int: '''Get TTL value to use for records for this domain''' return getattr(self, 'ttl', 0) or self.settings.ttl @@ -165,12 +169,12 @@ class Domain(ABC): for cmd in self.getip: result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, check=False) if result.returncode: - warn(f'{self.getip} failed (exit code {result.returncode})') + warn(f'{repr(cmd)} failed (exit code {result.returncode})') return [] ip = result.stdout.decode(errors='replace').strip() # this test isn't perfect, but it should catch most cases of weird stuff if not ip or '..' in ip or any(c not in '0123456789abcdefABCDEF:.' for c in ip): - warn(f'IP address {repr(ip)} is invalid') + warn(f'IP address {repr(ip)} is invalid (from command {repr(cmd)}') return [] ips.append(ip) ips.sort() @@ -309,7 +313,7 @@ class LinodeDomain(Domain): # this is it! self._id = domain['id'] if self._id is None: - warn(f'Domain {self.root_domain} not found in Linode. Are you sure it is set up there?') + self._error(f'Domain {self.root_domain} not found in Linode. Are you sure it is set up there?') return False records = self._paginated_get(f'https://api.linode.com/v4/domains/{self._id}/records') remaining_ips = set(ips) @@ -356,8 +360,8 @@ def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]: with open(config_path, encoding='utf-8') as config: for line in config: line = line.strip() - if line[0] == '#': - # comment + if not line or line[0] == '#': + # blank line/comment continue if line.startswith('Settings'): if isinstance(curr_section, Domain): @@ -371,11 +375,14 @@ def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]: fatal_error('Domain declaration should have exactly ' \ 'two space-separated arguments (type and name)') kind = parts[1] - domain_name = parts[2] - if kind == 'linode': - curr_section = LinodeDomain(settings, domain_name) - else: + domain_name = parts[2].rstrip('.') + domain_class = { + 'linode': LinodeDomain, + 'aws_route53': Route53Domain, + }.get(kind) + if domain_class is None: fatal_error(f'No such domain type: {kind}') + curr_section = domain_class(settings, domain_name) else: if not curr_section: fatal_error('First non-empty line of configuration must be a Domain declaration or Settings') @@ -392,6 +399,90 @@ def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]: fatal_error(f'In domain {domain.full_domain}: {err}') return (settings, domains) +class Route53Domain(Domain): + '''Domain registered with AWS Route 53''' + _id: str + _client: Any + + def _init(self) -> None: + self._id = '' + self._client = None + + def _get_client(self) -> Any: + import boto3 + if not self._client: + self._client = boto3.client('route53') + return self._client + + def _get_hosted_zones(self) -> Optional[list[dict[str, Any]]]: + '''Get list of hosted zones from AWS''' + import botocore + try: + route53 = self._get_client() + results = route53.list_hosted_zones(MaxItems='100') + sleep(self._request_delay()) + zones = [] + while results['IsTruncated']: + zones.extend(results['HostedZones']) + results = route53.list_hosted_zones(Marker=results['NextMarker'], MaxItems='100') + sleep(self._request_delay()) + zones.extend(results['HostedZones']) + return zones + except botocore.exceptions.BotoCoreError as e: + warn(f'Error listing AWS hosted zones: {e}') + return None + + def _list_record_sets(self) -> list[dict[str, Any]]: + '''Get A and AAAA record sets for this domain.''' + route53 = self._get_client() + record_sets: list[dict[str, Any]] = [] + start_record_name = self.full_domain + '.' + start_record_type = 'A' + start_record_identifier = None + while start_record_name == self.full_domain + '.' and start_record_type in ['A', 'AAAA']: + options = { + 'HostedZoneId': self._id, + 'StartRecordName': start_record_name, + 'StartRecordType': start_record_type, + 'MaxItems': '300', + } + if start_record_identifier is not None: + options['StartRecordIdentifier'] = start_record_identifier + results = route53.list_resource_record_sets(**options) + record_sets.extend(filter( + lambda record: record['Name'] == self.full_domain + '.' and \ + record['Type'] in ['A', 'AAAA'], + results['ResourceRecordSets'] + )) + if results['IsTruncated']: + start_record_name = results['NextRecordName'] + start_record_type = results['NextRecordType'] + start_record_identifier = results['NextRecordIdentifier'] + else: + start_record_name = '' + sleep(self._request_delay()) + return record_sets + + def update(self, ips: list[str]) -> bool: + self._had_error = False + if not self._id: + zones = self._get_hosted_zones() + if zones is None: + return False + for zone in zones: + if zone['Name'].rstrip('.') == self.root_domain: + self._id = zone['Id'] + break + if not self._id: + self._error(f'Domain {self.root_domain} not found in Route 53. Are you sure it is set up there?') + return False + print('ID:',self._id) + print(self._list_record_sets()) + return True + def validate_specifics(self) -> str: + # no provided-specific options for Route 53 + return '' + def main() -> None: '''Run simpleddns''' args = parse_args() |