| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876 |
- #!/usr/bin/env python3
- ########################################################################################
- # slack_client.py
- #
- # A tiny, standard-library only tool to read credentials from the Slack desktop app and
- # make requests to the Slack API on behalf of the user. It has access to the user's
- # Google Calendar via the Slack App, and the currently connected WiFi network via
- # `networksetup`. Requires `openssl` and `leveldbutil` to run.
- #
- # Not made to be portable.
- # Also a bit shit.
- ########################################################################################
- from base64 import b64encode
- from collections.abc import Generator
- from enum import StrEnum
- from hashlib import pbkdf2_hmac
- from json import loads, dumps
- from logging import NOTSET, basicConfig, debug, error
- from os import getenv, listdir, makedirs, remove, stat
- from os.path import join, isfile, splitext, dirname
- from sqlite3 import connect as connect_sqlite
- from string import hexdigits
- from subprocess import check_output, CalledProcessError, DEVNULL, STDOUT
- from sys import exit, argv
- from sys import platform
- from tempfile import TemporaryDirectory
- from time import sleep, time
- from typing import cast
- from urllib import request, parse
- from datetime import datetime, timedelta
- from http.client import HTTPResponse
- JSONType = str | int | float | None | list["JSONType"] | dict[str, "JSONType"]
- JSONDict = dict[str, JSONType]
- class WorkTime(StrEnum):
- WEEKEND = "WEEKEND"
- OFF_HOURS = "OFF_HOURS"
- WORK_MORNING = "WORK_MORNING"
- WORK_AFTERNOON = "WORK_AFTERNOON"
- class EnvConfigKey(StrEnum):
- XDG_CACHE_HOME = "XDG_CACHE_HOME"
- SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK = "SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK"
- SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME = "SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME"
- SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE = "SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE"
- SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT = "SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT"
- SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK = "SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK"
- SLACK_CLIENT_NETWORK_NAME_HOME = "SLACK_CLIENT_NETWORK_NAME_HOME"
- SLACK_CLIENT_NETWORK_NAME_OFFICE = "SLACK_CLIENT_NETWORK_NAME_OFFICE"
- SLACK_CLIENT_NETWORK_NAME_TRANSIT = "SLACK_CLIENT_NETWORK_NAME_TRANSIT"
- SLACK_CLIENT_STATUS_HOME = "SLACK_CLIENT_STATUS_HOME"
- SLACK_CLIENT_STATUS_HOME_LATER_OFFICE = "SLACK_CLIENT_STATUS_HOME_LATER_OFFICE"
- SLACK_CLIENT_STATUS_HOME_MEETING = "SLACK_CLIENT_STATUS_HOME_MEETING"
- SLACK_CLIENT_STATUS_OFFICE = "SLACK_CLIENT_STATUS_OFFICE"
- SLACK_CLIENT_STATUS_OFFICE_MEETING = "SLACK_CLIENT_STATUS_OFFICE_MEETING"
- SLACK_CLIENT_STATUS_OFF_HOURS = "SLACK_CLIENT_STATUS_OFF_HOURS"
- SLACK_CLIENT_STATUS_OUT = "SLACK_CLIENT_STATUS_OUT"
- SLACK_CLIENT_STATUS_SICK = "SLACK_CLIENT_STATUS_SICK"
- SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE = "SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE"
- SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE = "SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE"
- SLACK_CLIENT_STATUS_VACATION = "SLACK_CLIENT_STATUS_VACATION"
- SLACK_CLIENT_STATUS_WEEKEND = "SLACK_CLIENT_STATUS_WEEKEND"
- class NetworkLocation(StrEnum):
- HOME = "HOME"
- OFFICE = "OFFICE"
- TRANSIT = "TRANSIT"
- UNKNOWN = "UNKNOWN"
- class CalendarWorkLocation(StrEnum):
- HOME = "HOME"
- OFFICE = "OFFICE"
- TRANSIT = "TRANSIT"
- UNKNOWN = "UNKNOWN"
- OUT = "OUT"
- SICK = "SICK"
- VACATION = "VACATION"
- class UserState(StrEnum):
- OFF = "OFF"
- WEEKEND = "WEEKEND"
- OUT = "OUT"
- SICK = "SICK"
- VACATION = "VACATION"
- UNKNOWN = "UNKNOWN"
- HOME = "HOME"
- HOME_LATER_OFFICE = "HOME_LATER_OFFICE"
- HOME_MEETING = "HOME_MEETING"
- OFFICE = "OFFICE"
- OFFICE_MEETING = "OFFICE_MEETING"
- TRANSIT_FROM_OFFICE = "TRANSIT_FROM_OFFICE"
- TRANSIT_FROM_OFFICE_MEETING = "TRANSIT_FROM_OFFICE_MEETING"
- TRANSIT_TO_OFFICE = "TRANSIT_TO_OFFICE"
- TRANSIT_TO_OFFICE_MEETING = "TRANSIT_TO_OFFICE_MEETING"
- class Cacheable:
- _ensured = False
- _cache_path: str = "cache.json"
- _cache_expiry = 60 * 60 * 24
- def _ensure_and_clear_cache(self):
- if self._ensured:
- return
- makedirs(dirname(self._cache_path), exist_ok=True)
- if not isfile(self._cache_path):
- with open(self._cache_path, "w") as f:
- _ = f.write(dumps({}))
- self._ensured = True
- if (
- datetime.now().timestamp() - stat(self._cache_path).st_birthtime
- > self._cache_expiry
- ):
- debug(f"Reset cache {self._cache_path}")
- remove(self._cache_path)
- with open(self._cache_path, "w") as f:
- _ = f.write(dumps({}))
- def _read_cache(self, key: str) -> JSONType:
- self._ensure_and_clear_cache()
- with open(self._cache_path, "r") as f:
- cache: JSONDict = loads(f.read())
- if key in cache:
- debug(f"{self.__class__.__name__} read cached value `{key}`")
- return cache[key]
- def _write_cache(self, key: str, value: JSONType):
- self._ensure_and_clear_cache()
- with open(self._cache_path, "r") as f:
- cache: JSONDict = loads(f.read())
- cache[key] = value
- with open(self._cache_path, "w") as f:
- _ = f.write(dumps(cache))
- class TimeParser:
- def __init__(self, date: datetime) -> None:
- self._date = date
- def read(
- self,
- ) -> tuple[WorkTime, int, int, int, int]:
- return (
- self._get_worktime(),
- self._get_workday_end_timestamp(),
- self._get_workday_start_timestamp(),
- self._get_future_round_timestamp(30),
- self._get_future_round_timestamp(60),
- )
- def _get_worktime(self) -> WorkTime:
- weekday = int(self._date.strftime("%w"))
- is_weekday = bool(weekday % 6)
- hour = int(self._date.strftime("%H"))
- if not is_weekday or (weekday == 5 and hour >= 16):
- return WorkTime.WEEKEND
- if hour < 7 or hour >= 20:
- return WorkTime.OFF_HOURS
- elif hour <= 12:
- return WorkTime.WORK_MORNING
- else:
- return WorkTime.WORK_AFTERNOON
- def _get_workday_end_timestamp(self) -> int:
- return round(
- datetime(
- year=self._date.year,
- month=self._date.month,
- day=self._date.day,
- hour=17,
- ).timestamp()
- )
- def _get_workday_start_timestamp(self) -> int:
- future = datetime(
- year=self._date.year,
- month=self._date.month,
- day=self._date.day,
- hour=8,
- minute=0,
- )
- if self._date.hour > 17:
- future += timedelta(days=1)
- while future.weekday() >= 5:
- future += timedelta(days=1)
- return round(future.timestamp())
- def _get_future_round_timestamp(self, minutes: int) -> int:
- future = self._date + timedelta(minutes=minutes)
- if future.minute % 15 == 0:
- future += timedelta(minutes=1)
- while future.minute % 15 != 0:
- future += timedelta(minutes=1)
- return round(future.timestamp())
- class EnvConfigParser:
- def __init__(self):
- self._app_config: dict[EnvConfigKey, str] = {}
- for key in EnvConfigKey:
- value = getenv(key)
- if value is None:
- raise ValueError(f"Undefined environment variable: {key}")
- self._app_config[key] = value
- def read(self):
- return self._app_config
- def __getitem__(self, name: EnvConfigKey) -> str:
- return self._app_config[name]
- class NetworkParser:
- def __init__(self, env_config: EnvConfigParser):
- self._config = env_config.read()
- def read(self) -> NetworkLocation:
- connected = next(
- i.replace(" ", "").replace("\t", "")
- for i in (
- check_output(["networksetup", "-listpreferredwirelessnetworks", "en0"])
- .decode("utf8")
- .split("\n")
- )[1:]
- )
- if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_HOME]:
- return NetworkLocation.HOME
- if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_OFFICE]:
- return NetworkLocation.OFFICE
- if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_TRANSIT]:
- return NetworkLocation.TRANSIT
- return NetworkLocation.UNKNOWN
- class CookiesParser(Cacheable):
- # Read and decrypt Chrome cookies from SQLite database
- # Useful resources:
- # * https://gist.github.com/creachadair/937179894a24571ce9860e2475a2d2ec
- # * https://n8henrie.com/2014/05/decrypt-chrome-cookies-with-python/
- def __init__(self, env_config: EnvConfigParser, path: str):
- self._path = join(path, "Cookies")
- self._env_config = env_config
- self._key = None
- self._cookies: list[tuple[str, str]] | None = None
- self._cache_path = join(
- self._env_config[EnvConfigKey.XDG_CACHE_HOME],
- "slack_client",
- "cookies_cache.json",
- )
- self._cache_expiry = 60 * 60 * 24 * 365
- def read(self) -> Generator[tuple[str, str]]:
- if self._key is None:
- self._key = self._make_key()
- self._cookies = []
- for key, value in self._iterate_values():
- value = value[3:]
- decoded = self._decode(self._key, value)
- self._cookies.append((key, decoded))
- yield self._cookies[-1]
- def _get_password(self, cache: bool = False) -> str:
- if cache:
- return str(self._read_cache("password"))
- try:
- security_result = check_output(
- ["security", "find-generic-password", "-g", "-s", "Slack Safe Storage"],
- stderr=STDOUT,
- )
- except CalledProcessError:
- return self._get_password(True)
- password = next(
- l
- for l in security_result.decode("utf8").split("\n")
- if l.startswith("password:")
- )
- password = password[11:-1]
- self._write_cache("password", password)
- return password
- def _make_key(
- self,
- ):
- return pbkdf2_hmac(
- "sha1", self._get_password().encode("utf8"), b"saltysalt", 1003
- )[:16]
- def _decode(self, key: bytes, value: bytes, cache: bool = True) -> str:
- cache_key = b64encode(key + value).decode("ascii")
- if (
- cache
- and (cached_value := self._read_cache(cache_key))
- and type(cached_value) == str
- ):
- return cached_value
- with TemporaryDirectory() as directory:
- keyfile: str = join(directory, "keyfile")
- infile: str = join(directory, "infile")
- outfile: str = join(directory, "outfile")
- with open(keyfile, "wb") as f:
- _ = f.write(key)
- with open(infile, "wb") as f:
- _ = f.write(value)
- _ = check_output(
- [
- "openssl",
- "enc",
- "-d",
- "-aes-128-cbc",
- "-K",
- "".join([hex(i)[2:] for i in key]),
- "-in",
- infile,
- "-out",
- outfile,
- "-iv",
- "".join([hex(i)[2:] for i in b" " * 16]),
- "-p",
- ]
- )
- with open(outfile, "rb") as f:
- result = f.read()
- # Rather then spend time understanding the format of the decrypted bytes
- # (which does seem to vary), just throw out the first 8 bytes any time
- # we don't get a valid ASCII string from decoding.
- while True:
- try:
- result = result.decode("ascii")
- break
- except UnicodeDecodeError:
- result = result[8:]
- self._write_cache(cache_key, result)
- return result
- def _iterate_values(self) -> list[tuple[str, bytes]]:
- con = connect_sqlite(self._path)
- cur = con.cursor()
- res = cur.execute(
- 'SELECT name, encrypted_value FROM cookies WHERE host_key=".slack.com"'
- )
- all: list[tuple[str, bytes]] = res.fetchall()
- return list([r for r in all])
- class LevelDBParser:
- def __init__(self, path: str):
- self._path = join(path, "Local Storage/leveldb/")
- self._app_configs = None
- def read(self):
- if self._app_configs is None:
- self._app_configs = [c for c in self._load_configs()]
- return [c for c in self._app_configs]
- def _read_files(self):
- for file in listdir(self._path):
- file = join(self._path, file)
- if not isfile(file) or splitext(file)[-1] != ".ldb":
- continue
- content = (
- check_output(["leveldbutil", "dump", file]).decode("utf8").split("\n")
- )
- for line in content:
- if len(line) == 0:
- continue
- if line[0] != "'":
- debug("Invalid start character detected in " + file)
- continue
- yield line
- def _get_keys_and_values(self):
- for line in self._read_files():
- key_end = line.find("'", 1)
- key = line[1 : key_end - 1]
- line = line[key_end:]
- line = line[line.find("@") :]
- line = line[line.find(":") :]
- line = line[line.find("'") :]
- if line[0] != "'" or line[-1] != "'":
- debug("Invalid start or end character detected")
- continue
- value = line[1:-1]
- yield key, value
- def _get_json_values(self):
- for key, value in self._get_keys_and_values():
- while (
- len(value) >= 4
- and value[0] == "\\"
- and value[1] == "x"
- and value[2] in hexdigits
- and value[3] in hexdigits
- ):
- value = value[4:]
- try:
- value: JSONType = loads(value)
- except:
- continue
- if type(value) != dict:
- continue
- yield key, value
- def _load_configs(self):
- def find_config(data: JSONDict) -> JSONDict | None:
- if (
- "token" in data
- and type(data["token"]) == str
- and "domain" in data
- and "url" in data
- and type(data["url"]) == str
- and "id" in data
- and data["url"].endswith(".slack.com/")
- and data["token"].startswith("xoxc")
- ):
- return data
- for _, value in data.items():
- if type(value) == dict:
- return find_config(value)
- for _, value in self._get_json_values():
- config = find_config(value)
- if config:
- yield config
- class CalendarParser:
- def __init__(self, blocks: list[list[str]], env_config: EnvConfigParser):
- self._blocks = blocks
- self._env_config = env_config
- def read(
- self,
- ) -> tuple[
- CalendarWorkLocation, bool, tuple[list[tuple[list[int], str]], list[list[str]]]
- ]:
- location: CalendarWorkLocation = CalendarWorkLocation.UNKNOWN
- sick = False
- out = False
- vacation = False
- meeting = False
- current: list[list[str]] = []
- items: list[tuple[list[int], str]] = []
- for block in self._blocks:
- if block[-1] == "All-day" or block[-1].count("date_long_pretty") >= 2:
- if (
- "|"
- + self._env_config[
- EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE
- ]
- in block[0]
- ):
- location = CalendarWorkLocation.OFFICE
- if (
- "|"
- + self._env_config[
- EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME
- ]
- in block[0]
- ):
- location = CalendarWorkLocation.HOME
- if (
- "|"
- + self._env_config[
- EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK
- ]
- in block[0]
- ):
- vacation = True
- if (
- "|"
- + self._env_config[
- EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK
- ]
- in block[0]
- ):
- sick = True
- if (
- "|"
- + self._env_config[
- EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT
- ]
- in block[0]
- ):
- out = True
- current.append(block)
- elif "!date^" in block[-1]:
- chunks = block[-1].split("!date^")
- times: list[int] = []
- for chunk in chunks:
- subchunks = chunk.split("^")
- try:
- times.append(int(subchunks[0]))
- except:
- pass
- if len(times) != 2:
- continue
- items.append((times, block[0]))
- if times[0] < time() and times[1] > time():
- meeting = True
- current.append(block)
- if out:
- location = CalendarWorkLocation.OUT
- if sick:
- location = CalendarWorkLocation.SICK
- if vacation:
- location = CalendarWorkLocation.VACATION
- return location, meeting, (items, current)
- class SlackClient(Cacheable):
- def __init__(
- self,
- env_config: EnvConfigParser,
- leveldb_parser: LevelDBParser,
- cookies: CookiesParser,
- ):
- self._env_config = env_config
- self._cookies = list(cookies.read())
- # TODO: Add support for multiple Slack workspaces by selecting config by ID
- self._app_config = leveldb_parser.read().pop()
- self._cache_path = join(
- self._env_config[EnvConfigKey.XDG_CACHE_HOME],
- "slack_client",
- "slack_cache.json",
- )
- self._cache_expiry = 60 * 60
- self._token = (
- self._app_config["token"]
- if "token" in self._app_config and type(self._app_config["token"]) == str
- else ""
- )
- def _call(
- self,
- method: str,
- form_data: None | JSONDict = None,
- cache: bool = False,
- ) -> JSONDict:
- cache_key = dumps([method, form_data])
- if cache and (cached := self._read_cache(cache_key)) and type(cached) == dict:
- return cached
- req = request.Request(
- f"{self._app_config['url']}/api/{method}",
- headers={
- "Authorization": "Bearer " + self._token,
- "Cookie": ";".join([f"{k}={v}" for k, v in self._cookies]),
- },
- data=parse.urlencode(
- (form_data if form_data else {})
- | {"_x_mode": "online", "_x_sonic": "true", "_x_app_name": "client"}
- ).encode(),
- )
- response = cast(HTTPResponse, request.urlopen(req))
- result: JSONDict = loads(response.read())
- if result["ok"]:
- if cache:
- self._write_cache(cache_key, result)
- return result
- raise ValueError("Bad result from Slack API call:", dumps(result))
- def update_status(self, text: str, emoji: str, expiration: int):
- return self._call(
- "users.profile.set",
- {
- "profile": {
- "status_emoji": emoji,
- "status_expiration": expiration,
- "status_text": text,
- "status_text_canonical": "",
- "ooo_message": "",
- },
- "_x_reason": "CustomStatusModal:handle_save",
- },
- )
- def update_presence(self, presence: bool):
- return self._call("profile.set", {"presence": "active" if presence else "away"})
- def get_calendar(self):
- users = self._call("users.list", cache=True)
- calendar_user = next(
- i
- for i in cast(list[JSONType], users["members"])
- if type(i) == dict
- and i["is_bot"]
- and "real_name" in i
- and i["real_name"] == "Google Calendar"
- )
- bot_id: str = cast(str, cast(JSONDict, calendar_user["profile"])["bot_id"])
- bot_convo = self._call(
- "conversations.open", {"users": calendar_user["id"]}, cache=True
- )
- convo_id: str = cast(str, cast(JSONDict, bot_convo["channel"])["id"])
- convo_info = self._call(
- "conversations.info", {"channel": convo_id, "return_app_home": True}, False
- )
- # The content of this view might not be properly updated - try some tricks to
- # force it to refresh!
- team_id: str = cast(
- str,
- cast(
- JSONDict,
- convo_info["home_view"],
- )["team_id"],
- )
- _ = self._call(
- "apps.home.dispatchOpenEvent",
- {
- "id": convo_id,
- "type": "home",
- "service_team_id": team_id,
- },
- False,
- )
- _ = self._call("apps.profile.get", {"bot": bot_id}, False)
- app_view = self._call(
- "views.get",
- {
- "view_id": cast(JSONDict, convo_info["home_view"])["id"],
- "_x_reason": "fetchView",
- },
- False,
- )
- # Click the "Tomorrow" button, then the "Today" button
- view: JSONDict = cast(JSONDict, app_view["view"])
- view_id: str = cast(str, view["id"])
- blocks: list[JSONDict] = cast(list[JSONDict], view["blocks"])
- hash: str = cast(str, view["hash"])
- def click_block_action(value: str):
- for block in blocks:
- if block["type"] != "actions":
- continue
- for button in cast(list[JSONDict], block["elements"]):
- if "value" not in button or button["value"] != value:
- continue
- button["block_id"] = block["block_id"]
- _ = self._call(
- "blocks.actions",
- {
- "service_id": bot_id,
- "service_team_id": team_id,
- "actions": dumps([button]),
- "container": dumps({"type": "view", "view_id": view_id}),
- "client_token": "web-" + hash.split(".")[0],
- },
- False,
- )
- break
- for button in ["AGENDA_TOMORROW", "AGENDA_TODAY"]:
- click_block_action(button)
- # Gotta sleep, to ensure that the app view has time to update
- sleep(5)
- app_view = self._call(
- "views.get",
- {
- "view_id": cast(JSONDict, convo_info["home_view"])["id"],
- "_x_reason": "fetchView",
- },
- False,
- )
- texts = [
- cast(str, cast(JSONDict, b["text"])["text"]).split("\n")
- for b in cast(list[JSONDict], cast(JSONDict, app_view["view"])["blocks"])
- if "text" in b
- ]
- texts = [t for t in texts if len(t) and any(len(b) for b in t) > 0]
- texts = [loads(t) for t in set(dumps(t) for t in texts)]
- return texts
- def userstate_parse(
- worktime: WorkTime,
- network_location: NetworkLocation,
- work_location: CalendarWorkLocation,
- meeting: bool,
- ) -> UserState:
- if network_location == NetworkLocation.OFFICE:
- if meeting and (worktime == "afternoon" or worktime == "evening"):
- return UserState.OFFICE_MEETING
- return UserState.OFFICE
- elif work_location == CalendarWorkLocation.VACATION:
- return UserState.VACATION
- elif work_location == CalendarWorkLocation.SICK:
- return UserState.SICK
- elif work_location == CalendarWorkLocation.OUT:
- return UserState.OUT
- elif worktime == WorkTime.WEEKEND:
- return UserState.WEEKEND
- elif worktime == WorkTime.OFF_HOURS:
- return UserState.OFF
- elif work_location == CalendarWorkLocation.OFFICE:
- if network_location == NetworkLocation.TRANSIT:
- if worktime == WorkTime.WORK_MORNING:
- if meeting:
- return UserState.TRANSIT_TO_OFFICE_MEETING
- return UserState.TRANSIT_TO_OFFICE
- elif worktime == WorkTime.WORK_AFTERNOON:
- if meeting:
- return UserState.TRANSIT_FROM_OFFICE_MEETING
- return UserState.TRANSIT_FROM_OFFICE
- elif network_location == NetworkLocation.HOME:
- if worktime == WorkTime.WORK_MORNING:
- return UserState.HOME_LATER_OFFICE
- elif worktime == WorkTime.WORK_AFTERNOON:
- return userstate_parse(
- worktime, network_location, CalendarWorkLocation.HOME, meeting
- )
- else:
- return userstate_parse(
- worktime, NetworkLocation.OFFICE, work_location, meeting
- )
- elif work_location == CalendarWorkLocation.HOME:
- if meeting:
- return UserState.HOME_MEETING
- return UserState.HOME
- elif work_location == CalendarWorkLocation.UNKNOWN:
- return userstate_parse(
- worktime, network_location, CalendarWorkLocation.HOME, meeting
- )
- return UserState.UNKNOWN
- def envvar_to_slack_status(status: str) -> tuple[str, str]:
- if "|" not in status:
- raise ValueError(status)
- emoji, text = tuple(status.split("|"))[:2]
- return (":" + emoji + ":") if len(emoji) > 0 else emoji, text
- def main():
- base_path = join(getenv("HOME", "~"), "Library/Application Support/Slack")
- env_config_parser = EnvConfigParser()
- leveldb_parser = LevelDBParser(base_path)
- cookies_parser = CookiesParser(env_config_parser, base_path)
- network_parser = NetworkParser(env_config_parser)
- time_parser = TimeParser(datetime.now())
- client = SlackClient(env_config_parser, leveldb_parser, cookies_parser)
- calendar_parser = CalendarParser(client.get_calendar(), env_config_parser)
- worktime, workday_end_expiry, workday_start_expiry, halfhour_expiry, hour_expiry = (
- time_parser.read()
- )
- work_location, meeting, (calendar_items, _) = calendar_parser.read()
- for calendar_item in calendar_items:
- debug(f"Calendar item: {calendar_item}")
- network_location = network_parser.read()
- userstate = userstate_parse(worktime, network_location, work_location, meeting)
- debug(
- f"State information: \
- {userstate=} \
- {worktime=}, \
- {network_location=}, \
- {work_location=}, \
- {meeting=}"
- )
- status_emoji: None | str = None
- status_text: None | str = None
- status_expiration: None | int = None
- match userstate:
- case UserState.OFF:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFF_HOURS]
- )
- status_expiration = workday_start_expiry
- case UserState.WEEKEND:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_WEEKEND]
- )
- status_expiration = workday_start_expiry
- case UserState.OUT:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OUT]
- )
- case UserState.SICK:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_SICK]
- )
- status_expiration = workday_end_expiry
- case UserState.VACATION:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_VACATION]
- )
- case UserState.HOME:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME]
- )
- status_expiration = workday_end_expiry
- case UserState.HOME_LATER_OFFICE:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME_LATER_OFFICE]
- )
- status_expiration = hour_expiry
- case UserState.HOME_MEETING:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME_MEETING]
- )
- status_expiration = halfhour_expiry
- case UserState.OFFICE:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFFICE]
- )
- status_expiration = workday_end_expiry
- case UserState.OFFICE_MEETING:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFFICE_MEETING]
- )
- status_expiration = halfhour_expiry
- case UserState.TRANSIT_FROM_OFFICE:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE]
- )
- status_expiration = halfhour_expiry
- case UserState.TRANSIT_FROM_OFFICE_MEETING:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE]
- )
- status_expiration = halfhour_expiry
- case UserState.TRANSIT_TO_OFFICE:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE]
- )
- status_expiration = halfhour_expiry
- case UserState.TRANSIT_TO_OFFICE_MEETING:
- status_emoji, status_text = envvar_to_slack_status(
- env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE]
- )
- status_expiration = halfhour_expiry
- case UserState.UNKNOWN:
- pass
- _ = client.update_status(
- status_text if status_text else "",
- status_emoji if status_emoji else "",
- status_expiration if status_expiration else 0,
- )
- debug(f"Updated status: {status_text=} {status_emoji=} {status_expiration=}")
- if __name__ == "__main__":
- basicConfig(level=NOTSET)
- debug(f"Called at {datetime.now().ctime()} with parameters {argv}")
- if platform != "darwin":
- error(f"This client only runs on macOS")
- exit(1)
- for program in ["networksetup", "leveldbutil", "openssl"]:
- try:
- _ = check_output([program, "--version"], stderr=DEVNULL)
- except CalledProcessError:
- pass
- except:
- error(f"Aborted, as `{program}` doesn't appear to be callable")
- exit(1)
- main()
|