summaryrefslogtreecommitdiff
path: root/codenames/v1/ssh_keys_connect.py
diff options
context:
space:
mode:
Diffstat (limited to 'codenames/v1/ssh_keys_connect.py')
-rw-r--r--codenames/v1/ssh_keys_connect.py188
1 files changed, 188 insertions, 0 deletions
diff --git a/codenames/v1/ssh_keys_connect.py b/codenames/v1/ssh_keys_connect.py
new file mode 100644
index 0000000..adbaa6b
--- /dev/null
+++ b/codenames/v1/ssh_keys_connect.py
@@ -0,0 +1,188 @@
+# -*- coding: utf-8 -*-
+# Generated by https://github.com/connectrpc/connect-python. DO NOT EDIT!
+# source: codenames/v1/ssh_keys.proto
+
+from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Iterator, Mapping
+from typing import Protocol
+
+from connectrpc.client import ConnectClient, ConnectClientSync
+from connectrpc.code import Code
+from connectrpc.compression import Compression
+from connectrpc.errors import ConnectError
+from connectrpc.interceptor import Interceptor, InterceptorSync
+from connectrpc.method import IdempotencyLevel, MethodInfo
+from connectrpc.request import Headers, RequestContext
+from connectrpc.server import ConnectASGIApplication, ConnectWSGIApplication, Endpoint, EndpointSync
+import codenames.v1.ssh_keys_pb2 as codenames_dot_v1_dot_ssh__keys__pb2
+
+
+class SSHKeyService(Protocol):
+ async def submit_s_s_h_key(self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse:
+ raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
+
+ async def list_teams_for_key_submission(self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse:
+ raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
+
+
+class SSHKeyServiceASGIApplication(ConnectASGIApplication[SSHKeyService]):
+ def __init__(self, service: SSHKeyService | AsyncGenerator[SSHKeyService], *, interceptors: Iterable[Interceptor]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None:
+ super().__init__(
+ service=service,
+ endpoints=lambda svc: {
+ "/codenames.v1.SSHKeyService/SubmitSSHKey": Endpoint.unary(
+ method=MethodInfo(
+ name="SubmitSSHKey",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ function=svc.submit_s_s_h_key,
+ ),
+ "/codenames.v1.SSHKeyService/ListTeamsForKeySubmission": Endpoint.unary(
+ method=MethodInfo(
+ name="ListTeamsForKeySubmission",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ function=svc.list_teams_for_key_submission,
+ ),
+ },
+ interceptors=interceptors,
+ read_max_bytes=read_max_bytes,
+ compressions=compressions,
+ )
+
+ @property
+ def path(self) -> str:
+ """Returns the URL path to mount the application to when serving multiple applications."""
+ return "/codenames.v1.SSHKeyService"
+
+
+class SSHKeyServiceClient(ConnectClient):
+ async def submit_s_s_h_key(
+ self,
+ request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest,
+ *,
+ headers: Headers | Mapping[str, str] | None = None,
+ timeout_ms: int | None = None,
+ ) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse:
+ return await self.execute_unary(
+ request=request,
+ method=MethodInfo(
+ name="SubmitSSHKey",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ headers=headers,
+ timeout_ms=timeout_ms,
+ )
+
+ async def list_teams_for_key_submission(
+ self,
+ request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest,
+ *,
+ headers: Headers | Mapping[str, str] | None = None,
+ timeout_ms: int | None = None,
+ ) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse:
+ return await self.execute_unary(
+ request=request,
+ method=MethodInfo(
+ name="ListTeamsForKeySubmission",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ headers=headers,
+ timeout_ms=timeout_ms,
+ )
+
+
+class SSHKeyServiceSync(Protocol):
+ def submit_s_s_h_key(self, request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse:
+ raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
+ def list_teams_for_key_submission(self, request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest, ctx: RequestContext) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse:
+ raise ConnectError(Code.UNIMPLEMENTED, "Not implemented")
+
+
+class SSHKeyServiceWSGIApplication(ConnectWSGIApplication):
+ def __init__(self, service: SSHKeyServiceSync, interceptors: Iterable[InterceptorSync]=(), read_max_bytes: int | None = None, compressions: Iterable[Compression] | None = None) -> None:
+ super().__init__(
+ endpoints={
+ "/codenames.v1.SSHKeyService/SubmitSSHKey": EndpointSync.unary(
+ method=MethodInfo(
+ name="SubmitSSHKey",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ function=service.submit_s_s_h_key,
+ ),
+ "/codenames.v1.SSHKeyService/ListTeamsForKeySubmission": EndpointSync.unary(
+ method=MethodInfo(
+ name="ListTeamsForKeySubmission",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ function=service.list_teams_for_key_submission,
+ ),
+ },
+ interceptors=interceptors,
+ read_max_bytes=read_max_bytes,
+ compressions=compressions,
+ )
+
+ @property
+ def path(self) -> str:
+ """Returns the URL path to mount the application to when serving multiple applications."""
+ return "/codenames.v1.SSHKeyService"
+
+
+class SSHKeyServiceClientSync(ConnectClientSync):
+ def submit_s_s_h_key(
+ self,
+ request: codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest,
+ *,
+ headers: Headers | Mapping[str, str] | None = None,
+ timeout_ms: int | None = None,
+ ) -> codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse:
+ return self.execute_unary(
+ request=request,
+ method=MethodInfo(
+ name="SubmitSSHKey",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.SubmitSSHKeyResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ headers=headers,
+ timeout_ms=timeout_ms,
+ )
+
+ def list_teams_for_key_submission(
+ self,
+ request: codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest,
+ *,
+ headers: Headers | Mapping[str, str] | None = None,
+ timeout_ms: int | None = None,
+ ) -> codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse:
+ return self.execute_unary(
+ request=request,
+ method=MethodInfo(
+ name="ListTeamsForKeySubmission",
+ service_name="codenames.v1.SSHKeyService",
+ input=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionRequest,
+ output=codenames_dot_v1_dot_ssh__keys__pb2.ListTeamsForKeySubmissionResponse,
+ idempotency_level=IdempotencyLevel.UNKNOWN,
+ ),
+ headers=headers,
+ timeout_ms=timeout_ms,
+ )