# Copyright (C) 2023 Evgenij Titarenko # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import logging from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi_socketio import SocketManager from cachetools import TTLCache from game import Room, User, load_streets, GameState from objects import ErrCode, UserCheck from threading import Timer import asyncio ROOM_DISCONNECT_TIMEOUT = 60*1 # TODO: Сохранить остаток таймера для каждого пользователя logging.basicConfig(level=logging.DEBUG, format="%(levelname)s:\t%(asctime)s\t%(message)s") load_streets() app = FastAPI() origins = ["*"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) sio = SocketManager(app=app, mount_location="/ws", socketio_path="game", cors_allowed_origins=[]) sio_sessions = TTLCache(maxsize=10000, ttl=24 * 60 * 60) rooms = dict() users = dict() users_to_disconnect = dict() # TODO: Time-based tokens def check_user(username, password=None, token=None): if not (password or token): return False else: global users if username not in users: if token: logging.debug(f"Invalid token for user {username}") return UserCheck.INVALID_CREDENTIALS logging.debug(f"Invalid user {username}") return UserCheck.USER_DOESNT_EXISTS else: user = users[username] if user.token == token or user.password == password: logging.debug(f"Valid credentials for user {username}") return user else: logging.debug(f"Invalid token or password for user {username}:\n{user.token} --- {token}") return UserCheck.INVALID_CREDENTIALS @sio.on("createRoom") async def sio_create_room(sid, username, token, password): match check_user(username, token=token): case User(username=username) as user: new_room = Room(sio, password) logging.info(f"User {username} created room {new_room.room_id}") rooms[new_room.room_id] = new_room await sio.emit("roomCreated", {"room_id": new_room.room_id, "room_password": new_room.password}, room=sid) case UserCheck.USER_DOESNT_EXISTS | UserCheck.INVALID_CREDENTIALS: await sio_throw_error(sid, ErrCode.INVALID_CREDENTIALS) return False async def sio_send_user_info(sid, user): userdata = user.get_dict() userdata["token"] = user.token if user.in_room: userdata["in_room"] = user.in_room userdata["room_password"] = user.room_password await sio.emit("userInfo", userdata, room=sid) @sio.on("joinRoom") async def sio_join_room(sid, username, token, room_id, room_password): # TODO: Проверить начата ли игра match check_user(username, token=token): case User(username=username) as user: global rooms if room_id not in rooms: logging.debug(f"Room {room_id} doesn't found") await sio_throw_error(sid, ErrCode.ROOM_DOESNT_EXIST) else: room = rooms[room_id] if room.password == room_password: room_data = room.get_dict() if not user.in_room: if room.is_full(): await sio_throw_error(sid, ErrCode.ROOM_IS_FULL) return user.in_room = room.room_id user.room_password = room.password room_data = room.get_dict() await room.add_user(user) sio.enter_room(sid, room.room_id) elif user.in_room != room.room_id: await sio_throw_error(sid, ErrCode.USER_ALREADY_IN_OTHER_ROOM) return await sio_send_user_info(sid, user) await sio.emit("roomInfo", room_data, room=sid) else: await sio_throw_error(sid, ErrCode.INVALID_ROOM_PASSWORD) case _: await sio_throw_error(sid, ErrCode.INVALID_CREDENTIALS) return False @sio.on("connect") async def sio_connect(sid, _, auth): if sid in sio_sessions: return False if not ("username" in auth and ("password" in auth or "token" in auth))\ or not auth["username"].isalnum()\ or len(auth["username"]) > 16: await sio_throw_error(sid, ErrCode.INVALID_CREDENTIALS) return False username = auth["username"] token = auth.get("token", None) password = auth.get("password", None) match check_user(username, password, token): case User(username=username) as user: user.sid = sid sio_sessions[sid] = user if username in users_to_disconnect: timer = users_to_disconnect[username] timer.cancel() del users_to_disconnect[username] await sio_send_user_info(sid, user) case UserCheck.USER_DOESNT_EXISTS: if not password: return False user = User(username, password) users[username] = user user.sid = sid sio_sessions[sid] = user await sio_send_user_info(sid, user) case UserCheck.INVALID_CREDENTIALS: await sio_throw_error(sid, ErrCode.INVALID_CREDENTIALS) return False case _: await sio_throw_error(sid, ErrCode.UNKNOWN_ERROR) return False logging.debug(f"{sid} connected as {username}") async def sio_throw_error(sid, error): await sio.emit("error", error.name, room=sid) @sio.on("disconnect") async def sio_disconnect(sid): user = sio_sessions[sid] logging.debug(f"{sid} disconnected") if user.in_room: logging.info(f"User {user.username} disconnected while in the room {user.in_room}") global users_to_disconnect t = Timer(ROOM_DISCONNECT_TIMEOUT, disconnect_user, args=[user]) users_to_disconnect[user.username] = t t.start() def disconnect_user(user): async def async_disconnect(): global rooms room = rooms[user.in_room] await room.remove_user(user) logging.info(f"User {user.username} disconnected from room {room.room_id}") user.in_room = False user.room_password = "" if room.is_empty(): logging.info(f"Room {room.room_id} is closed") del rooms[room.room_id] del room t = users_to_disconnect[user.username] t.cancel() del t del users_to_disconnect[user.username] asyncio.run(async_disconnect()) @sio.on("rollDices") async def sio_roll_dices(sid, token: str = ""): print("test") @sio.on("getReady") async def sio_get_ready(sid): user = sio_sessions[sid] if user.in_room: room = rooms[user.in_room] if not room.state == GameState.START: await sio_throw_error(sid, ErrCode.GAME_IS_ALREADY_STARTED) else: await room.set_user_ready(user)