Source code for reactpy.core.serve

from __future__ import annotations

from collections.abc import Awaitable
from logging import getLogger
from typing import Callable
from warnings import warn

from anyio import create_task_group
from anyio.abc import TaskGroup

from reactpy.config import REACTPY_DEBUG_MODE
from reactpy.core.types import LayoutEventMessage, LayoutType, LayoutUpdateMessage

logger = getLogger(__name__)


SendCoroutine = Callable[[LayoutUpdateMessage], Awaitable[None]]
"""Send model patches given by a dispatcher"""

RecvCoroutine = Callable[[], Awaitable[LayoutEventMessage]]
"""Called by a dispatcher to return a :class:`reactpy.core.layout.LayoutEventMessage`

The event will then trigger an :class:`reactpy.core.proto.EventHandlerType` in a layout.
"""


[docs]class Stop(BaseException): """Deprecated Stop serving changes and events Raising this error will tell dispatchers to gracefully exit. Typically this is called by code running inside a layout to tell it to stop rendering. """
[docs]async def serve_layout( layout: LayoutType[LayoutUpdateMessage, LayoutEventMessage], send: SendCoroutine, recv: RecvCoroutine, ) -> None: """Run a dispatch loop for a single view instance""" async with layout: try: async with create_task_group() as task_group: task_group.start_soon(_single_outgoing_loop, layout, send) task_group.start_soon(_single_incoming_loop, task_group, layout, recv) except Stop: # nocov warn( "The Stop exception is deprecated and will be removed in a future version", UserWarning, stacklevel=1, ) logger.info(f"Stopped serving {layout}")
async def _single_outgoing_loop( layout: LayoutType[LayoutUpdateMessage, LayoutEventMessage], send: SendCoroutine ) -> None: while True: update = await layout.render() try: await send(update) except Exception: # nocov if not REACTPY_DEBUG_MODE.current: msg = ( "Failed to send update. More info may be available " "if you enabling debug mode by setting " "`reactpy.config.REACTPY_DEBUG_MODE.current = True`." ) logger.error(msg) raise async def _single_incoming_loop( task_group: TaskGroup, layout: LayoutType[LayoutUpdateMessage, LayoutEventMessage], recv: RecvCoroutine, ) -> None: while True: # We need to fire and forget here so that we avoid waiting on the completion # of this event handler before receiving and running the next one. task_group.start_soon(layout.deliver, await recv())