# Generated by the protocol buffer compiler. DO NOT EDIT! # sources: {{ ', '.join(output_file.input_filenames) }} # plugin: python-aristaproto # This file has been @generated {% for i in output_file.python_module_imports|sort %} import {{ i }} {% endfor %} {% if output_file.pydantic_dataclasses %} from typing import TYPE_CHECKING if TYPE_CHECKING: from dataclasses import dataclass else: from pydantic.dataclasses import dataclass {%- else -%} from dataclasses import dataclass {% endif %} {% if output_file.datetime_imports %} from datetime import {% for i in output_file.datetime_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %} {% endif%} {% if output_file.typing_imports %} from typing import {% for i in output_file.typing_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %} {% endif %} {% if output_file.pydantic_imports %} from pydantic import {% for i in output_file.pydantic_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %} {% endif %} import aristaproto {% if output_file.services %} from aristaproto.grpc.grpclib_server import ServiceBase import grpclib {% endif %} {% for i in output_file.imports|sort %} {{ i }} {% endfor %} {% if output_file.imports_type_checking_only %} from typing import TYPE_CHECKING if TYPE_CHECKING: {% for i in output_file.imports_type_checking_only|sort %} {{ i }} {% endfor %} {% endif %} {% if output_file.enums %}{% for enum in output_file.enums %} class {{ enum.py_name }}(aristaproto.Enum): {% if enum.comment %} {{ enum.comment }} {% endif %} {% for entry in enum.entries %} {{ entry.name }} = {{ entry.value }} {% if entry.comment %} {{ entry.comment }} {% endif %} {% endfor %} {% endfor %} {% endif %} {% for message in output_file.messages %} @dataclass(eq=False, repr=False) class {{ message.py_name }}(aristaproto.Message): {% if message.comment %} {{ message.comment }} {% endif %} {% for field in message.fields %} {{ field.get_field_string() }} {% if field.comment %} {{ field.comment }} {% endif %} {% endfor %} {% if not message.fields %} pass {% endif %} {% if message.deprecated or message.has_deprecated_fields %} def __post_init__(self) -> None: {% if message.deprecated %} warnings.warn("{{ message.py_name }} is deprecated", DeprecationWarning) {% endif %} super().__post_init__() {% for field in message.deprecated_fields %} if self.is_set("{{ field }}"): warnings.warn("{{ message.py_name }}.{{ field }} is deprecated", DeprecationWarning) {% endfor %} {% endif %} {% if output_file.pydantic_dataclasses and message.has_oneof_fields %} @root_validator() def check_oneof(cls, values): return cls._validate_field_groups(values) {% endif %} {% endfor %} {% for service in output_file.services %} class {{ service.py_name }}Stub(aristaproto.ServiceStub): {% if service.comment %} {{ service.comment }} {% elif not service.methods %} pass {% endif %} {% for method in service.methods %} async def {{ method.py_name }}(self {%- if not method.client_streaming -%} {%- if method.py_input_message -%}, {{ method.py_input_message_param }}: "{{ method.py_input_message_type }}"{%- endif -%} {%- else -%} {# Client streaming: need a request iterator instead #} , {{ method.py_input_message_param }}_iterator: Union[AsyncIterable["{{ method.py_input_message_type }}"], Iterable["{{ method.py_input_message_type }}"]] {%- endif -%} , * , timeout: Optional[float] = None , deadline: Optional["Deadline"] = None , metadata: Optional["MetadataLike"] = None ) -> {% if method.server_streaming %}AsyncIterator["{{ method.py_output_message_type }}"]{% else %}"{{ method.py_output_message_type }}"{% endif %}: {% if method.comment %} {{ method.comment }} {% endif %} {% if method.server_streaming %} {% if method.client_streaming %} async for response in self._stream_stream( "{{ method.route }}", {{ method.py_input_message_param }}_iterator, {{ method.py_input_message_type }}, {{ method.py_output_message_type.strip('"') }}, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response {% else %}{# i.e. not client streaming #} async for response in self._unary_stream( "{{ method.route }}", {{ method.py_input_message_param }}, {{ method.py_output_message_type.strip('"') }}, timeout=timeout, deadline=deadline, metadata=metadata, ): yield response {% endif %}{# if client streaming #} {% else %}{# i.e. not server streaming #} {% if method.client_streaming %} return await self._stream_unary( "{{ method.route }}", {{ method.py_input_message_param }}_iterator, {{ method.py_input_message_type }}, {{ method.py_output_message_type.strip('"') }}, timeout=timeout, deadline=deadline, metadata=metadata, ) {% else %}{# i.e. not client streaming #} return await self._unary_unary( "{{ method.route }}", {{ method.py_input_message_param }}, {{ method.py_output_message_type.strip('"') }}, timeout=timeout, deadline=deadline, metadata=metadata, ) {% endif %}{# client streaming #} {% endif %} {% endfor %} {% endfor %} {% for service in output_file.services %} class {{ service.py_name }}Base(ServiceBase): {% if service.comment %} {{ service.comment }} {% endif %} {% for method in service.methods %} async def {{ method.py_name }}(self {%- if not method.client_streaming -%} {%- if method.py_input_message -%}, {{ method.py_input_message_param }}: "{{ method.py_input_message_type }}"{%- endif -%} {%- else -%} {# Client streaming: need a request iterator instead #} , {{ method.py_input_message_param }}_iterator: AsyncIterator["{{ method.py_input_message_type }}"] {%- endif -%} ) -> {% if method.server_streaming %}AsyncIterator["{{ method.py_output_message_type }}"]{% else %}"{{ method.py_output_message_type }}"{% endif %}: {% if method.comment %} {{ method.comment }} {% endif %} raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) {% if method.server_streaming %} {# Commented out to avoid unreachable code. #} {# yield {{ method.py_output_message_type }}() #} {% endif %} {% endfor %} {% for method in service.methods %} async def __rpc_{{ method.py_name }}(self, stream: "grpclib.server.Stream[{{ method.py_input_message_type }}, {{ method.py_output_message_type }}]") -> None: {% if not method.client_streaming %} request = await stream.recv_message() {% else %} request = stream.__aiter__() {% endif %} {% if not method.server_streaming %} response = await self.{{ method.py_name }}(request) await stream.send_message(response) {% else %} await self._call_rpc_handler_server_stream( self.{{ method.py_name }}, stream, request, ) {% endif %} {% endfor %} def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { {% for method in service.methods %} "{{ method.route }}": grpclib.const.Handler( self.__rpc_{{ method.py_name }}, {% if not method.client_streaming and not method.server_streaming %} grpclib.const.Cardinality.UNARY_UNARY, {% elif not method.client_streaming and method.server_streaming %} grpclib.const.Cardinality.UNARY_STREAM, {% elif method.client_streaming and not method.server_streaming %} grpclib.const.Cardinality.STREAM_UNARY, {% else %} grpclib.const.Cardinality.STREAM_STREAM, {% endif %} {{ method.py_input_message_type }}, {{ method.py_output_message_type }}, ), {% endfor %} } {% endfor %} {% if output_file.pydantic_dataclasses %} {% for message in output_file.messages %} {% if message.has_message_field %} {{ message.py_name }}.__pydantic_model__.update_forward_refs() # type: ignore {% endif %} {% endfor %} {% endif %}