Source code for reactpy.core.serve

from __future__ import annotations

from import Awaitable
from logging import getLogger
from typing import Callable

from anyio import create_task_group
from import TaskGroup

from reactpy.config import REACTPY_DEBUG_MODE
from reactpy.core.types import LayoutEventMessage, LayoutType, LayoutUpdateMessage

logger = getLogger(__name__)

SendCoroutine = Callable[[LayoutUpdateMessage], Awaitable[None]]
"""Send model patches given by a dispatcher"""

RecvCoroutine = Callable[[], Awaitable[LayoutEventMessage]]
"""Called by a dispatcher to return a :class:`reactpy.core.layout.LayoutEventMessage`

The event will then trigger an :class:`reactpy.core.proto.EventHandlerType` in a layout.

[docs]class Stop(BaseException): """Stop serving changes and events Raising this error will tell dispatchers to gracefully exit. Typically this is called by code running inside a layout to tell it to stop rendering. """
[docs]async def serve_layout( layout: LayoutType[LayoutUpdateMessage, LayoutEventMessage], send: SendCoroutine, recv: RecvCoroutine, ) -> None: """Run a dispatch loop for a single view instance""" async with layout: try: async with create_task_group() as task_group: task_group.start_soon(_single_outgoing_loop, layout, send) task_group.start_soon(_single_incoming_loop, task_group, layout, recv) except Stop:"Stopped serving {layout}")
async def _single_outgoing_loop( layout: LayoutType[LayoutUpdateMessage, LayoutEventMessage], send: SendCoroutine ) -> None: while True: update = await layout.render() try: await send(update) except Exception: # nocov if not REACTPY_DEBUG_MODE.current: msg = ( "Failed to send update. More info may be available " "if you enabling debug mode by setting " "`reactpy.config.REACTPY_DEBUG_MODE.current = True`." ) logger.error(msg) raise async def _single_incoming_loop( task_group: TaskGroup, layout: LayoutType[LayoutUpdateMessage, LayoutEventMessage], recv: RecvCoroutine, ) -> None: while True: # We need to fire and forget here so that we avoid waiting on the completion # of this event handler before receiving and running the next one. task_group.start_soon(layout.deliver, await recv())