1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
#!/usr/bin/env python3
'''Simple Dynamic DNS updater'''
import os
import sys
import subprocess
import argparse
import ast
from abc import ABC, abstractmethod
from typing import NoReturn, Optional
from pathlib import Path
# import requests
def parse_args() -> argparse.Namespace:
'''Parse command-line arguments'''
parser = argparse.ArgumentParser(prog='simpleddns',
description='Simple dynamic DNS updater',
epilog='''ENVIRONMENT VARIABLES
SIMPLEDDNS_CONFIG_DIR - directory where configburation files are stored''')
parser.add_argument('--setup', help='Set up configuration', action='store_true')
return parser.parse_args()
args = parse_args()
simpleddns_config_dir = os.getenv('SIMPLEDDNS_CONFIG_DIR')
xdg_config_dir = os.getenv('XDG_CONFIG_DIR')
if simpleddns_config_dir:
config_dir = Path(simpleddns_config_dir)
elif xdg_config_dir:
config_dir = Path(xdg_config_dir, 'simpleddns')
else:
config_dir = Path('~/.config/simpleddns').expanduser()
os.makedirs(config_dir, exist_ok = True)
config_path = Path(config_dir, 'config')
def fatal_error(message: str) -> NoReturn:
'''Output error message & exit'''
sys.stderr.write(f'Fatal error: {message}\n')
sys.exit(1)
def setup_config() -> None:
'''Interactively set up configuration file'''
if config_path.exists():
should_delete = input('Configuration already exists. Delete it [y/n]? ')
if not should_delete.strip().lower().startswith('y'):
print('Aborting.')
return
config_path.unlink()
default_get_ip = 'curl --no-progress-meter ifconfig.co'
get_ip = input(f'Command for getting IP address [default: {default_get_ip}]? ').strip()
if not get_ip:
get_ip = default_get_ip
domain_name = input('Domain name? ').strip()
if not domain_name:
fatal_error('Domain name must be set')
if any(c.isspace() for c in domain_name):
fatal_error('Domain name must not contain any whitespace')
print('1. Linode Domains')
# TODO: AWS
domain_type = input('Select a domain type from the list above [1-1]: ').strip()
if domain_type == '1':
access_token = input('Enter personal access token: ')
options = f'access_token = {repr(access_token)}'
type_name = 'linode'
else:
print('Invalid choice')
return
fd = os.open(config_path, os.O_CREAT | os.O_WRONLY | os.O_EXCL, 0o600)
with os.fdopen(fd, 'w') as config:
config.write(f'''Domain {type_name} {domain_name}
getip = {repr(get_ip)}
{options}
''')
if args.setup:
setup_config()
sys.exit(0)
if not config_path.exists():
fatal_error("Configuration doesn't exist. Try running with --setup first.")
if os.name == 'posix':
config_mode = config_path.stat().st_mode
if config_mode & 0o4:
fatal_error(f'''DANGER: configuration file {config_path} allows
reading by other users (mode {config_mode}).
If there are API tokens in there, revoke them immediately!!''')
class Domain(ABC):
'''A domain whose DNS will be updated'''
domain: str
getip: str
def __init__(self, domain: str):
self.domain = domain
@abstractmethod
def update(self, ip: str) -> None:
'''Make necessary API request to update domain to use new IP address'''
@abstractmethod
def validate_specifics(self) -> str:
'''Validate details specific to subclass'''
def validate(self) -> str:
'''Validate this object (ensure all required fields are set)'''
if not getattr(self, 'getip', ''):
return 'getip not set'
return self.validate_specifics()
def get_ip(self) -> Optional[str]:
'''Get IP address using the registered command.'''
result = subprocess.run(self.getip, shell=True, stdout=subprocess.PIPE, check=False)
if result.returncode:
print(f'WARNING: {self.getip} failed (exit code {result.returncode})')
return None
return result.stdout.decode(errors='replace').strip()
class LinodeDomain(Domain):
'''Domain registered with Linode Domains'''
access_token: str
def __repr__(self) -> str:
return f'<LinodeDomain domain={self.domain} ' \
'getip={repr(self.getip)} ' \
'access_token={repr(self.access_token)}>'
def update(self, ip: str) -> None:
print(f'update IP: {ip}')
def validate_specifics(self) -> str:
if not getattr(self, 'access_token', ''):
return 'Access token not set'
# OK
return ''
def parse_config() -> list[Domain]:
'''Parse configuration file'''
curr_domain = None
domains: list[Domain] = []
with open(config_path, encoding='utf-8') as config:
for line in config:
line = line.strip()
if line[0] == '#':
# comment
continue
if line.startswith('Domain '):
if curr_domain:
domains.append(curr_domain)
parts = line.split()
if len(parts) != 3:
fatal_error('Domain declaration should have exactly ' \
'two space-separated arguments (type and name)')
kind = parts[1]
domain_name = parts[2]
if kind == 'linode':
curr_domain = LinodeDomain(domain_name)
else:
fatal_error(f'No such domain type: {kind}')
else:
if not curr_domain:
fatal_error('First non-empty line of configuration must be a Domain declaration')
parts = [part.strip() for part in line.split('=', maxsplit=1)]
if len(parts) != 2:
fatal_error(f'Invalid syntax (want key = value): {line}')
[key, value] = parts
setattr(curr_domain, key, ast.literal_eval(value))
if curr_domain:
domains.append(curr_domain)
for domain in domains:
err = domain.validate()
if err:
fatal_error(f'In domain {domain.domain}: {err}')
return domains
configured_domains = parse_config()
if not configured_domains:
fatal_error('No domains defined. Try running with --setup?')
print(configured_domains[0].get_ip())
|