diff options
Diffstat (limited to 'codenames/v1/ssh_keys_connect.py')
| -rw-r--r-- | codenames/v1/ssh_keys_connect.py | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/codenames/v1/ssh_keys_connect.py b/codenames/v1/ssh_keys_connect.py new file mode 100644 index 0000000..adbaa6b --- /dev/null +++ b/codenames/v1/ssh_keys_connect.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# Generated by https://github.com/connectrpc/connect-python. DO NOT EDIT! +# source: codenames/v1/ssh_keys.proto + +from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Iterator, Mapping +from typing import Protocol + +from connectrpc.client import ConnectClient, ConnectClientSync +from connectrpc.code import Code +from connectrpc.compression import Compression +from connectrpc.errors import ConnectError +from connectrpc.interceptor import Interceptor, InterceptorSync +from connectrpc.method import IdempotencyLevel, MethodInfo +from connectrpc.request import Headers, RequestContext +from connectrpc.server import ConnectASGIApplication, ConnectWSGIApplication, Endpoint, EndpointSync +import codenames.v1.ssh_keys_pb2 as codenames_dot_v1_dot_ssh__keys__pb2 + + +class SSHKeyService(Protocol): + async def submit_s_s_h_key(self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + async def list_teams_for_key_submission(self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + +class SSHKeyServiceASGIApplication(ConnectASGIApplication[SSHKeyService]): + def __init__(self, service: SSHKeyService | AsyncGenerator[SSHKeyService], *, interceptors: Iterable[Interceptor]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: + super().__init__( + service=service, + endpoints=lambda svc: { + "/codenames.v1.SSHKeyService/SubmitSSHKey": Endpoint.unary( + method=MethodInfo( + name="SubmitSSHKey", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=svc.submit_s_s_h_key, + ), + "/codenames.v1.SSHKeyService/ListTeamsForKeySubmission": Endpoint.unary( + method=MethodInfo( + name="ListTeamsForKeySubmission", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=svc.list_teams_for_key_submission, + ), + }, + interceptors=interceptors, + read_max_bytes=read_max_bytes, + compressions=compressions, + ) + + @property + def path(self) -> str: + """Returns the URL path to mount the application to when serving multiple applications.""" + return "/codenames.v1.SSHKeyService" + + +class SSHKeyServiceClient(ConnectClient): + async def submit_s_s_h_key( + self, + request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: + return await self.execute_unary( + request=request, + method=MethodInfo( + name="SubmitSSHKey", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + async def list_teams_for_key_submission( + self, + request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: + return await self.execute_unary( + request=request, + method=MethodInfo( + name="ListTeamsForKeySubmission", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + +class SSHKeyServiceSync(Protocol): + def submit_s_s_h_key(self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + def list_teams_for_key_submission(self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: + raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") + + +class SSHKeyServiceWSGIApplication(ConnectWSGIApplication): + def __init__(self, service: SSHKeyServiceSync, interceptors: Iterable[InterceptorSync]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: + super().__init__( + endpoints={ + "/codenames.v1.SSHKeyService/SubmitSSHKey": EndpointSync.unary( + method=MethodInfo( + name="SubmitSSHKey", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=service.submit_s_s_h_key, + ), + "/codenames.v1.SSHKeyService/ListTeamsForKeySubmission": EndpointSync.unary( + method=MethodInfo( + name="ListTeamsForKeySubmission", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + function=service.list_teams_for_key_submission, + ), + }, + interceptors=interceptors, + read_max_bytes=read_max_bytes, + compressions=compressions, + ) + + @property + def path(self) -> str: + """Returns the URL path to mount the application to when serving multiple applications.""" + return "/codenames.v1.SSHKeyService" + + +class SSHKeyServiceClientSync(ConnectClientSync): + def submit_s_s_h_key( + self, + request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: + return self.execute_unary( + request=request, + method=MethodInfo( + name="SubmitSSHKey", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) + + def list_teams_for_key_submission( + self, + request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, + *, + headers: Headers | Mapping[str, str] | None = None, + timeout_ms: int | None = None, + ) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: + return self.execute_unary( + request=request, + method=MethodInfo( + name="ListTeamsForKeySubmission", + service_name="codenames.v1.SSHKeyService", + input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, + output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, + idempotency_level=IdempotencyLevel.UNKNOWN, + ), + headers=headers, + timeout_ms=timeout_ms, + ) |