0
0

obs_client.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #!/usr/bin/env python3
  2. # A little control script for OBS, using `websocat`.
  3. # Managing Python environments is _much_ harder than installing a binary with Homebrew,
  4. # hence why this used `websocat` and not the Python `websockets` library.
  5. #
  6. # Protocol documentation:
  7. # https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md
  8. # pyright: basic, reportAny=false, reportExplicitAny=false, reportUnusedCallResult=false
  9. from logging import DEBUG, ERROR, basicConfig, debug, error
  10. from os import environ
  11. from subprocess import (
  12. DEVNULL,
  13. CalledProcessError,
  14. Popen,
  15. PIPE,
  16. check_output,
  17. )
  18. from sys import argv, stdout
  19. from time import sleep
  20. from typing import cast
  21. from uuid import uuid4
  22. from json import JSONDecodeError, dumps, loads
  23. from signal import SIGALRM, signal, alarm
  24. OBS_WS_HOST = environ.get("OBS_WS_HOST", "localhost")
  25. OBS_WS_PORT = environ.get("OBS_WS_PORT", "4455")
  26. class WebsocketWrapper:
  27. def __init__(self, host: str, port: str, timeout: int = 60) -> None:
  28. self._conn_string = f"ws://{host}:{port}"
  29. self._process = Popen(
  30. ["websocat", self._conn_string], stdout=PIPE, stderr=PIPE, stdin=PIPE
  31. )
  32. if self._process.stdout is not None and self._process.stdin is not None:
  33. self._stdout = self._process.stdout
  34. self._stdin = self._process.stdin
  35. else:
  36. raise Exception("Invalid process IO")
  37. # Websockets are tricky, and so are process timeouts - this little hack
  38. # kills this script if it lives too long (without worrying about threads)
  39. def handler(*_):
  40. error(f"{self.__class__.__name__} timed out")
  41. self._process.kill()
  42. exit(1)
  43. signal(SIGALRM, handler)
  44. alarm(timeout)
  45. self._identify()
  46. def _receive_message(self):
  47. read = self._stdout.readline()
  48. decoded = read.decode()
  49. content = loads(decoded)
  50. debug(f"READ: {decoded}")
  51. return content
  52. def _send_message(self, message: str | dict):
  53. if type(message) == "dict":
  54. message = dumps(message)
  55. self._stdin.write((cast(str, message) + "\n").encode())
  56. self._stdin.flush()
  57. debug(f"WRITE: {message}")
  58. return message
  59. def _identify(self):
  60. self._receive_message()
  61. self._send_message(
  62. dumps({"op": 1, "d": {"rpcVersion": 1, "eventSubscriptions": 0}})
  63. )
  64. self._receive_message()
  65. def request(self, request_type: str, request_data: dict):
  66. self._send_message(
  67. dumps(
  68. {
  69. "op": 6,
  70. "d": {
  71. "requestType": request_type,
  72. "requestId": uuid4().__str__(),
  73. "requestData": request_data,
  74. },
  75. }
  76. )
  77. )
  78. return self._receive_message()
  79. class OBS:
  80. _default_name = "Default"
  81. _disabled_name = "Disabled"
  82. _tablecast_name = "Tablecast"
  83. def __init__(self, host: str, port: str) -> None:
  84. self._ensure_profiles = {
  85. self._disabled_name: {
  86. "General": {"Name": self._disabled_name},
  87. "Output": {
  88. "Mode": "Simple",
  89. "Reconnect": False,
  90. },
  91. "SimpleOutput": {
  92. "VBitrate": 200,
  93. "ABitrate": 20,
  94. "Preset": "veryfast",
  95. },
  96. "Video": {
  97. "BaseCX": 32,
  98. "BaseCY": 32,
  99. "OutputCX": 32,
  100. "OutputCY": 32,
  101. "FPSType": 2,
  102. "FPSCommon": 30,
  103. "FPSInt": 30,
  104. "FPSNum": 1,
  105. "FPSDen": 10,
  106. "ScaleType": "bicubic",
  107. "ColorFormat": "RGB",
  108. "ColorSpace": "sRGB",
  109. "ColorRange": "Partial",
  110. },
  111. },
  112. self._tablecast_name: {
  113. "General": {
  114. "Name": self._tablecast_name,
  115. },
  116. "Output": {"Mode": "Advanced"},
  117. "AdvOut": {
  118. "ApplyServiceSettings": "true",
  119. "UseRescale": "false",
  120. "Encoder": "com.apple.videotoolbox.videoencoder.ave.avc",
  121. "AudioEncoder": "ffmpeg_aac",
  122. "RecSplitFileType": "Time",
  123. "FFFormat": "rtsp",
  124. "FFVEncoderId": "27",
  125. "FFVEncoder": "h264_videotoolbox",
  126. "FFAEncoderId": "86076",
  127. "FFAEncoder": "libopus",
  128. "FFExtension": "FFURL=rtsp://localhost:8554/mystream",
  129. "FFVCustom": "bf=0",
  130. "RescaleFilter": "3",
  131. },
  132. "Video": {
  133. "BaseCX": "4096",
  134. "BaseCY": "2304",
  135. "OutputCX": "4096",
  136. "OutputCY": "2304",
  137. "FPSType": "0",
  138. "FPSCommon": "24 NTSC",
  139. },
  140. },
  141. self._default_name: {},
  142. }
  143. self._ensure_collections = {
  144. self._disabled_name: {},
  145. self._default_name: {},
  146. self._tablecast_name: {},
  147. }
  148. # Start OBS if it doesn't appear to be started already
  149. started = False
  150. if (
  151. len([o for o in check_output(["ps", "aux"]).split(b"\n") if b"OBS" in o])
  152. == 0
  153. ):
  154. Popen(
  155. [
  156. "obs",
  157. "--minimize-to-tray",
  158. "--disable-updater",
  159. "--disable-missing-files-check",
  160. "--disable-shutdown-check",
  161. "--profile",
  162. self._disabled_name,
  163. "--collection",
  164. self._disabled_name,
  165. ],
  166. start_new_session=True,
  167. stdin=DEVNULL,
  168. stdout=DEVNULL,
  169. stderr=DEVNULL,
  170. )
  171. started = True
  172. sleep(5)
  173. self._websocket = WebsocketWrapper(host, port, 60 if not started else 10 * 60)
  174. if started:
  175. # Create "ensured" items on startup
  176. self._ensure()
  177. self.request("SetCurrentProfile", {"profileName": self._disabled_name})
  178. self.request(
  179. "SetCurrentSceneCollection",
  180. {"sceneCollectionName": self._disabled_name},
  181. )
  182. def _ensure(self):
  183. for profile in set(self._ensure_collections.keys()).difference(
  184. set(self.request("GetProfileList", {})["profiles"])
  185. ):
  186. self.request("CreateProfile", {"profileName": profile})
  187. for category in self._ensure_profiles[profile]:
  188. for key in self._ensure_profiles[profile][category]:
  189. self.request(
  190. "SetProfileParameter",
  191. {
  192. "parameterCategory": category,
  193. "parameterName": key,
  194. "parameterValue": str(
  195. self._ensure_profiles[profile][category][key]
  196. ),
  197. },
  198. )
  199. for collection in set(self._ensure_profiles.keys()).difference(
  200. set(self.request("GetSceneCollectionList", {})["sceneCollections"])
  201. ):
  202. self.request("CreateSceneCollection", {"sceneCollectionName": collection})
  203. def request(self, request_type: str, request_data: dict):
  204. result = self._websocket.request(request_type, request_data)["d"]
  205. if (
  206. "requestStatus" in result
  207. and "code" in result["requestStatus"]
  208. and result["requestStatus"]["code"] == 207
  209. ):
  210. sleep(0.1)
  211. return self.request(request_type, request_data)
  212. if request_type == "StartVirtualCam":
  213. sleep(3)
  214. if "responseData" in result:
  215. return result["responseData"]
  216. return result
  217. def batch(self, requests: list[tuple[str,] | tuple[str, dict]]):
  218. for request in requests:
  219. if len(request) == 1:
  220. yield self.request(request[0], {})
  221. else:
  222. yield self.request(request[0], request[1])
  223. def parse(self, args: list[str]):
  224. requests: list[tuple[str,] | tuple[str, dict]] = []
  225. current: str | None = None
  226. for item in args:
  227. if current == "Activate":
  228. if (
  229. item in self._ensure_collections.keys()
  230. and item in self._ensure_profiles.keys()
  231. ):
  232. requests.extend(
  233. (
  234. ("SetCurrentProfile", {"profileName": item}),
  235. (
  236. "SetCurrentSceneCollection",
  237. {"sceneCollectionName": item},
  238. ),
  239. )
  240. )
  241. else:
  242. error(f"Cannot activate '{item}'")
  243. current = None
  244. elif current is not None:
  245. try:
  246. requests.append((current, loads(item)))
  247. current = None
  248. except JSONDecodeError:
  249. requests.append((current,))
  250. current = item
  251. else:
  252. current = item
  253. if current is not None:
  254. requests.append((current,))
  255. debug(requests)
  256. for response in self.batch(requests):
  257. yield response
  258. if __name__ == "__main__":
  259. basicConfig(level=ERROR)
  260. debug(f"Called with parameters {argv}")
  261. for program in ["websocat"]:
  262. try:
  263. _ = check_output([program, "--version"], stderr=DEVNULL)
  264. except CalledProcessError:
  265. pass
  266. except:
  267. error(f"Aborted, as `{program}` doesn't appear to be callable")
  268. for response in OBS(OBS_WS_HOST, OBS_WS_PORT).parse(argv[1:]):
  269. stdout.write(dumps(response) + "\n")