✘✘ GRAYBYTE WORDPRESS FILE MANAGER ✘✘

​🇳​​🇦​​🇲​​🇪♯➤ server366.web-hosting.com ​🇻​♯➤ 4.18.0-553.50.1.lve.el8.x86_64 #1 SMP 🇾​♯➤ 2025

𝗛𝗢𝗠𝗘 𝗜𝗗 ♯➤ 67.223.118.204 ♯➤ 𝗔𝗗𝗠𝗜𝗡 𝗜𝗗 216.73.216.170
𝗢𝗣𝗧𝗜𝗢𝗡𝗦 ♯ CRL ♯➤ 𝗢𝗞 ┃ WGT ♯➤ 𝗢𝗞 ┃ SDO ♯➤ 𝗢𝗙𝗙 ┃ PKEX ♯➤ 𝗢𝗙𝗙
𝗗𝗘𝗔𝗖𝗧𝗜𝗩𝗔𝗧𝗘𝗗 ♯➤ 𝗔𝗟𝗟 𝗪𝗢𝗥𝗞𝗜𝗡𝗚....

𝗛𝗢𝗠𝗘
𝗖𝗨𝗥𝗥𝗘𝗡𝗧 𝗙𝗜𝗟𝗘 : /opt/hc_python/lib/python3.12/site-packages/sentry_sdk/crons//decorator.py
from functools import wraps
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING

from sentry_sdk.crons import capture_checkin
from sentry_sdk.crons.consts import MonitorStatus
from sentry_sdk.utils import now

if TYPE_CHECKING:
    from collections.abc import Awaitable, Callable
    from types import TracebackType
    from typing import (
        Any,
        Optional,
        ParamSpec,
        Type,
        TypeVar,
        Union,
        cast,
        overload,
    )

    from sentry_sdk._types import MonitorConfig

    P = ParamSpec("P")
    R = TypeVar("R")


class monitor:  # noqa: N801
    """
    Decorator/context manager to capture checkin events for a monitor.

    Usage (as decorator):
    ```
    import sentry_sdk

    app = Celery()

    @app.task
    @sentry_sdk.monitor(monitor_slug='my-fancy-slug')
    def test(arg):
        print(arg)
    ```

    This does not have to be used with Celery, but if you do use it with celery,
    put the `@sentry_sdk.monitor` decorator below Celery's `@app.task` decorator.

    Usage (as context manager):
    ```
    import sentry_sdk

    def test(arg):
        with sentry_sdk.monitor(monitor_slug='my-fancy-slug'):
            print(arg)
    ```
    """

    def __init__(
        self,
        monitor_slug: "Optional[str]" = None,
        monitor_config: "Optional[MonitorConfig]" = None,
    ) -> None:
        self.monitor_slug = monitor_slug
        self.monitor_config = monitor_config

    def __enter__(self) -> None:
        self.start_timestamp = now()
        self.check_in_id = capture_checkin(
            monitor_slug=self.monitor_slug,
            status=MonitorStatus.IN_PROGRESS,
            monitor_config=self.monitor_config,
        )

    def __exit__(
        self,
        exc_type: "Optional[Type[BaseException]]",
        exc_value: "Optional[BaseException]",
        traceback: "Optional[TracebackType]",
    ) -> None:
        duration_s = now() - self.start_timestamp

        if exc_type is None and exc_value is None and traceback is None:
            status = MonitorStatus.OK
        else:
            status = MonitorStatus.ERROR

        capture_checkin(
            monitor_slug=self.monitor_slug,
            check_in_id=self.check_in_id,
            status=status,
            duration=duration_s,
            monitor_config=self.monitor_config,
        )

    if TYPE_CHECKING:

        @overload
        def __call__(
            self, fn: "Callable[P, Awaitable[Any]]"
        ) -> "Callable[P, Awaitable[Any]]":
            # Unfortunately, mypy does not give us any reliable way to type check the
            # return value of an Awaitable (i.e. async function) for this overload,
            # since calling iscouroutinefunction narrows the type to Callable[P, Awaitable[Any]].
            ...

        @overload
        def __call__(self, fn: "Callable[P, R]") -> "Callable[P, R]": ...

    def __call__(
        self,
        fn: "Union[Callable[P, R], Callable[P, Awaitable[Any]]]",
    ) -> "Union[Callable[P, R], Callable[P, Awaitable[Any]]]":
        if iscoroutinefunction(fn):
            return self._async_wrapper(fn)

        else:
            if TYPE_CHECKING:
                fn = cast("Callable[P, R]", fn)
            return self._sync_wrapper(fn)

    def _async_wrapper(
        self, fn: "Callable[P, Awaitable[Any]]"
    ) -> "Callable[P, Awaitable[Any]]":
        @wraps(fn)
        async def inner(*args: "P.args", **kwargs: "P.kwargs") -> "R":
            with self:
                return await fn(*args, **kwargs)

        return inner

    def _sync_wrapper(self, fn: "Callable[P, R]") -> "Callable[P, R]":
        @wraps(fn)
        def inner(*args: "P.args", **kwargs: "P.kwargs") -> "R":
            with self:
                return fn(*args, **kwargs)

        return inner


Current_dir [ 𝗡𝗢𝗧 𝗪𝗥𝗜𝗧𝗘𝗔𝗕𝗟𝗘 ] Document_root [ 𝗪𝗥𝗜𝗧𝗘𝗔𝗕𝗟𝗘 ]


[ Back ]
𝗡𝗔𝗠𝗘
𝗦𝗜𝗭𝗘
𝗟𝗔𝗦𝗧 𝗧𝗢𝗨𝗖𝗛
𝗨𝗦𝗘𝗥
𝗦𝗧𝗔𝗧𝗨𝗦
𝗙𝗨𝗡𝗖𝗧𝗜𝗢𝗡𝗦
..
--
11 Jun 2026 5.00 AM
root / root
0755
__pycache__
--
11 Jun 2026 5.00 AM
root / root
0755
__init__.py
0.215 KB
11 Jun 2026 5.00 AM
root / root
0644
api.py
1.635 KB
11 Jun 2026 5.00 AM
root / root
0644
consts.py
0.085 KB
11 Jun 2026 5.00 AM
root / root
0644
decorator.py
3.744 KB
11 Jun 2026 5.00 AM
root / root
0644

✘✘ GRAYBYTE WORDPRESS FILE MANAGER @ 2026 CONTACT ME ✘✘
Static GIF Static GIF