diff --git a/.config/kak/scripts/lsp-diags.py b/.config/kak/scripts/lsp-diags.py deleted file mode 100755 index e9b30345..00000000 --- a/.config/kak/scripts/lsp-diags.py +++ /dev/null @@ -1,143 +0,0 @@ -#!/usr/bin/env python - -# pyright: basic, reportUnusedCallResult=false - -import atexit -import subprocess -import sys -import os -import tempfile -import signal -import types -import selectors - -Position = tuple[int, int] -SpecList = list[tuple[Position, Position]] - -diagnostics: SpecList = [] - - -def parse_specs(data: str): - parsed: SpecList = [] - for entry in data.strip().split(): - if not entry or len(entry) < 9: - continue - range_part, _ = entry.split("|", 1) - start_str, end_str = range_part.split(",") - sl, sc = map(int, start_str.split(".")) - el, ec = map(int, end_str.split(".")) - parsed.append(((sl, sc), (el, ec))) - return parsed - - -def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool: - cl, cc = cursor - for (sl, sc), (el, ec) in diagnostics: - if cl < sl or cl > el: - continue - if sl == el: - if cl == sl and sc <= cc <= ec: - return True - elif cl == sl: - if cc >= sc: - return True - elif cl == el: - if cc <= ec: - return True - elif sl < cl < el: - return True - return False - - -def cleanup(inp: str, outp: str, dir: str): - # subprocess.run(["notify-send", "cleanup runs"]) - try: - os.remove(inp) - os.remove(outp) - os.rmdir(dir) - except FileNotFoundError: - pass - - -def gen_kakoune_output(inp: str, outp: str) -> str: - return f"declare-option -hidden str diagpipe_in {inp}\ndeclare-option -hidden str diagpipe_out {outp}" - - -def daemonize(inp: str, outp: str, dir: str): - # fork and exit parent - if os.fork() > 0: - sys.exit(0) - # new session - os.setsid() - if os.fork() > 0: - # exit first child - sys.exit(0) - - # redirect IO to /dev/null - with open("/dev/null", "rb", 0) as dn: - os.dup2(dn.fileno(), sys.stdin.fileno()) - with open("/dev/null", "ab", 0) as dn: - os.dup2(dn.fileno(), sys.stdout.fileno()) - os.dup2(dn.fileno(), sys.stderr.fileno()) - _ = atexit.register(lambda: cleanup(inp, outp, dir)) - - def on_exit(*_): - # cleanup(inp, outp, dir) - sys.exit(0) - - signal.signal(signal.SIGTERM, on_exit) - signal.signal(signal.SIGINT, on_exit) - - -def main(): - # subprocess.run(["notify-send", "begin loop"]) - # create unique directory and names - fifo_dir = tempfile.mkdtemp(prefix="diagpipe-") - in_path = os.path.join(fifo_dir, "in") - out_path = os.path.join(fifo_dir, "out") - - # create fifos - os.mkfifo(in_path) - os.mkfifo(out_path) - - output = gen_kakoune_output(in_path, out_path) - print(output) - sys.stdout.flush() - daemonize(in_path, out_path, fifo_dir) - - sel = selectors.DefaultSelector() - infile = open(in_path, "r", buffering=1) - outfile = open(out_path, "w", buffering=1) - sel.register(infile.fileno(), selectors.EVENT_READ, data=infile) - - # with open(in_path, "r") as infile, open(out_path, "w") as outfile: - diagnostics: SpecList = [] - while True: - for key, _ in sel.select(timeout=None): - infile_obj = key.data - line = infile_obj.readline() - # line = infile.readline() - if not line: - continue - line = line.strip() - assert isinstance(line, str) - # # subprocess.run(["notify-send", f"Received command: {line}"]) - if line.startswith("set "): - # subprocess.run(["notify-send", f"Received set: {line}"]) - _, payload = line.split(" ", 1) - diagnostics = parse_specs(payload) - _ = outfile.write("ok\n") - outfile.flush() - elif line.startswith("query "): - _, pos = line.split(" ", 1) - l, c = map(int, pos.strip().split()) - result = is_cursor_in_any((l, c), diagnostics) - _ = outfile.write("true\n" if result else "false\n") - outfile.flush() - elif line.startswith("exit"): - # subprocess.run(["notify-send", "exit received"]) - sys.exit(0) - - -if __name__ == "__main__": - main() diff --git a/.config/kak/scripts/lsp-diags.py b/.config/kak/scripts/lsp-diags.py new file mode 120000 index 00000000..732dc5e5 --- /dev/null +++ b/.config/kak/scripts/lsp-diags.py @@ -0,0 +1 @@ +/home/fic/dev/kak-lsp-diags/lsp-diags.py \ No newline at end of file diff --git a/.config/kak/scripts/lsp-diags_.py b/.config/kak/scripts/lsp-diags_.py new file mode 100755 index 00000000..dd548ed2 --- /dev/null +++ b/.config/kak/scripts/lsp-diags_.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python + +# pyright: basic, reportUnusedCallResult=false + +import atexit +import sys +import os +import tempfile +import signal + +Position = tuple[int, int] +SpecList = list[tuple[Position, Position]] + +diagnostics: SpecList = [] + + +def parse_specs(data: str): + parsed: SpecList = [] + for entry in data.strip().split(): + if not entry or len(entry) < 9: + continue + range_part, _ = entry.split("|", 1) + start_str, end_str = range_part.split(",") + sl, sc = map(int, start_str.split(".")) + el, ec = map(int, end_str.split(".")) + parsed.append(((sl, sc), (el, ec))) + return parsed + + +def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool: + cl, cc = cursor + for (sl, sc), (el, ec) in diagnostics: + if cl < sl or cl > el: + continue + if sl == el: + if cl == sl and sc <= cc <= ec: + return True + elif cl == sl: + if cc >= sc: + return True + elif cl == el: + if cc <= ec: + return True + elif sl < cl < el: + return True + return False + + +def cleanup(inp: str, outp: str, dir: str): + # subprocess.run(["notify-send", "cleanup runs"]) + try: + os.remove(inp) + os.remove(outp) + os.rmdir(dir) + except FileNotFoundError: + pass + + +def gen_kakoune_output(inp: str, outp: str) -> str: + return f"declare-option -hidden str diagpipe_in {inp}\ndeclare-option -hidden str diagpipe_out {outp}" + + +def daemonize(inp: str, outp: str, dir: str): + # fork and exit parent + if os.fork() > 0: + sys.exit(0) + # new session + os.setsid() + if os.fork() > 0: + # exit first child + sys.exit(0) + + # redirect IO to /dev/null + with open("/dev/null", "rb", 0) as dn: + os.dup2(dn.fileno(), sys.stdin.fileno()) + with open("/dev/null", "ab", 0) as dn: + os.dup2(dn.fileno(), sys.stdout.fileno()) + os.dup2(dn.fileno(), sys.stderr.fileno()) + _ = atexit.register(lambda: cleanup(inp, outp, dir)) + + def on_exit(*_): + # cleanup(inp, outp, dir) + sys.exit(0) + + signal.signal(signal.SIGTERM, on_exit) + signal.signal(signal.SIGINT, on_exit) + + +def main(): + # subprocess.run(["notify-send", "begin loop"]) + # create unique directory and names + fifo_dir = tempfile.mkdtemp(prefix="diagpipe-") + in_path = os.path.join(fifo_dir, "in") + out_path = os.path.join(fifo_dir, "out") + + # create fifos + os.mkfifo(in_path) + os.mkfifo(out_path) + + output = gen_kakoune_output(in_path, out_path) + print(output) + sys.stdout.flush() + daemonize(in_path, out_path, fifo_dir) + + with open(in_path, "r") as infile, open(out_path, "w") as outfile: + diagnostics: SpecList = [] + while True: + line = infile.readline() + if not line: + continue + line = line.strip() + assert isinstance(line, str) + # # subprocess.run(["notify-send", f"Received command: {line}"]) + if line.startswith("set "): + # subprocess.run(["notify-send", f"Received set: {line}"]) + _, payload = line.split(" ", 1) + diagnostics = parse_specs(payload) + _ = outfile.write("ok\n") + outfile.flush() + elif line.startswith("query "): + _, pos = line.split(" ", 1) + l, c = map(int, pos.strip().split()) + result = is_cursor_in_any((l, c), diagnostics) + _ = outfile.write("true\n" if result else "false\n") + outfile.flush() + elif line.startswith("exit"): + # subprocess.run(["notify-send", "exit received"]) + sys.exit(0) + + +if __name__ == "__main__": + main()