✘✘ GRAYBYTE WORDPRESS FILE MANAGER ✘✘

​🇳​​🇦​​🇲​​🇪♯➤ server366.web-hosting.com ​🇻​♯➤ 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP 🇾​♯➤ 2025

𝗛𝗢𝗠𝗘 𝗜𝗗 ♯➤ 67.223.118.204 ♯➤ 𝗔𝗗𝗠𝗜𝗡 𝗜𝗗 216.73.217.86
𝗢𝗣𝗧𝗜𝗢𝗡𝗦 ♯ CRL ♯➤ 𝗢𝗞 ┃ WGT ♯➤ 𝗢𝗞 ┃ SDO ♯➤ 𝗢𝗙𝗙 ┃ PKEX ♯➤ 𝗢𝗙𝗙
𝗗𝗘𝗔𝗖𝗧𝗜𝗩𝗔𝗧𝗘𝗗 ♯➤ 𝗔𝗟𝗟 𝗪𝗢𝗥𝗞𝗜𝗡𝗚....

𝗛𝗢𝗠𝗘
𝗖𝗨𝗥𝗥𝗘𝗡𝗧 𝗙𝗜𝗟𝗘 : /opt/hc_python/lib/python3.12/site-packages/sentry_sdk//sessions.py
import os
import warnings
from contextlib import contextmanager
from threading import Event, Lock, Thread
from typing import TYPE_CHECKING

import sentry_sdk
from sentry_sdk.envelope import Envelope
from sentry_sdk.session import Session
from sentry_sdk.utils import format_timestamp

if TYPE_CHECKING:
    from typing import Any, Callable, Dict, Generator, List, Optional, Union


def is_auto_session_tracking_enabled(
    hub: "Optional[sentry_sdk.Hub]" = None,
) -> "Union[Any, bool, None]":
    """DEPRECATED: Utility function to find out if session tracking is enabled."""

    # Internal callers should use private _is_auto_session_tracking_enabled, instead.
    warnings.warn(
        "This function is deprecated and will be removed in the next major release. "
        "There is no public API replacement.",
        DeprecationWarning,
        stacklevel=2,
    )

    if hub is None:
        hub = sentry_sdk.Hub.current

    should_track = hub.scope._force_auto_session_tracking

    if should_track is None:
        client_options = hub.client.options if hub.client else {}
        should_track = client_options.get("auto_session_tracking", False)

    return should_track


@contextmanager
def auto_session_tracking(
    hub: "Optional[sentry_sdk.Hub]" = None, session_mode: str = "application"
) -> "Generator[None, None, None]":
    """DEPRECATED: Use track_session instead
    Starts and stops a session automatically around a block.
    """
    warnings.warn(
        "This function is deprecated and will be removed in the next major release. "
        "Use track_session instead.",
        DeprecationWarning,
        stacklevel=2,
    )

    if hub is None:
        hub = sentry_sdk.Hub.current
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", DeprecationWarning)
        should_track = is_auto_session_tracking_enabled(hub)
    if should_track:
        hub.start_session(session_mode=session_mode)
    try:
        yield
    finally:
        if should_track:
            hub.end_session()


def is_auto_session_tracking_enabled_scope(scope: "sentry_sdk.Scope") -> bool:
    """
    DEPRECATED: Utility function to find out if session tracking is enabled.
    """

    warnings.warn(
        "This function is deprecated and will be removed in the next major release. "
        "There is no public API replacement.",
        DeprecationWarning,
        stacklevel=2,
    )

    # Internal callers should use private _is_auto_session_tracking_enabled, instead.
    return _is_auto_session_tracking_enabled(scope)


def _is_auto_session_tracking_enabled(scope: "sentry_sdk.Scope") -> bool:
    """
    Utility function to find out if session tracking is enabled.
    """

    should_track = scope._force_auto_session_tracking
    if should_track is None:
        client_options = sentry_sdk.get_client().options
        should_track = client_options.get("auto_session_tracking", False)

    return should_track


@contextmanager
def auto_session_tracking_scope(
    scope: "sentry_sdk.Scope", session_mode: str = "application"
) -> "Generator[None, None, None]":
    """DEPRECATED: This function is a deprecated alias for track_session.
    Starts and stops a session automatically around a block.
    """

    warnings.warn(
        "This function is a deprecated alias for track_session and will be removed in the next major release.",
        DeprecationWarning,
        stacklevel=2,
    )

    with track_session(scope, session_mode=session_mode):
        yield


@contextmanager
def track_session(
    scope: "sentry_sdk.Scope", session_mode: str = "application"
) -> "Generator[None, None, None]":
    """
    Start a new session in the provided scope, assuming session tracking is enabled.
    This is a no-op context manager if session tracking is not enabled.
    """

    should_track = _is_auto_session_tracking_enabled(scope)
    if should_track:
        scope.start_session(session_mode=session_mode)
    try:
        yield
    finally:
        if should_track:
            scope.end_session()


TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed")
MAX_ENVELOPE_ITEMS = 100


def make_aggregate_envelope(aggregate_states: "Any", attrs: "Any") -> "Any":
    return {"attrs": dict(attrs), "aggregates": list(aggregate_states.values())}


class SessionFlusher:
    def __init__(
        self,
        capture_func: "Callable[[Envelope], None]",
        flush_interval: int = 60,
    ) -> None:
        self.capture_func = capture_func
        self.flush_interval = flush_interval
        self.pending_sessions: "List[Any]" = []
        self.pending_aggregates: "Dict[Any, Any]" = {}
        self._thread: "Optional[Thread]" = None
        self._thread_lock = Lock()
        self._aggregate_lock = Lock()
        self._thread_for_pid: "Optional[int]" = None
        self.__shutdown_requested = Event()

    def flush(self) -> None:
        pending_sessions = self.pending_sessions
        self.pending_sessions = []

        with self._aggregate_lock:
            pending_aggregates = self.pending_aggregates
            self.pending_aggregates = {}

        envelope = Envelope()
        for session in pending_sessions:
            if len(envelope.items) == MAX_ENVELOPE_ITEMS:
                self.capture_func(envelope)
                envelope = Envelope()

            envelope.add_session(session)

        for attrs, states in pending_aggregates.items():
            if len(envelope.items) == MAX_ENVELOPE_ITEMS:
                self.capture_func(envelope)
                envelope = Envelope()

            envelope.add_sessions(make_aggregate_envelope(states, attrs))

        if len(envelope.items) > 0:
            self.capture_func(envelope)

    def _ensure_running(self) -> None:
        """
        Check that we have an active thread to run in, or create one if not.

        Note that this might fail (e.g. in Python 3.12 it's not possible to
        spawn new threads at interpreter shutdown). In that case self._running
        will be False after running this function.
        """
        if self._thread_for_pid == os.getpid() and self._thread is not None:
            return None
        with self._thread_lock:
            if self._thread_for_pid == os.getpid() and self._thread is not None:
                return None

            def _thread() -> None:
                running = True
                while running:
                    running = not self.__shutdown_requested.wait(self.flush_interval)
                    self.flush()

            thread = Thread(target=_thread)
            thread.daemon = True
            try:
                thread.start()
            except RuntimeError:
                # Unfortunately at this point the interpreter is in a state that no
                # longer allows us to spawn a thread and we have to bail.
                self.__shutdown_requested.set()
                return None

            self._thread = thread
            self._thread_for_pid = os.getpid()

        return None

    def add_aggregate_session(
        self,
        session: "Session",
    ) -> None:
        # NOTE on `session.did`:
        # the protocol can deal with buckets that have a distinct-id, however
        # in practice we expect the python SDK to have an extremely high cardinality
        # here, effectively making aggregation useless, therefore we do not
        # aggregate per-did.

        # For this part we can get away with using the global interpreter lock
        with self._aggregate_lock:
            attrs = session.get_json_attrs(with_user_info=False)
            primary_key = tuple(sorted(attrs.items()))
            secondary_key = session.truncated_started  # (, session.did)
            states = self.pending_aggregates.setdefault(primary_key, {})
            state = states.setdefault(secondary_key, {})

            if "started" not in state:
                state["started"] = format_timestamp(session.truncated_started)
            # if session.did is not None:
            #     state["did"] = session.did
            if session.status == "crashed":
                state["crashed"] = state.get("crashed", 0) + 1
            elif session.status == "abnormal":
                state["abnormal"] = state.get("abnormal", 0) + 1
            elif session.errors > 0:
                state["errored"] = state.get("errored", 0) + 1
            else:
                state["exited"] = state.get("exited", 0) + 1

    def add_session(
        self,
        session: "Session",
    ) -> None:
        if session.session_mode == "request":
            self.add_aggregate_session(session)
        else:
            self.pending_sessions.append(session.to_json())
        self._ensure_running()

    def kill(self) -> None:
        self.__shutdown_requested.set()


Current_dir [ 𝗡𝗢𝗧 𝗪𝗥𝗜𝗧𝗘𝗔𝗕𝗟𝗘 ] Document_root [ 𝗪𝗥𝗜𝗧𝗘𝗔𝗕𝗟𝗘 ]


[ Back ]
𝗡𝗔𝗠𝗘
𝗦𝗜𝗭𝗘
𝗟𝗔𝗦𝗧 𝗧𝗢𝗨𝗖𝗛
𝗨𝗦𝗘𝗥
𝗦𝗧𝗔𝗧𝗨𝗦
𝗙𝗨𝗡𝗖𝗧𝗜𝗢𝗡𝗦
..
--
11 Jun 2026 5.00 AM
root / root
0755
__pycache__
--
11 Jun 2026 5.00 AM
root / root
0755
ai
--
11 Jun 2026 5.00 AM
root / root
0755
crons
--
11 Jun 2026 5.00 AM
root / root
0755
integrations
--
11 Jun 2026 5.00 AM
root / root
0755
profiler
--
11 Jun 2026 5.00 AM
root / root
0755
__init__.py
1.462 KB
11 Jun 2026 5.00 AM
root / root
0644
_batcher.py
5.702 KB
11 Jun 2026 5.00 AM
root / root
0644
_compat.py
3 KB
11 Jun 2026 5.00 AM
root / root
0644
_init_implementation.py
2.432 KB
11 Jun 2026 5.00 AM
root / root
0644
_log_batcher.py
1.876 KB
11 Jun 2026 5.00 AM
root / root
0644
_lru_cache.py
1.14 KB
11 Jun 2026 5.00 AM
root / root
0644
_metrics_batcher.py
1.208 KB
11 Jun 2026 5.00 AM
root / root
0644
_queue.py
10.979 KB
11 Jun 2026 5.00 AM
root / root
0644
_span_batcher.py
8.122 KB
11 Jun 2026 5.00 AM
root / root
0644
_types.py
13.161 KB
11 Jun 2026 5.00 AM
root / root
0644
_werkzeug.py
3.852 KB
11 Jun 2026 5.00 AM
root / root
0644
api.py
15.588 KB
11 Jun 2026 5.00 AM
root / root
0644
attachments.py
2.951 KB
11 Jun 2026 5.00 AM
root / root
0644
client.py
49.95 KB
11 Jun 2026 5.00 AM
root / root
0644
consts.py
61.951 KB
11 Jun 2026 5.00 AM
root / root
0644
debug.py
0.937 KB
11 Jun 2026 5.00 AM
root / root
0644
envelope.py
9.369 KB
11 Jun 2026 5.00 AM
root / root
0644
feature_flags.py
2.503 KB
11 Jun 2026 5.00 AM
root / root
0644
hub.py
24.542 KB
11 Jun 2026 5.00 AM
root / root
0644
logger.py
2.604 KB
11 Jun 2026 5.00 AM
root / root
0644
metrics.py
1.418 KB
11 Jun 2026 5.00 AM
root / root
0644
monitor.py
4.469 KB
11 Jun 2026 5.00 AM
root / root
0644
py.typed
0 KB
11 Jun 2026 5.00 AM
root / root
0644
scope.py
74.089 KB
11 Jun 2026 5.00 AM
root / root
0644
scrubber.py
5.991 KB
11 Jun 2026 5.00 AM
root / root
0644
serializer.py
12.818 KB
11 Jun 2026 5.00 AM
root / root
0644
session.py
5.085 KB
11 Jun 2026 5.00 AM
root / root
0644
sessions.py
8.593 KB
11 Jun 2026 5.00 AM
root / root
0644
spotlight.py
11.85 KB
11 Jun 2026 5.00 AM
root / root
0644
traces.py
25.079 KB
11 Jun 2026 5.00 AM
root / root
0644
tracing.py
50.335 KB
11 Jun 2026 5.00 AM
root / root
0644
tracing_utils.py
54.361 KB
11 Jun 2026 5.00 AM
root / root
0644
transport.py
44.414 KB
11 Jun 2026 5.00 AM
root / root
0644
types.py
1.239 KB
11 Jun 2026 5.00 AM
root / root
0644
utils.py
65.96 KB
11 Jun 2026 5.00 AM
root / root
0644
worker.py
10.905 KB
11 Jun 2026 5.00 AM
root / root
0644

✘✘ GRAYBYTE WORDPRESS FILE MANAGER @ 2026 CONTACT ME ✘✘
Static GIF Static GIF