# -*- coding: utf-8 -*- # Generated by https://github.com/connectrpc/connect-python. DO NOT EDIT! # source: codenames/v1/bot.proto from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Iterator, Mapping from typing import Protocol from connectrpc.client import ConnectClient, ConnectClientSync from connectrpc.code import Code from connectrpc.compression import Compression from connectrpc.errors import ConnectError from connectrpc.interceptor import Interceptor, InterceptorSync from connectrpc.method import IdempotencyLevel, MethodInfo from connectrpc.request import Headers, RequestContext from connectrpc.server import ConnectASGIApplication, ConnectWSGIApplication, Endpoint, EndpointSync import codenames.v1.bot_pb2 as codenames_dot_v1_dot_bot__pb2 class BotService(Protocol): async def game_started(self, request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") async def give_clue(self, request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") async def make_guess(self, request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") async def game_ended(self, request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") class BotServiceASGIApplication(ConnectASGIApplication[BotService]): def __init__(self, service: BotService | AsyncGenerator[BotService], *, interceptors: Iterable[Interceptor]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: super().__init__( service=service, endpoints=lambda svc: { "/codenames.v1.BotService/GameStarted": Endpoint.unary( method=MethodInfo( name="GameStarted", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=svc.game_started, ), "/codenames.v1.BotService/GiveClue": Endpoint.unary( method=MethodInfo( name="GiveClue", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=svc.give_clue, ), "/codenames.v1.BotService/MakeGuess": Endpoint.unary( method=MethodInfo( name="MakeGuess", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=svc.make_guess, ), "/codenames.v1.BotService/GameEnded": Endpoint.unary( method=MethodInfo( name="GameEnded", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=svc.game_ended, ), }, interceptors=interceptors, read_max_bytes=read_max_bytes, compressions=compressions, ) @property def path(self) -> str: """Returns the URL path to mount the application to when serving multiple applications.""" return "/codenames.v1.BotService" class BotServiceClient(ConnectClient): async def game_started( self, request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: return await self.execute_unary( request=request, method=MethodInfo( name="GameStarted", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) async def give_clue( self, request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: return await self.execute_unary( request=request, method=MethodInfo( name="GiveClue", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) async def make_guess( self, request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: return await self.execute_unary( request=request, method=MethodInfo( name="MakeGuess", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) async def game_ended( self, request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: return await self.execute_unary( request=request, method=MethodInfo( name="GameEnded", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) class BotServiceSync(Protocol): def game_started(self, request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") def give_clue(self, request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") def make_guess(self, request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") def game_ended(self, request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") class BotServiceWSGIApplication(ConnectWSGIApplication): def __init__(self, service: BotServiceSync, interceptors: Iterable[InterceptorSync]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: super().__init__( endpoints={ "/codenames.v1.BotService/GameStarted": EndpointSync.unary( method=MethodInfo( name="GameStarted", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=service.game_started, ), "/codenames.v1.BotService/GiveClue": EndpointSync.unary( method=MethodInfo( name="GiveClue", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=service.give_clue, ), "/codenames.v1.BotService/MakeGuess": EndpointSync.unary( method=MethodInfo( name="MakeGuess", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=service.make_guess, ), "/codenames.v1.BotService/GameEnded": EndpointSync.unary( method=MethodInfo( name="GameEnded", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=service.game_ended, ), }, interceptors=interceptors, read_max_bytes=read_max_bytes, compressions=compressions, ) @property def path(self) -> str: """Returns the URL path to mount the application to when serving multiple applications.""" return "/codenames.v1.BotService" class BotServiceClientSync(ConnectClientSync): def game_started( self, request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: return self.execute_unary( request=request, method=MethodInfo( name="GameStarted", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) def give_clue( self, request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: return self.execute_unary( request=request, method=MethodInfo( name="GiveClue", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) def make_guess( self, request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: return self.execute_unary( request=request, method=MethodInfo( name="MakeGuess", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) def game_ended( self, request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: return self.execute_unary( request=request, method=MethodInfo( name="GameEnded", service_name="codenames.v1.BotService", input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, )