summaryrefslogtreecommitdiff
path: root/simpleddns.py
diff options
context:
space:
mode:
Diffstat (limited to 'simpleddns.py')
-rwxr-xr-xsimpleddns.py115
1 files changed, 103 insertions, 12 deletions
diff --git a/simpleddns.py b/simpleddns.py
index 3cc3b32..cf2862a 100755
--- a/simpleddns.py
+++ b/simpleddns.py
@@ -61,13 +61,17 @@ def setup_config(config_path: Path) -> None:
if any(c.isspace() for c in domain_name):
fatal_error('Domain name must not contain any whitespace')
print('1. Linode Domains')
- # TODO: AWS
- domain_type = input('Select a domain type from the list above [1-1]: ').strip()
+ print('2. AWS Route 53')
+ domain_type = input('Select a domain type from the list above [1-2]: ').strip()
if domain_type == '1':
access_token = input('Enter personal access token: ')
options = f'''# Personal access token (secret!!)
access_token = {repr(access_token)}'''
type_name = 'linode'
+ elif domain_type == '2':
+ type_name = 'aws_route53'
+ options = ''
+ print('(Credentials in ~/.aws/credentials will be used)')
else:
print('Invalid choice')
return
@@ -132,7 +136,7 @@ class Domain(ABC):
def _error(self, message: str) -> None:
'''Print warning and set _had_error field to True.'''
self._had_error = True
- warn(message)
+ warn(f'{self.full_domain}: {message}')
def _ttl(self) -> int:
'''Get TTL value to use for records for this domain'''
return getattr(self, 'ttl', 0) or self.settings.ttl
@@ -165,12 +169,12 @@ class Domain(ABC):
for cmd in self.getip:
result = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, check=False)
if result.returncode:
- warn(f'{self.getip} failed (exit code {result.returncode})')
+ warn(f'{repr(cmd)} failed (exit code {result.returncode})')
return []
ip = result.stdout.decode(errors='replace').strip()
# this test isn't perfect, but it should catch most cases of weird stuff
if not ip or '..' in ip or any(c not in '0123456789abcdefABCDEF:.' for c in ip):
- warn(f'IP address {repr(ip)} is invalid')
+ warn(f'IP address {repr(ip)} is invalid (from command {repr(cmd)}')
return []
ips.append(ip)
ips.sort()
@@ -309,7 +313,7 @@ class LinodeDomain(Domain):
# this is it!
self._id = domain['id']
if self._id is None:
- warn(f'Domain {self.root_domain} not found in Linode. Are you sure it is set up there?')
+ self._error(f'Domain {self.root_domain} not found in Linode. Are you sure it is set up there?')
return False
records = self._paginated_get(f'https://api.linode.com/v4/domains/{self._id}/records')
remaining_ips = set(ips)
@@ -356,8 +360,8 @@ def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]:
with open(config_path, encoding='utf-8') as config:
for line in config:
line = line.strip()
- if line[0] == '#':
- # comment
+ if not line or line[0] == '#':
+ # blank line/comment
continue
if line.startswith('Settings'):
if isinstance(curr_section, Domain):
@@ -371,11 +375,14 @@ def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]:
fatal_error('Domain declaration should have exactly ' \
'two space-separated arguments (type and name)')
kind = parts[1]
- domain_name = parts[2]
- if kind == 'linode':
- curr_section = LinodeDomain(settings, domain_name)
- else:
+ domain_name = parts[2].rstrip('.')
+ domain_class = {
+ 'linode': LinodeDomain,
+ 'aws_route53': Route53Domain,
+ }.get(kind)
+ if domain_class is None:
fatal_error(f'No such domain type: {kind}')
+ curr_section = domain_class(settings, domain_name)
else:
if not curr_section:
fatal_error('First non-empty line of configuration must be a Domain declaration or Settings')
@@ -392,6 +399,90 @@ def parse_config(config_path: Path) -> tuple[Settings, list[Domain]]:
fatal_error(f'In domain {domain.full_domain}: {err}')
return (settings, domains)
+class Route53Domain(Domain):
+ '''Domain registered with AWS Route 53'''
+ _id: str
+ _client: Any
+
+ def _init(self) -> None:
+ self._id = ''
+ self._client = None
+
+ def _get_client(self) -> Any:
+ import boto3
+ if not self._client:
+ self._client = boto3.client('route53')
+ return self._client
+
+ def _get_hosted_zones(self) -> Optional[list[dict[str, Any]]]:
+ '''Get list of hosted zones from AWS'''
+ import botocore
+ try:
+ route53 = self._get_client()
+ results = route53.list_hosted_zones(MaxItems='100')
+ sleep(self._request_delay())
+ zones = []
+ while results['IsTruncated']:
+ zones.extend(results['HostedZones'])
+ results = route53.list_hosted_zones(Marker=results['NextMarker'], MaxItems='100')
+ sleep(self._request_delay())
+ zones.extend(results['HostedZones'])
+ return zones
+ except botocore.exceptions.BotoCoreError as e:
+ warn(f'Error listing AWS hosted zones: {e}')
+ return None
+
+ def _list_record_sets(self) -> list[dict[str, Any]]:
+ '''Get A and AAAA record sets for this domain.'''
+ route53 = self._get_client()
+ record_sets: list[dict[str, Any]] = []
+ start_record_name = self.full_domain + '.'
+ start_record_type = 'A'
+ start_record_identifier = None
+ while start_record_name == self.full_domain + '.' and start_record_type in ['A', 'AAAA']:
+ options = {
+ 'HostedZoneId': self._id,
+ 'StartRecordName': start_record_name,
+ 'StartRecordType': start_record_type,
+ 'MaxItems': '300',
+ }
+ if start_record_identifier is not None:
+ options['StartRecordIdentifier'] = start_record_identifier
+ results = route53.list_resource_record_sets(**options)
+ record_sets.extend(filter(
+ lambda record: record['Name'] == self.full_domain + '.' and \
+ record['Type'] in ['A', 'AAAA'],
+ results['ResourceRecordSets']
+ ))
+ if results['IsTruncated']:
+ start_record_name = results['NextRecordName']
+ start_record_type = results['NextRecordType']
+ start_record_identifier = results['NextRecordIdentifier']
+ else:
+ start_record_name = ''
+ sleep(self._request_delay())
+ return record_sets
+
+ def update(self, ips: list[str]) -> bool:
+ self._had_error = False
+ if not self._id:
+ zones = self._get_hosted_zones()
+ if zones is None:
+ return False
+ for zone in zones:
+ if zone['Name'].rstrip('.') == self.root_domain:
+ self._id = zone['Id']
+ break
+ if not self._id:
+ self._error(f'Domain {self.root_domain} not found in Route 53. Are you sure it is set up there?')
+ return False
+ print('ID:',self._id)
+ print(self._list_record_sets())
+ return True
+ def validate_specifics(self) -> str:
+ # no provided-specific options for Route 53
+ return ''
+
def main() -> None:
'''Run simpleddns'''
args = parse_args()