#!/usr/bin/env python3 ######################################################################################## # slack_client.py # # A tiny, standard-library only tool to read credentials from the Slack desktop app and # make requests to the Slack API on behalf of the user. It has access to the user's # Google Calendar via the Slack App, and the currently connected WiFi network via # `networksetup`. Requires `openssl` and `leveldbutil` to run. # # Not made to be portable. # Also a bit shit. ######################################################################################## from base64 import b64encode from collections.abc import Generator from enum import StrEnum from hashlib import pbkdf2_hmac from json import loads, dumps from logging import NOTSET, basicConfig, debug, error from os import getenv, listdir, makedirs, remove, stat from os.path import join, isfile, splitext, dirname from sqlite3 import connect as connect_sqlite from string import hexdigits from subprocess import check_output, CalledProcessError, DEVNULL, STDOUT from sys import exit, argv from sys import platform from tempfile import TemporaryDirectory from time import sleep, time from typing import cast from urllib import request, parse from datetime import datetime, timedelta from http.client import HTTPResponse JSONType = str | int | float | None | list["JSONType"] | dict[str, "JSONType"] JSONDict = dict[str, JSONType] class WorkTime(StrEnum): WEEKEND = "WEEKEND" OFF_HOURS = "OFF_HOURS" WORK_MORNING = "WORK_MORNING" WORK_AFTERNOON = "WORK_AFTERNOON" class EnvConfigKey(StrEnum): XDG_CACHE_HOME = "XDG_CACHE_HOME" SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK = "SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK" SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME = "SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME" SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE = "SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE" SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT = "SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT" SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK = "SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK" SLACK_CLIENT_NETWORK_NAME_HOME = "SLACK_CLIENT_NETWORK_NAME_HOME" SLACK_CLIENT_NETWORK_NAME_OFFICE = "SLACK_CLIENT_NETWORK_NAME_OFFICE" SLACK_CLIENT_NETWORK_NAME_TRANSIT = "SLACK_CLIENT_NETWORK_NAME_TRANSIT" SLACK_CLIENT_STATUS_HOME = "SLACK_CLIENT_STATUS_HOME" SLACK_CLIENT_STATUS_HOME_LATER_OFFICE = "SLACK_CLIENT_STATUS_HOME_LATER_OFFICE" SLACK_CLIENT_STATUS_HOME_MEETING = "SLACK_CLIENT_STATUS_HOME_MEETING" SLACK_CLIENT_STATUS_OFFICE = "SLACK_CLIENT_STATUS_OFFICE" SLACK_CLIENT_STATUS_OFFICE_MEETING = "SLACK_CLIENT_STATUS_OFFICE_MEETING" SLACK_CLIENT_STATUS_OFF_HOURS = "SLACK_CLIENT_STATUS_OFF_HOURS" SLACK_CLIENT_STATUS_OUT = "SLACK_CLIENT_STATUS_OUT" SLACK_CLIENT_STATUS_SICK = "SLACK_CLIENT_STATUS_SICK" SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE = "SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE" SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE = "SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE" SLACK_CLIENT_STATUS_VACATION = "SLACK_CLIENT_STATUS_VACATION" SLACK_CLIENT_STATUS_WEEKEND = "SLACK_CLIENT_STATUS_WEEKEND" class NetworkLocation(StrEnum): HOME = "HOME" OFFICE = "OFFICE" TRANSIT = "TRANSIT" UNKNOWN = "UNKNOWN" class CalendarWorkLocation(StrEnum): HOME = "HOME" OFFICE = "OFFICE" TRANSIT = "TRANSIT" UNKNOWN = "UNKNOWN" OUT = "OUT" SICK = "SICK" VACATION = "VACATION" class UserState(StrEnum): OFF = "OFF" WEEKEND = "WEEKEND" OUT = "OUT" SICK = "SICK" VACATION = "VACATION" UNKNOWN = "UNKNOWN" HOME = "HOME" HOME_LATER_OFFICE = "HOME_LATER_OFFICE" HOME_MEETING = "HOME_MEETING" OFFICE = "OFFICE" OFFICE_MEETING = "OFFICE_MEETING" TRANSIT_FROM_OFFICE = "TRANSIT_FROM_OFFICE" TRANSIT_FROM_OFFICE_MEETING = "TRANSIT_FROM_OFFICE_MEETING" TRANSIT_TO_OFFICE = "TRANSIT_TO_OFFICE" TRANSIT_TO_OFFICE_MEETING = "TRANSIT_TO_OFFICE_MEETING" class Cacheable: _ensured = False _cache_path: str = "cache.json" _cache_expiry = 60 * 60 * 24 def _ensure_and_clear_cache(self): if self._ensured: return makedirs(dirname(self._cache_path), exist_ok=True) if not isfile(self._cache_path): with open(self._cache_path, "w") as f: _ = f.write(dumps({})) self._ensured = True if ( datetime.now().timestamp() - stat(self._cache_path).st_birthtime > self._cache_expiry ): debug(f"Reset cache {self._cache_path}") remove(self._cache_path) with open(self._cache_path, "w") as f: _ = f.write(dumps({})) def _read_cache(self, key: str) -> JSONType: self._ensure_and_clear_cache() with open(self._cache_path, "r") as f: cache: JSONDict = loads(f.read()) if key in cache: debug(f"{self.__class__.__name__} read cached value `{key}`") return cache[key] def _write_cache(self, key: str, value: JSONType): self._ensure_and_clear_cache() with open(self._cache_path, "r") as f: cache: JSONDict = loads(f.read()) cache[key] = value with open(self._cache_path, "w") as f: _ = f.write(dumps(cache)) class TimeParser: def __init__(self, date: datetime) -> None: self._date = date def read( self, ) -> tuple[WorkTime, int, int, int, int]: return ( self._get_worktime(), self._get_workday_end_timestamp(), self._get_workday_start_timestamp(), self._get_future_round_timestamp(30), self._get_future_round_timestamp(60), ) def _get_worktime(self) -> WorkTime: weekday = int(self._date.strftime("%w")) is_weekday = bool(weekday % 6) hour = int(self._date.strftime("%H")) if not is_weekday or (weekday == 5 and hour >= 16): return WorkTime.WEEKEND if hour < 7 or hour >= 20: return WorkTime.OFF_HOURS elif hour <= 12: return WorkTime.WORK_MORNING else: return WorkTime.WORK_AFTERNOON def _get_workday_end_timestamp(self) -> int: return round( datetime( year=self._date.year, month=self._date.month, day=self._date.day, hour=17, ).timestamp() ) def _get_workday_start_timestamp(self) -> int: future = datetime( year=self._date.year, month=self._date.month, day=self._date.day, hour=8, minute=0, ) if self._date.hour > 17: future += timedelta(days=1) while future.weekday() >= 5: future += timedelta(days=1) return round(future.timestamp()) def _get_future_round_timestamp(self, minutes: int) -> int: future = self._date + timedelta(minutes=minutes) if future.minute % 15 == 0: future += timedelta(minutes=1) while future.minute % 15 != 0: future += timedelta(minutes=1) return round(future.timestamp()) class EnvConfigParser: def __init__(self): self._app_config: dict[EnvConfigKey, str] = {} for key in EnvConfigKey: value = getenv(key) if value is None: raise ValueError(f"Undefined environment variable: {key}") self._app_config[key] = value def read(self): return self._app_config def __getitem__(self, name: EnvConfigKey) -> str: return self._app_config[name] class NetworkParser: def __init__(self, env_config: EnvConfigParser): self._config = env_config.read() def read(self) -> NetworkLocation: connected = next( i.replace(" ", "").replace("\t", "") for i in ( check_output(["networksetup", "-listpreferredwirelessnetworks", "en0"]) .decode("utf8") .split("\n") )[1:] ) if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_HOME]: return NetworkLocation.HOME if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_OFFICE]: return NetworkLocation.OFFICE if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_TRANSIT]: return NetworkLocation.TRANSIT return NetworkLocation.UNKNOWN class CookiesParser(Cacheable): # Read and decrypt Chrome cookies from SQLite database # Useful resources: # * https://gist.github.com/creachadair/937179894a24571ce9860e2475a2d2ec # * https://n8henrie.com/2014/05/decrypt-chrome-cookies-with-python/ def __init__(self, env_config: EnvConfigParser, path: str): self._path = join(path, "Cookies") self._env_config = env_config self._key = None self._cookies: list[tuple[str, str]] | None = None self._cache_path = join( self._env_config[EnvConfigKey.XDG_CACHE_HOME], "slack_client", "cookies_cache.json", ) self._cache_expiry = 60 * 60 * 24 * 365 def read(self) -> Generator[tuple[str, str]]: if self._key is None: self._key = self._make_key() self._cookies = [] for key, value in self._iterate_values(): value = value[3:] decoded = self._decode(self._key, value) self._cookies.append((key, decoded)) yield self._cookies[-1] def _get_password(self, cache: bool = False) -> str: if cache: return str(self._read_cache("password")) try: security_result = check_output( ["security", "find-generic-password", "-g", "-s", "Slack Safe Storage"], stderr=STDOUT, ) except CalledProcessError: return self._get_password(True) password = next( l for l in security_result.decode("utf8").split("\n") if l.startswith("password:") ) password = password[11:-1] self._write_cache("password", password) return password def _make_key( self, ): return pbkdf2_hmac( "sha1", self._get_password().encode("utf8"), b"saltysalt", 1003 )[:16] def _decode(self, key: bytes, value: bytes, cache: bool = True) -> str: cache_key = b64encode(key + value).decode("ascii") if ( cache and (cached_value := self._read_cache(cache_key)) and type(cached_value) == str ): return cached_value with TemporaryDirectory() as directory: keyfile: str = join(directory, "keyfile") infile: str = join(directory, "infile") outfile: str = join(directory, "outfile") with open(keyfile, "wb") as f: _ = f.write(key) with open(infile, "wb") as f: _ = f.write(value) _ = check_output( [ "openssl", "enc", "-d", "-aes-128-cbc", "-K", "".join([hex(i)[2:] for i in key]), "-in", infile, "-out", outfile, "-iv", "".join([hex(i)[2:] for i in b" " * 16]), "-p", ] ) with open(outfile, "rb") as f: result = f.read() # Rather then spend time understanding the format of the decrypted bytes # (which does seem to vary), just throw out the first 8 bytes any time # we don't get a valid ASCII string from decoding. while True: try: result = result.decode("ascii") break except UnicodeDecodeError: result = result[8:] self._write_cache(cache_key, result) return result def _iterate_values(self) -> list[tuple[str, bytes]]: con = connect_sqlite(self._path) cur = con.cursor() res = cur.execute( 'SELECT name, encrypted_value FROM cookies WHERE host_key=".slack.com"' ) all: list[tuple[str, bytes]] = res.fetchall() return list([r for r in all]) class LevelDBParser: def __init__(self, path: str): self._path = join(path, "Local Storage/leveldb/") self._app_configs = None def read(self): if self._app_configs is None: self._app_configs = [c for c in self._load_configs()] return [c for c in self._app_configs] def _read_files(self): for file in listdir(self._path): file = join(self._path, file) if not isfile(file) or splitext(file)[-1] != ".ldb": continue content = ( check_output(["leveldbutil", "dump", file]).decode("utf8").split("\n") ) for line in content: if len(line) == 0: continue if line[0] != "'": debug("Invalid start character detected in " + file) continue yield line def _get_keys_and_values(self): for line in self._read_files(): key_end = line.find("'", 1) key = line[1 : key_end - 1] line = line[key_end:] line = line[line.find("@") :] line = line[line.find(":") :] line = line[line.find("'") :] if line[0] != "'" or line[-1] != "'": debug("Invalid start or end character detected") continue value = line[1:-1] yield key, value def _get_json_values(self): for key, value in self._get_keys_and_values(): while ( len(value) >= 4 and value[0] == "\\" and value[1] == "x" and value[2] in hexdigits and value[3] in hexdigits ): value = value[4:] try: value: JSONType = loads(value) except: continue if type(value) != dict: continue yield key, value def _load_configs(self): def find_config(data: JSONDict) -> JSONDict | None: if ( "token" in data and type(data["token"]) == str and "domain" in data and "url" in data and type(data["url"]) == str and "id" in data and data["url"].endswith(".slack.com/") and data["token"].startswith("xoxc") ): return data for _, value in data.items(): if type(value) == dict: return find_config(value) for _, value in self._get_json_values(): config = find_config(value) if config: yield config class CalendarParser: def __init__(self, blocks: list[list[str]], env_config: EnvConfigParser): self._blocks = blocks self._env_config = env_config def read( self, ) -> tuple[ CalendarWorkLocation, bool, tuple[list[tuple[list[int], str]], list[list[str]]] ]: location: CalendarWorkLocation = CalendarWorkLocation.UNKNOWN sick = False out = False vacation = False meeting = False current: list[list[str]] = [] items: list[tuple[list[int], str]] = [] for block in self._blocks: if block[-1] == "All-day" or block[-1].count("date_long_pretty") >= 2: if ( "|" + self._env_config[ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE ] in block[0] ): location = CalendarWorkLocation.OFFICE if ( "|" + self._env_config[ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME ] in block[0] ): location = CalendarWorkLocation.HOME if ( "|" + self._env_config[ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK ] in block[0] ): vacation = True if ( "|" + self._env_config[ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK ] in block[0] ): sick = True if ( "|" + self._env_config[ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT ] in block[0] ): out = True current.append(block) elif "!date^" in block[-1]: chunks = block[-1].split("!date^") times: list[int] = [] for chunk in chunks: subchunks = chunk.split("^") try: times.append(int(subchunks[0])) except: pass if len(times) != 2: continue items.append((times, block[0])) if times[0] < time() and times[1] > time(): meeting = True current.append(block) if out: location = CalendarWorkLocation.OUT if sick: location = CalendarWorkLocation.SICK if vacation: location = CalendarWorkLocation.VACATION return location, meeting, (items, current) class SlackClient(Cacheable): def __init__( self, env_config: EnvConfigParser, leveldb_parser: LevelDBParser, cookies: CookiesParser, ): self._env_config = env_config self._cookies = list(cookies.read()) # TODO: Add support for multiple Slack workspaces by selecting config by ID self._app_config = leveldb_parser.read().pop() self._cache_path = join( self._env_config[EnvConfigKey.XDG_CACHE_HOME], "slack_client", "slack_cache.json", ) self._cache_expiry = 60 * 60 self._token = ( self._app_config["token"] if "token" in self._app_config and type(self._app_config["token"]) == str else "" ) def _call( self, method: str, form_data: None | JSONDict = None, cache: bool = False, ) -> JSONDict: cache_key = dumps([method, form_data]) if cache and (cached := self._read_cache(cache_key)) and type(cached) == dict: return cached req = request.Request( f"{self._app_config['url']}/api/{method}", headers={ "Authorization": "Bearer " + self._token, "Cookie": ";".join([f"{k}={v}" for k, v in self._cookies]), }, data=parse.urlencode( (form_data if form_data else {}) | {"_x_mode": "online", "_x_sonic": "true", "_x_app_name": "client"} ).encode(), ) response = cast(HTTPResponse, request.urlopen(req)) result: JSONDict = loads(response.read()) if result["ok"]: if cache: self._write_cache(cache_key, result) return result raise ValueError("Bad result from Slack API call:", dumps(result)) def update_status(self, text: str, emoji: str, expiration: int): return self._call( "users.profile.set", { "profile": { "status_emoji": emoji, "status_expiration": expiration, "status_text": text, "status_text_canonical": "", "ooo_message": "", }, "_x_reason": "CustomStatusModal:handle_save", }, ) def update_presence(self, presence: bool): return self._call("profile.set", {"presence": "active" if presence else "away"}) def get_calendar(self): users = self._call("users.list", cache=True) calendar_user = next( i for i in cast(list[JSONType], users["members"]) if type(i) == dict and i["is_bot"] and "real_name" in i and i["real_name"] == "Google Calendar" ) bot_id: str = cast(str, cast(JSONDict, calendar_user["profile"])["bot_id"]) bot_convo = self._call( "conversations.open", {"users": calendar_user["id"]}, cache=True ) convo_id: str = cast(str, cast(JSONDict, bot_convo["channel"])["id"]) convo_info = self._call( "conversations.info", {"channel": convo_id, "return_app_home": True}, False ) # The content of this view might not be properly updated - try some tricks to # force it to refresh! team_id: str = cast( str, cast( JSONDict, convo_info["home_view"], )["team_id"], ) _ = self._call( "apps.home.dispatchOpenEvent", { "id": convo_id, "type": "home", "service_team_id": team_id, }, False, ) _ = self._call("apps.profile.get", {"bot": bot_id}, False) app_view = self._call( "views.get", { "view_id": cast(JSONDict, convo_info["home_view"])["id"], "_x_reason": "fetchView", }, False, ) # Click the "Tomorrow" button, then the "Today" button view: JSONDict = cast(JSONDict, app_view["view"]) view_id: str = cast(str, view["id"]) blocks: list[JSONDict] = cast(list[JSONDict], view["blocks"]) hash: str = cast(str, view["hash"]) def click_block_action(value: str): for block in blocks: if block["type"] != "actions": continue for button in cast(list[JSONDict], block["elements"]): if "value" not in button or button["value"] != value: continue button["block_id"] = block["block_id"] _ = self._call( "blocks.actions", { "service_id": bot_id, "service_team_id": team_id, "actions": dumps([button]), "container": dumps({"type": "view", "view_id": view_id}), "client_token": "web-" + hash.split(".")[0], }, False, ) break for button in ["AGENDA_TOMORROW", "AGENDA_TODAY"]: click_block_action(button) # Gotta sleep, to ensure that the app view has time to update sleep(5) app_view = self._call( "views.get", { "view_id": cast(JSONDict, convo_info["home_view"])["id"], "_x_reason": "fetchView", }, False, ) texts = [ cast(str, cast(JSONDict, b["text"])["text"]).split("\n") for b in cast(list[JSONDict], cast(JSONDict, app_view["view"])["blocks"]) if "text" in b ] texts = [t for t in texts if len(t) and any(len(b) for b in t) > 0] texts = [loads(t) for t in set(dumps(t) for t in texts)] return texts def userstate_parse( worktime: WorkTime, network_location: NetworkLocation, work_location: CalendarWorkLocation, meeting: bool, ) -> UserState: if network_location == NetworkLocation.OFFICE: if meeting and (worktime == "afternoon" or worktime == "evening"): return UserState.OFFICE_MEETING return UserState.OFFICE elif work_location == CalendarWorkLocation.VACATION: return UserState.VACATION elif work_location == CalendarWorkLocation.SICK: return UserState.SICK elif work_location == CalendarWorkLocation.OUT: return UserState.OUT elif worktime == WorkTime.WEEKEND: return UserState.WEEKEND elif worktime == WorkTime.OFF_HOURS: return UserState.OFF elif work_location == CalendarWorkLocation.OFFICE: if network_location == NetworkLocation.TRANSIT: if worktime == WorkTime.WORK_MORNING: if meeting: return UserState.TRANSIT_TO_OFFICE_MEETING return UserState.TRANSIT_TO_OFFICE elif worktime == WorkTime.WORK_AFTERNOON: if meeting: return UserState.TRANSIT_FROM_OFFICE_MEETING return UserState.TRANSIT_FROM_OFFICE elif network_location == NetworkLocation.HOME: if worktime == WorkTime.WORK_MORNING: return UserState.HOME_LATER_OFFICE elif worktime == WorkTime.WORK_AFTERNOON: return userstate_parse( worktime, network_location, CalendarWorkLocation.HOME, meeting ) else: return userstate_parse( worktime, NetworkLocation.OFFICE, work_location, meeting ) elif work_location == CalendarWorkLocation.HOME: if meeting: return UserState.HOME_MEETING return UserState.HOME elif work_location == CalendarWorkLocation.UNKNOWN: return userstate_parse( worktime, network_location, CalendarWorkLocation.HOME, meeting ) return UserState.UNKNOWN def envvar_to_slack_status(status: str) -> tuple[str, str]: if "|" not in status: raise ValueError(status) emoji, text = tuple(status.split("|"))[:2] return (":" + emoji + ":") if len(emoji) > 0 else emoji, text def main(): base_path = join(getenv("HOME", "~"), "Library/Application Support/Slack") env_config_parser = EnvConfigParser() leveldb_parser = LevelDBParser(base_path) cookies_parser = CookiesParser(env_config_parser, base_path) network_parser = NetworkParser(env_config_parser) time_parser = TimeParser(datetime.now()) client = SlackClient(env_config_parser, leveldb_parser, cookies_parser) calendar_parser = CalendarParser(client.get_calendar(), env_config_parser) worktime, workday_end_expiry, workday_start_expiry, halfhour_expiry, hour_expiry = ( time_parser.read() ) work_location, meeting, (calendar_items, _) = calendar_parser.read() for calendar_item in calendar_items: debug(f"Calendar item: {calendar_item}") network_location = network_parser.read() userstate = userstate_parse(worktime, network_location, work_location, meeting) debug( f"State information: \ {userstate=} \ {worktime=}, \ {network_location=}, \ {work_location=}, \ {meeting=}" ) status_emoji: None | str = None status_text: None | str = None status_expiration: None | int = None match userstate: case UserState.OFF: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFF_HOURS] ) status_expiration = workday_start_expiry case UserState.WEEKEND: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_WEEKEND] ) status_expiration = workday_start_expiry case UserState.OUT: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OUT] ) case UserState.SICK: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_SICK] ) status_expiration = workday_end_expiry case UserState.VACATION: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_VACATION] ) case UserState.HOME: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME] ) status_expiration = workday_end_expiry case UserState.HOME_LATER_OFFICE: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME_LATER_OFFICE] ) status_expiration = hour_expiry case UserState.HOME_MEETING: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME_MEETING] ) status_expiration = halfhour_expiry case UserState.OFFICE: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFFICE] ) status_expiration = workday_end_expiry case UserState.OFFICE_MEETING: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFFICE_MEETING] ) status_expiration = halfhour_expiry case UserState.TRANSIT_FROM_OFFICE: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE] ) status_expiration = halfhour_expiry case UserState.TRANSIT_FROM_OFFICE_MEETING: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE] ) status_expiration = halfhour_expiry case UserState.TRANSIT_TO_OFFICE: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE] ) status_expiration = halfhour_expiry case UserState.TRANSIT_TO_OFFICE_MEETING: status_emoji, status_text = envvar_to_slack_status( env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE] ) status_expiration = halfhour_expiry case UserState.UNKNOWN: pass _ = client.update_status( status_text if status_text else "", status_emoji if status_emoji else "", status_expiration if status_expiration else 0, ) debug(f"Updated status: {status_text=} {status_emoji=} {status_expiration=}") if __name__ == "__main__": basicConfig(level=NOTSET) debug(f"Called at {datetime.now().ctime()} with parameters {argv}") if platform != "darwin": error(f"This client only runs on macOS") exit(1) for program in ["networksetup", "leveldbutil", "openssl"]: try: _ = check_output([program, "--version"], stderr=DEVNULL) except CalledProcessError: pass except: error(f"Aborted, as `{program}` doesn't appear to be callable") exit(1) main()