|
|
@@ -0,0 +1,803 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+########################################################################################
|
|
|
+# slack_client.py
|
|
|
+#
|
|
|
+# A tiny, standard-library only tool to read credentials from the Slack desktop app and
|
|
|
+# make requests to the Slack API on behalf of the user. It has access to the user's
|
|
|
+# Google Calendar via the Slack App, and the currently connected WiFi network via
|
|
|
+# `networksetup`. Requires `openssl` and `leveldbutil` to run.
|
|
|
+#
|
|
|
+# Not made to be portable.
|
|
|
+# Also a bit shit.
|
|
|
+########################################################################################
|
|
|
+
|
|
|
+from base64 import b64encode
|
|
|
+from collections.abc import Generator
|
|
|
+from enum import StrEnum
|
|
|
+from hashlib import pbkdf2_hmac
|
|
|
+from json import loads, dumps
|
|
|
+from logging import NOTSET, basicConfig, debug, error
|
|
|
+from os import getenv, listdir, makedirs, remove, stat
|
|
|
+from os.path import join, isfile, splitext, dirname
|
|
|
+from sqlite3 import connect as connect_sqlite
|
|
|
+from string import hexdigits
|
|
|
+from subprocess import check_output, CalledProcessError, DEVNULL, STDOUT
|
|
|
+from sys import exit, argv
|
|
|
+from sys import platform
|
|
|
+from tempfile import TemporaryDirectory
|
|
|
+from time import time
|
|
|
+from typing import cast
|
|
|
+from urllib import request, parse
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from http.client import HTTPResponse
|
|
|
+
|
|
|
+JSONType = str | int | float | None | list["JSONType"] | dict[str, "JSONType"]
|
|
|
+JSONDict = dict[str, JSONType]
|
|
|
+
|
|
|
+
|
|
|
+class WorkTime(StrEnum):
|
|
|
+ WEEKEND = "WEEKEND"
|
|
|
+ OFF_HOURS = "OFF_HOURS"
|
|
|
+ WORK_MORNING = "WORK_MORNING"
|
|
|
+ WORK_AFTERNOON = "WORK_AFTERNOON"
|
|
|
+
|
|
|
+
|
|
|
+class EnvConfigKey(StrEnum):
|
|
|
+ XDG_CACHE_HOME = "XDG_CACHE_HOME"
|
|
|
+ SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK = "SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK"
|
|
|
+ SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME = "SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME"
|
|
|
+ SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE = "SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE"
|
|
|
+ SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT = "SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT"
|
|
|
+ SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK = "SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK"
|
|
|
+ SLACK_CLIENT_NETWORK_NAME_HOME = "SLACK_CLIENT_NETWORK_NAME_HOME"
|
|
|
+ SLACK_CLIENT_NETWORK_NAME_OFFICE = "SLACK_CLIENT_NETWORK_NAME_OFFICE"
|
|
|
+ SLACK_CLIENT_NETWORK_NAME_TRANSIT = "SLACK_CLIENT_NETWORK_NAME_TRANSIT"
|
|
|
+ SLACK_CLIENT_STATUS_HOME = "SLACK_CLIENT_STATUS_HOME"
|
|
|
+ SLACK_CLIENT_STATUS_HOME_LATER_OFFICE = "SLACK_CLIENT_STATUS_HOME_LATER_OFFICE"
|
|
|
+ SLACK_CLIENT_STATUS_HOME_MEETING = "SLACK_CLIENT_STATUS_HOME_MEETING"
|
|
|
+ SLACK_CLIENT_STATUS_OFFICE = "SLACK_CLIENT_STATUS_OFFICE"
|
|
|
+ SLACK_CLIENT_STATUS_OFFICE_MEETING = "SLACK_CLIENT_STATUS_OFFICE_MEETING"
|
|
|
+ SLACK_CLIENT_STATUS_OFF_HOURS = "SLACK_CLIENT_STATUS_OFF_HOURS"
|
|
|
+ SLACK_CLIENT_STATUS_OUT = "SLACK_CLIENT_STATUS_OUT"
|
|
|
+ SLACK_CLIENT_STATUS_SICK = "SLACK_CLIENT_STATUS_SICK"
|
|
|
+ SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE = "SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE"
|
|
|
+ SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE = "SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE"
|
|
|
+ SLACK_CLIENT_STATUS_VACATION = "SLACK_CLIENT_STATUS_VACATION"
|
|
|
+ SLACK_CLIENT_STATUS_WEEKEND = "SLACK_CLIENT_STATUS_WEEKEND"
|
|
|
+
|
|
|
+
|
|
|
+class NetworkLocation(StrEnum):
|
|
|
+ HOME = "HOME"
|
|
|
+ OFFICE = "OFFICE"
|
|
|
+ TRANSIT = "TRANSIT"
|
|
|
+ UNKNOWN = "UNKNOWN"
|
|
|
+
|
|
|
+
|
|
|
+class CalendarWorkLocation(StrEnum):
|
|
|
+ HOME = "HOME"
|
|
|
+ OFFICE = "OFFICE"
|
|
|
+ TRANSIT = "TRANSIT"
|
|
|
+ UNKNOWN = "UNKNOWN"
|
|
|
+ OUT = "OUT"
|
|
|
+ SICK = "SICK"
|
|
|
+ VACATION = "VACATION"
|
|
|
+
|
|
|
+
|
|
|
+class UserState(StrEnum):
|
|
|
+ OFF = "OFF"
|
|
|
+ WEEKEND = "WEEKEND"
|
|
|
+ OUT = "OUT"
|
|
|
+ SICK = "SICK"
|
|
|
+ VACATION = "VACATION"
|
|
|
+ UNKNOWN = "UNKNOWN"
|
|
|
+
|
|
|
+ HOME = "HOME"
|
|
|
+ HOME_LATER_OFFICE = "HOME_LATER_OFFICE"
|
|
|
+ HOME_MEETING = "HOME_MEETING"
|
|
|
+ OFFICE = "OFFICE"
|
|
|
+ OFFICE_MEETING = "OFFICE_MEETING"
|
|
|
+ TRANSIT_FROM_OFFICE = "TRANSIT_FROM_OFFICE"
|
|
|
+ TRANSIT_FROM_OFFICE_MEETING = "TRANSIT_FROM_OFFICE_MEETING"
|
|
|
+ TRANSIT_TO_OFFICE = "TRANSIT_TO_OFFICE"
|
|
|
+ TRANSIT_TO_OFFICE_MEETING = "TRANSIT_TO_OFFICE_MEETING"
|
|
|
+
|
|
|
+
|
|
|
+class Cacheable:
|
|
|
+ _ensured = False
|
|
|
+ _cache_path: str = "cache.json"
|
|
|
+ _cache_expiry = 60 * 60 * 24
|
|
|
+
|
|
|
+ def _ensure_and_clear_cache(self):
|
|
|
+ if self._ensured:
|
|
|
+ return
|
|
|
+ makedirs(dirname(self._cache_path), exist_ok=True)
|
|
|
+ if not isfile(self._cache_path):
|
|
|
+ with open(self._cache_path, "w") as f:
|
|
|
+ _ = f.write(dumps({}))
|
|
|
+ self._ensured = True
|
|
|
+
|
|
|
+ if (
|
|
|
+ datetime.now().timestamp() - stat(self._cache_path).st_birthtime
|
|
|
+ > self._cache_expiry
|
|
|
+ ):
|
|
|
+ debug(f"Reset cache {self._cache_path}")
|
|
|
+ remove(self._cache_path)
|
|
|
+ with open(self._cache_path, "w") as f:
|
|
|
+ _ = f.write(dumps({}))
|
|
|
+
|
|
|
+ def _read_cache(self, key: str) -> JSONType:
|
|
|
+ self._ensure_and_clear_cache()
|
|
|
+ with open(self._cache_path, "r") as f:
|
|
|
+ cache: JSONDict = loads(f.read())
|
|
|
+ if key in cache:
|
|
|
+ debug(f"{self.__class__.__name__} read cached value `{key}`")
|
|
|
+ return cache[key]
|
|
|
+
|
|
|
+ def _write_cache(self, key: str, value: JSONType):
|
|
|
+ self._ensure_and_clear_cache()
|
|
|
+ with open(self._cache_path, "r") as f:
|
|
|
+ cache: JSONDict = loads(f.read())
|
|
|
+ cache[key] = value
|
|
|
+ with open(self._cache_path, "w") as f:
|
|
|
+ _ = f.write(dumps(cache))
|
|
|
+
|
|
|
+
|
|
|
+class TimeParser:
|
|
|
+ def __init__(self, date: datetime) -> None:
|
|
|
+ self._date = date
|
|
|
+
|
|
|
+ def read(
|
|
|
+ self,
|
|
|
+ ) -> tuple[WorkTime, int, int, int, int]:
|
|
|
+ return (
|
|
|
+ self._get_worktime(),
|
|
|
+ self._get_workday_end_timestamp(),
|
|
|
+ self._get_workday_start_timestamp(),
|
|
|
+ self._get_future_round_timestamp(30),
|
|
|
+ self._get_future_round_timestamp(60),
|
|
|
+ )
|
|
|
+
|
|
|
+ def _get_worktime(self) -> WorkTime:
|
|
|
+ weekday = int(self._date.strftime("%w"))
|
|
|
+ is_weekday = bool(weekday % 6)
|
|
|
+ hour = int(self._date.strftime("%H"))
|
|
|
+ if not is_weekday or (weekday == 5 and hour >= 16):
|
|
|
+ return WorkTime.WEEKEND
|
|
|
+ if hour < 8 or hour >= 18:
|
|
|
+ return WorkTime.OFF_HOURS
|
|
|
+ elif hour <= 12:
|
|
|
+ return WorkTime.WORK_MORNING
|
|
|
+ else:
|
|
|
+ return WorkTime.WORK_AFTERNOON
|
|
|
+
|
|
|
+ def _get_workday_end_timestamp(self) -> int:
|
|
|
+ return round(
|
|
|
+ datetime(
|
|
|
+ year=self._date.year,
|
|
|
+ month=self._date.month,
|
|
|
+ day=self._date.day,
|
|
|
+ hour=17,
|
|
|
+ ).timestamp()
|
|
|
+ )
|
|
|
+
|
|
|
+ def _get_workday_start_timestamp(self) -> int:
|
|
|
+ future = datetime(
|
|
|
+ year=self._date.year,
|
|
|
+ month=self._date.month,
|
|
|
+ day=self._date.day,
|
|
|
+ hour=8,
|
|
|
+ minute=0,
|
|
|
+ )
|
|
|
+ if self._date.hour > 17:
|
|
|
+ future += timedelta(days=1)
|
|
|
+ while future.weekday() >= 5:
|
|
|
+ future += timedelta(days=1)
|
|
|
+ return round(future.timestamp())
|
|
|
+
|
|
|
+ def _get_future_round_timestamp(self, minutes: int) -> int:
|
|
|
+ future = self._date + timedelta(minutes=minutes)
|
|
|
+ if future.minute % 15 == 0:
|
|
|
+ future += timedelta(minutes=1)
|
|
|
+ while future.minute % 15 != 0:
|
|
|
+ future += timedelta(minutes=1)
|
|
|
+ return round(future.timestamp())
|
|
|
+
|
|
|
+
|
|
|
+class EnvConfigParser:
|
|
|
+ def __init__(self):
|
|
|
+ self._app_config: dict[EnvConfigKey, str] = {}
|
|
|
+ for key in EnvConfigKey:
|
|
|
+ value = getenv(key)
|
|
|
+ if value is None:
|
|
|
+ raise ValueError(f"Undefined environment variable: {key}")
|
|
|
+ self._app_config[key] = value
|
|
|
+
|
|
|
+ def read(self):
|
|
|
+ return self._app_config
|
|
|
+
|
|
|
+ def __getitem__(self, name: EnvConfigKey) -> str:
|
|
|
+ return self._app_config[name]
|
|
|
+
|
|
|
+
|
|
|
+class NetworkParser:
|
|
|
+ def __init__(self, env_config: EnvConfigParser):
|
|
|
+ self._config = env_config.read()
|
|
|
+
|
|
|
+ def read(self) -> NetworkLocation:
|
|
|
+ connected = next(
|
|
|
+ i.replace(" ", "").replace("\t", "")
|
|
|
+ for i in (
|
|
|
+ check_output(["networksetup", "-listpreferredwirelessnetworks", "en0"])
|
|
|
+ .decode("utf8")
|
|
|
+ .split("\n")
|
|
|
+ )[1:]
|
|
|
+ )
|
|
|
+ if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_HOME]:
|
|
|
+ return NetworkLocation.HOME
|
|
|
+ if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_OFFICE]:
|
|
|
+ return NetworkLocation.OFFICE
|
|
|
+ if connected == self._config[EnvConfigKey.SLACK_CLIENT_NETWORK_NAME_TRANSIT]:
|
|
|
+ return NetworkLocation.TRANSIT
|
|
|
+ return NetworkLocation.UNKNOWN
|
|
|
+
|
|
|
+
|
|
|
+class CookiesParser(Cacheable):
|
|
|
+ # Read and decrypt Chrome cookies from SQLite database
|
|
|
+ # Useful resources:
|
|
|
+ # * https://gist.github.com/creachadair/937179894a24571ce9860e2475a2d2ec
|
|
|
+ # * https://n8henrie.com/2014/05/decrypt-chrome-cookies-with-python/
|
|
|
+ def __init__(self, env_config: EnvConfigParser, path: str):
|
|
|
+ self._path = join(path, "Cookies")
|
|
|
+ self._env_config = env_config
|
|
|
+ self._key = None
|
|
|
+ self._cookies: list[tuple[str, str]] | None = None
|
|
|
+ self._cache_path = join(
|
|
|
+ self._env_config[EnvConfigKey.XDG_CACHE_HOME],
|
|
|
+ "slack_client",
|
|
|
+ "cookies_cache.json",
|
|
|
+ )
|
|
|
+ self._cache_expiry = 60 * 60 * 24 * 365
|
|
|
+
|
|
|
+ def read(self) -> Generator[tuple[str, str]]:
|
|
|
+ if self._key is None:
|
|
|
+ self._key = self._make_key()
|
|
|
+ self._cookies = []
|
|
|
+ for key, value in self._iterate_values():
|
|
|
+ value = value[3:]
|
|
|
+ decoded = self._decode(self._key, value)
|
|
|
+ self._cookies.append((key, decoded))
|
|
|
+ yield self._cookies[-1]
|
|
|
+
|
|
|
+ def _get_password(self, cache: bool = False) -> str:
|
|
|
+ if cache:
|
|
|
+ return str(self._read_cache("password"))
|
|
|
+ try:
|
|
|
+ security_result = check_output(
|
|
|
+ ["security", "find-generic-password", "-g", "-s", "Slack Safe Storage"],
|
|
|
+ stderr=STDOUT,
|
|
|
+ )
|
|
|
+ except CalledProcessError:
|
|
|
+ return self._get_password(True)
|
|
|
+ password = next(
|
|
|
+ l
|
|
|
+ for l in security_result.decode("utf8").split("\n")
|
|
|
+ if l.startswith("password:")
|
|
|
+ )
|
|
|
+ password = password[11:-1]
|
|
|
+ self._write_cache("password", password)
|
|
|
+ return password
|
|
|
+
|
|
|
+ def _make_key(
|
|
|
+ self,
|
|
|
+ ):
|
|
|
+ return pbkdf2_hmac(
|
|
|
+ "sha1", self._get_password().encode("utf8"), b"saltysalt", 1003
|
|
|
+ )[:16]
|
|
|
+
|
|
|
+ def _decode(self, key: bytes, value: bytes, cache: bool = True) -> str:
|
|
|
+ cache_key = b64encode(key + value).decode("ascii")
|
|
|
+ if (
|
|
|
+ cache
|
|
|
+ and (cached_value := self._read_cache(cache_key))
|
|
|
+ and type(cached_value) == str
|
|
|
+ ):
|
|
|
+ return cached_value
|
|
|
+ with TemporaryDirectory() as directory:
|
|
|
+ keyfile: str = join(directory, "keyfile")
|
|
|
+ infile: str = join(directory, "infile")
|
|
|
+ outfile: str = join(directory, "outfile")
|
|
|
+ with open(keyfile, "wb") as f:
|
|
|
+ _ = f.write(key)
|
|
|
+ with open(infile, "wb") as f:
|
|
|
+ _ = f.write(value)
|
|
|
+ _ = check_output(
|
|
|
+ [
|
|
|
+ "openssl",
|
|
|
+ "enc",
|
|
|
+ "-d",
|
|
|
+ "-aes-128-cbc",
|
|
|
+ "-K",
|
|
|
+ "".join([hex(i)[2:] for i in key]),
|
|
|
+ "-in",
|
|
|
+ infile,
|
|
|
+ "-out",
|
|
|
+ outfile,
|
|
|
+ "-iv",
|
|
|
+ "".join([hex(i)[2:] for i in b" " * 16]),
|
|
|
+ "-p",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ with open(outfile, "rb") as f:
|
|
|
+ result = f.read()
|
|
|
+ result = result.decode("utf8")
|
|
|
+ self._write_cache(cache_key, result)
|
|
|
+ return result
|
|
|
+
|
|
|
+ def _iterate_values(self) -> list[tuple[str, bytes]]:
|
|
|
+ con = connect_sqlite(self._path)
|
|
|
+ cur = con.cursor()
|
|
|
+ res = cur.execute(
|
|
|
+ 'SELECT name, encrypted_value FROM cookies WHERE host_key=".slack.com"'
|
|
|
+ )
|
|
|
+ all: list[tuple[str, bytes]] = res.fetchall()
|
|
|
+ return list([r for r in all])
|
|
|
+
|
|
|
+
|
|
|
+class LevelDBParser:
|
|
|
+ def __init__(self, path: str):
|
|
|
+ self._path = join(path, "Local Storage/leveldb/")
|
|
|
+ self._app_configs = None
|
|
|
+
|
|
|
+ def read(self):
|
|
|
+ if self._app_configs is None:
|
|
|
+ self._app_configs = [c for c in self._load_configs()]
|
|
|
+ return [c for c in self._app_configs]
|
|
|
+
|
|
|
+ def _read_files(self):
|
|
|
+ for file in listdir(self._path):
|
|
|
+ file = join(self._path, file)
|
|
|
+ if not isfile(file) or splitext(file)[-1] != ".ldb":
|
|
|
+ continue
|
|
|
+ content = (
|
|
|
+ check_output(["leveldbutil", "dump", file]).decode("utf8").split("\n")
|
|
|
+ )
|
|
|
+ for line in content:
|
|
|
+ if len(line) == 0:
|
|
|
+ continue
|
|
|
+ if line[0] != "'":
|
|
|
+ debug("Invalid start character detected in " + file)
|
|
|
+ continue
|
|
|
+ yield line
|
|
|
+
|
|
|
+ def _get_keys_and_values(self):
|
|
|
+ for line in self._read_files():
|
|
|
+ key_end = line.find("'", 1)
|
|
|
+ key = line[1 : key_end - 1]
|
|
|
+ line = line[key_end:]
|
|
|
+ line = line[line.find("@") :]
|
|
|
+ line = line[line.find(":") :]
|
|
|
+ line = line[line.find("'") :]
|
|
|
+ if line[0] != "'" or line[-1] != "'":
|
|
|
+ debug("Invalid start or end character detected")
|
|
|
+ continue
|
|
|
+ value = line[1:-1]
|
|
|
+ yield key, value
|
|
|
+
|
|
|
+ def _get_json_values(self):
|
|
|
+ for key, value in self._get_keys_and_values():
|
|
|
+ while (
|
|
|
+ len(value) >= 4
|
|
|
+ and value[0] == "\\"
|
|
|
+ and value[1] == "x"
|
|
|
+ and value[2] in hexdigits
|
|
|
+ and value[3] in hexdigits
|
|
|
+ ):
|
|
|
+ value = value[4:]
|
|
|
+ try:
|
|
|
+ value: JSONType = loads(value)
|
|
|
+ except:
|
|
|
+ continue
|
|
|
+ if type(value) != dict:
|
|
|
+ continue
|
|
|
+ yield key, value
|
|
|
+
|
|
|
+ def _load_configs(self):
|
|
|
+ def find_config(data: JSONDict) -> JSONDict | None:
|
|
|
+ if (
|
|
|
+ "token" in data
|
|
|
+ and type(data["token"]) == str
|
|
|
+ and "domain" in data
|
|
|
+ and "url" in data
|
|
|
+ and type(data["url"]) == str
|
|
|
+ and "id" in data
|
|
|
+ and data["url"].endswith(".slack.com/")
|
|
|
+ and data["token"].startswith("xoxc")
|
|
|
+ ):
|
|
|
+ return data
|
|
|
+ for _, value in data.items():
|
|
|
+ if type(value) == dict:
|
|
|
+ return find_config(value)
|
|
|
+
|
|
|
+ for _, value in self._get_json_values():
|
|
|
+ config = find_config(value)
|
|
|
+ if config:
|
|
|
+ yield config
|
|
|
+
|
|
|
+
|
|
|
+class CalendarParser:
|
|
|
+ def __init__(self, blocks: list[list[str]], env_config: EnvConfigParser):
|
|
|
+ self._blocks = blocks
|
|
|
+ self._env_config = env_config
|
|
|
+
|
|
|
+ def read(
|
|
|
+ self,
|
|
|
+ ) -> tuple[
|
|
|
+ CalendarWorkLocation, bool, tuple[list[tuple[list[int], str]], list[list[str]]]
|
|
|
+ ]:
|
|
|
+ location: CalendarWorkLocation = CalendarWorkLocation.UNKNOWN
|
|
|
+ sick = False
|
|
|
+ out = False
|
|
|
+ vacation = False
|
|
|
+ meeting = False
|
|
|
+ current: list[list[str]] = []
|
|
|
+ items: list[tuple[list[int], str]] = []
|
|
|
+ for block in self._blocks:
|
|
|
+ if block[-1] == "All-day":
|
|
|
+ if (
|
|
|
+ "|"
|
|
|
+ + self._env_config[
|
|
|
+ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_OFFICE
|
|
|
+ ]
|
|
|
+ in block[0]
|
|
|
+ ):
|
|
|
+ location = CalendarWorkLocation.OFFICE
|
|
|
+ elif (
|
|
|
+ "|"
|
|
|
+ + self._env_config[
|
|
|
+ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_HOME
|
|
|
+ ]
|
|
|
+ in block[0]
|
|
|
+ ):
|
|
|
+ location = CalendarWorkLocation.HOME
|
|
|
+ elif (
|
|
|
+ "|"
|
|
|
+ + self._env_config[
|
|
|
+ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_BREAK
|
|
|
+ ]
|
|
|
+ in block[0]
|
|
|
+ ):
|
|
|
+ vacation = True
|
|
|
+ elif (
|
|
|
+ "|"
|
|
|
+ + self._env_config[
|
|
|
+ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_SICK
|
|
|
+ ]
|
|
|
+ in block[0]
|
|
|
+ ):
|
|
|
+ sick = True
|
|
|
+ elif (
|
|
|
+ "|"
|
|
|
+ + self._env_config[
|
|
|
+ EnvConfigKey.SLACK_CLIENT_CALENDAR_EVENT_NAME_OUT
|
|
|
+ ]
|
|
|
+ in block[0]
|
|
|
+ ):
|
|
|
+ out = True
|
|
|
+ current.append(block)
|
|
|
+ elif "!date^" in block[-1]:
|
|
|
+ chunks = block[-1].split("!date^")
|
|
|
+ times: list[int] = []
|
|
|
+ for chunk in chunks:
|
|
|
+ subchunks = chunk.split("^")
|
|
|
+ try:
|
|
|
+ times.append(int(subchunks[0]))
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ if len(times) != 2:
|
|
|
+ continue
|
|
|
+ items.append((times, block[0]))
|
|
|
+ if times[0] < time() and times[1] > time():
|
|
|
+ meeting = True
|
|
|
+ current.append(block)
|
|
|
+ if out:
|
|
|
+ location = CalendarWorkLocation.OUT
|
|
|
+ if sick:
|
|
|
+ location = CalendarWorkLocation.SICK
|
|
|
+ if vacation:
|
|
|
+ location = CalendarWorkLocation.VACATION
|
|
|
+ return location, meeting, (items, current)
|
|
|
+
|
|
|
+
|
|
|
+class SlackClient(Cacheable):
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ env_config: EnvConfigParser,
|
|
|
+ leveldb_parser: LevelDBParser,
|
|
|
+ cookies: CookiesParser,
|
|
|
+ ):
|
|
|
+ self._env_config = env_config
|
|
|
+ self._cookies = list(cookies.read())
|
|
|
+ # TODO: Add support for multiple Slack workspaces by selecting config by ID
|
|
|
+ self._app_config = leveldb_parser.read().pop()
|
|
|
+
|
|
|
+ self._cache_path = join(
|
|
|
+ self._env_config[EnvConfigKey.XDG_CACHE_HOME],
|
|
|
+ "slack_client",
|
|
|
+ "slack_cache.json",
|
|
|
+ )
|
|
|
+ self._cache_expiry = 60 * 60
|
|
|
+
|
|
|
+ self._token = (
|
|
|
+ self._app_config["token"]
|
|
|
+ if "token" in self._app_config and type(self._app_config["token"]) == str
|
|
|
+ else ""
|
|
|
+ )
|
|
|
+
|
|
|
+ def _call(
|
|
|
+ self,
|
|
|
+ method: str,
|
|
|
+ form_data: None | JSONDict = None,
|
|
|
+ cache: bool = False,
|
|
|
+ ) -> JSONDict:
|
|
|
+ cache_key = dumps([method, form_data])
|
|
|
+ if cache and (cached := self._read_cache(cache_key)) and type(cached) == dict:
|
|
|
+ return cached
|
|
|
+ req = request.Request(
|
|
|
+ f'{self._app_config["url"]}/api/{method}',
|
|
|
+ headers={
|
|
|
+ "Authorization": "Bearer " + self._token,
|
|
|
+ "Cookie": ";".join([f"{k}={v}" for k, v in self._cookies]),
|
|
|
+ },
|
|
|
+ data=parse.urlencode(
|
|
|
+ (form_data if form_data else {})
|
|
|
+ | {"_x_mode": "online", "_x_sonic": "true", "_x_app_name": "client"}
|
|
|
+ ).encode(),
|
|
|
+ )
|
|
|
+ response = cast(HTTPResponse, request.urlopen(req))
|
|
|
+ result: JSONDict = loads(response.read())
|
|
|
+ if result["ok"]:
|
|
|
+ if cache:
|
|
|
+ self._write_cache(cache_key, result)
|
|
|
+ return result
|
|
|
+ raise ValueError("Bad result from Slack API call:", dumps(result))
|
|
|
+
|
|
|
+ def update_status(self, text: str, emoji: str, expiration: int):
|
|
|
+ return self._call(
|
|
|
+ "users.profile.set",
|
|
|
+ {
|
|
|
+ "profile": {
|
|
|
+ "status_emoji": emoji,
|
|
|
+ "status_expiration": expiration,
|
|
|
+ "status_text": text,
|
|
|
+ "status_text_canonical": "",
|
|
|
+ "ooo_message": "",
|
|
|
+ },
|
|
|
+ "_x_reason": "CustomStatusModal:handle_save",
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ def update_presence(self, presence: bool):
|
|
|
+ return self._call("profile.set", {"presence": "active" if presence else "away"})
|
|
|
+
|
|
|
+ def get_calendar(self):
|
|
|
+ users = self._call("users.list", cache=True)
|
|
|
+ calendar_user = next(
|
|
|
+ i
|
|
|
+ for i in cast(list[JSONType], users["members"])
|
|
|
+ if type(i) == dict
|
|
|
+ and i["is_bot"]
|
|
|
+ and "real_name" in i
|
|
|
+ and i["real_name"] == "Google Calendar"
|
|
|
+ )
|
|
|
+ bot_convo = self._call(
|
|
|
+ "conversations.open", {"users": calendar_user["id"]}, cache=True
|
|
|
+ )
|
|
|
+ convo_id: str = cast(str, cast(JSONDict, bot_convo["channel"])["id"])
|
|
|
+ convo_info = self._call(
|
|
|
+ "conversations.info", {"channel": convo_id, "return_app_home": True}, False
|
|
|
+ )
|
|
|
+ app_view = self._call(
|
|
|
+ "views.get",
|
|
|
+ {"view_id": cast(JSONDict, convo_info["home_view"])["id"]},
|
|
|
+ False,
|
|
|
+ )
|
|
|
+ texts = [
|
|
|
+ cast(str, cast(JSONDict, b["text"])["text"]).split("\n")
|
|
|
+ for b in cast(list[JSONDict], cast(JSONDict, app_view["view"])["blocks"])
|
|
|
+ if "text" in b
|
|
|
+ ]
|
|
|
+ texts = [t for t in texts if len(t) and any(len(b) for b in t) > 0]
|
|
|
+ texts = [loads(t) for t in set(dumps(t) for t in texts)]
|
|
|
+ return texts
|
|
|
+
|
|
|
+
|
|
|
+def userstate_parse(
|
|
|
+ worktime: WorkTime,
|
|
|
+ network_location: NetworkLocation,
|
|
|
+ work_location: CalendarWorkLocation,
|
|
|
+ meeting: bool,
|
|
|
+) -> UserState:
|
|
|
+ if network_location == NetworkLocation.OFFICE:
|
|
|
+ if meeting and (worktime == "afternoon" or worktime == "evening"):
|
|
|
+ return UserState.OFFICE_MEETING
|
|
|
+ return UserState.OFFICE
|
|
|
+ elif work_location == CalendarWorkLocation.VACATION:
|
|
|
+ return UserState.VACATION
|
|
|
+ elif work_location == CalendarWorkLocation.SICK:
|
|
|
+ return UserState.SICK
|
|
|
+ elif work_location == CalendarWorkLocation.OUT:
|
|
|
+ return UserState.OUT
|
|
|
+ elif worktime == WorkTime.WEEKEND:
|
|
|
+ return UserState.WEEKEND
|
|
|
+ elif worktime == WorkTime.OFF_HOURS:
|
|
|
+ return UserState.OFF
|
|
|
+ elif work_location == CalendarWorkLocation.OFFICE:
|
|
|
+ if network_location == NetworkLocation.TRANSIT:
|
|
|
+ if worktime == WorkTime.WORK_MORNING:
|
|
|
+ if meeting:
|
|
|
+ return UserState.TRANSIT_TO_OFFICE_MEETING
|
|
|
+ return UserState.TRANSIT_TO_OFFICE
|
|
|
+ elif worktime == WorkTime.WORK_AFTERNOON:
|
|
|
+ if meeting:
|
|
|
+ return UserState.TRANSIT_FROM_OFFICE_MEETING
|
|
|
+ return UserState.TRANSIT_FROM_OFFICE
|
|
|
+ elif network_location == NetworkLocation.HOME:
|
|
|
+ if worktime == WorkTime.WORK_MORNING:
|
|
|
+ return UserState.HOME_LATER_OFFICE
|
|
|
+ elif worktime == WorkTime.WORK_AFTERNOON:
|
|
|
+ return userstate_parse(
|
|
|
+ worktime, network_location, CalendarWorkLocation.HOME, meeting
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ return userstate_parse(
|
|
|
+ worktime, NetworkLocation.OFFICE, work_location, meeting
|
|
|
+ )
|
|
|
+ elif work_location == CalendarWorkLocation.HOME:
|
|
|
+ if meeting:
|
|
|
+ return UserState.HOME_MEETING
|
|
|
+ return UserState.HOME
|
|
|
+ elif work_location == CalendarWorkLocation.UNKNOWN:
|
|
|
+ return userstate_parse(
|
|
|
+ worktime, network_location, CalendarWorkLocation.HOME, meeting
|
|
|
+ )
|
|
|
+ return UserState.UNKNOWN
|
|
|
+
|
|
|
+
|
|
|
+def envvar_to_slack_status(status: str) -> tuple[str, str]:
|
|
|
+ if "|" not in status:
|
|
|
+ raise ValueError(status)
|
|
|
+ emoji, text = tuple(status.split("|"))[:2]
|
|
|
+ return (":" + emoji + ":") if len(emoji) > 0 else emoji, text
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ base_path = join(getenv("HOME", "~"), "Library/Application Support/Slack")
|
|
|
+ env_config_parser = EnvConfigParser()
|
|
|
+ leveldb_parser = LevelDBParser(base_path)
|
|
|
+ cookies_parser = CookiesParser(env_config_parser, base_path)
|
|
|
+ network_parser = NetworkParser(env_config_parser)
|
|
|
+ time_parser = TimeParser(datetime.now())
|
|
|
+
|
|
|
+ client = SlackClient(env_config_parser, leveldb_parser, cookies_parser)
|
|
|
+
|
|
|
+ calendar_parser = CalendarParser(client.get_calendar(), env_config_parser)
|
|
|
+ worktime, workday_end_expiry, workday_start_expiry, halfhour_expiry, hour_expiry = (
|
|
|
+ time_parser.read()
|
|
|
+ )
|
|
|
+ work_location, meeting, (calendar_items, _) = calendar_parser.read()
|
|
|
+ for calendar_item in calendar_items:
|
|
|
+ debug(f"Calendar item: {calendar_item}")
|
|
|
+ network_location = network_parser.read()
|
|
|
+
|
|
|
+ userstate = userstate_parse(worktime, network_location, work_location, meeting)
|
|
|
+ debug(
|
|
|
+ f"State information: \
|
|
|
+ {userstate=} \
|
|
|
+ {worktime=}, \
|
|
|
+ {network_location=}, \
|
|
|
+ {work_location=}, \
|
|
|
+ {meeting=}"
|
|
|
+ )
|
|
|
+
|
|
|
+ status_emoji: None | str = None
|
|
|
+ status_text: None | str = None
|
|
|
+ status_expiration: None | int = None
|
|
|
+ match userstate:
|
|
|
+ case UserState.OFF:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFF_HOURS]
|
|
|
+ )
|
|
|
+ status_expiration = workday_start_expiry
|
|
|
+ case UserState.WEEKEND:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_WEEKEND]
|
|
|
+ )
|
|
|
+ status_expiration = workday_start_expiry
|
|
|
+ case UserState.OUT:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OUT]
|
|
|
+ )
|
|
|
+ case UserState.SICK:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_SICK]
|
|
|
+ )
|
|
|
+ status_expiration = workday_end_expiry
|
|
|
+ case UserState.VACATION:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_VACATION]
|
|
|
+ )
|
|
|
+
|
|
|
+ case UserState.HOME:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME]
|
|
|
+ )
|
|
|
+ status_expiration = workday_end_expiry
|
|
|
+ case UserState.HOME_LATER_OFFICE:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME_LATER_OFFICE]
|
|
|
+ )
|
|
|
+ status_expiration = hour_expiry
|
|
|
+ case UserState.HOME_MEETING:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_HOME_MEETING]
|
|
|
+ )
|
|
|
+ status_expiration = halfhour_expiry
|
|
|
+ case UserState.OFFICE:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFFICE]
|
|
|
+ )
|
|
|
+ status_expiration = workday_end_expiry
|
|
|
+ case UserState.OFFICE_MEETING:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_OFFICE_MEETING]
|
|
|
+ )
|
|
|
+ status_expiration = halfhour_expiry
|
|
|
+ case UserState.TRANSIT_FROM_OFFICE:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE]
|
|
|
+ )
|
|
|
+ status_expiration = halfhour_expiry
|
|
|
+ case UserState.TRANSIT_FROM_OFFICE_MEETING:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_FROM_OFFICE]
|
|
|
+ )
|
|
|
+ status_expiration = halfhour_expiry
|
|
|
+ case UserState.TRANSIT_TO_OFFICE:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE]
|
|
|
+ )
|
|
|
+ status_expiration = halfhour_expiry
|
|
|
+ case UserState.TRANSIT_TO_OFFICE_MEETING:
|
|
|
+ status_emoji, status_text = envvar_to_slack_status(
|
|
|
+ env_config_parser[EnvConfigKey.SLACK_CLIENT_STATUS_TRANSIT_TO_OFFICE]
|
|
|
+ )
|
|
|
+ status_expiration = halfhour_expiry
|
|
|
+
|
|
|
+ case UserState.UNKNOWN:
|
|
|
+ pass
|
|
|
+
|
|
|
+ _ = client.update_status(
|
|
|
+ status_text if status_text else "",
|
|
|
+ status_emoji if status_emoji else "",
|
|
|
+ status_expiration if status_expiration else 0,
|
|
|
+ )
|
|
|
+ debug(f"Updated status: {status_text=} {status_emoji=} {status_expiration=}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ basicConfig(level=NOTSET)
|
|
|
+ debug(f"Called at {datetime.now().ctime()} with parameters {argv}")
|
|
|
+
|
|
|
+ if platform != "darwin":
|
|
|
+ error(f"This client only runs on macOS")
|
|
|
+ exit(1)
|
|
|
+ for program in ["networksetup", "leveldbutil", "openssl"]:
|
|
|
+ try:
|
|
|
+ _ = check_output([program], stderr=DEVNULL)
|
|
|
+ except CalledProcessError:
|
|
|
+ pass
|
|
|
+ except:
|
|
|
+ error(f"Aborted, as `{program}` doesn't appear to be callable")
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+ main()
|