104 lines
3.2 KiB
Python
104 lines
3.2 KiB
Python
|
#!/usr/bin/env python3
|
||
|
import requests
|
||
|
import sys
|
||
|
from configparser import ConfigParser
|
||
|
|
||
|
CONFIG_FILE = "/etc/dehydrated/hook.ini"
|
||
|
# contents should look something like this:
|
||
|
# [powerdns]
|
||
|
# endpoint = https://dns.example.lol/api/v1/servers/localhost
|
||
|
# api_key = lololololol
|
||
|
|
||
|
config = ConfigParser()
|
||
|
config.read("/etc/dehydrated/hook.ini")
|
||
|
|
||
|
def deploy_challenge(domain, token_filename, token_value):
|
||
|
endpoint = config["powerdns"]["endpoint"]
|
||
|
headers = {"X-Api-Key": config["powerdns"]["api_key"], "Content-Type": "application/json"}
|
||
|
|
||
|
# list all zones, try to find one that matches
|
||
|
zones = requests.get(f"{endpoint}/zones", headers=headers).json()
|
||
|
|
||
|
zone = None
|
||
|
for z in zones:
|
||
|
if domain.endswith(z['name'][:-1]):
|
||
|
zone = z['name']
|
||
|
break
|
||
|
|
||
|
if zone is None:
|
||
|
raise Exception(f"unable to find zone for {domain}")
|
||
|
|
||
|
records = [{"content": f"\"{token_value}\"", "disabled": False}]
|
||
|
|
||
|
name = f"_acme-challenge.{domain}."
|
||
|
|
||
|
existing_zone = requests.get(f"{endpoint}/zones/{zone}", headers=headers).json()
|
||
|
for rrset in existing_zone['rrsets']:
|
||
|
if rrset["name"] == name and rrset["type"] == "TXT":
|
||
|
records += rrset["records"]
|
||
|
|
||
|
patchdata = {
|
||
|
"rrsets": [
|
||
|
{
|
||
|
"name": name,
|
||
|
"type": "TXT",
|
||
|
"ttl": 30,
|
||
|
"changetype": "REPLACE",
|
||
|
"records": records
|
||
|
}
|
||
|
]
|
||
|
}
|
||
|
|
||
|
resp = requests.patch(f"{endpoint}/zones/{zone}", headers=headers, json=patchdata)
|
||
|
if not resp.ok:
|
||
|
print(resp.content.decode())
|
||
|
resp.raise_for_status()
|
||
|
|
||
|
def clean_challenge(domain, token_filename, token_value):
|
||
|
endpoint = config["powerdns"]["endpoint"]
|
||
|
headers = {"X-Api-Key": config["powerdns"]["api_key"], "Content-Type": "application/json"}
|
||
|
|
||
|
# list all zones, try to find one that matches
|
||
|
zones = requests.get(f"{endpoint}/zones", headers=headers).json()
|
||
|
|
||
|
zone = None
|
||
|
for z in zones:
|
||
|
if domain.endswith(z['name'][:-1]):
|
||
|
zone = z['name']
|
||
|
break
|
||
|
|
||
|
if zone is None:
|
||
|
raise Exception(f"unable to find zone for {domain}")
|
||
|
|
||
|
records = []
|
||
|
name = f"_acme-challenge.{domain}."
|
||
|
|
||
|
existing_zone = requests.get(f"{endpoint}/zones/{zone}", headers=headers).json()
|
||
|
for rrset in existing_zone['rrsets']:
|
||
|
if rrset["name"] == name and rrset["type"] == "TXT":
|
||
|
# preserve all records that don't match the one we are supposed to be cleaning
|
||
|
records += [x for x in rrset["records"] if x["content"] != f"\"{token_value}\""]
|
||
|
|
||
|
patchdata = {
|
||
|
"rrsets": [
|
||
|
{
|
||
|
"name": name,
|
||
|
"type": "TXT",
|
||
|
"ttl": 30,
|
||
|
"changetype": "REPLACE",
|
||
|
"records": records
|
||
|
}
|
||
|
]
|
||
|
}
|
||
|
|
||
|
resp = requests.patch(f"{endpoint}/zones/{zone}", headers=headers, json=patchdata)
|
||
|
if not resp.ok:
|
||
|
print(resp.content.decode())
|
||
|
resp.raise_for_status()
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
if sys.argv[1] == "deploy_challenge":
|
||
|
deploy_challenge(sys.argv[2], sys.argv[3], sys.argv[4])
|
||
|
elif sys.argv[1] == "clean_challenge":
|
||
|
clean_challenge(sys.argv[2], sys.argv[3], sys.argv[4])
|