AutoYADM commit: 2025-06-24 19:59:59

This commit is contained in:
Daniel Fichtinger 2025-06-24 19:59:59 -04:00
parent 66d4e2635a
commit a1faa7d872

View file

@ -1,132 +0,0 @@
#!/usr/bin/env python
# pyright: basic, reportUnusedCallResult=false
import atexit
import sys
import os
import tempfile
import signal
Position = tuple[int, int]
SpecList = list[tuple[Position, Position]]
diagnostics: SpecList = []
def parse_specs(data: str):
parsed: SpecList = []
for entry in data.strip().split():
if not entry or len(entry) < 9:
continue
range_part, _ = entry.split("|", 1)
start_str, end_str = range_part.split(",")
sl, sc = map(int, start_str.split("."))
el, ec = map(int, end_str.split("."))
parsed.append(((sl, sc), (el, ec)))
return parsed
def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool:
cl, cc = cursor
for (sl, sc), (el, ec) in diagnostics:
if cl < sl or cl > el:
continue
if sl == el:
if cl == sl and sc <= cc <= ec:
return True
elif cl == sl:
if cc >= sc:
return True
elif cl == el:
if cc <= ec:
return True
elif sl < cl < el:
return True
return False
def cleanup(inp: str, outp: str, dir: str):
# subprocess.run(["notify-send", "cleanup runs"])
try:
os.remove(inp)
os.remove(outp)
os.rmdir(dir)
except FileNotFoundError:
pass
def gen_kakoune_output(inp: str, outp: str) -> str:
return f"declare-option -hidden str diagpipe_in {inp}\ndeclare-option -hidden str diagpipe_out {outp}"
def daemonize(inp: str, outp: str, dir: str):
# fork and exit parent
if os.fork() > 0:
sys.exit(0)
# new session
os.setsid()
if os.fork() > 0:
# exit first child
sys.exit(0)
# redirect IO to /dev/null
with open("/dev/null", "rb", 0) as dn:
os.dup2(dn.fileno(), sys.stdin.fileno())
with open("/dev/null", "ab", 0) as dn:
os.dup2(dn.fileno(), sys.stdout.fileno())
os.dup2(dn.fileno(), sys.stderr.fileno())
_ = atexit.register(lambda: cleanup(inp, outp, dir))
def on_exit(*_):
# cleanup(inp, outp, dir)
sys.exit(0)
signal.signal(signal.SIGTERM, on_exit)
signal.signal(signal.SIGINT, on_exit)
def main():
# subprocess.run(["notify-send", "begin loop"])
# create unique directory and names
fifo_dir = tempfile.mkdtemp(prefix="diagpipe-")
in_path = os.path.join(fifo_dir, "in")
out_path = os.path.join(fifo_dir, "out")
# create fifos
os.mkfifo(in_path)
os.mkfifo(out_path)
output = gen_kakoune_output(in_path, out_path)
print(output)
sys.stdout.flush()
daemonize(in_path, out_path, fifo_dir)
with open(in_path, "r") as infile, open(out_path, "w") as outfile:
diagnostics: SpecList = []
while True:
line = infile.readline()
if not line:
continue
line = line.strip()
assert isinstance(line, str)
# # subprocess.run(["notify-send", f"Received command: {line}"])
if line.startswith("set "):
# subprocess.run(["notify-send", f"Received set: {line}"])
_, payload = line.split(" ", 1)
diagnostics = parse_specs(payload)
_ = outfile.write("ok\n")
outfile.flush()
elif line.startswith("query "):
_, pos = line.split(" ", 1)
l, c = map(int, pos.strip().split())
result = is_cursor_in_any((l, c), diagnostics)
_ = outfile.write("true\n" if result else "false\n")
outfile.flush()
elif line.startswith("exit"):
# subprocess.run(["notify-send", "exit received"])
sys.exit(0)
if __name__ == "__main__":
main()