infra/dehydrated-powerdns-hook.py

103 lines
3.2 KiB
Python

#!/usr/bin/env python3
import requests
import sys
from configparser import ConfigParser
CONFIG_FILE = "/etc/dehydrated/hook.ini"
# contents should look something like this:
# [powerdns]
# endpoint = https://dns.example.lol/api/v1/servers/localhost
# api_key = lololololol
config = ConfigParser()
config.read("/etc/dehydrated/hook.ini")
def deploy_challenge(domain, token_filename, token_value):
endpoint = config["powerdns"]["endpoint"]
headers = {"X-Api-Key": config["powerdns"]["api_key"], "Content-Type": "application/json"}
# list all zones, try to find one that matches
zones = requests.get(f"{endpoint}/zones", headers=headers).json()
zone = None
for z in zones:
if domain.endswith(z['name'][:-1]):
zone = z['name']
break
if zone is None:
raise Exception(f"unable to find zone for {domain}")
records = [{"content": f"\"{token_value}\"", "disabled": False}]
name = f"_acme-challenge.{domain}."
existing_zone = requests.get(f"{endpoint}/zones/{zone}", headers=headers).json()
for rrset in existing_zone['rrsets']:
if rrset["name"] == name and rrset["type"] == "TXT":
records += rrset["records"]
patchdata = {
"rrsets": [
{
"name": name,
"type": "TXT",
"ttl": 30,
"changetype": "REPLACE",
"records": records
}
]
}
resp = requests.patch(f"{endpoint}/zones/{zone}", headers=headers, json=patchdata)
if not resp.ok:
print(resp.content.decode())
resp.raise_for_status()
def clean_challenge(domain, token_filename, token_value):
endpoint = config["powerdns"]["endpoint"]
headers = {"X-Api-Key": config["powerdns"]["api_key"], "Content-Type": "application/json"}
# list all zones, try to find one that matches
zones = requests.get(f"{endpoint}/zones", headers=headers).json()
zone = None
for z in zones:
if domain.endswith(z['name'][:-1]):
zone = z['name']
break
if zone is None:
raise Exception(f"unable to find zone for {domain}")
records = []
name = f"_acme-challenge.{domain}."
existing_zone = requests.get(f"{endpoint}/zones/{zone}", headers=headers).json()
for rrset in existing_zone['rrsets']:
if rrset["name"] == name and rrset["type"] == "TXT":
# preserve all records that don't match the one we are supposed to be cleaning
records += [x for x in rrset["records"] if x["content"] != f"\"{token_value}\""]
patchdata = {
"rrsets": [
{
"name": name,
"type": "TXT",
"ttl": 30,
"changetype": "REPLACE",
"records": records
}
]
}
resp = requests.patch(f"{endpoint}/zones/{zone}", headers=headers, json=patchdata)
if not resp.ok:
print(resp.content.decode())
resp.raise_for_status()
if __name__ == "__main__":
if sys.argv[1] == "deploy_challenge":
deploy_challenge(sys.argv[2], sys.argv[3], sys.argv[4])
elif sys.argv[1] == "clean_challenge":
clean_challenge(sys.argv[2], sys.argv[3], sys.argv[4])