summaryrefslogtreecommitdiffstats
path: root/src/aristaproto/templates
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-29 09:40:12 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-29 09:40:12 +0000
commit14b40ec77a4bf8605789cc3aff0eb87625510a41 (patch)
tree4064d27144d6deaabfcd96df01bd996baa8b51a0 /src/aristaproto/templates
parentInitial commit. (diff)
downloadpython-aristaproto-14b40ec77a4bf8605789cc3aff0eb87625510a41.tar.xz
python-aristaproto-14b40ec77a4bf8605789cc3aff0eb87625510a41.zip
Adding upstream version 1.2+20240521.upstream/1.2+20240521upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/aristaproto/templates')
-rw-r--r--src/aristaproto/templates/template.py.j2257
1 files changed, 257 insertions, 0 deletions
diff --git a/src/aristaproto/templates/template.py.j2 b/src/aristaproto/templates/template.py.j2
new file mode 100644
index 0000000..f2f1425
--- /dev/null
+++ b/src/aristaproto/templates/template.py.j2
@@ -0,0 +1,257 @@
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# sources: {{ ', '.join(output_file.input_filenames) }}
+# plugin: python-aristaproto
+# This file has been @generated
+{% for i in output_file.python_module_imports|sort %}
+import {{ i }}
+{% endfor %}
+
+{% if output_file.pydantic_dataclasses %}
+from typing import TYPE_CHECKING
+if TYPE_CHECKING:
+ from dataclasses import dataclass
+else:
+ from pydantic.dataclasses import dataclass
+{%- else -%}
+from dataclasses import dataclass
+{% endif %}
+
+{% if output_file.datetime_imports %}
+from datetime import {% for i in output_file.datetime_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %}
+
+{% endif%}
+{% if output_file.typing_imports %}
+from typing import {% for i in output_file.typing_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %}
+
+{% endif %}
+
+{% if output_file.pydantic_imports %}
+from pydantic import {% for i in output_file.pydantic_imports|sort %}{{ i }}{% if not loop.last %}, {% endif %}{% endfor %}
+
+{% endif %}
+
+import aristaproto
+{% if output_file.services %}
+from aristaproto.grpc.grpclib_server import ServiceBase
+import grpclib
+{% endif %}
+
+{% for i in output_file.imports|sort %}
+{{ i }}
+{% endfor %}
+
+{% if output_file.imports_type_checking_only %}
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+{% for i in output_file.imports_type_checking_only|sort %} {{ i }}
+{% endfor %}
+{% endif %}
+
+{% if output_file.enums %}{% for enum in output_file.enums %}
+class {{ enum.py_name }}(aristaproto.Enum):
+ {% if enum.comment %}
+{{ enum.comment }}
+
+ {% endif %}
+ {% for entry in enum.entries %}
+ {{ entry.name }} = {{ entry.value }}
+ {% if entry.comment %}
+{{ entry.comment }}
+
+ {% endif %}
+ {% endfor %}
+
+
+{% endfor %}
+{% endif %}
+{% for message in output_file.messages %}
+@dataclass(eq=False, repr=False)
+class {{ message.py_name }}(aristaproto.Message):
+ {% if message.comment %}
+{{ message.comment }}
+
+ {% endif %}
+ {% for field in message.fields %}
+ {{ field.get_field_string() }}
+ {% if field.comment %}
+{{ field.comment }}
+
+ {% endif %}
+ {% endfor %}
+ {% if not message.fields %}
+ pass
+ {% endif %}
+
+ {% if message.deprecated or message.has_deprecated_fields %}
+ def __post_init__(self) -> None:
+ {% if message.deprecated %}
+ warnings.warn("{{ message.py_name }} is deprecated", DeprecationWarning)
+ {% endif %}
+ super().__post_init__()
+ {% for field in message.deprecated_fields %}
+ if self.is_set("{{ field }}"):
+ warnings.warn("{{ message.py_name }}.{{ field }} is deprecated", DeprecationWarning)
+ {% endfor %}
+ {% endif %}
+
+ {% if output_file.pydantic_dataclasses and message.has_oneof_fields %}
+ @root_validator()
+ def check_oneof(cls, values):
+ return cls._validate_field_groups(values)
+ {% endif %}
+
+{% endfor %}
+{% for service in output_file.services %}
+class {{ service.py_name }}Stub(aristaproto.ServiceStub):
+ {% if service.comment %}
+{{ service.comment }}
+
+ {% elif not service.methods %}
+ pass
+ {% endif %}
+ {% for method in service.methods %}
+ async def {{ method.py_name }}(self
+ {%- if not method.client_streaming -%}
+ {%- if method.py_input_message -%}, {{ method.py_input_message_param }}: "{{ method.py_input_message_type }}"{%- endif -%}
+ {%- else -%}
+ {# Client streaming: need a request iterator instead #}
+ , {{ method.py_input_message_param }}_iterator: Union[AsyncIterable["{{ method.py_input_message_type }}"], Iterable["{{ method.py_input_message_type }}"]]
+ {%- endif -%}
+ ,
+ *
+ , timeout: Optional[float] = None
+ , deadline: Optional["Deadline"] = None
+ , metadata: Optional["MetadataLike"] = None
+ ) -> {% if method.server_streaming %}AsyncIterator["{{ method.py_output_message_type }}"]{% else %}"{{ method.py_output_message_type }}"{% endif %}:
+ {% if method.comment %}
+{{ method.comment }}
+
+ {% endif %}
+ {% if method.server_streaming %}
+ {% if method.client_streaming %}
+ async for response in self._stream_stream(
+ "{{ method.route }}",
+ {{ method.py_input_message_param }}_iterator,
+ {{ method.py_input_message_type }},
+ {{ method.py_output_message_type.strip('"') }},
+ timeout=timeout,
+ deadline=deadline,
+ metadata=metadata,
+ ):
+ yield response
+ {% else %}{# i.e. not client streaming #}
+ async for response in self._unary_stream(
+ "{{ method.route }}",
+ {{ method.py_input_message_param }},
+ {{ method.py_output_message_type.strip('"') }},
+ timeout=timeout,
+ deadline=deadline,
+ metadata=metadata,
+ ):
+ yield response
+
+ {% endif %}{# if client streaming #}
+ {% else %}{# i.e. not server streaming #}
+ {% if method.client_streaming %}
+ return await self._stream_unary(
+ "{{ method.route }}",
+ {{ method.py_input_message_param }}_iterator,
+ {{ method.py_input_message_type }},
+ {{ method.py_output_message_type.strip('"') }},
+ timeout=timeout,
+ deadline=deadline,
+ metadata=metadata,
+ )
+ {% else %}{# i.e. not client streaming #}
+ return await self._unary_unary(
+ "{{ method.route }}",
+ {{ method.py_input_message_param }},
+ {{ method.py_output_message_type.strip('"') }},
+ timeout=timeout,
+ deadline=deadline,
+ metadata=metadata,
+ )
+ {% endif %}{# client streaming #}
+ {% endif %}
+
+ {% endfor %}
+{% endfor %}
+
+{% for service in output_file.services %}
+class {{ service.py_name }}Base(ServiceBase):
+ {% if service.comment %}
+{{ service.comment }}
+
+ {% endif %}
+
+ {% for method in service.methods %}
+ async def {{ method.py_name }}(self
+ {%- if not method.client_streaming -%}
+ {%- if method.py_input_message -%}, {{ method.py_input_message_param }}: "{{ method.py_input_message_type }}"{%- endif -%}
+ {%- else -%}
+ {# Client streaming: need a request iterator instead #}
+ , {{ method.py_input_message_param }}_iterator: AsyncIterator["{{ method.py_input_message_type }}"]
+ {%- endif -%}
+ ) -> {% if method.server_streaming %}AsyncIterator["{{ method.py_output_message_type }}"]{% else %}"{{ method.py_output_message_type }}"{% endif %}:
+ {% if method.comment %}
+{{ method.comment }}
+
+ {% endif %}
+ raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+ {% if method.server_streaming %}
+ {# Commented out to avoid unreachable code. #}
+ {# yield {{ method.py_output_message_type }}() #}
+ {% endif %}
+
+ {% endfor %}
+
+ {% for method in service.methods %}
+ async def __rpc_{{ method.py_name }}(self, stream: "grpclib.server.Stream[{{ method.py_input_message_type }}, {{ method.py_output_message_type }}]") -> None:
+ {% if not method.client_streaming %}
+ request = await stream.recv_message()
+ {% else %}
+ request = stream.__aiter__()
+ {% endif %}
+ {% if not method.server_streaming %}
+ response = await self.{{ method.py_name }}(request)
+ await stream.send_message(response)
+ {% else %}
+ await self._call_rpc_handler_server_stream(
+ self.{{ method.py_name }},
+ stream,
+ request,
+ )
+ {% endif %}
+
+ {% endfor %}
+
+ def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
+ return {
+ {% for method in service.methods %}
+ "{{ method.route }}": grpclib.const.Handler(
+ self.__rpc_{{ method.py_name }},
+ {% if not method.client_streaming and not method.server_streaming %}
+ grpclib.const.Cardinality.UNARY_UNARY,
+ {% elif not method.client_streaming and method.server_streaming %}
+ grpclib.const.Cardinality.UNARY_STREAM,
+ {% elif method.client_streaming and not method.server_streaming %}
+ grpclib.const.Cardinality.STREAM_UNARY,
+ {% else %}
+ grpclib.const.Cardinality.STREAM_STREAM,
+ {% endif %}
+ {{ method.py_input_message_type }},
+ {{ method.py_output_message_type }},
+ ),
+ {% endfor %}
+ }
+
+{% endfor %}
+
+{% if output_file.pydantic_dataclasses %}
+{% for message in output_file.messages %}
+{% if message.has_message_field %}
+{{ message.py_name }}.__pydantic_model__.update_forward_refs() # type: ignore
+{% endif %}
+{% endfor %}
+{% endif %}