130 lines
4.4 KiB
Python
Executable file
130 lines
4.4 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
# pyright: strict, reportUnusedCallResult=false
|
|
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import atexit
|
|
|
|
Position = tuple[int, int]
|
|
SpecList = list[tuple[Position, Position]]
|
|
|
|
diagnostics: SpecList = []
|
|
|
|
|
|
def parse_specs(data: str):
|
|
parsed: SpecList = []
|
|
for entry in data.strip().split():
|
|
if not entry or len(entry) < 9:
|
|
continue
|
|
range_part, _ = entry.split("|", 1)
|
|
start_str, end_str = range_part.split(",")
|
|
sl, sc = map(int, start_str.split("."))
|
|
el, ec = map(int, end_str.split("."))
|
|
parsed.append(((sl, sc), (el, ec)))
|
|
return parsed
|
|
|
|
|
|
def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool:
|
|
cl, cc = cursor
|
|
for (sl, sc), (el, ec) in diagnostics:
|
|
if cl < sl or cl > el:
|
|
continue
|
|
if sl == el:
|
|
if cl == sl and sc <= cc <= ec:
|
|
return True
|
|
elif cl == sl:
|
|
if cc >= sc:
|
|
return True
|
|
elif cl == el:
|
|
if cc <= ec:
|
|
return True
|
|
elif sl < cl < el:
|
|
return True
|
|
return False
|
|
|
|
|
|
def test():
|
|
test_data = "1 10.21,10.30|DiagnosticHint 33.43,33.47|DiagnosticHint 34.7,34.10|DiagnosticHint 57.8,57.17|DiagnosticHint 68.9,68.18|DiagnosticHint 69.14,69.23|DiagnosticHint 72.7,72.11|DiagnosticHint 73.70,73.79|DiagnosticHint 75.7,75.11|DiagnosticHint 77.47,77.56|DiagnosticHint 79.27,79.28|DiagnosticHint 81.61,81.70|DiagnosticHint 84.66,84.75|DiagnosticHint 96.4,96.13|DiagnosticHint 97.3,97.12|DiagnosticHint 101.8,101.20|DiagnosticHint 145.28,145.43|DiagnosticHint 153.13,153.21|DiagnosticHint 155.13,155.21|DiagnosticHint 158.5,158.14|DiagnosticHint 164.56,164.61|DiagnosticHint 203.21,203.21|DiagnosticHint 204.28,204.28|DiagnosticHint 206.16,206.16|DiagnosticHint 209.5,209.5|DiagnosticHint 210.5,210.5|DiagnosticHint 216.25,216.25|DiagnosticHint 219.21,219.21|DiagnosticHint 225.27,225.27|DiagnosticHint 234.59,234.59|DiagnosticHint 263.37,263.39|DiagnosticHint 268.55,268.57|DiagnosticHint 287.46,287.47|DiagnosticHint 326.16,326.25|DiagnosticHint 337.34,337.36|DiagnosticHint 339.33,339.35|DiagnosticHint 364.63,364.71|DiagnosticHint 366.46,366.54|DiagnosticHint"
|
|
out = parse_specs(test_data)
|
|
print(out)
|
|
test_cursor = (10, 24)
|
|
print(is_cursor_in_any(test_cursor, out))
|
|
|
|
|
|
def cleanup(inp: str, outp: str, dir: str):
|
|
try:
|
|
os.remove(inp)
|
|
os.remove(outp)
|
|
os.rmdir(dir)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def gen_kakoune_output(inp: str, outp: str) -> str:
|
|
return f"declare_option -hidden str diagpipe_in {inp}\ndeclare_option -hidden str diagpipe_out {outp}"
|
|
|
|
|
|
def daemonize():
|
|
# exit parent
|
|
if os.fork() > 0:
|
|
sys.exit(0)
|
|
# new session
|
|
os.setsid()
|
|
if os.fork() > 0:
|
|
# exit first child
|
|
sys.exit(0)
|
|
_ = sys.stdin.close()
|
|
_ = sys.stdout.flush()
|
|
_ = sys.stderr.flush()
|
|
|
|
# redirect IO to /dev/null
|
|
with open("/dev/null", "rb", 0) as dn:
|
|
os.dup2(dn.fileno(), sys.stdin.fileno())
|
|
with open("/dev/null", "ab", 0) as dn:
|
|
os.dup2(dn.fileno(), sys.stdout.fileno())
|
|
os.dup2(dn.fileno(), sys.stderr.fileno())
|
|
|
|
|
|
def main():
|
|
# create unique directory and names
|
|
fifo_dir = tempfile.mkdtemp(prefix="diagpipe-")
|
|
in_path = os.path.join(fifo_dir, "in")
|
|
out_path = os.path.join(fifo_dir, "out")
|
|
|
|
# create fifos
|
|
os.mkfifo(in_path)
|
|
os.mkfifo(out_path)
|
|
|
|
_ = atexit.register(lambda: cleanup(in_path, out_path, fifo_dir))
|
|
|
|
output = gen_kakoune_output(in_path, out_path)
|
|
print(output)
|
|
sys.stdout.flush()
|
|
daemonize()
|
|
|
|
with open(in_path, "r") as infile, open(out_path, "w") as outfile:
|
|
diagnostics = []
|
|
while True:
|
|
line = infile.readline()
|
|
if not line:
|
|
continue
|
|
assert isinstance(line, str)
|
|
if line.startswith("set "):
|
|
_, payload = line.split(" ", 1)
|
|
diagnostics = parse_specs(payload)
|
|
_ = outfile.write("ok\n")
|
|
outfile.flush()
|
|
elif line.startswith("query "):
|
|
_, pos = line.split(" ", 1)
|
|
l, c = map(int, pos.strip().split())
|
|
result = is_cursor_in_any((l, c), diagnostics)
|
|
_ = outfile.write("true\n" if result else "false\n")
|
|
outfile.flush()
|
|
elif line == "exit":
|
|
break
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|