summaryrefslogtreecommitdiffstats
path: root/sqlglot/dataframe/sql/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'sqlglot/dataframe/sql/group.py')
-rw-r--r--sqlglot/dataframe/sql/group.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/sqlglot/dataframe/sql/group.py b/sqlglot/dataframe/sql/group.py
new file mode 100644
index 0000000..947aace
--- /dev/null
+++ b/sqlglot/dataframe/sql/group.py
@@ -0,0 +1,57 @@
+from __future__ import annotations
+
+import typing as t
+
+from sqlglot.dataframe.sql import functions as F
+from sqlglot.dataframe.sql.column import Column
+from sqlglot.dataframe.sql.operations import Operation, operation
+
+if t.TYPE_CHECKING:
+ from sqlglot.dataframe.sql.dataframe import DataFrame
+
+
+class GroupedData:
+ def __init__(self, df: DataFrame, group_by_cols: t.List[Column], last_op: Operation):
+ self._df = df.copy()
+ self.spark = df.spark
+ self.last_op = last_op
+ self.group_by_cols = group_by_cols
+
+ def _get_function_applied_columns(self, func_name: str, cols: t.Tuple[str, ...]) -> t.List[Column]:
+ func_name = func_name.lower()
+ return [getattr(F, func_name)(name).alias(f"{func_name}({name})") for name in cols]
+
+ @operation(Operation.SELECT)
+ def agg(self, *exprs: t.Union[Column, t.Dict[str, str]]) -> DataFrame:
+ columns = (
+ [Column(f"{agg_func}({column_name})") for column_name, agg_func in exprs[0].items()]
+ if isinstance(exprs[0], dict)
+ else exprs
+ )
+ cols = self._df._ensure_and_normalize_cols(columns)
+
+ expression = self._df.expression.group_by(*[x.expression for x in self.group_by_cols]).select(
+ *[x.expression for x in self.group_by_cols + cols], append=False
+ )
+ return self._df.copy(expression=expression)
+
+ def count(self) -> DataFrame:
+ return self.agg(F.count("*").alias("count"))
+
+ def mean(self, *cols: str) -> DataFrame:
+ return self.avg(*cols)
+
+ def avg(self, *cols: str) -> DataFrame:
+ return self.agg(*self._get_function_applied_columns("avg", cols))
+
+ def max(self, *cols: str) -> DataFrame:
+ return self.agg(*self._get_function_applied_columns("max", cols))
+
+ def min(self, *cols: str) -> DataFrame:
+ return self.agg(*self._get_function_applied_columns("min", cols))
+
+ def sum(self, *cols: str) -> DataFrame:
+ return self.agg(*self._get_function_applied_columns("sum", cols))
+
+ def pivot(self, *cols: str) -> DataFrame:
+ raise NotImplementedError("Sum distinct is not currently implemented")