270 lines
7.8 KiB
Python
Executable file
270 lines
7.8 KiB
Python
Executable file
#!/bin/env python
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import sys
|
|
from typing import override
|
|
|
|
|
|
# helper prints dict as json string
|
|
def p(obj):
|
|
print(json.dumps(obj), flush=True)
|
|
|
|
|
|
log = logging.getLogger()
|
|
log.setLevel(logging.DEBUG)
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
# log_level = os.environ["NIRILOG"]
|
|
if "NIRILOG" in os.environ:
|
|
handler.setLevel(logging.DEBUG)
|
|
else:
|
|
handler.setLevel(logging.ERROR)
|
|
log.addHandler(handler)
|
|
SOCKET = os.environ["NIRI_SOCKET"]
|
|
|
|
|
|
class Window:
|
|
def __init__(self, id: int, title: str, app_id: str) -> None:
|
|
self.id = id
|
|
self.title = title
|
|
self.app_id = app_id
|
|
|
|
@override
|
|
def __str__(self):
|
|
return f"{self.app_id}: {self.title}"
|
|
|
|
|
|
class Workspace:
|
|
def __init__(self, id: int, output: str) -> None:
|
|
self.id = id
|
|
self.windows: dict[int, Window] = {}
|
|
# self.windows: set[int] = set()
|
|
self.output = output
|
|
|
|
@override
|
|
def __str__(self) -> str:
|
|
return str(list(self.windows))
|
|
|
|
def add(self, id, title: str, app_id: str):
|
|
w = Window(id, title, app_id)
|
|
self.windows[id] = w
|
|
|
|
def remove(self, id):
|
|
self.windows.pop(id)
|
|
|
|
def query(self, id) -> bool:
|
|
return id in self.windows
|
|
|
|
def change_output(self, new: str):
|
|
self.output = new
|
|
|
|
def count(self) -> int:
|
|
return len(self.windows)
|
|
|
|
def get_windows(self):
|
|
return list(self.windows.values())
|
|
|
|
|
|
class State:
|
|
def __init__(self):
|
|
self.focused_workspace: int = 0
|
|
self.active_workspaces: dict[str, int] = {}
|
|
# output name -> workspace: id -> window count
|
|
self.workspaces: dict[int, Workspace] = {}
|
|
self.first_run = True
|
|
|
|
@override
|
|
def __str__(self) -> str:
|
|
s = ""
|
|
s += f"focused: {self.focused_workspace}\n"
|
|
for output, id in self.active_workspaces.items():
|
|
s += f"{output}: {id}\n"
|
|
for id, ws in self.workspaces.items():
|
|
s += f"id: {id}, win: {ws}\n"
|
|
return s
|
|
|
|
def set_activated(self, id: int):
|
|
output = self.workspaces[id].output
|
|
self.active_workspaces[output] = id
|
|
|
|
def set_focused(self, id: int):
|
|
self.focused_workspace = id
|
|
|
|
def update_workspaces(self, arr: list[dict]):
|
|
ids = []
|
|
for ws in arr:
|
|
id = ws["id"]
|
|
output = ws["output"]
|
|
if id not in self.workspaces:
|
|
self.workspaces[id] = Workspace(id, output)
|
|
ids.append(id)
|
|
if ws["is_focused"]:
|
|
self.focused_workspace = id
|
|
if ws["is_active"]:
|
|
self.active_workspaces[output] = id
|
|
to_pop = []
|
|
for id in self.workspaces.keys():
|
|
if id not in ids:
|
|
to_pop.append(id)
|
|
for id in to_pop:
|
|
self.workspaces.pop(id)
|
|
|
|
def add_window(
|
|
self, workspace_id: int, window_id: int, title: str, app_id: str
|
|
):
|
|
self.workspaces[workspace_id].add(window_id, title, app_id)
|
|
|
|
def remove_window(self, window_id: int):
|
|
for workspace in self.workspaces.values():
|
|
if workspace.query(window_id):
|
|
workspace.remove(window_id)
|
|
return
|
|
|
|
def update_windows(self, arr: list[dict]):
|
|
for win in arr:
|
|
window_id: int = win["id"]
|
|
workspace_id: int = win["workspace_id"]
|
|
title: str = win["title"]
|
|
app_id: str = win["app_id"]
|
|
self.add_window(workspace_id, window_id, title, app_id)
|
|
|
|
def get_count(self, output: str | None = None) -> int:
|
|
if output is None:
|
|
return self.workspaces[self.focused_workspace].count()
|
|
else:
|
|
id = self.active_workspaces[output]
|
|
return self.workspaces[id].count()
|
|
|
|
def get_windows(
|
|
self, output: str | None = None, ws_id: int | None = None
|
|
):
|
|
if ws_id is None:
|
|
if output is None:
|
|
ws_id = self.focused_workspace
|
|
else:
|
|
ws_id = self.active_workspaces[output]
|
|
return self.workspaces[ws_id].get_windows()
|
|
|
|
|
|
state = State()
|
|
|
|
|
|
def display():
|
|
print(generate_message(), flush=True)
|
|
|
|
|
|
def generate_message() -> str:
|
|
obj = {"text": generate_text(), "tooltip": generate_tooltip()}
|
|
return json.dumps(obj)
|
|
|
|
|
|
def generate_tooltip(output=None) -> str:
|
|
windows = state.get_windows(output)
|
|
s = ""
|
|
for _, w in enumerate(windows):
|
|
s += f"{str(w)}\r\n"
|
|
return s
|
|
|
|
|
|
def generate_text(icon="", output=None):
|
|
count = state.get_count(output)
|
|
out = " ".join([icon] * count)
|
|
if log.level <= logging.DEBUG:
|
|
log.debug(str(state))
|
|
return out
|
|
|
|
|
|
# function handles message from socket
|
|
def handle_message(event: dict):
|
|
log.debug("Handling message.")
|
|
log.debug(event)
|
|
should_display = False
|
|
match next(iter(event)):
|
|
case "WorkspacesChanged":
|
|
workspaces: list[dict] = event["WorkspacesChanged"][
|
|
"workspaces"
|
|
]
|
|
state.update_workspaces(workspaces)
|
|
log.info("Updated workspaces.")
|
|
should_display = True
|
|
case "WindowsChanged":
|
|
windows: list[dict] = event["WindowsChanged"]["windows"]
|
|
state.update_windows(windows)
|
|
log.info("Updated windows.")
|
|
should_display = True
|
|
# workspaces : list[dict] = event["WorkspacesChanged"]["workspaces"]
|
|
# state.process_changed(workspaces)
|
|
case "WorkspaceActivated":
|
|
ev = event["WorkspaceActivated"]
|
|
if ev["focused"]:
|
|
state.set_focused(ev["id"])
|
|
log.info("Changed focused workspace.")
|
|
state.set_activated(ev["id"])
|
|
should_display = True
|
|
case "WindowOpenedOrChanged":
|
|
# This event also handles window moved across workspace
|
|
window = event["WindowOpenedOrChanged"]["window"]
|
|
window_id, workspace_id = window["id"], window["workspace_id"]
|
|
state.remove_window(window_id)
|
|
title = window["title"]
|
|
app_id = window["app_id"]
|
|
state.add_window(workspace_id, window_id, title, app_id)
|
|
log.info("Updated window.")
|
|
should_display = True
|
|
case "WindowClosed":
|
|
# TODO: update Workspace to track window IDs
|
|
ev = event["WindowClosed"]
|
|
id: int = ev["id"]
|
|
state.remove_window(id)
|
|
log.info("Removed window.")
|
|
should_display = True
|
|
return should_display
|
|
|
|
|
|
# start the server
|
|
def server():
|
|
# pointer to this event loop
|
|
# open our socket
|
|
log.info(f"Connecting to Niri socket @ {SOCKET}")
|
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
|
|
client.connect(SOCKET)
|
|
# needed for async
|
|
client.sendall('"EventStream"'.encode() + b"\n")
|
|
client.shutdown(socket.SHUT_WR)
|
|
|
|
# only one connection at a time
|
|
log.info("Connection successful, starting event loop.")
|
|
# main loop; runs until waybar exits
|
|
while True:
|
|
# receive data from socket
|
|
data = client.recv(4096)
|
|
if not data:
|
|
log.debug("No data!")
|
|
for line in data.split(b"\n"):
|
|
if line.strip():
|
|
try:
|
|
event = json.loads(line)
|
|
if handle_message(event):
|
|
display()
|
|
except json.JSONDecodeError:
|
|
print(
|
|
"Malformed JSON:",
|
|
line.decode(errors="replace"),
|
|
)
|
|
|
|
|
|
# main function
|
|
def main():
|
|
# start by outputing default contents
|
|
# start the server
|
|
try:
|
|
server()
|
|
except Exception as e:
|
|
print(e, flush=True)
|
|
log.error(e)
|
|
|
|
|
|
# entry point
|
|
main()
|