from copy import copy from collections import deque from functools import wraps from itertools import chain from sentry_sdk.utils import logger, capture_internal_exceptions from sentry_sdk._types import MYPY if MYPY: from typing import Any from typing import Dict from typing import Optional from typing import Deque from typing import List from typing import Callable from typing import TypeVar from sentry_sdk._types import ( Breadcrumb, Event, EventProcessor, ErrorProcessor, ExcInfo, Hint, Type, ) from sentry_sdk.tracing import Span from sentry_sdk.sessions import Session F = TypeVar("F", bound=Callable[..., Any]) T = TypeVar("T") global_event_processors = [] # type: List[EventProcessor] def add_global_event_processor(processor): # type: (EventProcessor) -> None global_event_processors.append(processor) def _attr_setter(fn): # type: (Any) -> Any return property(fset=fn, doc=fn.__doc__) def _disable_capture(fn): # type: (F) -> F @wraps(fn) def wrapper(self, *args, **kwargs): # type: (Any, *Dict[str, Any], **Any) -> Any if not self._should_capture: return try: self._should_capture = False return fn(self, *args, **kwargs) finally: self._should_capture = True return wrapper # type: ignore class Scope(object): """The scope holds extra information that should be sent with all events that belong to it. """ # NOTE: Even though it should not happen, the scope needs to not crash when # accessed by multiple threads. It's fine if it's full of races, but those # races should never make the user application crash. # # The same needs to hold for any accesses of the scope the SDK makes. __slots__ = ( "_level", "_name", "_fingerprint", "_transaction", "_user", "_tags", "_contexts", "_extras", "_breadcrumbs", "_event_processors", "_error_processors", "_should_capture", "_span", "_session", "_force_auto_session_tracking", ) def __init__(self): # type: () -> None self._event_processors = [] # type: List[EventProcessor] self._error_processors = [] # type: List[ErrorProcessor] self._name = None # type: Optional[str] self.clear() def clear(self): # type: () -> None """Clears the entire scope.""" self._level = None # type: Optional[str] self._fingerprint = None # type: Optional[List[str]] self._transaction = None # type: Optional[str] self._user = None # type: Optional[Dict[str, Any]] self._tags = {} # type: Dict[str, Any] self._contexts = {} # type: Dict[str, Dict[str, Any]] self._extras = {} # type: Dict[str, Any] self.clear_breadcrumbs() self._should_capture = True self._span = None # type: Optional[Span] self._session = None # type: Optional[Session] self._force_auto_session_tracking = None # type: Optional[bool] @_attr_setter def level(self, value): # type: (Optional[str]) -> None """When set this overrides the level. Deprecated in favor of set_level.""" self._level = value def set_level(self, value): # type: (Optional[str]) -> None """Sets the level for the scope.""" self._level = value @_attr_setter def fingerprint(self, value): # type: (Optional[List[str]]) -> None """When set this overrides the default fingerprint.""" self._fingerprint = value @_attr_setter def transaction(self, value): # type: (Optional[str]) -> None """When set this forces a specific transaction name to be set.""" self._transaction = value span = self._span if span: span.transaction = value @_attr_setter def user(self, value): # type: (Dict[str, Any]) -> None """When set a specific user is bound to the scope. Deprecated in favor of set_user.""" self.set_user(value) def set_user(self, value): # type: (Dict[str, Any]) -> None """Sets a user for the scope.""" self._user = value if self._session is not None: self._session.update(user=value) @property def span(self): # type: () -> Optional[Span] """Get/set current tracing span.""" return self._span @span.setter def span(self, span): # type: (Optional[Span]) -> None self._span = span if span is not None: span_transaction = span.transaction if span_transaction: self._transaction = span_transaction def set_tag( self, key, # type: str value, # type: Any ): # type: (...) -> None """Sets a tag for a key to a specific value.""" self._tags[key] = value def remove_tag( self, key # type: str ): # type: (...) -> None """Removes a specific tag.""" self._tags.pop(key, None) def set_context( self, key, # type: str value, # type: Any ): # type: (...) -> None """Binds a context at a certain key to a specific value.""" self._contexts[key] = value def remove_context( self, key # type: str ): # type: (...) -> None """Removes a context.""" self._contexts.pop(key, None) def set_extra( self, key, # type: str value, # type: Any ): # type: (...) -> None """Sets an extra key to a specific value.""" self._extras[key] = value def remove_extra( self, key # type: str ): # type: (...) -> None """Removes a specific extra key.""" self._extras.pop(key, None) def clear_breadcrumbs(self): # type: () -> None """Clears breadcrumb buffer.""" self._breadcrumbs = deque() # type: Deque[Breadcrumb] def add_event_processor( self, func # type: EventProcessor ): # type: (...) -> None """Register a scope local event processor on the scope. :param func: This function behaves like `before_send.` """ if len(self._event_processors) > 20: logger.warning( "Too many event processors on scope! Clearing list to free up some memory: %r", self._event_processors, ) del self._event_processors[:] self._event_processors.append(func) def add_error_processor( self, func, # type: ErrorProcessor cls=None, # type: Optional[Type[BaseException]] ): # type: (...) -> None """Register a scope local error processor on the scope. :param func: A callback that works similar to an event processor but is invoked with the original exception info triple as second argument. :param cls: Optionally, only process exceptions of this type. """ if cls is not None: cls_ = cls # For mypy. real_func = func def func(event, exc_info): # type: (Event, ExcInfo) -> Optional[Event] try: is_inst = isinstance(exc_info[1], cls_) except Exception: is_inst = False if is_inst: return real_func(event, exc_info) return event self._error_processors.append(func) @_disable_capture def apply_to_event( self, event, # type: Event hint, # type: Hint ): # type: (...) -> Optional[Event] """Applies the information contained on the scope to the given event.""" def _drop(event, cause, ty): # type: (Dict[str, Any], Any, str) -> Optional[Any] logger.info("%s (%s) dropped event (%s)", ty, cause, event) return None if self._level is not None: event["level"] = self._level if event.get("type") != "transaction": event.setdefault("breadcrumbs", []).extend(self._breadcrumbs) if event.get("user") is None and self._user is not None: event["user"] = self._user if event.get("transaction") is None and self._transaction is not None: event["transaction"] = self._transaction if event.get("fingerprint") is None and self._fingerprint is not None: event["fingerprint"] = self._fingerprint if self._extras: event.setdefault("extra", {}).update(self._extras) if self._tags: event.setdefault("tags", {}).update(self._tags) if self._contexts: event.setdefault("contexts", {}).update(self._contexts) if self._span is not None: contexts = event.setdefault("contexts", {}) if not contexts.get("trace"): contexts["trace"] = self._span.get_trace_context() exc_info = hint.get("exc_info") if exc_info is not None: for error_processor in self._error_processors: new_event = error_processor(event, exc_info) if new_event is None: return _drop(event, error_processor, "error processor") event = new_event for event_processor in chain(global_event_processors, self._event_processors): new_event = event with capture_internal_exceptions(): new_event = event_processor(event, hint) if new_event is None: return _drop(event, event_processor, "event processor") event = new_event return event def update_from_scope(self, scope): # type: (Scope) -> None if scope._level is not None: self._level = scope._level if scope._fingerprint is not None: self._fingerprint = scope._fingerprint if scope._transaction is not None: self._transaction = scope._transaction if scope._user is not None: self._user = scope._user if scope._tags: self._tags.update(scope._tags) if scope._contexts: self._contexts.update(scope._contexts) if scope._extras: self._extras.update(scope._extras) if scope._breadcrumbs: self._breadcrumbs.extend(scope._breadcrumbs) if scope._span: self._span = scope._span def update_from_kwargs( self, user=None, # type: Optional[Any] level=None, # type: Optional[str] extras=None, # type: Optional[Dict[str, Any]] contexts=None, # type: Optional[Dict[str, Any]] tags=None, # type: Optional[Dict[str, str]] fingerprint=None, # type: Optional[List[str]] ): # type: (...) -> None if level is not None: self._level = level if user is not None: self._user = user if extras is not None: self._extras.update(extras) if contexts is not None: self._contexts.update(contexts) if tags is not None: self._tags.update(tags) if fingerprint is not None: self._fingerprint = fingerprint def __copy__(self): # type: () -> Scope rv = object.__new__(self.__class__) # type: Scope rv._level = self._level rv._name = self._name rv._fingerprint = self._fingerprint rv._transaction = self._transaction rv._user = self._user rv._tags = dict(self._tags) rv._contexts = dict(self._contexts) rv._extras = dict(self._extras) rv._breadcrumbs = copy(self._breadcrumbs) rv._event_processors = list(self._event_processors) rv._error_processors = list(self._error_processors) rv._should_capture = self._should_capture rv._span = self._span rv._session = self._session rv._force_auto_session_tracking = self._force_auto_session_tracking return rv def __repr__(self): # type: () -> str return "<%s id=%s name=%s>" % ( self.__class__.__name__, hex(id(self)), self._name, )