| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- #!/usr/bin/env python3
- # A little control script for OBS, using `websocat`.
- # Managing Python environments is _much_ harder than installing a binary with Homebrew,
- # hence why this used `websocat` and not the Python `websockets` library.
- #
- # Protocol documentation:
- # https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md
- # pyright: basic, reportAny=false, reportExplicitAny=false, reportUnusedCallResult=false
- from logging import DEBUG, ERROR, basicConfig, debug, error
- from os import environ
- from subprocess import (
- DEVNULL,
- CalledProcessError,
- Popen,
- PIPE,
- check_output,
- )
- from sys import argv, stdout
- from time import sleep
- from typing import cast
- from uuid import uuid4
- from json import JSONDecodeError, dumps, loads
- from signal import SIGALRM, signal, alarm
- OBS_WS_HOST = environ.get("OBS_WS_HOST", "localhost")
- OBS_WS_PORT = environ.get("OBS_WS_PORT", "4455")
- class WebsocketWrapper:
- def __init__(self, host: str, port: str, timeout: int = 60) -> None:
- self._conn_string = f"ws://{host}:{port}"
- self._process = Popen(
- ["websocat", self._conn_string], stdout=PIPE, stderr=PIPE, stdin=PIPE
- )
- if self._process.stdout is not None and self._process.stdin is not None:
- self._stdout = self._process.stdout
- self._stdin = self._process.stdin
- else:
- raise Exception("Invalid process IO")
- # Websockets are tricky, and so are process timeouts - this little hack
- # kills this script if it lives too long (without worrying about threads)
- def handler(*_):
- error(f"{self.__class__.__name__} timed out")
- self._process.kill()
- exit(1)
- signal(SIGALRM, handler)
- alarm(timeout)
- self._identify()
- def _receive_message(self):
- read = self._stdout.readline()
- decoded = read.decode()
- content = loads(decoded)
- debug(f"READ: {decoded}")
- return content
- def _send_message(self, message: str | dict):
- if type(message) == "dict":
- message = dumps(message)
- self._stdin.write((cast(str, message) + "\n").encode())
- self._stdin.flush()
- debug(f"WRITE: {message}")
- return message
- def _identify(self):
- self._receive_message()
- self._send_message(
- dumps({"op": 1, "d": {"rpcVersion": 1, "eventSubscriptions": 0}})
- )
- self._receive_message()
- def request(self, request_type: str, request_data: dict):
- self._send_message(
- dumps(
- {
- "op": 6,
- "d": {
- "requestType": request_type,
- "requestId": uuid4().__str__(),
- "requestData": request_data,
- },
- }
- )
- )
- return self._receive_message()
- class OBS:
- _default_name = "Default"
- _disabled_name = "Disabled"
- _tablecast_name = "Tablecast"
- def __init__(self, host: str, port: str) -> None:
- self._ensure_profiles = {
- self._disabled_name: {
- "General": {"Name": self._disabled_name},
- "Output": {
- "Mode": "Simple",
- "Reconnect": False,
- },
- "SimpleOutput": {
- "VBitrate": 200,
- "ABitrate": 20,
- "Preset": "veryfast",
- },
- "Video": {
- "BaseCX": 32,
- "BaseCY": 32,
- "OutputCX": 32,
- "OutputCY": 32,
- "FPSType": 2,
- "FPSCommon": 30,
- "FPSInt": 30,
- "FPSNum": 1,
- "FPSDen": 10,
- "ScaleType": "bicubic",
- "ColorFormat": "RGB",
- "ColorSpace": "sRGB",
- "ColorRange": "Partial",
- },
- },
- self._tablecast_name: {
- "General": {
- "Name": self._tablecast_name,
- },
- "Output": {"Mode": "Advanced"},
- "AdvOut": {
- "ApplyServiceSettings": "true",
- "UseRescale": "false",
- "Encoder": "com.apple.videotoolbox.videoencoder.ave.avc",
- "AudioEncoder": "ffmpeg_aac",
- "RecSplitFileType": "Time",
- "FFFormat": "rtsp",
- "FFVEncoderId": "27",
- "FFVEncoder": "h264_videotoolbox",
- "FFAEncoderId": "86076",
- "FFAEncoder": "libopus",
- "FFExtension": "FFURL=rtsp://localhost:8554/mystream",
- "FFVCustom": "bf=0",
- "RescaleFilter": "3",
- },
- "Video": {
- "BaseCX": "4096",
- "BaseCY": "2304",
- "OutputCX": "4096",
- "OutputCY": "2304",
- "FPSType": "0",
- "FPSCommon": "24 NTSC",
- },
- },
- self._default_name: {},
- }
- self._ensure_collections = {
- self._disabled_name: {},
- self._default_name: {},
- self._tablecast_name: {},
- }
- # Start OBS if it doesn't appear to be started already
- started = False
- if (
- len([o for o in check_output(["ps", "aux"]).split(b"\n") if b"OBS" in o])
- == 0
- ):
- Popen(
- [
- "obs",
- "--minimize-to-tray",
- "--disable-updater",
- "--disable-missing-files-check",
- "--disable-shutdown-check",
- "--profile",
- self._disabled_name,
- "--collection",
- self._disabled_name,
- ],
- start_new_session=True,
- stdin=DEVNULL,
- stdout=DEVNULL,
- stderr=DEVNULL,
- )
- started = True
- sleep(5)
- self._websocket = WebsocketWrapper(host, port, 60 if not started else 10 * 60)
- if started:
- # Create "ensured" items on startup
- self._ensure()
- self.request("SetCurrentProfile", {"profileName": self._disabled_name})
- self.request(
- "SetCurrentSceneCollection",
- {"sceneCollectionName": self._disabled_name},
- )
- def _ensure(self):
- for profile in set(self._ensure_collections.keys()).difference(
- set(self.request("GetProfileList", {})["profiles"])
- ):
- self.request("CreateProfile", {"profileName": profile})
- for category in self._ensure_profiles[profile]:
- for key in self._ensure_profiles[profile][category]:
- self.request(
- "SetProfileParameter",
- {
- "parameterCategory": category,
- "parameterName": key,
- "parameterValue": str(
- self._ensure_profiles[profile][category][key]
- ),
- },
- )
- for collection in set(self._ensure_profiles.keys()).difference(
- set(self.request("GetSceneCollectionList", {})["sceneCollections"])
- ):
- self.request("CreateSceneCollection", {"sceneCollectionName": collection})
- def request(self, request_type: str, request_data: dict):
- result = self._websocket.request(request_type, request_data)["d"]
- if (
- "requestStatus" in result
- and "code" in result["requestStatus"]
- and result["requestStatus"]["code"] == 207
- ):
- sleep(0.1)
- return self.request(request_type, request_data)
- if request_type == "StartVirtualCam":
- sleep(3)
- if "responseData" in result:
- return result["responseData"]
- return result
- def batch(self, requests: list[tuple[str,] | tuple[str, dict]]):
- for request in requests:
- if len(request) == 1:
- yield self.request(request[0], {})
- else:
- yield self.request(request[0], request[1])
- def parse(self, args: list[str]):
- requests: list[tuple[str,] | tuple[str, dict]] = []
- current: str | None = None
- for item in args:
- if current == "Activate":
- if (
- item in self._ensure_collections.keys()
- and item in self._ensure_profiles.keys()
- ):
- requests.extend(
- (
- ("SetCurrentProfile", {"profileName": item}),
- (
- "SetCurrentSceneCollection",
- {"sceneCollectionName": item},
- ),
- )
- )
- else:
- error(f"Cannot activate '{item}'")
- current = None
- elif current is not None:
- try:
- requests.append((current, loads(item)))
- current = None
- except JSONDecodeError:
- requests.append((current,))
- current = item
- else:
- current = item
- if current is not None:
- requests.append((current,))
- debug(requests)
- for response in self.batch(requests):
- yield response
- if __name__ == "__main__":
- basicConfig(level=ERROR)
- debug(f"Called with parameters {argv}")
- for program in ["websocat"]:
- try:
- _ = check_output([program, "--version"], stderr=DEVNULL)
- except CalledProcessError:
- pass
- except:
- error(f"Aborted, as `{program}` doesn't appear to be callable")
- for response in OBS(OBS_WS_HOST, OBS_WS_PORT).parse(argv[1:]):
- stdout.write(dumps(response) + "\n")
|