sqlglot.helper
1from __future__ import annotations 2 3import inspect 4import logging 5import re 6import sys 7import typing as t 8from collections.abc import Collection 9from contextlib import contextmanager 10from copy import copy 11from enum import Enum 12from itertools import count 13 14if t.TYPE_CHECKING: 15 from sqlglot import exp 16 from sqlglot._typing import E, T 17 from sqlglot.expressions import Expression 18 19CAMEL_CASE_PATTERN = re.compile("(?<!^)(?=[A-Z])") 20PYTHON_VERSION = sys.version_info[:2] 21logger = logging.getLogger("sqlglot") 22 23 24class AutoName(Enum): 25 """ 26 This is used for creating Enum classes where `auto()` is the string form 27 of the corresponding enum's identifier (e.g. FOO.value results in "FOO"). 28 29 Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values 30 """ 31 32 def _generate_next_value_(name, _start, _count, _last_values): 33 return name 34 35 36class classproperty(property): 37 """ 38 Similar to a normal property but works for class methods 39 """ 40 41 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 42 return classmethod(self.fget).__get__(None, owner)() # type: ignore 43 44 45def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]: 46 """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds.""" 47 try: 48 return seq[index] 49 except IndexError: 50 return None 51 52 53@t.overload 54def ensure_list(value: t.Collection[T]) -> t.List[T]: 55 ... 56 57 58@t.overload 59def ensure_list(value: T) -> t.List[T]: 60 ... 61 62 63def ensure_list(value): 64 """ 65 Ensures that a value is a list, otherwise casts or wraps it into one. 66 67 Args: 68 value: The value of interest. 69 70 Returns: 71 The value cast as a list if it's a list or a tuple, or else the value wrapped in a list. 72 """ 73 if value is None: 74 return [] 75 if isinstance(value, (list, tuple)): 76 return list(value) 77 78 return [value] 79 80 81@t.overload 82def ensure_collection(value: t.Collection[T]) -> t.Collection[T]: 83 ... 84 85 86@t.overload 87def ensure_collection(value: T) -> t.Collection[T]: 88 ... 89 90 91def ensure_collection(value): 92 """ 93 Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list. 94 95 Args: 96 value: The value of interest. 97 98 Returns: 99 The value if it's a collection, or else the value wrapped in a list. 100 """ 101 if value is None: 102 return [] 103 return ( 104 value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value] 105 ) 106 107 108def csv(*args: str, sep: str = ", ") -> str: 109 """ 110 Formats any number of string arguments as CSV. 111 112 Args: 113 args: The string arguments to format. 114 sep: The argument separator. 115 116 Returns: 117 The arguments formatted as a CSV string. 118 """ 119 return sep.join(arg for arg in args if arg) 120 121 122def subclasses( 123 module_name: str, 124 classes: t.Type | t.Tuple[t.Type, ...], 125 exclude: t.Type | t.Tuple[t.Type, ...] = (), 126) -> t.List[t.Type]: 127 """ 128 Returns all subclasses for a collection of classes, possibly excluding some of them. 129 130 Args: 131 module_name: The name of the module to search for subclasses in. 132 classes: Class(es) we want to find the subclasses of. 133 exclude: Class(es) we want to exclude from the returned list. 134 135 Returns: 136 The target subclasses. 137 """ 138 return [ 139 obj 140 for _, obj in inspect.getmembers( 141 sys.modules[module_name], 142 lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude, 143 ) 144 ] 145 146 147def apply_index_offset( 148 this: exp.Expression, 149 expressions: t.List[E], 150 offset: int, 151) -> t.List[E]: 152 """ 153 Applies an offset to a given integer literal expression. 154 155 Args: 156 this: The target of the index. 157 expressions: The expression the offset will be applied to, wrapped in a list. 158 offset: The offset that will be applied. 159 160 Returns: 161 The original expression with the offset applied to it, wrapped in a list. If the provided 162 `expressions` argument contains more than one expression, it's returned unaffected. 163 """ 164 if not offset or len(expressions) != 1: 165 return expressions 166 167 expression = expressions[0] 168 169 from sqlglot import exp 170 from sqlglot.optimizer.annotate_types import annotate_types 171 from sqlglot.optimizer.simplify import simplify 172 173 if not this.type: 174 annotate_types(this) 175 176 if t.cast(exp.DataType, this.type).this not in ( 177 exp.DataType.Type.UNKNOWN, 178 exp.DataType.Type.ARRAY, 179 ): 180 return expressions 181 182 if not expression.type: 183 annotate_types(expression) 184 if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES: 185 logger.warning("Applying array index offset (%s)", offset) 186 expression = simplify( 187 exp.Add(this=expression.copy(), expression=exp.Literal.number(offset)) 188 ) 189 return [expression] 190 191 return expressions 192 193 194def camel_to_snake_case(name: str) -> str: 195 """Converts `name` from camelCase to snake_case and returns the result.""" 196 return CAMEL_CASE_PATTERN.sub("_", name).upper() 197 198 199def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E: 200 """ 201 Applies a transformation to a given expression until a fix point is reached. 202 203 Args: 204 expression: The expression to be transformed. 205 func: The transformation to be applied. 206 207 Returns: 208 The transformed expression. 209 """ 210 while True: 211 for n, *_ in reversed(tuple(expression.walk())): 212 n._hash = hash(n) 213 214 start = hash(expression) 215 expression = func(expression) 216 217 for n, *_ in expression.walk(): 218 n._hash = None 219 if start == hash(expression): 220 break 221 222 return expression 223 224 225def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]: 226 """ 227 Sorts a given directed acyclic graph in topological order. 228 229 Args: 230 dag: The graph to be sorted. 231 232 Returns: 233 A list that contains all of the graph's nodes in topological order. 234 """ 235 result = [] 236 237 for node, deps in tuple(dag.items()): 238 for dep in deps: 239 if not dep in dag: 240 dag[dep] = set() 241 242 while dag: 243 current = {node for node, deps in dag.items() if not deps} 244 245 if not current: 246 raise ValueError("Cycle error") 247 248 for node in current: 249 dag.pop(node) 250 251 for deps in dag.values(): 252 deps -= current 253 254 result.extend(sorted(current)) # type: ignore 255 256 return result 257 258 259def open_file(file_name: str) -> t.TextIO: 260 """Open a file that may be compressed as gzip and return it in universal newline mode.""" 261 with open(file_name, "rb") as f: 262 gzipped = f.read(2) == b"\x1f\x8b" 263 264 if gzipped: 265 import gzip 266 267 return gzip.open(file_name, "rt", newline="") 268 269 return open(file_name, encoding="utf-8", newline="") 270 271 272@contextmanager 273def csv_reader(read_csv: exp.ReadCSV) -> t.Any: 274 """ 275 Returns a csv reader given the expression `READ_CSV(name, ['delimiter', '|', ...])`. 276 277 Args: 278 read_csv: A `ReadCSV` function call. 279 280 Yields: 281 A python csv reader. 282 """ 283 args = read_csv.expressions 284 file = open_file(read_csv.name) 285 286 delimiter = "," 287 args = iter(arg.name for arg in args) 288 for k, v in zip(args, args): 289 if k == "delimiter": 290 delimiter = v 291 292 try: 293 import csv as csv_ 294 295 yield csv_.reader(file, delimiter=delimiter) 296 finally: 297 file.close() 298 299 300def find_new_name(taken: t.Collection[str], base: str) -> str: 301 """ 302 Searches for a new name. 303 304 Args: 305 taken: A collection of taken names. 306 base: Base name to alter. 307 308 Returns: 309 The new, available name. 310 """ 311 if base not in taken: 312 return base 313 314 i = 2 315 new = f"{base}_{i}" 316 while new in taken: 317 i += 1 318 new = f"{base}_{i}" 319 320 return new 321 322 323def name_sequence(prefix: str) -> t.Callable[[], str]: 324 """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").""" 325 sequence = count() 326 return lambda: f"{prefix}{next(sequence)}" 327 328 329def object_to_dict(obj: t.Any, **kwargs) -> t.Dict: 330 """Returns a dictionary created from an object's attributes.""" 331 return { 332 **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()}, 333 **kwargs, 334 } 335 336 337def split_num_words( 338 value: str, sep: str, min_num_words: int, fill_from_start: bool = True 339) -> t.List[t.Optional[str]]: 340 """ 341 Perform a split on a value and return N words as a result with `None` used for words that don't exist. 342 343 Args: 344 value: The value to be split. 345 sep: The value to use to split on. 346 min_num_words: The minimum number of words that are going to be in the result. 347 fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list. 348 349 Examples: 350 >>> split_num_words("db.table", ".", 3) 351 [None, 'db', 'table'] 352 >>> split_num_words("db.table", ".", 3, fill_from_start=False) 353 ['db', 'table', None] 354 >>> split_num_words("db.table", ".", 1) 355 ['db', 'table'] 356 357 Returns: 358 The list of words returned by `split`, possibly augmented by a number of `None` values. 359 """ 360 words = value.split(sep) 361 if fill_from_start: 362 return [None] * (min_num_words - len(words)) + words 363 return words + [None] * (min_num_words - len(words)) 364 365 366def is_iterable(value: t.Any) -> bool: 367 """ 368 Checks if the value is an iterable, excluding the types `str` and `bytes`. 369 370 Examples: 371 >>> is_iterable([1,2]) 372 True 373 >>> is_iterable("test") 374 False 375 376 Args: 377 value: The value to check if it is an iterable. 378 379 Returns: 380 A `bool` value indicating if it is an iterable. 381 """ 382 return hasattr(value, "__iter__") and not isinstance(value, (str, bytes)) 383 384 385def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]: 386 """ 387 Flattens an iterable that can contain both iterable and non-iterable elements. Objects of 388 type `str` and `bytes` are not regarded as iterables. 389 390 Examples: 391 >>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) 392 [1, 2, 3, 4, 5, 'bla'] 393 >>> list(flatten([1, 2, 3])) 394 [1, 2, 3] 395 396 Args: 397 values: The value to be flattened. 398 399 Yields: 400 Non-iterable elements in `values`. 401 """ 402 for value in values: 403 if is_iterable(value): 404 yield from flatten(value) 405 else: 406 yield value 407 408 409def dict_depth(d: t.Dict) -> int: 410 """ 411 Get the nesting depth of a dictionary. 412 413 Example: 414 >>> dict_depth(None) 415 0 416 >>> dict_depth({}) 417 1 418 >>> dict_depth({"a": "b"}) 419 1 420 >>> dict_depth({"a": {}}) 421 2 422 >>> dict_depth({"a": {"b": {}}}) 423 3 424 """ 425 try: 426 return 1 + dict_depth(next(iter(d.values()))) 427 except AttributeError: 428 # d doesn't have attribute "values" 429 return 0 430 except StopIteration: 431 # d.values() returns an empty sequence 432 return 1 433 434 435def first(it: t.Iterable[T]) -> T: 436 """Returns the first element from an iterable (useful for sets).""" 437 return next(i for i in it)
25class AutoName(Enum): 26 """ 27 This is used for creating Enum classes where `auto()` is the string form 28 of the corresponding enum's identifier (e.g. FOO.value results in "FOO"). 29 30 Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values 31 """ 32 33 def _generate_next_value_(name, _start, _count, _last_values): 34 return name
This is used for creating Enum classes where auto()
is the string form
of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
Inherited Members
- enum.Enum
- name
- value
37class classproperty(property): 38 """ 39 Similar to a normal property but works for class methods 40 """ 41 42 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 43 return classmethod(self.fget).__get__(None, owner)() # type: ignore
Similar to a normal property but works for class methods
Inherited Members
- builtins.property
- property
- getter
- setter
- deleter
- fget
- fset
- fdel
46def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]: 47 """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds.""" 48 try: 49 return seq[index] 50 except IndexError: 51 return None
Returns the value in seq
at position index
, or None
if index
is out of bounds.
64def ensure_list(value): 65 """ 66 Ensures that a value is a list, otherwise casts or wraps it into one. 67 68 Args: 69 value: The value of interest. 70 71 Returns: 72 The value cast as a list if it's a list or a tuple, or else the value wrapped in a list. 73 """ 74 if value is None: 75 return [] 76 if isinstance(value, (list, tuple)): 77 return list(value) 78 79 return [value]
Ensures that a value is a list, otherwise casts or wraps it into one.
Arguments:
- value: The value of interest.
Returns:
The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
92def ensure_collection(value): 93 """ 94 Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list. 95 96 Args: 97 value: The value of interest. 98 99 Returns: 100 The value if it's a collection, or else the value wrapped in a list. 101 """ 102 if value is None: 103 return [] 104 return ( 105 value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value] 106 )
Ensures that a value is a collection (excluding str
and bytes
), otherwise wraps it into a list.
Arguments:
- value: The value of interest.
Returns:
The value if it's a collection, or else the value wrapped in a list.
109def csv(*args: str, sep: str = ", ") -> str: 110 """ 111 Formats any number of string arguments as CSV. 112 113 Args: 114 args: The string arguments to format. 115 sep: The argument separator. 116 117 Returns: 118 The arguments formatted as a CSV string. 119 """ 120 return sep.join(arg for arg in args if arg)
Formats any number of string arguments as CSV.
Arguments:
- args: The string arguments to format.
- sep: The argument separator.
Returns:
The arguments formatted as a CSV string.
123def subclasses( 124 module_name: str, 125 classes: t.Type | t.Tuple[t.Type, ...], 126 exclude: t.Type | t.Tuple[t.Type, ...] = (), 127) -> t.List[t.Type]: 128 """ 129 Returns all subclasses for a collection of classes, possibly excluding some of them. 130 131 Args: 132 module_name: The name of the module to search for subclasses in. 133 classes: Class(es) we want to find the subclasses of. 134 exclude: Class(es) we want to exclude from the returned list. 135 136 Returns: 137 The target subclasses. 138 """ 139 return [ 140 obj 141 for _, obj in inspect.getmembers( 142 sys.modules[module_name], 143 lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude, 144 ) 145 ]
Returns all subclasses for a collection of classes, possibly excluding some of them.
Arguments:
- module_name: The name of the module to search for subclasses in.
- classes: Class(es) we want to find the subclasses of.
- exclude: Class(es) we want to exclude from the returned list.
Returns:
The target subclasses.
148def apply_index_offset( 149 this: exp.Expression, 150 expressions: t.List[E], 151 offset: int, 152) -> t.List[E]: 153 """ 154 Applies an offset to a given integer literal expression. 155 156 Args: 157 this: The target of the index. 158 expressions: The expression the offset will be applied to, wrapped in a list. 159 offset: The offset that will be applied. 160 161 Returns: 162 The original expression with the offset applied to it, wrapped in a list. If the provided 163 `expressions` argument contains more than one expression, it's returned unaffected. 164 """ 165 if not offset or len(expressions) != 1: 166 return expressions 167 168 expression = expressions[0] 169 170 from sqlglot import exp 171 from sqlglot.optimizer.annotate_types import annotate_types 172 from sqlglot.optimizer.simplify import simplify 173 174 if not this.type: 175 annotate_types(this) 176 177 if t.cast(exp.DataType, this.type).this not in ( 178 exp.DataType.Type.UNKNOWN, 179 exp.DataType.Type.ARRAY, 180 ): 181 return expressions 182 183 if not expression.type: 184 annotate_types(expression) 185 if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES: 186 logger.warning("Applying array index offset (%s)", offset) 187 expression = simplify( 188 exp.Add(this=expression.copy(), expression=exp.Literal.number(offset)) 189 ) 190 return [expression] 191 192 return expressions
Applies an offset to a given integer literal expression.
Arguments:
- this: The target of the index.
- expressions: The expression the offset will be applied to, wrapped in a list.
- offset: The offset that will be applied.
Returns:
The original expression with the offset applied to it, wrapped in a list. If the provided
expressions
argument contains more than one expression, it's returned unaffected.
195def camel_to_snake_case(name: str) -> str: 196 """Converts `name` from camelCase to snake_case and returns the result.""" 197 return CAMEL_CASE_PATTERN.sub("_", name).upper()
Converts name
from camelCase to snake_case and returns the result.
200def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E: 201 """ 202 Applies a transformation to a given expression until a fix point is reached. 203 204 Args: 205 expression: The expression to be transformed. 206 func: The transformation to be applied. 207 208 Returns: 209 The transformed expression. 210 """ 211 while True: 212 for n, *_ in reversed(tuple(expression.walk())): 213 n._hash = hash(n) 214 215 start = hash(expression) 216 expression = func(expression) 217 218 for n, *_ in expression.walk(): 219 n._hash = None 220 if start == hash(expression): 221 break 222 223 return expression
Applies a transformation to a given expression until a fix point is reached.
Arguments:
- expression: The expression to be transformed.
- func: The transformation to be applied.
Returns:
The transformed expression.
226def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]: 227 """ 228 Sorts a given directed acyclic graph in topological order. 229 230 Args: 231 dag: The graph to be sorted. 232 233 Returns: 234 A list that contains all of the graph's nodes in topological order. 235 """ 236 result = [] 237 238 for node, deps in tuple(dag.items()): 239 for dep in deps: 240 if not dep in dag: 241 dag[dep] = set() 242 243 while dag: 244 current = {node for node, deps in dag.items() if not deps} 245 246 if not current: 247 raise ValueError("Cycle error") 248 249 for node in current: 250 dag.pop(node) 251 252 for deps in dag.values(): 253 deps -= current 254 255 result.extend(sorted(current)) # type: ignore 256 257 return result
Sorts a given directed acyclic graph in topological order.
Arguments:
- dag: The graph to be sorted.
Returns:
A list that contains all of the graph's nodes in topological order.
260def open_file(file_name: str) -> t.TextIO: 261 """Open a file that may be compressed as gzip and return it in universal newline mode.""" 262 with open(file_name, "rb") as f: 263 gzipped = f.read(2) == b"\x1f\x8b" 264 265 if gzipped: 266 import gzip 267 268 return gzip.open(file_name, "rt", newline="") 269 270 return open(file_name, encoding="utf-8", newline="")
Open a file that may be compressed as gzip and return it in universal newline mode.
273@contextmanager 274def csv_reader(read_csv: exp.ReadCSV) -> t.Any: 275 """ 276 Returns a csv reader given the expression `READ_CSV(name, ['delimiter', '|', ...])`. 277 278 Args: 279 read_csv: A `ReadCSV` function call. 280 281 Yields: 282 A python csv reader. 283 """ 284 args = read_csv.expressions 285 file = open_file(read_csv.name) 286 287 delimiter = "," 288 args = iter(arg.name for arg in args) 289 for k, v in zip(args, args): 290 if k == "delimiter": 291 delimiter = v 292 293 try: 294 import csv as csv_ 295 296 yield csv_.reader(file, delimiter=delimiter) 297 finally: 298 file.close()
Returns a csv reader given the expression READ_CSV(name, ['delimiter', '|', ...])
.
Arguments:
- read_csv: A
ReadCSV
function call.
Yields:
A python csv reader.
301def find_new_name(taken: t.Collection[str], base: str) -> str: 302 """ 303 Searches for a new name. 304 305 Args: 306 taken: A collection of taken names. 307 base: Base name to alter. 308 309 Returns: 310 The new, available name. 311 """ 312 if base not in taken: 313 return base 314 315 i = 2 316 new = f"{base}_{i}" 317 while new in taken: 318 i += 1 319 new = f"{base}_{i}" 320 321 return new
Searches for a new name.
Arguments:
- taken: A collection of taken names.
- base: Base name to alter.
Returns:
The new, available name.
324def name_sequence(prefix: str) -> t.Callable[[], str]: 325 """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").""" 326 sequence = count() 327 return lambda: f"{prefix}{next(sequence)}"
Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").
330def object_to_dict(obj: t.Any, **kwargs) -> t.Dict: 331 """Returns a dictionary created from an object's attributes.""" 332 return { 333 **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()}, 334 **kwargs, 335 }
Returns a dictionary created from an object's attributes.
338def split_num_words( 339 value: str, sep: str, min_num_words: int, fill_from_start: bool = True 340) -> t.List[t.Optional[str]]: 341 """ 342 Perform a split on a value and return N words as a result with `None` used for words that don't exist. 343 344 Args: 345 value: The value to be split. 346 sep: The value to use to split on. 347 min_num_words: The minimum number of words that are going to be in the result. 348 fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list. 349 350 Examples: 351 >>> split_num_words("db.table", ".", 3) 352 [None, 'db', 'table'] 353 >>> split_num_words("db.table", ".", 3, fill_from_start=False) 354 ['db', 'table', None] 355 >>> split_num_words("db.table", ".", 1) 356 ['db', 'table'] 357 358 Returns: 359 The list of words returned by `split`, possibly augmented by a number of `None` values. 360 """ 361 words = value.split(sep) 362 if fill_from_start: 363 return [None] * (min_num_words - len(words)) + words 364 return words + [None] * (min_num_words - len(words))
Perform a split on a value and return N words as a result with None
used for words that don't exist.
Arguments:
- value: The value to be split.
- sep: The value to use to split on.
- min_num_words: The minimum number of words that are going to be in the result.
- fill_from_start: Indicates that if
None
values should be inserted at the start or end of the list.
Examples:
>>> split_num_words("db.table", ".", 3) [None, 'db', 'table'] >>> split_num_words("db.table", ".", 3, fill_from_start=False) ['db', 'table', None] >>> split_num_words("db.table", ".", 1) ['db', 'table']
Returns:
The list of words returned by
split
, possibly augmented by a number ofNone
values.
367def is_iterable(value: t.Any) -> bool: 368 """ 369 Checks if the value is an iterable, excluding the types `str` and `bytes`. 370 371 Examples: 372 >>> is_iterable([1,2]) 373 True 374 >>> is_iterable("test") 375 False 376 377 Args: 378 value: The value to check if it is an iterable. 379 380 Returns: 381 A `bool` value indicating if it is an iterable. 382 """ 383 return hasattr(value, "__iter__") and not isinstance(value, (str, bytes))
Checks if the value is an iterable, excluding the types str
and bytes
.
Examples:
>>> is_iterable([1,2]) True >>> is_iterable("test") False
Arguments:
- value: The value to check if it is an iterable.
Returns:
A
bool
value indicating if it is an iterable.
386def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]: 387 """ 388 Flattens an iterable that can contain both iterable and non-iterable elements. Objects of 389 type `str` and `bytes` are not regarded as iterables. 390 391 Examples: 392 >>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) 393 [1, 2, 3, 4, 5, 'bla'] 394 >>> list(flatten([1, 2, 3])) 395 [1, 2, 3] 396 397 Args: 398 values: The value to be flattened. 399 400 Yields: 401 Non-iterable elements in `values`. 402 """ 403 for value in values: 404 if is_iterable(value): 405 yield from flatten(value) 406 else: 407 yield value
Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
type str
and bytes
are not regarded as iterables.
Examples:
>>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) [1, 2, 3, 4, 5, 'bla'] >>> list(flatten([1, 2, 3])) [1, 2, 3]
Arguments:
- values: The value to be flattened.
Yields:
Non-iterable elements in
values
.
410def dict_depth(d: t.Dict) -> int: 411 """ 412 Get the nesting depth of a dictionary. 413 414 Example: 415 >>> dict_depth(None) 416 0 417 >>> dict_depth({}) 418 1 419 >>> dict_depth({"a": "b"}) 420 1 421 >>> dict_depth({"a": {}}) 422 2 423 >>> dict_depth({"a": {"b": {}}}) 424 3 425 """ 426 try: 427 return 1 + dict_depth(next(iter(d.values()))) 428 except AttributeError: 429 # d doesn't have attribute "values" 430 return 0 431 except StopIteration: 432 # d.values() returns an empty sequence 433 return 1
Get the nesting depth of a dictionary.
Example:
>>> dict_depth(None) 0 >>> dict_depth({}) 1 >>> dict_depth({"a": "b"}) 1 >>> dict_depth({"a": {}}) 2 >>> dict_depth({"a": {"b": {}}}) 3
436def first(it: t.Iterable[T]) -> T: 437 """Returns the first element from an iterable (useful for sets).""" 438 return next(i for i in it)
Returns the first element from an iterable (useful for sets).