# Copyright (C) 2023 Evgenij Titarenko # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from fastapi import FastAPI from fastapi_socketio import SocketManager from cachetools import TTLCache from game import Room, User, load_streets load_streets() app = FastAPI() sio = SocketManager(app=app) sio_sessions = TTLCache(maxsize=10000, ttl=24 * 60 * 60) rooms = [] @sio.on("connect") async def sio_connect(sid): sio_sessions[sid] = { "sid": sid, "user": None, "room": None, } @sio.on("disconnect") async def sio_disconnect(sid): if sid not in sio_sessions[sid]: return room = sio_sessions[sid].get("room", None) user = sio_sessions[sid].get("user", None) if room is not None: room.remove_user(user) if not room.users: del room del user del sio_sessions[sid] @sio.on("joinRoom") async def sio_join_room(sid, username: str = "", password: str = ""): pass @sio.on("createRoom") async def sio_create_room(sid, username: str = "", password: str = ""): if sid not in sio_sessions[sid]: return new_room = Room(password) new_user = User(username) new_room.add_user(new_user) sio_sessions[sid]["room"] = new_room sio_sessions[sid]["user"] = new_user @sio.on("rollDices") async def sio_roll_dices(sid, token: str = ""): pass