AutoYADM commit: 2025-06-10 15:43:47
This commit is contained in:
parent
f096e44679
commit
5a1ce73215
2 changed files with 133 additions and 143 deletions
|
@ -1,143 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
# pyright: basic, reportUnusedCallResult=false
|
|
||||||
|
|
||||||
import atexit
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import signal
|
|
||||||
import types
|
|
||||||
import selectors
|
|
||||||
|
|
||||||
Position = tuple[int, int]
|
|
||||||
SpecList = list[tuple[Position, Position]]
|
|
||||||
|
|
||||||
diagnostics: SpecList = []
|
|
||||||
|
|
||||||
|
|
||||||
def parse_specs(data: str):
|
|
||||||
parsed: SpecList = []
|
|
||||||
for entry in data.strip().split():
|
|
||||||
if not entry or len(entry) < 9:
|
|
||||||
continue
|
|
||||||
range_part, _ = entry.split("|", 1)
|
|
||||||
start_str, end_str = range_part.split(",")
|
|
||||||
sl, sc = map(int, start_str.split("."))
|
|
||||||
el, ec = map(int, end_str.split("."))
|
|
||||||
parsed.append(((sl, sc), (el, ec)))
|
|
||||||
return parsed
|
|
||||||
|
|
||||||
|
|
||||||
def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool:
|
|
||||||
cl, cc = cursor
|
|
||||||
for (sl, sc), (el, ec) in diagnostics:
|
|
||||||
if cl < sl or cl > el:
|
|
||||||
continue
|
|
||||||
if sl == el:
|
|
||||||
if cl == sl and sc <= cc <= ec:
|
|
||||||
return True
|
|
||||||
elif cl == sl:
|
|
||||||
if cc >= sc:
|
|
||||||
return True
|
|
||||||
elif cl == el:
|
|
||||||
if cc <= ec:
|
|
||||||
return True
|
|
||||||
elif sl < cl < el:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def cleanup(inp: str, outp: str, dir: str):
|
|
||||||
# subprocess.run(["notify-send", "cleanup runs"])
|
|
||||||
try:
|
|
||||||
os.remove(inp)
|
|
||||||
os.remove(outp)
|
|
||||||
os.rmdir(dir)
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def gen_kakoune_output(inp: str, outp: str) -> str:
|
|
||||||
return f"declare-option -hidden str diagpipe_in {inp}\ndeclare-option -hidden str diagpipe_out {outp}"
|
|
||||||
|
|
||||||
|
|
||||||
def daemonize(inp: str, outp: str, dir: str):
|
|
||||||
# fork and exit parent
|
|
||||||
if os.fork() > 0:
|
|
||||||
sys.exit(0)
|
|
||||||
# new session
|
|
||||||
os.setsid()
|
|
||||||
if os.fork() > 0:
|
|
||||||
# exit first child
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
# redirect IO to /dev/null
|
|
||||||
with open("/dev/null", "rb", 0) as dn:
|
|
||||||
os.dup2(dn.fileno(), sys.stdin.fileno())
|
|
||||||
with open("/dev/null", "ab", 0) as dn:
|
|
||||||
os.dup2(dn.fileno(), sys.stdout.fileno())
|
|
||||||
os.dup2(dn.fileno(), sys.stderr.fileno())
|
|
||||||
_ = atexit.register(lambda: cleanup(inp, outp, dir))
|
|
||||||
|
|
||||||
def on_exit(*_):
|
|
||||||
# cleanup(inp, outp, dir)
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, on_exit)
|
|
||||||
signal.signal(signal.SIGINT, on_exit)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
# subprocess.run(["notify-send", "begin loop"])
|
|
||||||
# create unique directory and names
|
|
||||||
fifo_dir = tempfile.mkdtemp(prefix="diagpipe-")
|
|
||||||
in_path = os.path.join(fifo_dir, "in")
|
|
||||||
out_path = os.path.join(fifo_dir, "out")
|
|
||||||
|
|
||||||
# create fifos
|
|
||||||
os.mkfifo(in_path)
|
|
||||||
os.mkfifo(out_path)
|
|
||||||
|
|
||||||
output = gen_kakoune_output(in_path, out_path)
|
|
||||||
print(output)
|
|
||||||
sys.stdout.flush()
|
|
||||||
daemonize(in_path, out_path, fifo_dir)
|
|
||||||
|
|
||||||
sel = selectors.DefaultSelector()
|
|
||||||
infile = open(in_path, "r", buffering=1)
|
|
||||||
outfile = open(out_path, "w", buffering=1)
|
|
||||||
sel.register(infile.fileno(), selectors.EVENT_READ, data=infile)
|
|
||||||
|
|
||||||
# with open(in_path, "r") as infile, open(out_path, "w") as outfile:
|
|
||||||
diagnostics: SpecList = []
|
|
||||||
while True:
|
|
||||||
for key, _ in sel.select(timeout=None):
|
|
||||||
infile_obj = key.data
|
|
||||||
line = infile_obj.readline()
|
|
||||||
# line = infile.readline()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
line = line.strip()
|
|
||||||
assert isinstance(line, str)
|
|
||||||
# # subprocess.run(["notify-send", f"Received command: {line}"])
|
|
||||||
if line.startswith("set "):
|
|
||||||
# subprocess.run(["notify-send", f"Received set: {line}"])
|
|
||||||
_, payload = line.split(" ", 1)
|
|
||||||
diagnostics = parse_specs(payload)
|
|
||||||
_ = outfile.write("ok\n")
|
|
||||||
outfile.flush()
|
|
||||||
elif line.startswith("query "):
|
|
||||||
_, pos = line.split(" ", 1)
|
|
||||||
l, c = map(int, pos.strip().split())
|
|
||||||
result = is_cursor_in_any((l, c), diagnostics)
|
|
||||||
_ = outfile.write("true\n" if result else "false\n")
|
|
||||||
outfile.flush()
|
|
||||||
elif line.startswith("exit"):
|
|
||||||
# subprocess.run(["notify-send", "exit received"])
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
1
.config/kak/scripts/lsp-diags.py
Symbolic link
1
.config/kak/scripts/lsp-diags.py
Symbolic link
|
@ -0,0 +1 @@
|
||||||
|
/home/fic/dev/kak-lsp-diags/lsp-diags.py
|
132
.config/kak/scripts/lsp-diags_.py
Executable file
132
.config/kak/scripts/lsp-diags_.py
Executable file
|
@ -0,0 +1,132 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
# pyright: basic, reportUnusedCallResult=false
|
||||||
|
|
||||||
|
import atexit
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import signal
|
||||||
|
|
||||||
|
Position = tuple[int, int]
|
||||||
|
SpecList = list[tuple[Position, Position]]
|
||||||
|
|
||||||
|
diagnostics: SpecList = []
|
||||||
|
|
||||||
|
|
||||||
|
def parse_specs(data: str):
|
||||||
|
parsed: SpecList = []
|
||||||
|
for entry in data.strip().split():
|
||||||
|
if not entry or len(entry) < 9:
|
||||||
|
continue
|
||||||
|
range_part, _ = entry.split("|", 1)
|
||||||
|
start_str, end_str = range_part.split(",")
|
||||||
|
sl, sc = map(int, start_str.split("."))
|
||||||
|
el, ec = map(int, end_str.split("."))
|
||||||
|
parsed.append(((sl, sc), (el, ec)))
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
|
||||||
|
def is_cursor_in_any(cursor: Position, diagnostics: SpecList) -> bool:
|
||||||
|
cl, cc = cursor
|
||||||
|
for (sl, sc), (el, ec) in diagnostics:
|
||||||
|
if cl < sl or cl > el:
|
||||||
|
continue
|
||||||
|
if sl == el:
|
||||||
|
if cl == sl and sc <= cc <= ec:
|
||||||
|
return True
|
||||||
|
elif cl == sl:
|
||||||
|
if cc >= sc:
|
||||||
|
return True
|
||||||
|
elif cl == el:
|
||||||
|
if cc <= ec:
|
||||||
|
return True
|
||||||
|
elif sl < cl < el:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup(inp: str, outp: str, dir: str):
|
||||||
|
# subprocess.run(["notify-send", "cleanup runs"])
|
||||||
|
try:
|
||||||
|
os.remove(inp)
|
||||||
|
os.remove(outp)
|
||||||
|
os.rmdir(dir)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def gen_kakoune_output(inp: str, outp: str) -> str:
|
||||||
|
return f"declare-option -hidden str diagpipe_in {inp}\ndeclare-option -hidden str diagpipe_out {outp}"
|
||||||
|
|
||||||
|
|
||||||
|
def daemonize(inp: str, outp: str, dir: str):
|
||||||
|
# fork and exit parent
|
||||||
|
if os.fork() > 0:
|
||||||
|
sys.exit(0)
|
||||||
|
# new session
|
||||||
|
os.setsid()
|
||||||
|
if os.fork() > 0:
|
||||||
|
# exit first child
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# redirect IO to /dev/null
|
||||||
|
with open("/dev/null", "rb", 0) as dn:
|
||||||
|
os.dup2(dn.fileno(), sys.stdin.fileno())
|
||||||
|
with open("/dev/null", "ab", 0) as dn:
|
||||||
|
os.dup2(dn.fileno(), sys.stdout.fileno())
|
||||||
|
os.dup2(dn.fileno(), sys.stderr.fileno())
|
||||||
|
_ = atexit.register(lambda: cleanup(inp, outp, dir))
|
||||||
|
|
||||||
|
def on_exit(*_):
|
||||||
|
# cleanup(inp, outp, dir)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
signal.signal(signal.SIGTERM, on_exit)
|
||||||
|
signal.signal(signal.SIGINT, on_exit)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# subprocess.run(["notify-send", "begin loop"])
|
||||||
|
# create unique directory and names
|
||||||
|
fifo_dir = tempfile.mkdtemp(prefix="diagpipe-")
|
||||||
|
in_path = os.path.join(fifo_dir, "in")
|
||||||
|
out_path = os.path.join(fifo_dir, "out")
|
||||||
|
|
||||||
|
# create fifos
|
||||||
|
os.mkfifo(in_path)
|
||||||
|
os.mkfifo(out_path)
|
||||||
|
|
||||||
|
output = gen_kakoune_output(in_path, out_path)
|
||||||
|
print(output)
|
||||||
|
sys.stdout.flush()
|
||||||
|
daemonize(in_path, out_path, fifo_dir)
|
||||||
|
|
||||||
|
with open(in_path, "r") as infile, open(out_path, "w") as outfile:
|
||||||
|
diagnostics: SpecList = []
|
||||||
|
while True:
|
||||||
|
line = infile.readline()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
line = line.strip()
|
||||||
|
assert isinstance(line, str)
|
||||||
|
# # subprocess.run(["notify-send", f"Received command: {line}"])
|
||||||
|
if line.startswith("set "):
|
||||||
|
# subprocess.run(["notify-send", f"Received set: {line}"])
|
||||||
|
_, payload = line.split(" ", 1)
|
||||||
|
diagnostics = parse_specs(payload)
|
||||||
|
_ = outfile.write("ok\n")
|
||||||
|
outfile.flush()
|
||||||
|
elif line.startswith("query "):
|
||||||
|
_, pos = line.split(" ", 1)
|
||||||
|
l, c = map(int, pos.strip().split())
|
||||||
|
result = is_cursor_in_any((l, c), diagnostics)
|
||||||
|
_ = outfile.write("true\n" if result else "false\n")
|
||||||
|
outfile.flush()
|
||||||
|
elif line.startswith("exit"):
|
||||||
|
# subprocess.run(["notify-send", "exit received"])
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
Add table
Add a link
Reference in a new issue