# -*- coding: utf-8 -*- # Generated by https://github.com/connectrpc/connect-python. DO NOT EDIT! # source: codenames/v1/ssh_keys.proto from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Iterator, Mapping from typing import Protocol from connectrpc.client import ConnectClient, ConnectClientSync from connectrpc.code import Code from connectrpc.compression import Compression from connectrpc.errors import ConnectError from connectrpc.interceptor import Interceptor, InterceptorSync from connectrpc.method import IdempotencyLevel, MethodInfo from connectrpc.request import Headers, RequestContext from connectrpc.server import ConnectASGIApplication, ConnectWSGIApplication, Endpoint, EndpointSync import codenames.v1.ssh_keys_pb2 as codenames_dot_v1_dot_ssh__keys__pb2 class SSHKeyService(Protocol): async def submit_s_s_h_key(self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") async def list_teams_for_key_submission(self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") class SSHKeyServiceASGIApplication(ConnectASGIApplication[SSHKeyService]): def __init__(self, service: SSHKeyService | AsyncGenerator[SSHKeyService], *, interceptors: Iterable[Interceptor]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: super().__init__( service=service, endpoints=lambda svc: { "/codenames.v1.SSHKeyService/SubmitSSHKey": Endpoint.unary( method=MethodInfo( name="SubmitSSHKey", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=svc.submit_s_s_h_key, ), "/codenames.v1.SSHKeyService/ListTeamsForKeySubmission": Endpoint.unary( method=MethodInfo( name="ListTeamsForKeySubmission", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=svc.list_teams_for_key_submission, ), }, interceptors=interceptors, read_max_bytes=read_max_bytes, compressions=compressions, ) @property def path(self) -> str: """Returns the URL path to mount the application to when serving multiple applications.""" return "/codenames.v1.SSHKeyService" class SSHKeyServiceClient(ConnectClient): async def submit_s_s_h_key( self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: return await self.execute_unary( request=request, method=MethodInfo( name="SubmitSSHKey", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) async def list_teams_for_key_submission( self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: return await self.execute_unary( request=request, method=MethodInfo( name="ListTeamsForKeySubmission", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) class SSHKeyServiceSync(Protocol): def submit_s_s_h_key(self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") def list_teams_for_key_submission(self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: raise ConnectError(Code.UNIMPLEMENTED, "Not implemented") class SSHKeyServiceWSGIApplication(ConnectWSGIApplication): def __init__(self, service: SSHKeyServiceSync, interceptors: Iterable[InterceptorSync]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None: super().__init__( endpoints={ "/codenames.v1.SSHKeyService/SubmitSSHKey": EndpointSync.unary( method=MethodInfo( name="SubmitSSHKey", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=service.submit_s_s_h_key, ), "/codenames.v1.SSHKeyService/ListTeamsForKeySubmission": EndpointSync.unary( method=MethodInfo( name="ListTeamsForKeySubmission", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), function=service.list_teams_for_key_submission, ), }, interceptors=interceptors, read_max_bytes=read_max_bytes, compressions=compressions, ) @property def path(self) -> str: """Returns the URL path to mount the application to when serving multiple applications.""" return "/codenames.v1.SSHKeyService" class SSHKeyServiceClientSync(ConnectClientSync): def submit_s_s_h_key( self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse: return self.execute_unary( request=request, method=MethodInfo( name="SubmitSSHKey", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, ) def list_teams_for_key_submission( self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, *, headers: Headers | Mapping[str, str] | None = None, timeout_ms: int | None = None, ) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse: return self.execute_unary( request=request, method=MethodInfo( name="ListTeamsForKeySubmission", service_name="codenames.v1.SSHKeyService", input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse, idempotency_level=IdempotencyLevel.UNKNOWN, ), headers=headers, timeout_ms=timeout_ms, )