#!/usr/bin/env python3 # A little control script for OBS, using `websocat`. # Managing Python environments is _much_ harder than installing a binary with Homebrew, # hence why this used `websocat` and not the Python `websockets` library. # # Protocol documentation: # https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md # pyright: basic, reportAny=false, reportExplicitAny=false, reportUnusedCallResult=false from logging import DEBUG, ERROR, basicConfig, debug, error from os import environ from subprocess import ( DEVNULL, CalledProcessError, Popen, PIPE, check_output, ) from sys import argv, stdout from time import sleep from typing import cast from uuid import uuid4 from json import JSONDecodeError, dumps, loads from signal import SIGALRM, signal, alarm OBS_WS_HOST = environ.get("OBS_WS_HOST", "localhost") OBS_WS_PORT = environ.get("OBS_WS_PORT", "4455") class WebsocketWrapper: def __init__(self, host: str, port: str, timeout: int = 60) -> None: self._conn_string = f"ws://{host}:{port}" self._process = Popen( ["websocat", self._conn_string], stdout=PIPE, stderr=PIPE, stdin=PIPE ) if self._process.stdout is not None and self._process.stdin is not None: self._stdout = self._process.stdout self._stdin = self._process.stdin else: raise Exception("Invalid process IO") # Websockets are tricky, and so are process timeouts - this little hack # kills this script if it lives too long (without worrying about threads) def handler(*_): error(f"{self.__class__.__name__} timed out") self._process.kill() exit(1) signal(SIGALRM, handler) alarm(timeout) self._identify() def _receive_message(self): read = self._stdout.readline() decoded = read.decode() content = loads(decoded) debug(f"READ: {decoded}") return content def _send_message(self, message: str | dict): if type(message) == "dict": message = dumps(message) self._stdin.write((cast(str, message) + "\n").encode()) self._stdin.flush() debug(f"WRITE: {message}") return message def _identify(self): self._receive_message() self._send_message( dumps({"op": 1, "d": {"rpcVersion": 1, "eventSubscriptions": 0}}) ) self._receive_message() def request(self, request_type: str, request_data: dict): self._send_message( dumps( { "op": 6, "d": { "requestType": request_type, "requestId": uuid4().__str__(), "requestData": request_data, }, } ) ) return self._receive_message() class OBS: _default_name = "Default" _disabled_name = "Disabled" _tablecast_name = "Tablecast" def __init__(self, host: str, port: str) -> None: self._ensure_profiles = { self._disabled_name: { "General": {"Name": self._disabled_name}, "Output": { "Mode": "Simple", "Reconnect": False, }, "SimpleOutput": { "VBitrate": 200, "ABitrate": 20, "Preset": "veryfast", }, "Video": { "BaseCX": 32, "BaseCY": 32, "OutputCX": 32, "OutputCY": 32, "FPSType": 2, "FPSCommon": 30, "FPSInt": 30, "FPSNum": 1, "FPSDen": 10, "ScaleType": "bicubic", "ColorFormat": "RGB", "ColorSpace": "sRGB", "ColorRange": "Partial", }, }, self._tablecast_name: { "General": { "Name": self._tablecast_name, }, "Output": {"Mode": "Advanced"}, "AdvOut": { "ApplyServiceSettings": "true", "UseRescale": "false", "Encoder": "com.apple.videotoolbox.videoencoder.ave.avc", "AudioEncoder": "ffmpeg_aac", "RecSplitFileType": "Time", "FFFormat": "rtsp", "FFVEncoderId": "27", "FFVEncoder": "h264_videotoolbox", "FFAEncoderId": "86076", "FFAEncoder": "libopus", "FFExtension": "FFURL=rtsp://localhost:8554/mystream", "FFVCustom": "bf=0", "RescaleFilter": "3", }, "Video": { "BaseCX": "4096", "BaseCY": "2304", "OutputCX": "4096", "OutputCY": "2304", "FPSType": "0", "FPSCommon": "24 NTSC", }, }, self._default_name: {}, } self._ensure_collections = { self._disabled_name: {}, self._default_name: {}, self._tablecast_name: {}, } # Start OBS if it doesn't appear to be started already started = False if ( len([o for o in check_output(["ps", "aux"]).split(b"\n") if b"OBS" in o]) == 0 ): Popen( [ "obs", "--minimize-to-tray", "--disable-updater", "--disable-missing-files-check", "--disable-shutdown-check", "--profile", self._disabled_name, "--collection", self._disabled_name, ], start_new_session=True, stdin=DEVNULL, stdout=DEVNULL, stderr=DEVNULL, ) started = True sleep(5) self._websocket = WebsocketWrapper(host, port, 60 if not started else 10 * 60) if started: # Create "ensured" items on startup self._ensure() self.request("SetCurrentProfile", {"profileName": self._disabled_name}) self.request( "SetCurrentSceneCollection", {"sceneCollectionName": self._disabled_name}, ) def _ensure(self): for profile in set(self._ensure_collections.keys()).difference( set(self.request("GetProfileList", {})["profiles"]) ): self.request("CreateProfile", {"profileName": profile}) for category in self._ensure_profiles[profile]: for key in self._ensure_profiles[profile][category]: self.request( "SetProfileParameter", { "parameterCategory": category, "parameterName": key, "parameterValue": str( self._ensure_profiles[profile][category][key] ), }, ) for collection in set(self._ensure_profiles.keys()).difference( set(self.request("GetSceneCollectionList", {})["sceneCollections"]) ): self.request("CreateSceneCollection", {"sceneCollectionName": collection}) def request(self, request_type: str, request_data: dict): result = self._websocket.request(request_type, request_data)["d"] if ( "requestStatus" in result and "code" in result["requestStatus"] and result["requestStatus"]["code"] == 207 ): sleep(0.1) return self.request(request_type, request_data) if request_type == "StartVirtualCam": sleep(3) if "responseData" in result: return result["responseData"] return result def batch(self, requests: list[tuple[str,] | tuple[str, dict]]): for request in requests: if len(request) == 1: yield self.request(request[0], {}) else: yield self.request(request[0], request[1]) def parse(self, args: list[str]): requests: list[tuple[str,] | tuple[str, dict]] = [] current: str | None = None for item in args: if current == "Activate": if ( item in self._ensure_collections.keys() and item in self._ensure_profiles.keys() ): requests.extend( ( ("SetCurrentProfile", {"profileName": item}), ( "SetCurrentSceneCollection", {"sceneCollectionName": item}, ), ) ) else: error(f"Cannot activate '{item}'") current = None elif current is not None: try: requests.append((current, loads(item))) current = None except JSONDecodeError: requests.append((current,)) current = item else: current = item if current is not None: requests.append((current,)) debug(requests) for response in self.batch(requests): yield response if __name__ == "__main__": basicConfig(level=ERROR) debug(f"Called with parameters {argv}") for program in ["websocat"]: try: _ = check_output([program, "--version"], stderr=DEVNULL) except CalledProcessError: pass except: error(f"Aborted, as `{program}` doesn't appear to be callable") for response in OBS(OBS_WS_HOST, OBS_WS_PORT).parse(argv[1:]): stdout.write(dumps(response) + "\n")