#!/bin/env python import os import socket import json import asyncio def p(obj): print(json.dumps(obj), flush=True) SOCKET = "/tmp/recorder-sock.sock" DEF_TT = "Click to record screen. Right-click to record region." DEF_TEXT = "rec" # Remove if already exists try: os.unlink(SOCKET) except OSError: # doesn't exist, we're good pass delayed_task = None async def delayed_msg(delay, message): # print(f"[DEBUG] Starting delayed message task, waiting {delay} seconds...", flush=True) try: await asyncio.sleep(delay) # print(f"[DEBUG] Delayed message task executed! Sending message: {message}", flush=True) p(message) except asyncio.CancelledError: # print("[DEBUG] Delayed message task was cancelled!", flush=True) pass def handle_message(data: str, loop): global delayed_task if delayed_task: delayed_task.cancel() out = {} out_s = "" out_t = "" if data: if data == "REC": out_s = "on" out_t = "Recording in progress. Click to stop." elif data == "CMP": out_s = "compressing" out_t = "Recording is being compressed." elif data == "CPD": out_s = "copied" out_t = "Recording has been copied to clipboard." elif data == "STP": out_s = "done" out_t = "Recording has been stopped." elif data == "ERR": out_s = "error" out_t = "Recording has encountered an error." else: # print("this runs a", flush=True) out_s = "" out_t = "" else: # print("this runs b", flush=True) out_s = "" out_t = "" out["text"] = f"rec: {out_s}" if out_s != "" else DEF_TEXT out["tooltip"] = out_t if out_t != "" else DEF_TT p(out) if data in ["ERR", "CPD", "STP"]: if delayed_task: # print("canceled delayed task", flush=True) delayed_task.cancel() delayed_out = {"text": DEF_TEXT, "tooltip": DEF_TT} # print("debug: creating delayed task", flush=True) delayed_task = loop.create_task(delayed_msg(5, delayed_out)) async def server(): loop = asyncio.get_running_loop() with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as server: server.bind(SOCKET) server.setblocking(False) server.listen(1) while True: conn, _ = await loop.sock_accept(server) with conn: data = (await loop.sock_recv(conn, 1024)).decode().strip() handle_message(data, loop) async def main(): p({"text": DEF_TEXT, "tooltip": DEF_TT}) await server() asyncio.run(main())