diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-29 09:40:12 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-29 09:40:12 +0000 |
commit | 14b40ec77a4bf8605789cc3aff0eb87625510a41 (patch) | |
tree | 4064d27144d6deaabfcd96df01bd996baa8b51a0 /src/aristaproto/templates | |
parent | Initial commit. (diff) | |
download | python-aristaproto-14b40ec77a4bf8605789cc3aff0eb87625510a41.tar.xz python-aristaproto-14b40ec77a4bf8605789cc3aff0eb87625510a41.zip |
Adding upstream version 1.2+20240521.upstream/1.2+20240521upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/aristaproto/templates')
-rw-r--r-- | src/aristaproto/templates/template.py.j2 | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/src/aristaproto/templates/template.py.j2 b/src/aristaproto/templates/template.py.j2 new file mode 100644 index 0000000..f2f1425 --- /dev/null +++ b/src/aristaproto/templates/template.py.j2 @@ -0,0 +1,257 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# sources: {{ ', '.join(output_file.input_filenames) }} +# plugin: python-aristaproto +# This file has been @generated +{% for i in output_file.python_module_imports|sort %} +import {{ i }} +{% endfor %} + +{% if output_file.pydantic_dataclasses %} +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from dataclasses import dataclass +else: + from pydantic.dataclasses import dataclass +{%- else -%} +from dataclasses import dataclass +{% endif %} + +{% if output_file.datetime_imports %} +from datetime import {% for i in output_file.datetime_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %} + +{% endif%} +{% if output_file.typing_imports %} +from typing import {% for i in output_file.typing_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %} + +{% endif %} + +{% if output_file.pydantic_imports %} +from pydantic import {% for i in output_file.pydantic_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %} + +{% endif %} + +import aristaproto +{% if output_file.services %} +from aristaproto.grpc.grpclib_server import ServiceBase +import grpclib +{% endif %} + +{% for i in output_file.imports|sort %} +{{ i }} +{% endfor %} + +{% if output_file.imports_type_checking_only %} +from typing import TYPE_CHECKING + +if TYPE_CHECKING: +{% for i in output_file.imports_type_checking_only|sort %} {{ i }} +{% endfor %} +{% endif %} + +{% if output_file.enums %}{% for enum in output_file.enums %} +class {{ enum.py_name }}(aristaproto.Enum): + {% if enum.comment %} +{{ enum.comment }} + + {% endif %} + {% for entry in enum.entries %} + {{ entry.name }} = {{ entry.value }} + {% if entry.comment %} +{{ entry.comment }} + + {% endif %} + {% endfor %} + + +{% endfor %} +{% endif %} +{% for message in output_file.messages %} +@dataclass(eq=False, repr=False) +class {{ message.py_name }}(aristaproto.Message): + {% if message.comment %} +{{ message.comment }} + + {% endif %} + {% for field in message.fields %} + {{ field.get_field_string() }} + {% if field.comment %} +{{ field.comment }} + + {% endif %} + {% endfor %} + {% if not message.fields %} + pass + {% endif %} + + {% if message.deprecated or message.has_deprecated_fields %} + def __post_init__(self) -> None: + {% if message.deprecated %} + warnings.warn("{{ message.py_name }} is deprecated", DeprecationWarning) + {% endif %} + super().__post_init__() + {% for field in message.deprecated_fields %} + if self.is_set("{{ field }}"): + warnings.warn("{{ message.py_name }}.{{ field }} is deprecated", DeprecationWarning) + {% endfor %} + {% endif %} + + {% if output_file.pydantic_dataclasses and message.has_oneof_fields %} + @root_validator() + def check_oneof(cls, values): + return cls._validate_field_groups(values) + {% endif %} + +{% endfor %} +{% for service in output_file.services %} +class {{ service.py_name }}Stub(aristaproto.ServiceStub): + {% if service.comment %} +{{ service.comment }} + + {% elif not service.methods %} + pass + {% endif %} + {% for method in service.methods %} + async def {{ method.py_name }}(self + {%- if not method.client_streaming -%} + {%- if method.py_input_message -%}, {{ method.py_input_message_param }}: "{{ method.py_input_message_type }}"{%- endif -%} + {%- else -%} + {# Client streaming: need a request iterator instead #} + , {{ method.py_input_message_param }}_iterator: Union[AsyncIterable["{{ method.py_input_message_type }}"], Iterable["{{ method.py_input_message_type }}"]] + {%- endif -%} + , + * + , timeout: Optional[float] = None + , deadline: Optional["Deadline"] = None + , metadata: Optional["MetadataLike"] = None + ) -> {% if method.server_streaming %}AsyncIterator["{{ method.py_output_message_type }}"]{% else %}"{{ method.py_output_message_type }}"{% endif %}: + {% if method.comment %} +{{ method.comment }} + + {% endif %} + {% if method.server_streaming %} + {% if method.client_streaming %} + async for response in self._stream_stream( + "{{ method.route }}", + {{ method.py_input_message_param }}_iterator, + {{ method.py_input_message_type }}, + {{ method.py_output_message_type.strip('"') }}, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ): + yield response + {% else %}{# i.e. not client streaming #} + async for response in self._unary_stream( + "{{ method.route }}", + {{ method.py_input_message_param }}, + {{ method.py_output_message_type.strip('"') }}, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ): + yield response + + {% endif %}{# if client streaming #} + {% else %}{# i.e. not server streaming #} + {% if method.client_streaming %} + return await self._stream_unary( + "{{ method.route }}", + {{ method.py_input_message_param }}_iterator, + {{ method.py_input_message_type }}, + {{ method.py_output_message_type.strip('"') }}, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + {% else %}{# i.e. not client streaming #} + return await self._unary_unary( + "{{ method.route }}", + {{ method.py_input_message_param }}, + {{ method.py_output_message_type.strip('"') }}, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + {% endif %}{# client streaming #} + {% endif %} + + {% endfor %} +{% endfor %} + +{% for service in output_file.services %} +class {{ service.py_name }}Base(ServiceBase): + {% if service.comment %} +{{ service.comment }} + + {% endif %} + + {% for method in service.methods %} + async def {{ method.py_name }}(self + {%- if not method.client_streaming -%} + {%- if method.py_input_message -%}, {{ method.py_input_message_param }}: "{{ method.py_input_message_type }}"{%- endif -%} + {%- else -%} + {# Client streaming: need a request iterator instead #} + , {{ method.py_input_message_param }}_iterator: AsyncIterator["{{ method.py_input_message_type }}"] + {%- endif -%} + ) -> {% if method.server_streaming %}AsyncIterator["{{ method.py_output_message_type }}"]{% else %}"{{ method.py_output_message_type }}"{% endif %}: + {% if method.comment %} +{{ method.comment }} + + {% endif %} + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + {% if method.server_streaming %} + {# Commented out to avoid unreachable code. #} + {# yield {{ method.py_output_message_type }}() #} + {% endif %} + + {% endfor %} + + {% for method in service.methods %} + async def __rpc_{{ method.py_name }}(self, stream: "grpclib.server.Stream[{{ method.py_input_message_type }}, {{ method.py_output_message_type }}]") -> None: + {% if not method.client_streaming %} + request = await stream.recv_message() + {% else %} + request = stream.__aiter__() + {% endif %} + {% if not method.server_streaming %} + response = await self.{{ method.py_name }}(request) + await stream.send_message(response) + {% else %} + await self._call_rpc_handler_server_stream( + self.{{ method.py_name }}, + stream, + request, + ) + {% endif %} + + {% endfor %} + + def __mapping__(self) -> Dict[str, grpclib.const.Handler]: + return { + {% for method in service.methods %} + "{{ method.route }}": grpclib.const.Handler( + self.__rpc_{{ method.py_name }}, + {% if not method.client_streaming and not method.server_streaming %} + grpclib.const.Cardinality.UNARY_UNARY, + {% elif not method.client_streaming and method.server_streaming %} + grpclib.const.Cardinality.UNARY_STREAM, + {% elif method.client_streaming and not method.server_streaming %} + grpclib.const.Cardinality.STREAM_UNARY, + {% else %} + grpclib.const.Cardinality.STREAM_STREAM, + {% endif %} + {{ method.py_input_message_type }}, + {{ method.py_output_message_type }}, + ), + {% endfor %} + } + +{% endfor %} + +{% if output_file.pydantic_dataclasses %} +{% for message in output_file.messages %} +{% if message.has_message_field %} +{{ message.py_name }}.__pydantic_model__.update_forward_refs() # type: ignore +{% endif %} +{% endfor %} +{% endif %} |