#!/usr/bin/env python # pyright: basic, reportUnusedCallResult=false import atexit import subprocess import sys import os import tempfile import signal import types Position = tuple[int, int] SpecList = list[tuple[Position, Position]] diagnostics: SpecList = [] def parse_specs(data: str): parsed: SpecList = [] for entry in data.strip().split(): if not entry or len(entry) < 9: continue range_part, _ = entry.split("|", 1) start_str, end_str = range_part.split(",") sl, sc = map(int, start_str.split(".")) el, ec = map(int, end_str.split(".")) parsed.append(((sl, sc), (el, ec))) return parsed def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool: cl, cc = cursor for (sl, sc), (el, ec) in diagnostics: if cl < sl or cl > el: continue if sl == el: if cl == sl and sc <= cc <= ec: return True elif cl == sl: if cc >= sc: return True elif cl == el: if cc <= ec: return True elif sl < cl < el: return True return False def test(): test_data = "1 10.21,10.30|DiagnosticHint 33.43,33.47|DiagnosticHint 34.7,34.10|DiagnosticHint 57.8,57.17|DiagnosticHint 68.9,68.18|DiagnosticHint 69.14,69.23|DiagnosticHint 72.7,72.11|DiagnosticHint 73.70,73.79|DiagnosticHint 75.7,75.11|DiagnosticHint 77.47,77.56|DiagnosticHint 79.27,79.28|DiagnosticHint 81.61,81.70|DiagnosticHint 84.66,84.75|DiagnosticHint 96.4,96.13|DiagnosticHint 97.3,97.12|DiagnosticHint 101.8,101.20|DiagnosticHint 145.28,145.43|DiagnosticHint 153.13,153.21|DiagnosticHint 155.13,155.21|DiagnosticHint 158.5,158.14|DiagnosticHint 164.56,164.61|DiagnosticHint 203.21,203.21|DiagnosticHint 204.28,204.28|DiagnosticHint 206.16,206.16|DiagnosticHint 209.5,209.5|DiagnosticHint 210.5,210.5|DiagnosticHint 216.25,216.25|DiagnosticHint 219.21,219.21|DiagnosticHint 225.27,225.27|DiagnosticHint 234.59,234.59|DiagnosticHint 263.37,263.39|DiagnosticHint 268.55,268.57|DiagnosticHint 287.46,287.47|DiagnosticHint 326.16,326.25|DiagnosticHint 337.34,337.36|DiagnosticHint 339.33,339.35|DiagnosticHint 364.63,364.71|DiagnosticHint 366.46,366.54|DiagnosticHint" out = parse_specs(test_data) print(out) test_cursor = (10, 24) print(is_cursor_in_any(test_cursor, out)) def cleanup(inp: str, outp: str, dir: str): # subprocess.run(["notify-send", "cleanup runs"]) try: os.remove(inp) os.remove(outp) os.rmdir(dir) except FileNotFoundError: pass def gen_kakoune_output(inp: str, outp: str) -> str: return f"declare-option -hidden str diagpipe_in {inp}\ndeclare-option -hidden str diagpipe_out {outp}" def daemonize(inp: str, outp: str, dir: str): # fork and exit parent if os.fork() > 0: sys.exit(0) # new session os.setsid() if os.fork() > 0: # exit first child sys.exit(0) # redirect IO to /dev/null with open("/dev/null", "rb", 0) as dn: os.dup2(dn.fileno(), sys.stdin.fileno()) with open("/dev/null", "ab", 0) as dn: os.dup2(dn.fileno(), sys.stdout.fileno()) os.dup2(dn.fileno(), sys.stderr.fileno()) _ = atexit.register(lambda: cleanup(inp, outp, dir)) def on_exit(*_): # cleanup(inp, outp, dir) sys.exit(0) signal.signal(signal.SIGTERM, on_exit) signal.signal(signal.SIGINT, on_exit) def main(): # subprocess.run(["notify-send", "begin loop"]) # create unique directory and names fifo_dir = tempfile.mkdtemp(prefix="diagpipe-") in_path = os.path.join(fifo_dir, "in") out_path = os.path.join(fifo_dir, "out") # create fifos os.mkfifo(in_path) os.mkfifo(out_path) output = gen_kakoune_output(in_path, out_path) print(output) sys.stdout.flush() daemonize(in_path, out_path, fifo_dir) with open(in_path, "r") as infile, open(out_path, "w") as outfile: diagnostics: SpecList = [] while True: line = infile.readline() if not line: continue assert isinstance(line, str) # # subprocess.run(["notify-send", f"Received command: {line}"]) if line.startswith("set "): # subprocess.run(["notify-send", f"Received set: {line}"]) _, payload = line.split(" ", 1) diagnostics = parse_specs(payload) _ = outfile.write("ok\n") outfile.flush() elif line.startswith("query "): _, pos = line.split(" ", 1) l, c = map(int, pos.strip().split()) result = is_cursor_in_any((l, c), diagnostics) _ = outfile.write("true\n" if result else "false\n") outfile.flush() elif line.startswith("exit"): # subprocess.run(["notify-send", "exit received"]) sys.exit(0) if __name__ == "__main__": main()