# Copyright (C) 2023 Evgenij Titarenko # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import logging from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi_socketio import SocketManager from game import Room, User, load_streets from objects import ErrCode, UserCheck logging.basicConfig(level=logging.DEBUG, format="%(levelname)s:\t%(asctime)s\t%(message)s") load_streets() app = FastAPI() origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) sio = SocketManager(app=app, mount_location="/ws", socketio_path="game", cors_allowed_origins=[]) # sio_sessions = TTLCache(maxsize=10000, ttl=24 * 60 * 60) rooms = [] users = [] # TODO: implement as dict # TODO: Time-based tokens def check_user(username, password=None, token=None): if not (password or token): return False else: global users selected_users = tuple(filter(lambda usr: usr.username == username, users)) if len(selected_users) == 0: if token: logging.debug(f"Invalid token for user {username}") return UserCheck.INVALID_CREDENTIALS logging.debug(f"Invalid user {username}") return UserCheck.USER_DOESNT_EXISTS else: user = selected_users[0] if user.token == token or user.password == password: logging.debug(f"Valid credentials for user {username}") return user else: logging.debug(f"Invalid token or password for user {username}:\n{user.token} --- {token}") return UserCheck.INVALID_CREDENTIALS @sio.on("createRoom") async def sio_create_room(sid, username, token, password): match check_user(username, token=token): case User(username=username) as user: new_room = Room(password) logging.info(f"User {username} created room {new_room.room_id}") new_room.add_user(user) rooms.append(new_room) user.in_room = new_room.room_id # TODO: not add user on creation user.room_password = new_room.password await sio_send_user_info(sid, user) case _: return False # TODO: some errors async def sio_send_user_info(sid, user): userdata = user.get_dict() userdata["token"] = user.token if user.in_room: userdata["in_room"] = user.in_room userdata["password"] = user.password await sio.emit("userInfo", userdata, room=sid) @sio.on("joinRoom") async def sio_join_room(sid, room_id, room_password): # TODO: Check if user already in room global rooms selected_room = tuple(filter(lambda room: room.room_id == room_id, rooms)) if len(selected_room) == 0: await sio_throw_error(sid, ErrCode.ROOM_DOESNT_EXIST) else: room = selected_room[0] room_data = room.get_dict() sio.enter_room(sid, room.room_id) if room.password == room_password: await sio.emit("roomInfo", room_data, room=sid) @sio.on("connect") async def sio_connect(sid, _, auth): if not ("username" in auth and ("password" in auth or "token" in auth))\ or not auth["username"].isalnum()\ or len(auth["username"]) > 16: await sio_throw_error(sid, ErrCode.INVALID_CREDENTIALS) return False username = auth["username"] token = auth.get("token", None) password = auth.get("password", None) match check_user(username, password, token): case User(username=username) as user: await sio_send_user_info(sid, user) case UserCheck.USER_DOESNT_EXISTS: if not password: return False user = User(username, password) users.append(user) await sio_send_user_info(sid, user) case UserCheck.INVALID_CREDENTIALS: await sio_throw_error(sid, ErrCode.INVALID_CREDENTIALS) return False case _: await sio_throw_error(sid, ErrCode.UNKNOWN_ERROR) return False logging.debug(f"{sid} connected as {username}") async def sio_throw_error(sid, error): await sio.emit("error", error.name, room=sid) @sio.on("disconnect") async def sio_disconnect(sid): logging.debug(f"{sid} disconnected") @sio.on("rollDices") async def sio_roll_dices(sid, token: str = ""): print("test")