132 lines
3.8 KiB
Python
Executable file
132 lines
3.8 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
# pyright: basic, reportUnusedCallResult=false
|
|
|
|
import atexit
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import signal
|
|
|
|
Position = tuple[int, int]
|
|
SpecList = list[tuple[Position, Position]]
|
|
|
|
diagnostics: SpecList = []
|
|
|
|
|
|
def parse_specs(data: str):
|
|
parsed: SpecList = []
|
|
for entry in data.strip().split():
|
|
if not entry or len(entry) < 9:
|
|
continue
|
|
range_part, _ = entry.split("|", 1)
|
|
start_str, end_str = range_part.split(",")
|
|
sl, sc = map(int, start_str.split("."))
|
|
el, ec = map(int, end_str.split("."))
|
|
parsed.append(((sl, sc), (el, ec)))
|
|
return parsed
|
|
|
|
|
|
def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool:
|
|
cl, cc = cursor
|
|
for (sl, sc), (el, ec) in diagnostics:
|
|
if cl < sl or cl > el:
|
|
continue
|
|
if sl == el:
|
|
if cl == sl and sc <= cc <= ec:
|
|
return True
|
|
elif cl == sl:
|
|
if cc >= sc:
|
|
return True
|
|
elif cl == el:
|
|
if cc <= ec:
|
|
return True
|
|
elif sl < cl < el:
|
|
return True
|
|
return False
|
|
|
|
|
|
def cleanup(inp: str, outp: str, dir: str):
|
|
# subprocess.run(["notify-send", "cleanup runs"])
|
|
try:
|
|
os.remove(inp)
|
|
os.remove(outp)
|
|
os.rmdir(dir)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def gen_kakoune_output(inp: str, outp: str) -> str:
|
|
return f"declare-option -hidden str diagpipe_in {inp}\ndeclare-option -hidden str diagpipe_out {outp}"
|
|
|
|
|
|
def daemonize(inp: str, outp: str, dir: str):
|
|
# fork and exit parent
|
|
if os.fork() > 0:
|
|
sys.exit(0)
|
|
# new session
|
|
os.setsid()
|
|
if os.fork() > 0:
|
|
# exit first child
|
|
sys.exit(0)
|
|
|
|
# redirect IO to /dev/null
|
|
with open("/dev/null", "rb", 0) as dn:
|
|
os.dup2(dn.fileno(), sys.stdin.fileno())
|
|
with open("/dev/null", "ab", 0) as dn:
|
|
os.dup2(dn.fileno(), sys.stdout.fileno())
|
|
os.dup2(dn.fileno(), sys.stderr.fileno())
|
|
_ = atexit.register(lambda: cleanup(inp, outp, dir))
|
|
|
|
def on_exit(*_):
|
|
# cleanup(inp, outp, dir)
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, on_exit)
|
|
signal.signal(signal.SIGINT, on_exit)
|
|
|
|
|
|
def main():
|
|
# subprocess.run(["notify-send", "begin loop"])
|
|
# create unique directory and names
|
|
fifo_dir = tempfile.mkdtemp(prefix="diagpipe-")
|
|
in_path = os.path.join(fifo_dir, "in")
|
|
out_path = os.path.join(fifo_dir, "out")
|
|
|
|
# create fifos
|
|
os.mkfifo(in_path)
|
|
os.mkfifo(out_path)
|
|
|
|
output = gen_kakoune_output(in_path, out_path)
|
|
print(output)
|
|
sys.stdout.flush()
|
|
daemonize(in_path, out_path, fifo_dir)
|
|
|
|
with open(in_path, "r") as infile, open(out_path, "w") as outfile:
|
|
diagnostics: SpecList = []
|
|
while True:
|
|
line = infile.readline()
|
|
if not line:
|
|
continue
|
|
line = line.strip()
|
|
assert isinstance(line, str)
|
|
# # subprocess.run(["notify-send", f"Received command: {line}"])
|
|
if line.startswith("set "):
|
|
# subprocess.run(["notify-send", f"Received set: {line}"])
|
|
_, payload = line.split(" ", 1)
|
|
diagnostics = parse_specs(payload)
|
|
_ = outfile.write("ok\n")
|
|
outfile.flush()
|
|
elif line.startswith("query "):
|
|
_, pos = line.split(" ", 1)
|
|
l, c = map(int, pos.strip().split())
|
|
result = is_cursor_in_any((l, c), diagnostics)
|
|
_ = outfile.write("true\n" if result else "false\n")
|
|
outfile.flush()
|
|
elif line.startswith("exit"):
|
|
# subprocess.run(["notify-send", "exit received"])
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|