#!/usr/bin/env python3 """Monitoring plugin (Nagios-compatible) for watching a Gemini server The monitoring plugin API is documented at . """ import socket import re import sys import datetime import getopt import urllib.parse # https://www.pyopenssl.org/ # https://pyopenssl.readthedocs.io/ import OpenSSL # https://framagit.org/bortzmeyer/agunua import Agunua # Do not touch # https://www.monitoring-plugins.org/doc/guidelines.html#AEN78 STATE_OK = 0 STATE_WARNING = 1 STATE_CRITICAL = 2 STATE_UNKNOWN = 3 STATE_DEPENDENT = 4 # Can be changed from the command-line host = None vhostname = None path = "" expect = None expect_statuscode = None # "meta" is defined in Gemini protocol specification expect_meta = None sni = True warn_cert = None crit_cert = None insecure = False disable_tofu = False accept_expired = False socks = None port = Agunua.GEMINI_PORT forceIPv4 = False forceIPv6 = False get_content = False err_message = "" def error(msg=None): if msg is None: msg = "Unknown error" print("%s CRITICAL: %s" % (host, msg)) sys.exit(STATE_CRITICAL) def warning(msg=None): if msg is None: msg = "Unknown warning" print("%s WARNING: %s" % (host, msg)) sys.exit(STATE_WARNING) try: try: optlist, args = getopt.getopt (sys.argv[1:], "hH:p:V:e:c:C:m:P:s:ghi46xaT") for option, value in optlist: if option == "-h": print("Standard Nagios API options") sys.exit(STATE_UNKNOWN) elif option == "-H": host = value elif option == "-V": vhostname = value elif option == "-e": expect = value elif option == "-c": expect_statuscode = value elif option == "-C": if value.find(",") < 0: warn_cert = int(value) else: (warning_c, critical_c) = value.split(",") warn_cert = int(warning_c) crit_cert = int(critical_c) elif option == "-m": expect_meta = value elif option == "-p": path = value elif option == "-P": port = int(value) elif option == "-i": insecure = True elif option == "-T": disable_tofu = True elif option == "-x": sni = False elif option == "-a": accept_expired = True elif option == "-s": match = re.search("^\[?(([\w\.]+)|(([a-fA-F0-9:]+))|([0-9\.]+))\]?:([0-9]+)$", value) if not match: print("Socks must be host:port") sys.exit(STATE_UNKNOWN) socks = (match.group(1), int(match.group(6))) elif option == "-4": forceIPv4 = True elif option == "-6": forceIPv6 = True elif option == '-g': get_content = True else: # Should never occur, it is trapped by getopt print("Unknown option %s" % option) sys.exit(STATE_UNKNOWN) except getopt.error as reason: print("Option parsing problem %s" % reason) sys.exit(STATE_UNKNOWN) if host is None: print("Host (-H) is necessary") sys.exit(STATE_UNKNOWN) if vhostname is None: vhostname = host if socks is not None and vhostname.lower().endswith(".onion"): host = vhostname if forceIPv4 and forceIPv6: print("Force IPv4 or IPv6 but not both") sys.exit(STATE_UNKNOWN) if warn_cert is not None and crit_cert is not None and warn_cert <= crit_cert: print("Warning threshold must be higher than critical threshold") sys.exit(STATE_UNKNOWN) if len(args) > 0: print("Too many arguments (\"%s\")" % args) sys.exit(STATE_UNKNOWN) if port == Agunua.GEMINI_PORT: netloc = vhostname else: netloc = "%s:%i" % (vhostname, port) url = urllib.parse.urlunsplit(("gemini", netloc, path, "", "")) if disable_tofu: tofu_path = "" else: tofu_path = Agunua.TOFU if expect != None: get_content = True start = datetime.datetime.now() result = Agunua.GeminiUri(url, get_content=get_content, connect_to=host, use_socks=socks, insecure=insecure, tofu=tofu_path, accept_expired=accept_expired, send_sni=sni, force_ipv4=forceIPv4, force_ipv6=forceIPv6) if not result.network_success: error(result.error) end = datetime.datetime.now() if get_content: data = result.payload if expect_statuscode is not None: if result.status_code != expect_statuscode: error("Unexpected status code %s (expected was %s)" % \ (result.status_code, expect_statuscode)) elif result.status_code == "20": if expect_meta is not None: f = result.meta.find(expect_meta) if f < 0: error("\"%s\" not found in meta (value is \"%s\")" % (expect_meta, result.meta)) if expect is not None: f = data.find(expect) if f < 0: error("\"%s\" not found in answer" % expect) else: error("Status code %s: %s" % (result.status_code, result.meta)) now = datetime.datetime.now() if crit_cert is not None and (result.cert_not_after <= now + datetime.timedelta(days=crit_cert)): error("Certificate expires in less than %i days" % crit_cert) if warn_cert is not None and (result.cert_not_after <= now + datetime.timedelta(days=warn_cert)): warning("Certificate expires in less than %i days" % warn_cert) latency = end-start elapsed = latency.seconds + latency.microseconds/(10**6) if get_content: size = " - %i bytes" % len(data) else: size = "" print("%s OK - %s%s - %.3f seconds" % (host, "No error", size, elapsed)) sys.exit(STATE_OK) except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() print("%s UNKNOWN - Unknown internal error %s: %s" % (host, exc_type, exc_value)) sys.exit(STATE_UNKNOWN) gemini://gemini.bortzmeyer.org/software/manisha/check_gemini.py

-- Leo's gemini proxy

-- Connecting to gemini.bortzmeyer.org:1965...

-- Connected

-- Sending request

-- Meta line: 20 text/plain

-- Response ended

-- Page fetched on Wed May 8 03:36:40 2024