summaryrefslogtreecommitdiffstats
path: root/src/bin/psql/crosstabview.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/psql/crosstabview.c')
-rw-r--r--src/bin/psql/crosstabview.c713
1 files changed, 713 insertions, 0 deletions
diff --git a/src/bin/psql/crosstabview.c b/src/bin/psql/crosstabview.c
new file mode 100644
index 0000000..f06cb06
--- /dev/null
+++ b/src/bin/psql/crosstabview.c
@@ -0,0 +1,713 @@
+/*
+ * psql - the PostgreSQL interactive terminal
+ *
+ * Copyright (c) 2000-2020, PostgreSQL Global Development Group
+ *
+ * src/bin/psql/crosstabview.c
+ */
+#include "postgres_fe.h"
+
+#include "common.h"
+#include "common/logging.h"
+#include "crosstabview.h"
+#include "pqexpbuffer.h"
+#include "psqlscanslash.h"
+#include "settings.h"
+
+/*
+ * Value/position from the resultset that goes into the horizontal or vertical
+ * crosstabview header.
+ */
+typedef struct _pivot_field
+{
+ /*
+ * Pointer obtained from PQgetvalue() for colV or colH. Each distinct
+ * value becomes an entry in the vertical header (colV), or horizontal
+ * header (colH). A Null value is represented by a NULL pointer.
+ */
+ char *name;
+
+ /*
+ * When a sort is requested on an alternative column, this holds
+ * PQgetvalue() for the sort column corresponding to <name>. If <name>
+ * appear multiple times, it's the first value in the order of the results
+ * that is kept. A Null value is represented by a NULL pointer.
+ */
+ char *sort_value;
+
+ /*
+ * Rank of this value, starting at 0. Initially, it's the relative
+ * position of the first appearance of <name> in the resultset. For
+ * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
+ * When a sort column is specified, ranks get updated in a final pass to
+ * reflect the desired order.
+ */
+ int rank;
+} pivot_field;
+
+/* Node in avl_tree */
+typedef struct _avl_node
+{
+ /* Node contents */
+ pivot_field field;
+
+ /*
+ * Height of this node in the tree (number of nodes on the longest path to
+ * a leaf).
+ */
+ int height;
+
+ /*
+ * Child nodes. [0] points to left subtree, [1] to right subtree. Never
+ * NULL, points to the empty node avl_tree.end when no left or right
+ * value.
+ */
+ struct _avl_node *children[2];
+} avl_node;
+
+/*
+ * Control structure for the AVL tree (binary search tree kept
+ * balanced with the AVL algorithm)
+ */
+typedef struct _avl_tree
+{
+ int count; /* Total number of nodes */
+ avl_node *root; /* root of the tree */
+ avl_node *end; /* Immutable dereferenceable empty tree */
+} avl_tree;
+
+
+static bool printCrosstab(const PGresult *results,
+ int num_columns, pivot_field *piv_columns, int field_for_columns,
+ int num_rows, pivot_field *piv_rows, int field_for_rows,
+ int field_for_data);
+static void avlInit(avl_tree *tree);
+static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
+static int avlCollectFields(avl_tree *tree, avl_node *node,
+ pivot_field *fields, int idx);
+static void avlFree(avl_tree *tree, avl_node *node);
+static void rankSort(int num_columns, pivot_field *piv_columns);
+static int indexOfColumn(char *arg, const PGresult *res);
+static int pivotFieldCompare(const void *a, const void *b);
+static int rankCompare(const void *a, const void *b);
+
+
+/*
+ * Main entry point to this module.
+ *
+ * Process the data from *res according to the options in pset (global),
+ * to generate the horizontal and vertical headers contents,
+ * then call printCrosstab() for the actual output.
+ */
+bool
+PrintResultsInCrosstab(const PGresult *res)
+{
+ bool retval = false;
+ avl_tree piv_columns;
+ avl_tree piv_rows;
+ pivot_field *array_columns = NULL;
+ pivot_field *array_rows = NULL;
+ int num_columns = 0;
+ int num_rows = 0;
+ int field_for_rows;
+ int field_for_columns;
+ int field_for_data;
+ int sort_field_for_columns;
+ int rn;
+
+ avlInit(&piv_rows);
+ avlInit(&piv_columns);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("\\crosstabview: statement did not return a result set");
+ goto error_return;
+ }
+
+ if (PQnfields(res) < 3)
+ {
+ pg_log_error("\\crosstabview: query must return at least three columns");
+ goto error_return;
+ }
+
+ /* Process first optional arg (vertical header column) */
+ if (pset.ctv_args[0] == NULL)
+ field_for_rows = 0;
+ else
+ {
+ field_for_rows = indexOfColumn(pset.ctv_args[0], res);
+ if (field_for_rows < 0)
+ goto error_return;
+ }
+
+ /* Process second optional arg (horizontal header column) */
+ if (pset.ctv_args[1] == NULL)
+ field_for_columns = 1;
+ else
+ {
+ field_for_columns = indexOfColumn(pset.ctv_args[1], res);
+ if (field_for_columns < 0)
+ goto error_return;
+ }
+
+ /* Insist that header columns be distinct */
+ if (field_for_columns == field_for_rows)
+ {
+ pg_log_error("\\crosstabview: vertical and horizontal headers must be different columns");
+ goto error_return;
+ }
+
+ /* Process third optional arg (data column) */
+ if (pset.ctv_args[2] == NULL)
+ {
+ int i;
+
+ /*
+ * If the data column was not specified, we search for the one not
+ * used as either vertical or horizontal headers. Must be exactly
+ * three columns, or this won't be unique.
+ */
+ if (PQnfields(res) != 3)
+ {
+ pg_log_error("\\crosstabview: data column must be specified when query returns more than three columns");
+ goto error_return;
+ }
+
+ field_for_data = -1;
+ for (i = 0; i < PQnfields(res); i++)
+ {
+ if (i != field_for_rows && i != field_for_columns)
+ {
+ field_for_data = i;
+ break;
+ }
+ }
+ Assert(field_for_data >= 0);
+ }
+ else
+ {
+ field_for_data = indexOfColumn(pset.ctv_args[2], res);
+ if (field_for_data < 0)
+ goto error_return;
+ }
+
+ /* Process fourth optional arg (horizontal header sort column) */
+ if (pset.ctv_args[3] == NULL)
+ sort_field_for_columns = -1; /* no sort column */
+ else
+ {
+ sort_field_for_columns = indexOfColumn(pset.ctv_args[3], res);
+ if (sort_field_for_columns < 0)
+ goto error_return;
+ }
+
+ /*
+ * First part: accumulate the names that go into the vertical and
+ * horizontal headers, each into an AVL binary tree to build the set of
+ * DISTINCT values.
+ */
+
+ for (rn = 0; rn < PQntuples(res); rn++)
+ {
+ char *val;
+ char *val1;
+
+ /* horizontal */
+ val = PQgetisnull(res, rn, field_for_columns) ? NULL :
+ PQgetvalue(res, rn, field_for_columns);
+ val1 = NULL;
+
+ if (sort_field_for_columns >= 0 &&
+ !PQgetisnull(res, rn, sort_field_for_columns))
+ val1 = PQgetvalue(res, rn, sort_field_for_columns);
+
+ avlMergeValue(&piv_columns, val, val1);
+
+ if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS)
+ {
+ pg_log_error("\\crosstabview: maximum number of columns (%d) exceeded",
+ CROSSTABVIEW_MAX_COLUMNS);
+ goto error_return;
+ }
+
+ /* vertical */
+ val = PQgetisnull(res, rn, field_for_rows) ? NULL :
+ PQgetvalue(res, rn, field_for_rows);
+
+ avlMergeValue(&piv_rows, val, NULL);
+ }
+
+ /*
+ * Second part: Generate sorted arrays from the AVL trees.
+ */
+
+ num_columns = piv_columns.count;
+ num_rows = piv_rows.count;
+
+ array_columns = (pivot_field *)
+ pg_malloc(sizeof(pivot_field) * num_columns);
+
+ array_rows = (pivot_field *)
+ pg_malloc(sizeof(pivot_field) * num_rows);
+
+ avlCollectFields(&piv_columns, piv_columns.root, array_columns, 0);
+ avlCollectFields(&piv_rows, piv_rows.root, array_rows, 0);
+
+ /*
+ * Third part: optionally, process the ranking data for the horizontal
+ * header
+ */
+ if (sort_field_for_columns >= 0)
+ rankSort(num_columns, array_columns);
+
+ /*
+ * Fourth part: print the crosstab'ed results.
+ */
+ retval = printCrosstab(res,
+ num_columns, array_columns, field_for_columns,
+ num_rows, array_rows, field_for_rows,
+ field_for_data);
+
+error_return:
+ avlFree(&piv_columns, piv_columns.root);
+ avlFree(&piv_rows, piv_rows.root);
+ pg_free(array_columns);
+ pg_free(array_rows);
+
+ return retval;
+}
+
+/*
+ * Output the pivoted resultset with the printTable* functions. Return true
+ * if successful, false otherwise.
+ */
+static bool
+printCrosstab(const PGresult *results,
+ int num_columns, pivot_field *piv_columns, int field_for_columns,
+ int num_rows, pivot_field *piv_rows, int field_for_rows,
+ int field_for_data)
+{
+ printQueryOpt popt = pset.popt;
+ printTableContent cont;
+ int i,
+ rn;
+ char col_align;
+ int *horiz_map;
+ bool retval = false;
+
+ printTableInit(&cont, &popt.topt, popt.title, num_columns + 1, num_rows);
+
+ /* Step 1: set target column names (horizontal header) */
+
+ /* The name of the first column is kept unchanged by the pivoting */
+ printTableAddHeader(&cont,
+ PQfname(results, field_for_rows),
+ false,
+ column_type_alignment(PQftype(results,
+ field_for_rows)));
+
+ /*
+ * To iterate over piv_columns[] by piv_columns[].rank, create a reverse
+ * map associating each piv_columns[].rank to its index in piv_columns.
+ * This avoids an O(N^2) loop later.
+ */
+ horiz_map = (int *) pg_malloc(sizeof(int) * num_columns);
+ for (i = 0; i < num_columns; i++)
+ horiz_map[piv_columns[i].rank] = i;
+
+ /*
+ * The display alignment depends on its PQftype().
+ */
+ col_align = column_type_alignment(PQftype(results, field_for_data));
+
+ for (i = 0; i < num_columns; i++)
+ {
+ char *colname;
+
+ colname = piv_columns[horiz_map[i]].name ?
+ piv_columns[horiz_map[i]].name :
+ (popt.nullPrint ? popt.nullPrint : "");
+
+ printTableAddHeader(&cont, colname, false, col_align);
+ }
+ pg_free(horiz_map);
+
+ /* Step 2: set row names in the first output column (vertical header) */
+ for (i = 0; i < num_rows; i++)
+ {
+ int k = piv_rows[i].rank;
+
+ cont.cells[k * (num_columns + 1)] = piv_rows[i].name ?
+ piv_rows[i].name :
+ (popt.nullPrint ? popt.nullPrint : "");
+ }
+ cont.cellsadded = num_rows * (num_columns + 1);
+
+ /*
+ * Step 3: fill in the content cells.
+ */
+ for (rn = 0; rn < PQntuples(results); rn++)
+ {
+ int row_number;
+ int col_number;
+ pivot_field *rp,
+ *cp;
+ pivot_field elt;
+
+ /* Find target row */
+ if (!PQgetisnull(results, rn, field_for_rows))
+ elt.name = PQgetvalue(results, rn, field_for_rows);
+ else
+ elt.name = NULL;
+ rp = (pivot_field *) bsearch(&elt,
+ piv_rows,
+ num_rows,
+ sizeof(pivot_field),
+ pivotFieldCompare);
+ Assert(rp != NULL);
+ row_number = rp->rank;
+
+ /* Find target column */
+ if (!PQgetisnull(results, rn, field_for_columns))
+ elt.name = PQgetvalue(results, rn, field_for_columns);
+ else
+ elt.name = NULL;
+
+ cp = (pivot_field *) bsearch(&elt,
+ piv_columns,
+ num_columns,
+ sizeof(pivot_field),
+ pivotFieldCompare);
+ Assert(cp != NULL);
+ col_number = cp->rank;
+
+ /* Place value into cell */
+ if (col_number >= 0 && row_number >= 0)
+ {
+ int idx;
+
+ /* index into the cont.cells array */
+ idx = 1 + col_number + row_number * (num_columns + 1);
+
+ /*
+ * If the cell already contains a value, raise an error.
+ */
+ if (cont.cells[idx] != NULL)
+ {
+ pg_log_error("\\crosstabview: query result contains multiple data values for row \"%s\", column \"%s\"",
+ rp->name ? rp->name :
+ (popt.nullPrint ? popt.nullPrint : "(null)"),
+ cp->name ? cp->name :
+ (popt.nullPrint ? popt.nullPrint : "(null)"));
+ goto error;
+ }
+
+ cont.cells[idx] = !PQgetisnull(results, rn, field_for_data) ?
+ PQgetvalue(results, rn, field_for_data) :
+ (popt.nullPrint ? popt.nullPrint : "");
+ }
+ }
+
+ /*
+ * The non-initialized cells must be set to an empty string for the print
+ * functions
+ */
+ for (i = 0; i < cont.cellsadded; i++)
+ {
+ if (cont.cells[i] == NULL)
+ cont.cells[i] = "";
+ }
+
+ printTable(&cont, pset.queryFout, false, pset.logfile);
+ retval = true;
+
+error:
+ printTableCleanup(&cont);
+
+ return retval;
+}
+
+/*
+ * The avl* functions below provide a minimalistic implementation of AVL binary
+ * trees, to efficiently collect the distinct values that will form the horizontal
+ * and vertical headers. It only supports adding new values, no removal or even
+ * search.
+ */
+static void
+avlInit(avl_tree *tree)
+{
+ tree->end = (avl_node *) pg_malloc0(sizeof(avl_node));
+ tree->end->children[0] = tree->end->children[1] = tree->end;
+ tree->count = 0;
+ tree->root = tree->end;
+}
+
+/* Deallocate recursively an AVL tree, starting from node */
+static void
+avlFree(avl_tree *tree, avl_node *node)
+{
+ if (node->children[0] != tree->end)
+ {
+ avlFree(tree, node->children[0]);
+ pg_free(node->children[0]);
+ }
+ if (node->children[1] != tree->end)
+ {
+ avlFree(tree, node->children[1]);
+ pg_free(node->children[1]);
+ }
+ if (node == tree->root)
+ {
+ /* free the root separately as it's not child of anything */
+ if (node != tree->end)
+ pg_free(node);
+ /* free the tree->end struct only once and when all else is freed */
+ pg_free(tree->end);
+ }
+}
+
+/* Set the height to 1 plus the greatest of left and right heights */
+static void
+avlUpdateHeight(avl_node *n)
+{
+ n->height = 1 + (n->children[0]->height > n->children[1]->height ?
+ n->children[0]->height :
+ n->children[1]->height);
+}
+
+/* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
+static avl_node *
+avlRotate(avl_node **current, int dir)
+{
+ avl_node *before = *current;
+ avl_node *after = (*current)->children[dir];
+
+ *current = after;
+ before->children[dir] = after->children[!dir];
+ avlUpdateHeight(before);
+ after->children[!dir] = before;
+
+ return after;
+}
+
+static int
+avlBalance(avl_node *n)
+{
+ return n->children[0]->height - n->children[1]->height;
+}
+
+/*
+ * After an insertion, possibly rebalance the tree so that the left and right
+ * node heights don't differ by more than 1.
+ * May update *node.
+ */
+static void
+avlAdjustBalance(avl_tree *tree, avl_node **node)
+{
+ avl_node *current = *node;
+ int b = avlBalance(current) / 2;
+
+ if (b != 0)
+ {
+ int dir = (1 - b) / 2;
+
+ if (avlBalance(current->children[dir]) == -b)
+ avlRotate(&current->children[dir], !dir);
+ current = avlRotate(node, dir);
+ }
+ if (current != tree->end)
+ avlUpdateHeight(current);
+}
+
+/*
+ * Insert a new value/field, starting from *node, reaching the correct position
+ * in the tree by recursion. Possibly rebalance the tree and possibly update
+ * *node. Do nothing if the value is already present in the tree.
+ */
+static void
+avlInsertNode(avl_tree *tree, avl_node **node, pivot_field field)
+{
+ avl_node *current = *node;
+
+ if (current == tree->end)
+ {
+ avl_node *new_node = (avl_node *)
+ pg_malloc(sizeof(avl_node));
+
+ new_node->height = 1;
+ new_node->field = field;
+ new_node->children[0] = new_node->children[1] = tree->end;
+ tree->count++;
+ *node = new_node;
+ }
+ else
+ {
+ int cmp = pivotFieldCompare(&field, &current->field);
+
+ if (cmp != 0)
+ {
+ avlInsertNode(tree,
+ cmp > 0 ? &current->children[1] : &current->children[0],
+ field);
+ avlAdjustBalance(tree, node);
+ }
+ }
+}
+
+/* Insert the value into the AVL tree, if it does not preexist */
+static void
+avlMergeValue(avl_tree *tree, char *name, char *sort_value)
+{
+ pivot_field field;
+
+ field.name = name;
+ field.rank = tree->count;
+ field.sort_value = sort_value;
+ avlInsertNode(tree, &tree->root, field);
+}
+
+/*
+ * Recursively extract node values into the names array, in sorted order with a
+ * left-to-right tree traversal.
+ * Return the next candidate offset to write into the names array.
+ * fields[] must be preallocated to hold tree->count entries
+ */
+static int
+avlCollectFields(avl_tree *tree, avl_node *node, pivot_field *fields, int idx)
+{
+ if (node == tree->end)
+ return idx;
+
+ idx = avlCollectFields(tree, node->children[0], fields, idx);
+ fields[idx] = node->field;
+ return avlCollectFields(tree, node->children[1], fields, idx + 1);
+}
+
+static void
+rankSort(int num_columns, pivot_field *piv_columns)
+{
+ int *hmap; /* [[offset in piv_columns, rank], ...for
+ * every header entry] */
+ int i;
+
+ hmap = (int *) pg_malloc(sizeof(int) * num_columns * 2);
+ for (i = 0; i < num_columns; i++)
+ {
+ char *val = piv_columns[i].sort_value;
+
+ /* ranking information is valid if non null and matches /^-?\d+$/ */
+ if (val &&
+ ((*val == '-' &&
+ strspn(val + 1, "0123456789") == strlen(val + 1)) ||
+ strspn(val, "0123456789") == strlen(val)))
+ {
+ hmap[i * 2] = atoi(val);
+ hmap[i * 2 + 1] = i;
+ }
+ else
+ {
+ /* invalid rank information ignored (equivalent to rank 0) */
+ hmap[i * 2] = 0;
+ hmap[i * 2 + 1] = i;
+ }
+ }
+
+ qsort(hmap, num_columns, sizeof(int) * 2, rankCompare);
+
+ for (i = 0; i < num_columns; i++)
+ {
+ piv_columns[hmap[i * 2 + 1]].rank = i;
+ }
+
+ pg_free(hmap);
+}
+
+/*
+ * Look up a column reference, which can be either:
+ * - a number from 1 to PQnfields(res)
+ * - a column name matching one of PQfname(res,...)
+ *
+ * Returns zero-based column number, or -1 if not found or ambiguous.
+ *
+ * Note: may modify contents of "arg" string.
+ */
+static int
+indexOfColumn(char *arg, const PGresult *res)
+{
+ int idx;
+
+ if (arg[0] && strspn(arg, "0123456789") == strlen(arg))
+ {
+ /* if arg contains only digits, it's a column number */
+ idx = atoi(arg) - 1;
+ if (idx < 0 || idx >= PQnfields(res))
+ {
+ pg_log_error("\\crosstabview: column number %d is out of range 1..%d",
+ idx + 1, PQnfields(res));
+ return -1;
+ }
+ }
+ else
+ {
+ int i;
+
+ /*
+ * Dequote and downcase the column name. By checking for all-digits
+ * before doing this, we can ensure that a quoted name is treated as a
+ * name even if it's all digits.
+ */
+ dequote_downcase_identifier(arg, true, pset.encoding);
+
+ /* Now look for match(es) among res' column names */
+ idx = -1;
+ for (i = 0; i < PQnfields(res); i++)
+ {
+ if (strcmp(arg, PQfname(res, i)) == 0)
+ {
+ if (idx >= 0)
+ {
+ /* another idx was already found for the same name */
+ pg_log_error("\\crosstabview: ambiguous column name: \"%s\"", arg);
+ return -1;
+ }
+ idx = i;
+ }
+ }
+ if (idx == -1)
+ {
+ pg_log_error("\\crosstabview: column name not found: \"%s\"", arg);
+ return -1;
+ }
+ }
+
+ return idx;
+}
+
+/*
+ * Value comparator for vertical and horizontal headers
+ * used for deduplication only.
+ * - null values are considered equal
+ * - non-null < null
+ * - non-null values are compared with strcmp()
+ */
+static int
+pivotFieldCompare(const void *a, const void *b)
+{
+ const pivot_field *pa = (const pivot_field *) a;
+ const pivot_field *pb = (const pivot_field *) b;
+
+ /* test null values */
+ if (!pb->name)
+ return pa->name ? -1 : 0;
+ else if (!pa->name)
+ return 1;
+
+ /* non-null values */
+ return strcmp(pa->name, pb->name);
+}
+
+static int
+rankCompare(const void *a, const void *b)
+{
+ return *((const int *) a) - *((const int *) b);
+}