sqlglot.helper
1from __future__ import annotations 2 3import inspect 4import logging 5import re 6import sys 7import typing as t 8from collections.abc import Collection 9from contextlib import contextmanager 10from copy import copy 11from enum import Enum 12from itertools import count 13 14if t.TYPE_CHECKING: 15 from sqlglot import exp 16 from sqlglot._typing import A, E, T 17 from sqlglot.expressions import Expression 18 19 20CAMEL_CASE_PATTERN = re.compile("(?<!^)(?=[A-Z])") 21PYTHON_VERSION = sys.version_info[:2] 22logger = logging.getLogger("sqlglot") 23 24 25class AutoName(Enum): 26 """ 27 This is used for creating Enum classes where `auto()` is the string form 28 of the corresponding enum's identifier (e.g. FOO.value results in "FOO"). 29 30 Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values 31 """ 32 33 def _generate_next_value_(name, _start, _count, _last_values): 34 return name 35 36 37class classproperty(property): 38 """ 39 Similar to a normal property but works for class methods 40 """ 41 42 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 43 return classmethod(self.fget).__get__(None, owner)() # type: ignore 44 45 46def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]: 47 """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds.""" 48 try: 49 return seq[index] 50 except IndexError: 51 return None 52 53 54@t.overload 55def ensure_list(value: t.Collection[T]) -> t.List[T]: 56 ... 57 58 59@t.overload 60def ensure_list(value: T) -> t.List[T]: 61 ... 62 63 64def ensure_list(value): 65 """ 66 Ensures that a value is a list, otherwise casts or wraps it into one. 67 68 Args: 69 value: The value of interest. 70 71 Returns: 72 The value cast as a list if it's a list or a tuple, or else the value wrapped in a list. 73 """ 74 if value is None: 75 return [] 76 if isinstance(value, (list, tuple)): 77 return list(value) 78 79 return [value] 80 81 82@t.overload 83def ensure_collection(value: t.Collection[T]) -> t.Collection[T]: 84 ... 85 86 87@t.overload 88def ensure_collection(value: T) -> t.Collection[T]: 89 ... 90 91 92def ensure_collection(value): 93 """ 94 Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list. 95 96 Args: 97 value: The value of interest. 98 99 Returns: 100 The value if it's a collection, or else the value wrapped in a list. 101 """ 102 if value is None: 103 return [] 104 return ( 105 value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value] 106 ) 107 108 109def csv(*args: str, sep: str = ", ") -> str: 110 """ 111 Formats any number of string arguments as CSV. 112 113 Args: 114 args: The string arguments to format. 115 sep: The argument separator. 116 117 Returns: 118 The arguments formatted as a CSV string. 119 """ 120 return sep.join(arg for arg in args if arg) 121 122 123def subclasses( 124 module_name: str, 125 classes: t.Type | t.Tuple[t.Type, ...], 126 exclude: t.Type | t.Tuple[t.Type, ...] = (), 127) -> t.List[t.Type]: 128 """ 129 Returns all subclasses for a collection of classes, possibly excluding some of them. 130 131 Args: 132 module_name: The name of the module to search for subclasses in. 133 classes: Class(es) we want to find the subclasses of. 134 exclude: Class(es) we want to exclude from the returned list. 135 136 Returns: 137 The target subclasses. 138 """ 139 return [ 140 obj 141 for _, obj in inspect.getmembers( 142 sys.modules[module_name], 143 lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude, 144 ) 145 ] 146 147 148def apply_index_offset( 149 this: exp.Expression, 150 expressions: t.List[E], 151 offset: int, 152) -> t.List[E]: 153 """ 154 Applies an offset to a given integer literal expression. 155 156 Args: 157 this: The target of the index. 158 expressions: The expression the offset will be applied to, wrapped in a list. 159 offset: The offset that will be applied. 160 161 Returns: 162 The original expression with the offset applied to it, wrapped in a list. If the provided 163 `expressions` argument contains more than one expression, it's returned unaffected. 164 """ 165 if not offset or len(expressions) != 1: 166 return expressions 167 168 expression = expressions[0] 169 170 from sqlglot import exp 171 from sqlglot.optimizer.annotate_types import annotate_types 172 from sqlglot.optimizer.simplify import simplify 173 174 if not this.type: 175 annotate_types(this) 176 177 if t.cast(exp.DataType, this.type).this not in ( 178 exp.DataType.Type.UNKNOWN, 179 exp.DataType.Type.ARRAY, 180 ): 181 return expressions 182 183 if not expression.type: 184 annotate_types(expression) 185 if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES: 186 logger.warning("Applying array index offset (%s)", offset) 187 expression = simplify( 188 exp.Add(this=expression.copy(), expression=exp.Literal.number(offset)) 189 ) 190 return [expression] 191 192 return expressions 193 194 195def camel_to_snake_case(name: str) -> str: 196 """Converts `name` from camelCase to snake_case and returns the result.""" 197 return CAMEL_CASE_PATTERN.sub("_", name).upper() 198 199 200def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E: 201 """ 202 Applies a transformation to a given expression until a fix point is reached. 203 204 Args: 205 expression: The expression to be transformed. 206 func: The transformation to be applied. 207 208 Returns: 209 The transformed expression. 210 """ 211 while True: 212 for n, *_ in reversed(tuple(expression.walk())): 213 n._hash = hash(n) 214 215 start = hash(expression) 216 expression = func(expression) 217 218 for n, *_ in expression.walk(): 219 n._hash = None 220 if start == hash(expression): 221 break 222 223 return expression 224 225 226def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]: 227 """ 228 Sorts a given directed acyclic graph in topological order. 229 230 Args: 231 dag: The graph to be sorted. 232 233 Returns: 234 A list that contains all of the graph's nodes in topological order. 235 """ 236 result = [] 237 238 for node, deps in tuple(dag.items()): 239 for dep in deps: 240 if not dep in dag: 241 dag[dep] = set() 242 243 while dag: 244 current = {node for node, deps in dag.items() if not deps} 245 246 if not current: 247 raise ValueError("Cycle error") 248 249 for node in current: 250 dag.pop(node) 251 252 for deps in dag.values(): 253 deps -= current 254 255 result.extend(sorted(current)) # type: ignore 256 257 return result 258 259 260def open_file(file_name: str) -> t.TextIO: 261 """Open a file that may be compressed as gzip and return it in universal newline mode.""" 262 with open(file_name, "rb") as f: 263 gzipped = f.read(2) == b"\x1f\x8b" 264 265 if gzipped: 266 import gzip 267 268 return gzip.open(file_name, "rt", newline="") 269 270 return open(file_name, encoding="utf-8", newline="") 271 272 273@contextmanager 274def csv_reader(read_csv: exp.ReadCSV) -> t.Any: 275 """ 276 Returns a csv reader given the expression `READ_CSV(name, ['delimiter', '|', ...])`. 277 278 Args: 279 read_csv: A `ReadCSV` function call. 280 281 Yields: 282 A python csv reader. 283 """ 284 args = read_csv.expressions 285 file = open_file(read_csv.name) 286 287 delimiter = "," 288 args = iter(arg.name for arg in args) 289 for k, v in zip(args, args): 290 if k == "delimiter": 291 delimiter = v 292 293 try: 294 import csv as csv_ 295 296 yield csv_.reader(file, delimiter=delimiter) 297 finally: 298 file.close() 299 300 301def find_new_name(taken: t.Collection[str], base: str) -> str: 302 """ 303 Searches for a new name. 304 305 Args: 306 taken: A collection of taken names. 307 base: Base name to alter. 308 309 Returns: 310 The new, available name. 311 """ 312 if base not in taken: 313 return base 314 315 i = 2 316 new = f"{base}_{i}" 317 while new in taken: 318 i += 1 319 new = f"{base}_{i}" 320 321 return new 322 323 324def name_sequence(prefix: str) -> t.Callable[[], str]: 325 """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").""" 326 sequence = count() 327 return lambda: f"{prefix}{next(sequence)}" 328 329 330def object_to_dict(obj: t.Any, **kwargs) -> t.Dict: 331 """Returns a dictionary created from an object's attributes.""" 332 return { 333 **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()}, 334 **kwargs, 335 } 336 337 338def split_num_words( 339 value: str, sep: str, min_num_words: int, fill_from_start: bool = True 340) -> t.List[t.Optional[str]]: 341 """ 342 Perform a split on a value and return N words as a result with `None` used for words that don't exist. 343 344 Args: 345 value: The value to be split. 346 sep: The value to use to split on. 347 min_num_words: The minimum number of words that are going to be in the result. 348 fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list. 349 350 Examples: 351 >>> split_num_words("db.table", ".", 3) 352 [None, 'db', 'table'] 353 >>> split_num_words("db.table", ".", 3, fill_from_start=False) 354 ['db', 'table', None] 355 >>> split_num_words("db.table", ".", 1) 356 ['db', 'table'] 357 358 Returns: 359 The list of words returned by `split`, possibly augmented by a number of `None` values. 360 """ 361 words = value.split(sep) 362 if fill_from_start: 363 return [None] * (min_num_words - len(words)) + words 364 return words + [None] * (min_num_words - len(words)) 365 366 367def is_iterable(value: t.Any) -> bool: 368 """ 369 Checks if the value is an iterable, excluding the types `str` and `bytes`. 370 371 Examples: 372 >>> is_iterable([1,2]) 373 True 374 >>> is_iterable("test") 375 False 376 377 Args: 378 value: The value to check if it is an iterable. 379 380 Returns: 381 A `bool` value indicating if it is an iterable. 382 """ 383 from sqlglot import Expression 384 385 return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression)) 386 387 388def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]: 389 """ 390 Flattens an iterable that can contain both iterable and non-iterable elements. Objects of 391 type `str` and `bytes` are not regarded as iterables. 392 393 Examples: 394 >>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) 395 [1, 2, 3, 4, 5, 'bla'] 396 >>> list(flatten([1, 2, 3])) 397 [1, 2, 3] 398 399 Args: 400 values: The value to be flattened. 401 402 Yields: 403 Non-iterable elements in `values`. 404 """ 405 for value in values: 406 if is_iterable(value): 407 yield from flatten(value) 408 else: 409 yield value 410 411 412def dict_depth(d: t.Dict) -> int: 413 """ 414 Get the nesting depth of a dictionary. 415 416 Example: 417 >>> dict_depth(None) 418 0 419 >>> dict_depth({}) 420 1 421 >>> dict_depth({"a": "b"}) 422 1 423 >>> dict_depth({"a": {}}) 424 2 425 >>> dict_depth({"a": {"b": {}}}) 426 3 427 """ 428 try: 429 return 1 + dict_depth(next(iter(d.values()))) 430 except AttributeError: 431 # d doesn't have attribute "values" 432 return 0 433 except StopIteration: 434 # d.values() returns an empty sequence 435 return 1 436 437 438def first(it: t.Iterable[T]) -> T: 439 """Returns the first element from an iterable (useful for sets).""" 440 return next(i for i in it) 441 442 443def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]: 444 if not ranges: 445 return [] 446 447 ranges = sorted(ranges) 448 449 merged = [ranges[0]] 450 451 for start, end in ranges[1:]: 452 last_start, last_end = merged[-1] 453 454 if start <= last_end: 455 merged[-1] = (last_start, max(last_end, end)) 456 else: 457 merged.append((start, end)) 458 459 return merged
26class AutoName(Enum): 27 """ 28 This is used for creating Enum classes where `auto()` is the string form 29 of the corresponding enum's identifier (e.g. FOO.value results in "FOO"). 30 31 Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values 32 """ 33 34 def _generate_next_value_(name, _start, _count, _last_values): 35 return name
This is used for creating Enum classes where auto()
is the string form
of the corresponding enum's identifier (e.g. FOO.value results in "FOO").
Reference: https://docs.python.org/3/howto/enum.html#using-automatic-values
Inherited Members
- enum.Enum
- name
- value
38class classproperty(property): 39 """ 40 Similar to a normal property but works for class methods 41 """ 42 43 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 44 return classmethod(self.fget).__get__(None, owner)() # type: ignore
Similar to a normal property but works for class methods
Inherited Members
- builtins.property
- property
- getter
- setter
- deleter
- fget
- fset
- fdel
47def seq_get(seq: t.Sequence[T], index: int) -> t.Optional[T]: 48 """Returns the value in `seq` at position `index`, or `None` if `index` is out of bounds.""" 49 try: 50 return seq[index] 51 except IndexError: 52 return None
Returns the value in seq
at position index
, or None
if index
is out of bounds.
65def ensure_list(value): 66 """ 67 Ensures that a value is a list, otherwise casts or wraps it into one. 68 69 Args: 70 value: The value of interest. 71 72 Returns: 73 The value cast as a list if it's a list or a tuple, or else the value wrapped in a list. 74 """ 75 if value is None: 76 return [] 77 if isinstance(value, (list, tuple)): 78 return list(value) 79 80 return [value]
Ensures that a value is a list, otherwise casts or wraps it into one.
Arguments:
- value: The value of interest.
Returns:
The value cast as a list if it's a list or a tuple, or else the value wrapped in a list.
93def ensure_collection(value): 94 """ 95 Ensures that a value is a collection (excluding `str` and `bytes`), otherwise wraps it into a list. 96 97 Args: 98 value: The value of interest. 99 100 Returns: 101 The value if it's a collection, or else the value wrapped in a list. 102 """ 103 if value is None: 104 return [] 105 return ( 106 value if isinstance(value, Collection) and not isinstance(value, (str, bytes)) else [value] 107 )
Ensures that a value is a collection (excluding str
and bytes
), otherwise wraps it into a list.
Arguments:
- value: The value of interest.
Returns:
The value if it's a collection, or else the value wrapped in a list.
110def csv(*args: str, sep: str = ", ") -> str: 111 """ 112 Formats any number of string arguments as CSV. 113 114 Args: 115 args: The string arguments to format. 116 sep: The argument separator. 117 118 Returns: 119 The arguments formatted as a CSV string. 120 """ 121 return sep.join(arg for arg in args if arg)
Formats any number of string arguments as CSV.
Arguments:
- args: The string arguments to format.
- sep: The argument separator.
Returns:
The arguments formatted as a CSV string.
124def subclasses( 125 module_name: str, 126 classes: t.Type | t.Tuple[t.Type, ...], 127 exclude: t.Type | t.Tuple[t.Type, ...] = (), 128) -> t.List[t.Type]: 129 """ 130 Returns all subclasses for a collection of classes, possibly excluding some of them. 131 132 Args: 133 module_name: The name of the module to search for subclasses in. 134 classes: Class(es) we want to find the subclasses of. 135 exclude: Class(es) we want to exclude from the returned list. 136 137 Returns: 138 The target subclasses. 139 """ 140 return [ 141 obj 142 for _, obj in inspect.getmembers( 143 sys.modules[module_name], 144 lambda obj: inspect.isclass(obj) and issubclass(obj, classes) and obj not in exclude, 145 ) 146 ]
Returns all subclasses for a collection of classes, possibly excluding some of them.
Arguments:
- module_name: The name of the module to search for subclasses in.
- classes: Class(es) we want to find the subclasses of.
- exclude: Class(es) we want to exclude from the returned list.
Returns:
The target subclasses.
149def apply_index_offset( 150 this: exp.Expression, 151 expressions: t.List[E], 152 offset: int, 153) -> t.List[E]: 154 """ 155 Applies an offset to a given integer literal expression. 156 157 Args: 158 this: The target of the index. 159 expressions: The expression the offset will be applied to, wrapped in a list. 160 offset: The offset that will be applied. 161 162 Returns: 163 The original expression with the offset applied to it, wrapped in a list. If the provided 164 `expressions` argument contains more than one expression, it's returned unaffected. 165 """ 166 if not offset or len(expressions) != 1: 167 return expressions 168 169 expression = expressions[0] 170 171 from sqlglot import exp 172 from sqlglot.optimizer.annotate_types import annotate_types 173 from sqlglot.optimizer.simplify import simplify 174 175 if not this.type: 176 annotate_types(this) 177 178 if t.cast(exp.DataType, this.type).this not in ( 179 exp.DataType.Type.UNKNOWN, 180 exp.DataType.Type.ARRAY, 181 ): 182 return expressions 183 184 if not expression.type: 185 annotate_types(expression) 186 if t.cast(exp.DataType, expression.type).this in exp.DataType.INTEGER_TYPES: 187 logger.warning("Applying array index offset (%s)", offset) 188 expression = simplify( 189 exp.Add(this=expression.copy(), expression=exp.Literal.number(offset)) 190 ) 191 return [expression] 192 193 return expressions
Applies an offset to a given integer literal expression.
Arguments:
- this: The target of the index.
- expressions: The expression the offset will be applied to, wrapped in a list.
- offset: The offset that will be applied.
Returns:
The original expression with the offset applied to it, wrapped in a list. If the provided
expressions
argument contains more than one expression, it's returned unaffected.
196def camel_to_snake_case(name: str) -> str: 197 """Converts `name` from camelCase to snake_case and returns the result.""" 198 return CAMEL_CASE_PATTERN.sub("_", name).upper()
Converts name
from camelCase to snake_case and returns the result.
201def while_changing(expression: Expression, func: t.Callable[[Expression], E]) -> E: 202 """ 203 Applies a transformation to a given expression until a fix point is reached. 204 205 Args: 206 expression: The expression to be transformed. 207 func: The transformation to be applied. 208 209 Returns: 210 The transformed expression. 211 """ 212 while True: 213 for n, *_ in reversed(tuple(expression.walk())): 214 n._hash = hash(n) 215 216 start = hash(expression) 217 expression = func(expression) 218 219 for n, *_ in expression.walk(): 220 n._hash = None 221 if start == hash(expression): 222 break 223 224 return expression
Applies a transformation to a given expression until a fix point is reached.
Arguments:
- expression: The expression to be transformed.
- func: The transformation to be applied.
Returns:
The transformed expression.
227def tsort(dag: t.Dict[T, t.Set[T]]) -> t.List[T]: 228 """ 229 Sorts a given directed acyclic graph in topological order. 230 231 Args: 232 dag: The graph to be sorted. 233 234 Returns: 235 A list that contains all of the graph's nodes in topological order. 236 """ 237 result = [] 238 239 for node, deps in tuple(dag.items()): 240 for dep in deps: 241 if not dep in dag: 242 dag[dep] = set() 243 244 while dag: 245 current = {node for node, deps in dag.items() if not deps} 246 247 if not current: 248 raise ValueError("Cycle error") 249 250 for node in current: 251 dag.pop(node) 252 253 for deps in dag.values(): 254 deps -= current 255 256 result.extend(sorted(current)) # type: ignore 257 258 return result
Sorts a given directed acyclic graph in topological order.
Arguments:
- dag: The graph to be sorted.
Returns:
A list that contains all of the graph's nodes in topological order.
261def open_file(file_name: str) -> t.TextIO: 262 """Open a file that may be compressed as gzip and return it in universal newline mode.""" 263 with open(file_name, "rb") as f: 264 gzipped = f.read(2) == b"\x1f\x8b" 265 266 if gzipped: 267 import gzip 268 269 return gzip.open(file_name, "rt", newline="") 270 271 return open(file_name, encoding="utf-8", newline="")
Open a file that may be compressed as gzip and return it in universal newline mode.
274@contextmanager 275def csv_reader(read_csv: exp.ReadCSV) -> t.Any: 276 """ 277 Returns a csv reader given the expression `READ_CSV(name, ['delimiter', '|', ...])`. 278 279 Args: 280 read_csv: A `ReadCSV` function call. 281 282 Yields: 283 A python csv reader. 284 """ 285 args = read_csv.expressions 286 file = open_file(read_csv.name) 287 288 delimiter = "," 289 args = iter(arg.name for arg in args) 290 for k, v in zip(args, args): 291 if k == "delimiter": 292 delimiter = v 293 294 try: 295 import csv as csv_ 296 297 yield csv_.reader(file, delimiter=delimiter) 298 finally: 299 file.close()
Returns a csv reader given the expression READ_CSV(name, ['delimiter', '|', ...])
.
Arguments:
- read_csv: A
ReadCSV
function call.
Yields:
A python csv reader.
302def find_new_name(taken: t.Collection[str], base: str) -> str: 303 """ 304 Searches for a new name. 305 306 Args: 307 taken: A collection of taken names. 308 base: Base name to alter. 309 310 Returns: 311 The new, available name. 312 """ 313 if base not in taken: 314 return base 315 316 i = 2 317 new = f"{base}_{i}" 318 while new in taken: 319 i += 1 320 new = f"{base}_{i}" 321 322 return new
Searches for a new name.
Arguments:
- taken: A collection of taken names.
- base: Base name to alter.
Returns:
The new, available name.
325def name_sequence(prefix: str) -> t.Callable[[], str]: 326 """Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").""" 327 sequence = count() 328 return lambda: f"{prefix}{next(sequence)}"
Returns a name generator given a prefix (e.g. a0, a1, a2, ... if the prefix is "a").
331def object_to_dict(obj: t.Any, **kwargs) -> t.Dict: 332 """Returns a dictionary created from an object's attributes.""" 333 return { 334 **{k: v.copy() if hasattr(v, "copy") else copy(v) for k, v in vars(obj).items()}, 335 **kwargs, 336 }
Returns a dictionary created from an object's attributes.
339def split_num_words( 340 value: str, sep: str, min_num_words: int, fill_from_start: bool = True 341) -> t.List[t.Optional[str]]: 342 """ 343 Perform a split on a value and return N words as a result with `None` used for words that don't exist. 344 345 Args: 346 value: The value to be split. 347 sep: The value to use to split on. 348 min_num_words: The minimum number of words that are going to be in the result. 349 fill_from_start: Indicates that if `None` values should be inserted at the start or end of the list. 350 351 Examples: 352 >>> split_num_words("db.table", ".", 3) 353 [None, 'db', 'table'] 354 >>> split_num_words("db.table", ".", 3, fill_from_start=False) 355 ['db', 'table', None] 356 >>> split_num_words("db.table", ".", 1) 357 ['db', 'table'] 358 359 Returns: 360 The list of words returned by `split`, possibly augmented by a number of `None` values. 361 """ 362 words = value.split(sep) 363 if fill_from_start: 364 return [None] * (min_num_words - len(words)) + words 365 return words + [None] * (min_num_words - len(words))
Perform a split on a value and return N words as a result with None
used for words that don't exist.
Arguments:
- value: The value to be split.
- sep: The value to use to split on.
- min_num_words: The minimum number of words that are going to be in the result.
- fill_from_start: Indicates that if
None
values should be inserted at the start or end of the list.
Examples:
>>> split_num_words("db.table", ".", 3) [None, 'db', 'table'] >>> split_num_words("db.table", ".", 3, fill_from_start=False) ['db', 'table', None] >>> split_num_words("db.table", ".", 1) ['db', 'table']
Returns:
The list of words returned by
split
, possibly augmented by a number ofNone
values.
368def is_iterable(value: t.Any) -> bool: 369 """ 370 Checks if the value is an iterable, excluding the types `str` and `bytes`. 371 372 Examples: 373 >>> is_iterable([1,2]) 374 True 375 >>> is_iterable("test") 376 False 377 378 Args: 379 value: The value to check if it is an iterable. 380 381 Returns: 382 A `bool` value indicating if it is an iterable. 383 """ 384 from sqlglot import Expression 385 386 return hasattr(value, "__iter__") and not isinstance(value, (str, bytes, Expression))
Checks if the value is an iterable, excluding the types str
and bytes
.
Examples:
>>> is_iterable([1,2]) True >>> is_iterable("test") False
Arguments:
- value: The value to check if it is an iterable.
Returns:
A
bool
value indicating if it is an iterable.
389def flatten(values: t.Iterable[t.Iterable[t.Any] | t.Any]) -> t.Iterator[t.Any]: 390 """ 391 Flattens an iterable that can contain both iterable and non-iterable elements. Objects of 392 type `str` and `bytes` are not regarded as iterables. 393 394 Examples: 395 >>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) 396 [1, 2, 3, 4, 5, 'bla'] 397 >>> list(flatten([1, 2, 3])) 398 [1, 2, 3] 399 400 Args: 401 values: The value to be flattened. 402 403 Yields: 404 Non-iterable elements in `values`. 405 """ 406 for value in values: 407 if is_iterable(value): 408 yield from flatten(value) 409 else: 410 yield value
Flattens an iterable that can contain both iterable and non-iterable elements. Objects of
type str
and bytes
are not regarded as iterables.
Examples:
>>> list(flatten([[1, 2], 3, {4}, (5, "bla")])) [1, 2, 3, 4, 5, 'bla'] >>> list(flatten([1, 2, 3])) [1, 2, 3]
Arguments:
- values: The value to be flattened.
Yields:
Non-iterable elements in
values
.
413def dict_depth(d: t.Dict) -> int: 414 """ 415 Get the nesting depth of a dictionary. 416 417 Example: 418 >>> dict_depth(None) 419 0 420 >>> dict_depth({}) 421 1 422 >>> dict_depth({"a": "b"}) 423 1 424 >>> dict_depth({"a": {}}) 425 2 426 >>> dict_depth({"a": {"b": {}}}) 427 3 428 """ 429 try: 430 return 1 + dict_depth(next(iter(d.values()))) 431 except AttributeError: 432 # d doesn't have attribute "values" 433 return 0 434 except StopIteration: 435 # d.values() returns an empty sequence 436 return 1
Get the nesting depth of a dictionary.
Example:
>>> dict_depth(None) 0 >>> dict_depth({}) 1 >>> dict_depth({"a": "b"}) 1 >>> dict_depth({"a": {}}) 2 >>> dict_depth({"a": {"b": {}}}) 3
439def first(it: t.Iterable[T]) -> T: 440 """Returns the first element from an iterable (useful for sets).""" 441 return next(i for i in it)
Returns the first element from an iterable (useful for sets).
444def merge_ranges(ranges: t.List[t.Tuple[A, A]]) -> t.List[t.Tuple[A, A]]: 445 if not ranges: 446 return [] 447 448 ranges = sorted(ranges) 449 450 merged = [ranges[0]] 451 452 for start, end in ranges[1:]: 453 last_start, last_end = merged[-1] 454 455 if start <= last_end: 456 merged[-1] = (last_start, max(last_end, end)) 457 else: 458 merged.append((start, end)) 459 460 return merged