"""Support module for picodi package.May be useful for writing your own scopes or other customizations."""from__future__importannotationsimportasyncioimportinspectfromcontextlibimportAsyncExitStackfromcontextlibimportExitStackasSyncExitStackfromtypingimportTYPE_CHECKING,Any,AsyncContextManager,ContextManagerifTYPE_CHECKING:fromcollections.abcimportAwaitable,GeneratorfromtypesimportTracebackType
[docs]classNullAwaitable:"""Dummy awaitable that does nothing."""def__await__(self)->Generator[None]:yieldreturnNone
[docs]classExitStack:""" A context manager that combines multiple context managers - both sync and async. Under the hood, it uses :class:`python:contextlib.ExitStack` for sync context managers and :class:`python:contextlib.AsyncExitStack` for async """def__init__(self)->None:self._sync_stack=SyncExitStack()self._async_stack=AsyncExitStack()def__exit__(self,exc_type:type[BaseException]|None,exc:BaseException|None,traceback:TracebackType|None,)->bool|None:returnself._sync_stack.__exit__(exc_type,exc,traceback)asyncdef__aexit__(self,exc_type:type[BaseException]|None,exc:BaseException|None,traceback:TracebackType|None,)->bool|None:res_sync=self._sync_stack.__exit__(exc_type,exc,traceback)res_async=awaitself._async_stack.__aexit__(exc_type,exc,traceback)returnres_syncandres_async
[docs]defenter_context(self,cm:AsyncContextManager|ContextManager)->Any:""" Enters a new context manager and adds its ``__[a]exit__()`` method to the callback stack. The return value is the result of the context manager’s own ``__[a]enter__()`` method. :param cm: context manager to enter. :return: Result of the context manager's ``__[a]enter__`` method. """ifisinstance(cm,ContextManager):returnself._sync_stack.enter_context(cm)elifisinstance(cm,AsyncContextManager):returnself._async_stack.enter_async_context(cm)raiseTypeError(f"Unsupported context manager: {cm}")# pragma: no cover
[docs]defclose(self,exc:BaseException|None=None)->Awaitable:""" Immediately unwinds the callback stack, invoking callbacks in the reverse order of registration. :param exc: exception to be passed to the ``__[a]exit__`` method. """exc_type=type(exc)ifexcisnotNoneelseNoneself.__exit__(exc_type,exc,exc.__traceback__ifexcelseNone)if(is_async_environment()# This is a workaround for the issue for RuntimeWarning# "coroutine was never awaited". If we in sync function in async context -# don't need to await for async exit if there are no async context managers.andself._async_stack._exit_callbacks# type: ignore # noqa: SLF001):returnself.__aexit__(exc_type,exc,None)returnNullAwaitable()
[docs]defis_async_environment()->bool:""" Check if we are in async environment. :return: True if we are in async environment, False otherwise. """try:asyncio.get_running_loop()exceptRuntimeError:returnFalsereturnTrue
[docs]defis_async_function(fn:Any)->bool:""" Check if the function is async. :param fn: function to check. :return: True if the function is async, False otherwise. """ifasyncio.iscoroutinefunction(fn)orinspect.isasyncgenfunction(fn):returnTrueifwrpd:=getattr(fn,"__wrapped__",None):returnis_async_function(wrpd)returnFalse
[docs]defcall_cm_sync(cm:ContextManager)->Any:""" Call a sync context manager and return its result. :param cm: context manager to call. :return: Result of the context manager's ``__enter__`` method. """withcmasres:returnres
[docs]asyncdefcall_cm_async(cm:AsyncContextManager)->Any:""" Call an async context manager and return its result. :param cm: context manager to call. :return: Result of the context manager's ``__aenter__`` method. """asyncwithcmasres:returnres