Initial implementation of the bot

This commit is contained in:
Tristan Daniël Maat 2023-05-19 19:46:11 +01:00
commit aec9047998
Signed by: tlater
GPG key ID: 49670FD774E43268
8 changed files with 427 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/src/__pycache__/

27
flake.lock Normal file
View file

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1659446231,
"narHash": "sha256-hekabNdTdgR/iLsgce5TGWmfIDZ86qjPhxDg/8TlzhE=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "eabc38219184cc3e04a974fe31857d8e0eac098d",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-21.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

58
flake.nix Normal file
View file

@ -0,0 +1,58 @@
{
description = "Script to check for TLS appointments";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-21.11";
};
outputs = {self, nixpkgs, ...}: let
system = "x86_64-linux";
pkgs = nixpkgs.legacyPackages.${system};
in {
packages.${system}.matrix-nio = nixpkgs.legacyPackages.${system}.python3Packages.matrix-nio.overridePythonAttrs (old: {
version = "0.20.2";
src = pkgs.fetchFromGitHub {
owner = "poljar";
repo = "matrix-nio";
rev = "0.20.2";
hash = "sha256-opxYQZC6g2BvSk46/3Uf7UC2AMofYqLZ/v15X6YGqeU=";
};
checkInputs = [nixpkgs.legacyPackages.${system}.python3Packages.pytest-flake8];
postPatch = ''
rm -r tests
mkdir -p tests
echo 'def test_thing():' > tests/test_thing.py
echo ' pass' >> tests/test_thing.py
substituteInPlace pyproject.toml \
--replace 'aiofiles = "^23.1.0"' 'aiofiles = "*"' \
--replace 'aiohttp-socks = "^0.7.0"' 'aiohttp-socks = "*"' \
--replace 'h11 = "^0.14.0"' 'h11 = "*"' \
--replace 'jsonschema = "^4.4.0"' 'jsonschema = "*"' \
--replace 'aiohttp = "^3.8.3"' 'aiohttp = "*"' \
--replace 'cachetools = { version = "^4.2.1", optional = true }' 'cachetools = { version = "*", optional = true }'
'';
});
devShells.${system}.default = pkgs.mkShell {
packages = with pkgs; [
geckodriver
(python3.withPackages (ppkgs:
with ppkgs; [
self.packages.${system}.matrix-nio
selenium
python-lsp-server
pyls-isort
pylsp-mypy
python-lsp-black
]))
];
};
};
}

0
src/__init__.py Normal file
View file

57
src/config.py Normal file
View file

@ -0,0 +1,57 @@
"""Configuration helpers."""
import argparse
import datetime
import logging
import os
import sys
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class Config:
"""Dataclass holding CLI/config settings."""
matrix_homeserver: str
matrix_user: str
tls_instance: str
tls_user: str
credentials_dir: Path
latest_appointment: datetime.datetime
@classmethod
def from_args(cls):
"""Create a config instance from CLI arguments."""
parser = argparse.ArgumentParser(
description="Monitor TLSContact for new appointments and notify via matrix."
)
parser.add_argument("matrix_homeserver")
parser.add_argument("matrix_user")
parser.add_argument("tls_instance") # https://nl.tlscontact.com/cn
parser.add_argument("tls_user")
parser.add_argument("--credentials-dir", type=Path, default=None)
parser.add_argument("--latest-appointment", default=None)
args = parser.parse_args()
if args.credentials_dir is None:
if os.getenv("CREDENTIALS_DIRECTORY"):
args.credentials_dir = Path(os.getenv("CREDENTIALS_DIRECTORY"))
else:
logger.critical("missing credentials directory")
sys.exit(1)
if args.latest_appointment is not None:
args.latest_appointment = datetime.datetime.strptime(
args.latest_appointment, "%Y-%m-%d"
)
return cls(**vars(args))

36
src/main.py Normal file
View file

@ -0,0 +1,36 @@
"""Simple bot to watch for tlscontact visa slots."""
import asyncio
import logging
import sys
import traceback
from .config import Config
from .matrixbot import MatrixBot
from .tlsappointmentbot import TLSAppointmentBot
logger = logging.getLogger(__name__)
def main():
"""Run the tlsappointment bot."""
logging.basicConfig(level=logging.INFO)
config = Config.from_args()
matrix_bot = MatrixBot(config)
tls_bot = TLSAppointmentBot(matrix_bot, config)
async def stuff():
await asyncio.gather(matrix_bot.run(), tls_bot.run())
asyncio.run(stuff())
if __name__ == "__main__":
try:
main()
except Exception:
logger.critical(traceback.format_exc().strip())
sys.exit(1)
except KeyboardInterrupt:
sys.exit(0)

107
src/matrixbot.py Normal file
View file

@ -0,0 +1,107 @@
"""Manage sending matrix messages to matrix rooms."""
import datetime
import logging
from typing import List
import nio
from .config import Config
logger = logging.getLogger(__name__)
class MatrixBot:
"""A class for managing the matrix bot that handles messaging users."""
def __init__(self, config: Config):
"""Initialize some variables for the bot."""
# Basic setup
self.client = None
self.initial_sync_done = None
# Matrix configuration
self.homeserver = config.matrix_homeserver
self.bot_id = config.matrix_user
self.bot_password_file = config.credentials_dir / "bot-password"
async def _on_error(self, response):
if self.client:
await self.client.close()
raise Exception(response)
async def _on_sync(self, response):
if not self.initial_sync_done:
self.initial_sync_done = True
for room_id in self.client.rooms:
logger.info(f"joined room {room_id}")
if self.client.rooms[room_id].member_count < 2:
await self.client.room_leave(room_id)
logger.debug(f"left room {room_id} since all other users left")
logger.info("initial sync done, ready for work")
async def _on_invite(self, room, event):
logger.info(f"invited by {event.sender} to {room.room_id}")
if event.sender in ["@tlater:matrix.tlater.net", "@yuanyuan:matrix.tlater.net"]:
await self.client.join(room.room_id)
await self.client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": "Hi! I'll inform you if new slots become available.",
},
)
else:
logger.info(f"rejected invite by {event.sender}")
await self.client.room_leave(room.room_id)
async def send_appointments(self, slots: List[datetime.datetime]):
"""Send a message with spotted appointments to all joined rooms."""
assert self.client
for room_id in self.client.rooms:
logger.info(f"notifying room {room_id} about new slots")
message = "Appointment time slots spotted:\n\n" + "\n".join(
slot.strftime("%Y-%m-%d %H:%M") for slot in slots
)
await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": message,
},
)
async def send_warning(self):
"""Send a message telling everyone that we're no longer running."""
assert self.client
for room_id in self.client.rooms:
await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": "Something went wrong, tell Tristan, I might be broken!",
},
)
async def run(self):
"""Start the bot."""
logger.info(f"Connecting to {self.homeserver}")
self.client = nio.AsyncClient(self.homeserver, self.bot_id)
self.client.device_id = "TLSContactAppointmentBot"
self.client.add_response_callback(self._on_error, nio.SyncError)
self.client.add_response_callback(self._on_sync, nio.SyncResponse)
self.client.add_event_callback(self._on_invite, nio.InviteMemberEvent)
logger.info(await self.client.login(self.bot_password_file.read_text()))
await self.client.sync_forever(timeout=30000, loop_sleep_time=200)
await self.client.close()

141
src/tlsappointmentbot.py Normal file
View file

@ -0,0 +1,141 @@
"""The actual appointment checking bot."""
import asyncio
import datetime
import logging
from typing import List
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support import wait
from .config import Config
from .matrixbot import MatrixBot
logger = logging.getLogger(__name__)
class TLSAppointmentBot:
"""A bot for periodically checking the tls appointment website."""
def __init__(self, matrix_bot: MatrixBot, config: Config):
"""Set up the TLSAppointmentBot."""
self.matrix_bot = matrix_bot
self.config = config
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
self.driver = webdriver.Firefox(options=options)
async def run(self):
"""Run the TLSAppointmentBot."""
# Wait for the matrix bot to be ready
while not self.matrix_bot.initial_sync_done:
await asyncio.sleep(1)
while True:
try:
logger.info("checking for appointment slots")
while not self._is_logged_in():
logger.info("relogging since login expired")
self._login()
appointments = sorted(
filter(
lambda app: self.config.latest_appointment is None
or app < self.config.latest_appointment,
self._get_appointments(),
)
)
if appointments:
logger.info("found appointments, notifying")
await self.matrix_bot.send_appointments(appointments)
except Exception:
await self.matrix_bot.send_warning()
await asyncio.sleep(2 * 60)
def _is_logged_in(self) -> bool:
self.driver.get(f"{self.config.tls_instance}/BJS/index.php")
button = wait.WebDriverWait(self.driver, 10).until(
ec.visibility_of_element_located((By.CSS_SELECTOR, "a.nav-link.login"))
)
return button.text == "LOG OUT"
def _login(self):
self.driver.get(f"{self.config.tls_instance}/BJS/login.php")
[email, pwd, btn] = wait.WebDriverWait(self.driver, 10).until(
ec.visibility_of_all_elements_located(
(
By.XPATH,
"//input[@id='email']|//input[@id='pwd']|//input[@type='button']",
)
)
)
email.send_keys(self.config.tls_user)
pwd.send_keys((self.config.credentials_dir / "tls-password").read_text())
btn.click()
def _get_appointments(self) -> List[datetime.datetime]:
self.driver.get(f"{self.config.tls_instance}/BJS/myapp.php")
# Make sure we're on the page, and that we can see the
# appointment table
wait.WebDriverWait(self.driver, 10).until(
ec.visibility_of_element_located((By.XPATH, "//tr[@class='amend']"))
)
# If the user has already made an appointment, and is just
# trying to reschedule, there will be an "Amend" button we
# need to press first.
amend_button = self.driver.find_elements(
By.XPATH, "//input[@type='button' and @value='Amend']"
)
if amend_button:
wait.WebDriverWait(self.driver, 10).until(
ec.visibility_of_element_located((By.CLASS_NAME, "osano-cm-dialog"))
)
# Make sure the cookie banner doesn't potentially
# interfere with the page
self.driver.execute_script(
"""
let banner = document.getElementsByClassName("osano-cm-dialog")[0];
banner.parentNode.removeChild(banner);
"""
)
amend_button[0].click()
timetable = wait.WebDriverWait(self.driver, 10).until(
ec.visibility_of_element_located((By.ID, "timeTable"))
)
year = timetable.find_element(By.CLASS_NAME, "year-month-title").text.split()[1]
appointments: List[datetime.datetime] = []
for day in timetable.find_elements_by_class_name("inner_timeslot"):
date = day.find_element_by_class_name("appt-table-d")
slots = day.find_elements_by_css_selector(".appt-table-btn.dispo")
[month, num, _] = date.text.split()
logger.debug(f"{len(slots)} appointment slots on {month} {num}, {year}")
appointments.extend(
datetime.datetime.strptime(
f"{year}-{month}-{num}T{slot.text}", "%Y-%B-%dT%H:%M"
)
for slot in slots
)
return appointments
def quit(self):
"""Quit the TLSAppointmentBot, including cleaning up cookies."""
self.driver.delete_all_cookies()
self.driver.quit()