diff options
Diffstat (limited to 'codenames/v1/bot_connect.py')
| -rw-r--r-- | codenames/v1/bot_connect.py | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/codenames/v1/bot_connect.py b/codenames/v1/bot_connect.py new file mode 100644 index 0000000..4ac205d --- /dev/null +++ b/codenames/v1/bot_connect.py @@ -0,0 +1,318 @@ +# -*- coding: utf-8 -*- +# Generated by https://github.com/connectrpc/connect-python. DO NOT EDIT! +# source: codenames/v1/bot.proto + +from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Iterator, Mapping +from typing import Protocol + +from connectrpc.client import ConnectClient, ConnectClientSync +from connectrpc.code import Code +from connectrpc.compression import Compression +from connectrpc.errors import ConnectError +from connectrpc.interceptor import Interceptor, InterceptorSync +from connectrpc.method import IdempotencyLevel, MethodInfo +from connectrpc.request import Headers, RequestContext +from connectrpc.server import ConnectASGIApplication, ConnectWSGIApplication, Endpoint, EndpointSync +import codenames.v1.bot_pb2 as codenames_dot_v1_dot_bot__pb2 + + +class BotService(Protocol): + async def game_started(self, request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + async def give_clue(self, request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + async def make_guess(self, request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + async def game_ended(self, request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + +class BotServiceASGIApplication(ConnectASGIApplication[BotService]): + def __init__(self, service: BotService | AsyncGenerator[BotService], *, interceptors: Iterable[Interceptor]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: + super().__init__( + service=service, + endpoints=lambda svc: { + "/codenames.v1.BotService/GameStarted": Endpoint.unary( + method=MethodInfo( + name="GameStarted", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=svc.game_started, + ), + "/codenames.v1.BotService/GiveClue": Endpoint.unary( + method=MethodInfo( + name="GiveClue", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, + output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=svc.give_clue, + ), + "/codenames.v1.BotService/MakeGuess": Endpoint.unary( + method=MethodInfo( + name="MakeGuess", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, + output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=svc.make_guess, + ), + "/codenames.v1.BotService/GameEnded": Endpoint.unary( + method=MethodInfo( + name="GameEnded", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=svc.game_ended, + ), + }, + interceptors=interceptors, + read_max_bytes=read_max_bytes, + compressions=compressions, + ) + + @property + def path(self) -> str: + """Returns the URL path to mount the application to when serving multiple applications.""" + return "/codenames.v1.BotService" + + +class BotServiceClient(ConnectClient): + async def game_started( + self, + request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: + return await self.execute_unary( + request=request, + method=MethodInfo( + name="GameStarted", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + async def give_clue( + self, + request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: + return await self.execute_unary( + request=request, + method=MethodInfo( + name="GiveClue", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, + output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + async def make_guess( + self, + request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: + return await self.execute_unary( + request=request, + method=MethodInfo( + name="MakeGuess", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, + output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + async def game_ended( + self, + request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: + return await self.execute_unary( + request=request, + method=MethodInfo( + name="GameEnded", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + +class BotServiceSync(Protocol): + def game_started(self, request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + def give_clue(self, request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + def make_guess(self, request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + def game_ended(self, request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, ctx: RequestContext) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + +class BotServiceWSGIApplication(ConnectWSGIApplication): + def __init__(self, service: BotServiceSync, interceptors: Iterable[InterceptorSync]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: + super().__init__( + endpoints={ + "/codenames.v1.BotService/GameStarted": EndpointSync.unary( + method=MethodInfo( + name="GameStarted", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=service.game_started, + ), + "/codenames.v1.BotService/GiveClue": EndpointSync.unary( + method=MethodInfo( + name="GiveClue", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, + output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=service.give_clue, + ), + "/codenames.v1.BotService/MakeGuess": EndpointSync.unary( + method=MethodInfo( + name="MakeGuess", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, + output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=service.make_guess, + ), + "/codenames.v1.BotService/GameEnded": EndpointSync.unary( + method=MethodInfo( + name="GameEnded", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=service.game_ended, + ), + }, + interceptors=interceptors, + read_max_bytes=read_max_bytes, + compressions=compressions, + ) + + @property + def path(self) -> str: + """Returns the URL path to mount the application to when serving multiple applications.""" + return "/codenames.v1.BotService" + + +class BotServiceClientSync(ConnectClientSync): + def game_started( + self, + request: codenames_dot_v1_dot_bot__pb2.GameStartedRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.GameStartedResponse: + return self.execute_unary( + request=request, + method=MethodInfo( + name="GameStarted", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameStartedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameStartedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + def give_clue( + self, + request: codenames_dot_v1_dot_bot__pb2.GiveClueRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.GiveClueResponse: + return self.execute_unary( + request=request, + method=MethodInfo( + name="GiveClue", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GiveClueRequest, + output=codenames_dot_v1_dot_bot__pb2.GiveClueResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + def make_guess( + self, + request: codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.MakeGuessResponse: + return self.execute_unary( + request=request, + method=MethodInfo( + name="MakeGuess", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.MakeGuessRequest, + output=codenames_dot_v1_dot_bot__pb2.MakeGuessResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + def game_ended( + self, + request: codenames_dot_v1_dot_bot__pb2.GameEndedRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_bot__pb2.GameEndedResponse: + return self.execute_unary( + request=request, + method=MethodInfo( + name="GameEnded", + service_name="codenames.v1.BotService", + input=codenames_dot_v1_dot_bot__pb2.GameEndedRequest, + output=codenames_dot_v1_dot_bot__pb2.GameEndedResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) |