import logging import os import uuid from typing import Any from connectrpc.request import RequestContext from codenames.v1.bot_connect import BotService, BotServiceASGIApplication from codenames.v1.game_connect import GameServiceClient from codenames.v1 import bot_pb2, game_pb2 from app.bot import CodenamesBot logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) if "PORT" not in os.environ: raise RuntimeError("PORT environment variable must be set") PORT = int(os.environ["PORT"]) LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") IDENTITY_KEY = os.environ.get("IDENTITY_KEY", f"python-bot-{uuid.uuid4().hex[:8]}") GAME_SERVER_URL = os.environ.get("GAME_SERVER_URL", "https://hackathon.evroc.dev/api") TEAM_TOKEN_PATH = os.environ.get("TEAM_TOKEN_PATH", "/etc/codenames/game_key") logging.getLogger().setLevel(getattr(logging, LOG_LEVEL.upper())) logging.getLogger("app").setLevel(getattr(logging, LOG_LEVEL.upper())) bot = CodenamesBot() logger.info(f"Identity Key: {IDENTITY_KEY}") class BotServiceHandler(BotService): async def game_started( self, request: bot_pb2.GameStartedRequest, ctx: RequestContext ) -> bot_pb2.GameStartedResponse: return bot.handle_game_started(request) async def give_clue( self, request: bot_pb2.GiveClueRequest, ctx: RequestContext ) -> bot_pb2.GiveClueResponse: return bot.handle_give_clue(request) async def make_guess( self, request: bot_pb2.MakeGuessRequest, ctx: RequestContext ) -> bot_pb2.MakeGuessResponse: return bot.handle_make_guess(request) async def game_ended( self, request: bot_pb2.GameEndedRequest, ctx: RequestContext ) -> bot_pb2.GameEndedResponse: return bot.handle_game_ended(request) class LifespanASGIWrapper: def __init__(self, connect_app: Any) -> None: self.connect_app = connect_app self.registered_bot_id: str | None = None async def register_bot(self) -> None: import asyncio start_time = asyncio.get_event_loop().time() max_duration = 30 # seconds base_delay = 1 # seconds while True: try: if not os.path.exists(TEAM_TOKEN_PATH): raise RuntimeError( f"Team token not found at {TEAM_TOKEN_PATH}" ) with open(TEAM_TOKEN_PATH) as f: token = f.read().strip() if not token: raise RuntimeError( f"Team token file at {TEAM_TOKEN_PATH} is empty" ) logger.info(f"Registering bot with identity_key={IDENTITY_KEY}") client = GameServiceClient(GAME_SERVER_URL) request = game_pb2.RegisterBotRequest( identity_key=IDENTITY_KEY, port=PORT, ) response = await client.register_bot( request, headers={"Authorization": f"Bearer {token}"}, ) self.registered_bot_id = response.bot.bot_id logger.info(f"Bot registered successfully: {self.registered_bot_id}") return except Exception as e: elapsed = asyncio.get_event_loop().time() - start_time if elapsed >= max_duration: raise RuntimeError( f"Failed to register bot after {max_duration}s: {e}" ) from e delay = min(base_delay * (2 ** int(elapsed / 5)), 5) # Exponential backoff, max 5s logger.warning( f"Registration failed (retrying in {delay}s): {e}" ) await asyncio.sleep(delay) async def unregister_bot(self) -> None: try: if not self.registered_bot_id: logger.debug("No registered bot ID, skipping unregistration") return if not os.path.exists(TEAM_TOKEN_PATH): logger.warning( f"Team token not found at {TEAM_TOKEN_PATH}, cannot unregister" ) return with open(TEAM_TOKEN_PATH) as f: token = f.read().strip() if not token: logger.warning( f"Team token file at {TEAM_TOKEN_PATH} is empty, cannot unregister" ) return logger.info(f"Unregistering bot {self.registered_bot_id}") client = GameServiceClient(GAME_SERVER_URL) request = game_pb2.UnregisterBotRequest( bot_id=self.registered_bot_id, ) await client.unregister_bot( request, headers={"Authorization": f"Bearer {token}"}, ) logger.info("Bot unregistered successfully") except Exception as e: logger.error(f"Failed to unregister bot: {e}") async def __call__( self, scope: dict[str, Any], receive: Any, send: Any ) -> None: if scope.get("type") == "lifespan": await self._handle_lifespan(scope, receive, send) else: await self.connect_app(scope, receive, send) async def _handle_lifespan( self, scope: dict[str, Any], receive: Any, send: Any ) -> None: while True: message = await receive() if message["type"] == "lifespan.startup": await self.register_bot() await send({"type": "lifespan.startup.complete"}) elif message["type"] == "lifespan.shutdown": await self.unregister_bot() await send({"type": "lifespan.shutdown.complete"}) return connect_app = BotServiceASGIApplication(BotServiceHandler()) app = LifespanASGIWrapper(connect_app)