commit fd68d9f4633203ed43167ee84815273a65f71714 Author: Daniel Fichtinger Date: Thu Jul 17 22:57:25 2025 -0400 Initial commit diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1cbda58 --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2025 Daniel Fichtinger + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. Neither the name of the author nor the names of its contributors may + be used to endorse or promote products derived from this software + +THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..8143593 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +# niri scripts + +This repository contains scripts and Waybar modules for Niri. + +## picker + +A simple window picker script. Use a `dmenu` compliant picker (`fuzzel` by +default) to pick a window and switch focus to it. + +## recorder + +A script and Waybar module to simplify the process of making and sharing screen +recordings. Features: + +- Select a screen or region for recording. +- Automatically compress and copy recording URI to clipboard for easy pasting + into applications like Discord. +- Cleans up old recordings. +- Waybar module allows starting/stopping recording, displays recording status. + +Check the [README](./recorder/README.md) for usage instructions. + +## windows + +A Waybar module that provides an indicator of open windows in the focused +workspace. Helpful for those that take Niri's scrolling a tad too far, and often +end up with too many windows in a single workspace. + +Check the [README](./windows/README.md) for usage instructions. diff --git a/picker/window-picker.py b/picker/window-picker.py new file mode 100755 index 0000000..122c689 --- /dev/null +++ b/picker/window-picker.py @@ -0,0 +1,64 @@ +#!/bin/env python +from argparse import ArgumentParser +import subprocess +import json + +# JSON object to represent a window in Niri +type WindowJson = dict[str, int | str | bool] + +# Get a list of open windows from Niri +def get_windows(): + command = "niri msg -j windows" + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + data: list[WindowJson] = json.loads(process.communicate()[0]) + + return data + +# Generate a string representation for each window. +# Map the string to its Niri window ID +def get_string_id_mapping(window_list: list[WindowJson]): + mapping: dict[str, int] = {} + for idx, window in enumerate(window_list): + s = f"{idx}: {window.get("app_id")}: {window.get("title")}" + id = window.get("id") + assert type(id) == int + mapping[s] = id + return mapping + +# Generate the string to be sent to fuzzel +def get_input_string(mapping: dict[str, int]): + m = max([len(s) for s in mapping.keys()]) + return "\n".join(mapping.keys()), m + +def spawn_picker(cmd, input_string, m): + # cmd = "fuzzel --dmenu -I --placeholder=Select a window:" + cmd = f"{cmd} --width {min(m, 120) + 1}" + process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + selection = process.communicate(input=input_string.encode("UTF-8")) + if process.returncode != 2: + return selection[0].decode("UTF-8").strip("\n") + else: + return None + +def switch_window(id: int): + cmd = f"niri msg action focus-window --id {id}" + process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) + process.communicate() + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument("-p", "--picker", required=False, help="Set a picker command. Must take a newline delimited string on stdin and return the selection on stdout (default: %(default)s)", default="fuzzel --dmenu --placeholder=Select a window:", type=str, metavar="COMMAND") + args = parser.parse_args() + picker_cmd = args.picker + # print("picker:", picker_cmd) + wl = get_windows() + mapping = get_string_id_mapping(wl) + input_string, m= get_input_string(mapping) + selection = spawn_picker(picker_cmd, input_string, m) + if selection is None: + exit(1) + try: + id = mapping[selection] + except KeyError: + exit(1) + switch_window(id) diff --git a/recorder/README.md b/recorder/README.md new file mode 100644 index 0000000..9a4e892 --- /dev/null +++ b/recorder/README.md @@ -0,0 +1,53 @@ +# Niri Recorder + +Niri recorder is a shell script and waybar module to for creating screen +recordings meant to be easily shared online. + +### Dependencies + +- `socat` +- `bash` +- `wf-recorder` +- `niri` +- `jq` +- `ffmpeg` +- `slurp` +- `python3` (optional) +- `waybar` (optional) +- `wl-clipboard` + +## Recorder + +A utility script meant to be triggered via keybinding. It will record your +screen, then automatically compress the recording and copy its URI path to your +clipboard, so you can easily paste it inside applications like Discord and +Matrix. You do not need the Waybar module to use the script. + +**Usage**: + +`recorder.sh` + +1. Run the script once to begin recording. Arguments: + +- `screen` \[default]: record entire screen. Select screen if multi-monitor. +- `region`: Select a region to record. + +2. Run the script again to stop recording. + +- Arguments don't matter this time. +- Compression and copying to clipboard is done automatically. + +## Waybar Module + +There is an included waybar module. This module shows the current recording +state. You can also use it to start/stop the recording with your mouse. Please +see `recorder_config.jsonc` for an example of how to setup the custom module. + +You can also use `test_waybar.sh` to test the module without creating any +recordings. After you've loaded the Waybar module, simply run the script, and +you should see the module responding to socket messages. + +## Acknowledgments + +- Thanks to [Axlefublr](https://axlefublr.github.io/screen-recording/) for the + method to optimize the compression of the video. diff --git a/recorder/module_config.jsonc b/recorder/module_config.jsonc new file mode 100644 index 0000000..b62a00e --- /dev/null +++ b/recorder/module_config.jsonc @@ -0,0 +1,9 @@ +{ + "custom/recorder": { + "exec": "/path/to/recorder.py", + "return-type": "json", + "restart-interval": "never", + "on-click": "/path/to/recorder.sh screen", + "on-click-right": "/path/to/recorder.sh region" + } +} \ No newline at end of file diff --git a/recorder/recorder.py b/recorder/recorder.py new file mode 100755 index 0000000..5ca7136 --- /dev/null +++ b/recorder/recorder.py @@ -0,0 +1,107 @@ +#!/bin/env python + +import os +import socket +import json +import asyncio + +# helper prints dict as json string +def p(obj): + print(json.dumps(obj), flush=True) + +SOCKET = "/tmp/recorder-sock.sock" +# default tooltip +DEF_TT = "Click to record screen. Right-click to record region." +# default text +DEF_TEXT = "rec" + +# Remove socket already exists +try: + os.unlink(SOCKET) +except OSError: + # doesn't exist, we're good + pass + +# track singleton delayed task +delayed_task = None + +# async function to send a message with delay +async def delayed_msg(delay, message): + try: + await asyncio.sleep(delay) + p(message) + except asyncio.CancelledError: + pass + +# function handles message from socket +def handle_message(data: str, loop): + global delayed_task + # always cancel delayed task + # if there's new message + if delayed_task: + delayed_task.cancel() + out = {} + out_s = "" + out_t = "" + # process the message type + if data: + if data == "REC": + out_s = "on" + out_t = "Recording in progress. Click to stop." + elif data == "CMP": + out_s = "compressing" + out_t = "Recording is being compressed." + elif data == "CPD": + out_s = "copied" + out_t = "Recording has been copied to clipboard." + elif data == "STP": + out_s = "done" + out_t = "Recording has been stopped." + elif data == "ERR": + out_s = "error" + out_t = "Recording has encountered an error." + # format the output + out["text"] = f"rec: {out_s}" if out_s != "" else DEF_TEXT + out["tooltip"] = out_t if out_t != "" else DEF_TT + # print to waybar + p(out) + + # check if delayed message should be sent afterwards + if data in ["ERR", "CPD", "STP"]: + # probably redundant but... for good measure lol + if delayed_task: + delayed_task.cancel() + # delayed print of default output + delayed_out = {"text": DEF_TEXT, "tooltip": DEF_TT} + delayed_task = loop.create_task(delayed_msg(5, delayed_out)) + +# start the server +async def server(): + # pointer to this event loop + loop = asyncio.get_running_loop() + # open our socket + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server: + server.bind(SOCKET) + # needed for async + server.setblocking(False) + # only one connection at a time + server.listen(1) + # main loop; runs until waybar exits + while True: + # receive data from socket + conn, _ = await loop.sock_accept(server) + with conn: + # parse string + data = (await loop.sock_recv(conn, 1024)).decode().strip() + # handle the data + handle_message(data, loop) + +# main function +async def main(): + # start by outputing default contents + p({"text": DEF_TEXT, "tooltip": DEF_TT}) + # start the server + await server() + +# entry point +asyncio.run(main()) diff --git a/recorder/recorder.sh b/recorder/recorder.sh new file mode 100755 index 0000000..a0d288d --- /dev/null +++ b/recorder/recorder.sh @@ -0,0 +1,69 @@ +#!/bin/env bash + +# depends: wf-recorder, libnotify + +TEMPDIR="/tmp/niri-recorder" +mkdir -p "$TEMPDIR" +LOCK="$TEMPDIR/lock" +RFMT=".mkv" +OFMT=".mp4" +RAW="$TEMPDIR/raw$RFMT" +OUTDIR="$HOME/Videos/niri-recorder" +mkdir -p "$OUTDIR" + +# sends current recording state to socket +function sig { + echo "$1" | socat - UNIX-CONNECT:/tmp/recorder-sock.sock +} + +# function generates unique name for recording +function compname { + now=$(date +"%y-%m-%d-%H:%M:%S") + echo "$OUTDIR/$now$OFMT" +} + +# First we need to check if the recording +# lockfile exists. +if [[ -f "$LOCK" ]]; then + # Stop the recording + sig "STP" + kill "$(cat "$LOCK")" + # remove lockfile + rm "$LOCK" + outpath="$(compname)" + sig "CMP" + # compress the recording + ffmpeg -y -i "$RAW" -c:v libx264 -preset slow -crf 21 -r 30 -b:v 2M -maxrate 3M -bufsize 4M -c:a aac -b:a 96k -movflags +faststart "$outpath" || (sig "ERR"; exit 1) + # copy URI path to clipboard + wl-copy -t 'text/uri-list' <<<"file://$outpath" || (sig "ERR"; exit 1) + sig "CPD" + # delete the raw recording + rm "$RAW" + +else + # count how many monitors are attached + num_mon=$(niri msg --json outputs | jq 'keys | length') + wf_flags="-Dyf" + if [ "$1" = "region" ];then + # select a screen region + sel=$(slurp) || exit 1 + wf-recorder -g "$sel" "$wf_flags" "$RAW" & + elif [ "$1" = "screen" ] || [ "$1" = "" ] && (( num_mon > 1 )); then + # select entire screen + sel=$(slurp -o) || exit 1 + wf-recorder -g "$sel" "$wf_flags" "$RAW" & + else + # this runs when screen is specified and there's only one monitor + # it also runs with no args bc screen is default + wf-recorder "$wf_flags" "$RAW" & + fi + + sig "REC" + # create lockfile + touch "$LOCK" + + # save recorder's process id to lockfile + PID=$! + echo "$PID" > "$LOCK" +fi + diff --git a/recorder/test-waybar.sh b/recorder/test-waybar.sh new file mode 100755 index 0000000..44f681b --- /dev/null +++ b/recorder/test-waybar.sh @@ -0,0 +1,14 @@ +#!/bin/env bash + +echo "REC" | socat - UNIX-CONNECT:/tmp/recorder-sock.sock; \ +sleep 2; \ +echo "ERR" | socat - UNIX-CONNECT:/tmp/recorder-sock.sock; \ +sleep 2; \ +echo "REC" | socat - UNIX-CONNECT:/tmp/recorder-sock.sock; \ +sleep 2; \ +echo "STP" | socat - UNIX-CONNECT:/tmp/recorder-sock.sock; \ +sleep 2; \ +echo "CMP" | socat - UNIX-CONNECT:/tmp/recorder-sock.sock; \ +sleep 2; \ +echo "CPD" | socat - UNIX-CONNECT:/tmp/recorder-sock.sock; \ +sleep 2; diff --git a/windows/README.md b/windows/README.md new file mode 100644 index 0000000..eebe0bb --- /dev/null +++ b/windows/README.md @@ -0,0 +1,23 @@ +# niri-windows + +Waybar module for counting windows open in current workspace. + +## Features + +- Tracks the state of windows per workspace. +- Shows number of windows in the focused workspace. +- Hover to show the applications and window titles. + +## Usage + +Add the following snippet to the module configuration section of your Waybar +config: + +```jsonc +"custom/colcount": { + "exec": "~/path/to/niri-windows.py", + "return-type": "json", + "restart-interval": "never", + "format": "{} ", +}, +``` diff --git a/windows/niri-windows.py b/windows/niri-windows.py new file mode 100755 index 0000000..09f1e4c --- /dev/null +++ b/windows/niri-windows.py @@ -0,0 +1,270 @@ +#!/bin/env python + +import json +import logging +import os +import socket +import sys +from typing import override + + +# helper prints dict as json string +def p(obj): + print(json.dumps(obj), flush=True) + + +log = logging.getLogger() +log.setLevel(logging.DEBUG) +handler = logging.StreamHandler(sys.stdout) +# log_level = os.environ["NIRILOG"] +if "NIRILOG" in os.environ: + handler.setLevel(logging.DEBUG) +else: + handler.setLevel(logging.ERROR) +log.addHandler(handler) +SOCKET = os.environ["NIRI_SOCKET"] + + +class Window: + def __init__(self, id: int, title: str, app_id: str) -> None: + self.id = id + self.title = title + self.app_id = app_id + + @override + def __str__(self): + return f"{self.app_id}: {self.title}" + + +class Workspace: + def __init__(self, id: int, output: str) -> None: + self.id = id + self.windows: dict[int, Window] = {} + # self.windows: set[int] = set() + self.output = output + + @override + def __str__(self) -> str: + return str(list(self.windows)) + + def add(self, id, title: str, app_id: str): + w = Window(id, title, app_id) + self.windows[id] = w + + def remove(self, id): + self.windows.pop(id) + + def query(self, id) -> bool: + return id in self.windows + + def change_output(self, new: str): + self.output = new + + def count(self) -> int: + return len(self.windows) + + def get_windows(self): + return list(self.windows.values()) + + +class State: + def __init__(self): + self.focused_workspace: int = 0 + self.active_workspaces: dict[str, int] = {} + # output name -> workspace: id -> window count + self.workspaces: dict[int, Workspace] = {} + self.first_run = True + + @override + def __str__(self) -> str: + s = "" + s += f"focused: {self.focused_workspace}\n" + for output, id in self.active_workspaces.items(): + s += f"{output}: {id}\n" + for id, ws in self.workspaces.items(): + s += f"id: {id}, win: {ws}\n" + return s + + def set_activated(self, id: int): + output = self.workspaces[id].output + self.active_workspaces[output] = id + + def set_focused(self, id: int): + self.focused_workspace = id + + def update_workspaces(self, arr: list[dict]): + ids = [] + for ws in arr: + id = ws["id"] + output = ws["output"] + if id not in self.workspaces: + self.workspaces[id] = Workspace(id, output) + ids.append(id) + if ws["is_focused"]: + self.focused_workspace = id + if ws["is_active"]: + self.active_workspaces[output] = id + to_pop = [] + for id in self.workspaces.keys(): + if id not in ids: + to_pop.append(id) + for id in to_pop: + self.workspaces.pop(id) + + def add_window( + self, workspace_id: int, window_id: int, title: str, app_id: str + ): + self.workspaces[workspace_id].add(window_id, title, app_id) + + def remove_window(self, window_id: int): + for workspace in self.workspaces.values(): + if workspace.query(window_id): + workspace.remove(window_id) + return + + def update_windows(self, arr: list[dict]): + for win in arr: + window_id: int = win["id"] + workspace_id: int = win["workspace_id"] + title: str = win["title"] + app_id: str = win["app_id"] + self.add_window(workspace_id, window_id, title, app_id) + + def get_count(self, output: str | None = None) -> int: + if output is None: + return self.workspaces[self.focused_workspace].count() + else: + id = self.active_workspaces[output] + return self.workspaces[id].count() + + def get_windows( + self, output: str | None = None, ws_id: int | None = None + ): + if ws_id is None: + if output is None: + ws_id = self.focused_workspace + else: + ws_id = self.active_workspaces[output] + return self.workspaces[ws_id].get_windows() + + +state = State() + + +def display(): + print(generate_message(), flush=True) + + +def generate_message() -> str: + obj = {"text": generate_text(), "tooltip": generate_tooltip()} + return json.dumps(obj) + + +def generate_tooltip(output=None) -> str: + windows = state.get_windows(output) + s = "" + for _, w in enumerate(windows): + s += f"{str(w)}\r\n" + return s + + +def generate_text(icon="", output=None): + count = state.get_count(output) + out = " ".join([icon] * count) + if log.level <= logging.DEBUG: + log.debug(str(state)) + return out + + +# function handles message from socket +def handle_message(event: dict): + log.debug("Handling message.") + log.debug(event) + should_display = False + match next(iter(event)): + case "WorkspacesChanged": + workspaces: list[dict] = event["WorkspacesChanged"][ + "workspaces" + ] + state.update_workspaces(workspaces) + log.info("Updated workspaces.") + should_display = True + case "WindowsChanged": + windows: list[dict] = event["WindowsChanged"]["windows"] + state.update_windows(windows) + log.info("Updated windows.") + should_display = True + # workspaces : list[dict] = event["WorkspacesChanged"]["workspaces"] + # state.process_changed(workspaces) + case "WorkspaceActivated": + ev = event["WorkspaceActivated"] + if ev["focused"]: + state.set_focused(ev["id"]) + log.info("Changed focused workspace.") + state.set_activated(ev["id"]) + should_display = True + case "WindowOpenedOrChanged": + # This event also handles window moved across workspace + window = event["WindowOpenedOrChanged"]["window"] + window_id, workspace_id = window["id"], window["workspace_id"] + state.remove_window(window_id) + title = window["title"] + app_id = window["app_id"] + state.add_window(workspace_id, window_id, title, app_id) + log.info("Updated window.") + should_display = True + case "WindowClosed": + # TODO: update Workspace to track window IDs + ev = event["WindowClosed"] + id: int = ev["id"] + state.remove_window(id) + log.info("Removed window.") + should_display = True + return should_display + + +# start the server +def server(): + # pointer to this event loop + # open our socket + log.info(f"Connecting to Niri socket @ {SOCKET}") + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: + client.connect(SOCKET) + # needed for async + client.sendall('"EventStream"'.encode() + b"\n") + client.shutdown(socket.SHUT_WR) + + # only one connection at a time + log.info("Connection successful, starting event loop.") + # main loop; runs until waybar exits + while True: + # receive data from socket + data = client.recv(4096) + if not data: + log.debug("No data!") + for line in data.split(b"\n"): + if line.strip(): + try: + event = json.loads(line) + if handle_message(event): + display() + except json.JSONDecodeError: + print( + "Malformed JSON:", + line.decode(errors="replace"), + ) + + +# main function +def main(): + # start by outputing default contents + # start the server + try: + server() + except Exception as e: + print(e, flush=True) + log.error(e) + + +# entry point +main()