Source code for reactpy.core.layout

from __future__ import annotations

import abc
from asyncio import (
    FIRST_COMPLETED,
    CancelledError,
    Queue,
    Task,
    create_task,
    get_running_loop,
    wait,
)
from collections import Counter
from collections.abc import Sequence
from contextlib import AsyncExitStack
from logging import getLogger
from typing import (
    Any,
    Callable,
    Generic,
    NamedTuple,
    NewType,
    TypeVar,
    cast,
)
from uuid import uuid4
from weakref import ref as weakref

from anyio import Semaphore
from typing_extensions import TypeAlias

from reactpy.config import (
    REACTPY_ASYNC_RENDERING,
    REACTPY_CHECK_VDOM_SPEC,
    REACTPY_DEBUG_MODE,
)
from reactpy.core._life_cycle_hook import LifeCycleHook
from reactpy.core.types import (
    ComponentType,
    EventHandlerDict,
    Key,
    LayoutEventMessage,
    LayoutUpdateMessage,
    VdomChild,
    VdomDict,
    VdomJson,
)
from reactpy.core.vdom import validate_vdom_json
from reactpy.utils import Ref

logger = getLogger(__name__)


[docs]class Layout: """Responsible for "rendering" components. That is, turning them into VDOM.""" __slots__: tuple[str, ...] = ( "root", "_event_handlers", "_rendering_queue", "_render_tasks", "_render_tasks_ready", "_root_life_cycle_state_id", "_model_states_by_life_cycle_state_id", ) if not hasattr(abc.ABC, "__weakref__"): # nocov __slots__ += ("__weakref__",) def __init__(self, root: ComponentType) -> None: super().__init__() if not isinstance(root, ComponentType): msg = f"Expected a ComponentType, not {type(root)!r}." raise TypeError(msg) self.root = root async def __aenter__(self) -> Layout: # create attributes here to avoid access before entering context manager self._event_handlers: EventHandlerDict = {} self._render_tasks: set[Task[LayoutUpdateMessage]] = set() self._render_tasks_ready: Semaphore = Semaphore(0) self._rendering_queue: _ThreadSafeQueue[_LifeCycleStateId] = _ThreadSafeQueue() root_model_state = _new_root_model_state(self.root, self._schedule_render_task) self._root_life_cycle_state_id = root_id = root_model_state.life_cycle_state.id self._model_states_by_life_cycle_state_id = {root_id: root_model_state} self._schedule_render_task(root_id) return self async def __aexit__(self, *exc: Any) -> None: root_csid = self._root_life_cycle_state_id root_model_state = self._model_states_by_life_cycle_state_id[root_csid] for t in self._render_tasks: t.cancel() try: await t except CancelledError: pass await self._unmount_model_states([root_model_state]) # delete attributes here to avoid access after exiting context manager del self._event_handlers del self._rendering_queue del self._root_life_cycle_state_id del self._model_states_by_life_cycle_state_id
[docs] async def deliver(self, event: LayoutEventMessage) -> None: """Dispatch an event to the targeted handler""" # It is possible for an element in the frontend to produce an event # associated with a backend model that has been deleted. We only handle # events if the element and the handler exist in the backend. Otherwise # we just ignore the event. handler = self._event_handlers.get(event["target"]) if handler is not None: try: await handler.function(event["data"]) except Exception: logger.exception(f"Failed to execute event handler {handler}") else: logger.info( f"Ignored event - handler {event['target']!r} " "does not exist or its component unmounted" )
async def render(self) -> LayoutUpdateMessage: if REACTPY_ASYNC_RENDERING.current: return await self._parallel_render() else: # nocov return await self._serial_render() async def _serial_render(self) -> LayoutUpdateMessage: # nocov """Await the next available render. This will block until a component is updated""" while True: model_state_id = await self._rendering_queue.get() try: model_state = self._model_states_by_life_cycle_state_id[model_state_id] except KeyError: logger.debug( "Did not render component with model state ID " f"{model_state_id!r} - component already unmounted" ) else: return await self._create_layout_update(model_state) async def _parallel_render(self) -> LayoutUpdateMessage: """Await to fetch the first completed render within our asyncio task group. We use the `asyncio.tasks.wait` API in order to return the first completed task. """ await self._render_tasks_ready.acquire() done, _ = await wait(self._render_tasks, return_when=FIRST_COMPLETED) update_task: Task[LayoutUpdateMessage] = done.pop() self._render_tasks.remove(update_task) return update_task.result() async def _create_layout_update( self, old_state: _ModelState ) -> LayoutUpdateMessage: new_state = _copy_component_model_state(old_state) component = new_state.life_cycle_state.component async with AsyncExitStack() as exit_stack: await self._render_component(exit_stack, old_state, new_state, component) if REACTPY_CHECK_VDOM_SPEC.current: validate_vdom_json(new_state.model.current) return { "type": "layout-update", "path": new_state.patch_path, "model": new_state.model.current, } async def _render_component( self, exit_stack: AsyncExitStack, old_state: _ModelState | None, new_state: _ModelState, component: ComponentType, ) -> None: life_cycle_state = new_state.life_cycle_state life_cycle_hook = life_cycle_state.hook self._model_states_by_life_cycle_state_id[life_cycle_state.id] = new_state await life_cycle_hook.affect_component_will_render(component) exit_stack.push_async_callback(life_cycle_hook.affect_layout_did_render) try: raw_model = component.render() # wrap the model in a fragment (i.e. tagName="") to ensure components have # a separate node in the model state tree. This could be removed if this # components are given a node in the tree some other way wrapper_model: VdomDict = {"tagName": "", "children": [raw_model]} await self._render_model(exit_stack, old_state, new_state, wrapper_model) except Exception as error: logger.exception(f"Failed to render {component}") new_state.model.current = { "tagName": "", "error": ( f"{type(error).__name__}: {error}" if REACTPY_DEBUG_MODE.current else "" ), } finally: await life_cycle_hook.affect_component_did_render() try: parent = new_state.parent except AttributeError: pass # only happens for root component else: key, index = new_state.key, new_state.index parent.children_by_key[key] = new_state # need to add this model to parent's children without mutating parent model old_parent_model = parent.model.current old_parent_children = old_parent_model["children"] parent.model.current = { **old_parent_model, "children": [ *old_parent_children[:index], new_state.model.current, *old_parent_children[index + 1 :], ], } async def _render_model( self, exit_stack: AsyncExitStack, old_state: _ModelState | None, new_state: _ModelState, raw_model: Any, ) -> None: try: new_state.model.current = {"tagName": raw_model["tagName"]} except Exception as e: # nocov msg = f"Expected a VDOM element dict, not {raw_model}" raise ValueError(msg) from e if "key" in raw_model: new_state.key = new_state.model.current["key"] = raw_model["key"] if "importSource" in raw_model: new_state.model.current["importSource"] = raw_model["importSource"] self._render_model_attributes(old_state, new_state, raw_model) await self._render_model_children( exit_stack, old_state, new_state, raw_model.get("children", []) ) def _render_model_attributes( self, old_state: _ModelState | None, new_state: _ModelState, raw_model: dict[str, Any], ) -> None: # extract event handlers from 'eventHandlers' and 'attributes' handlers_by_event: EventHandlerDict = raw_model.get("eventHandlers", {}) if "attributes" in raw_model: attrs = raw_model["attributes"].copy() new_state.model.current["attributes"] = attrs if old_state is None: self._render_model_event_handlers_without_old_state( new_state, handlers_by_event ) return None for old_event in set(old_state.targets_by_event).difference(handlers_by_event): old_target = old_state.targets_by_event[old_event] del self._event_handlers[old_target] if not handlers_by_event: return None model_event_handlers = new_state.model.current["eventHandlers"] = {} for event, handler in handlers_by_event.items(): if event in old_state.targets_by_event: target = old_state.targets_by_event[event] else: target = uuid4().hex if handler.target is None else handler.target new_state.targets_by_event[event] = target self._event_handlers[target] = handler model_event_handlers[event] = { "target": target, "preventDefault": handler.prevent_default, "stopPropagation": handler.stop_propagation, } return None def _render_model_event_handlers_without_old_state( self, new_state: _ModelState, handlers_by_event: EventHandlerDict, ) -> None: if not handlers_by_event: return None model_event_handlers = new_state.model.current["eventHandlers"] = {} for event, handler in handlers_by_event.items(): target = uuid4().hex if handler.target is None else handler.target new_state.targets_by_event[event] = target self._event_handlers[target] = handler model_event_handlers[event] = { "target": target, "preventDefault": handler.prevent_default, "stopPropagation": handler.stop_propagation, } return None async def _render_model_children( self, exit_stack: AsyncExitStack, old_state: _ModelState | None, new_state: _ModelState, raw_children: Any, ) -> None: if not isinstance(raw_children, (list, tuple)): raw_children = [raw_children] if old_state is None: if raw_children: await self._render_model_children_without_old_state( exit_stack, new_state, raw_children ) return None elif not raw_children: await self._unmount_model_states(list(old_state.children_by_key.values())) return None children_info = _get_children_info(raw_children) new_keys = {k for _, _, k in children_info} if len(new_keys) != len(children_info): key_counter = Counter(item[2] for item in children_info) duplicate_keys = [key for key, count in key_counter.items() if count > 1] msg = f"Duplicate keys {duplicate_keys} at {new_state.patch_path or '/'!r}" raise ValueError(msg) old_keys = set(old_state.children_by_key).difference(new_keys) if old_keys: await self._unmount_model_states( [old_state.children_by_key[key] for key in old_keys] ) new_state.model.current["children"] = [] for index, (child, child_type, key) in enumerate(children_info): old_child_state = old_state.children_by_key.get(key) if child_type is _DICT_TYPE: old_child_state = old_state.children_by_key.get(key) if old_child_state is None: new_child_state = _make_element_model_state( new_state, index, key, ) elif old_child_state.is_component_state: await self._unmount_model_states([old_child_state]) new_child_state = _make_element_model_state( new_state, index, key, ) old_child_state = None else: new_child_state = _update_element_model_state( old_child_state, new_state, index, ) await self._render_model( exit_stack, old_child_state, new_child_state, child ) new_state.append_child(new_child_state.model.current) new_state.children_by_key[key] = new_child_state elif child_type is _COMPONENT_TYPE: child = cast(ComponentType, child) old_child_state = old_state.children_by_key.get(key) if old_child_state is None: new_child_state = _make_component_model_state( new_state, index, key, child, self._schedule_render_task, ) elif old_child_state.is_component_state and ( old_child_state.life_cycle_state.component.type != child.type ): await self._unmount_model_states([old_child_state]) old_child_state = None new_child_state = _make_component_model_state( new_state, index, key, child, self._schedule_render_task, ) else: new_child_state = _update_component_model_state( old_child_state, new_state, index, child, self._schedule_render_task, ) await self._render_component( exit_stack, old_child_state, new_child_state, child ) else: old_child_state = old_state.children_by_key.get(key) if old_child_state is not None: await self._unmount_model_states([old_child_state]) new_state.append_child(child) async def _render_model_children_without_old_state( self, exit_stack: AsyncExitStack, new_state: _ModelState, raw_children: list[Any], ) -> None: children_info = _get_children_info(raw_children) new_keys = {k for _, _, k in children_info} if len(new_keys) != len(children_info): key_counter = Counter(k for _, _, k in children_info) duplicate_keys = [key for key, count in key_counter.items() if count > 1] msg = f"Duplicate keys {duplicate_keys} at {new_state.patch_path or '/'!r}" raise ValueError(msg) new_state.model.current["children"] = [] for index, (child, child_type, key) in enumerate(children_info): if child_type is _DICT_TYPE: child_state = _make_element_model_state(new_state, index, key) await self._render_model(exit_stack, None, child_state, child) new_state.append_child(child_state.model.current) new_state.children_by_key[key] = child_state elif child_type is _COMPONENT_TYPE: child_state = _make_component_model_state( new_state, index, key, child, self._schedule_render_task ) await self._render_component(exit_stack, None, child_state, child) else: new_state.append_child(child) async def _unmount_model_states(self, old_states: list[_ModelState]) -> None: to_unmount = old_states[::-1] # unmount in reversed order of rendering while to_unmount: model_state = to_unmount.pop() for target in model_state.targets_by_event.values(): del self._event_handlers[target] if model_state.is_component_state: life_cycle_state = model_state.life_cycle_state del self._model_states_by_life_cycle_state_id[life_cycle_state.id] await life_cycle_state.hook.affect_component_will_unmount() to_unmount.extend(model_state.children_by_key.values()) def _schedule_render_task(self, lcs_id: _LifeCycleStateId) -> None: if not REACTPY_ASYNC_RENDERING.current: self._rendering_queue.put(lcs_id) return None try: model_state = self._model_states_by_life_cycle_state_id[lcs_id] except KeyError: logger.debug( "Did not render component with model state ID " f"{lcs_id!r} - component already unmounted" ) else: self._render_tasks.add(create_task(self._create_layout_update(model_state))) self._render_tasks_ready.release() def __repr__(self) -> str: return f"{type(self).__name__}({self.root})"
def _new_root_model_state( component: ComponentType, schedule_render: Callable[[_LifeCycleStateId], None] ) -> _ModelState: return _ModelState( parent=None, index=-1, key=None, model=Ref(), patch_path="", children_by_key={}, targets_by_event={}, life_cycle_state=_make_life_cycle_state(component, schedule_render), ) def _make_component_model_state( parent: _ModelState, index: int, key: Any, component: ComponentType, schedule_render: Callable[[_LifeCycleStateId], None], ) -> _ModelState: return _ModelState( parent=parent, index=index, key=key, model=Ref(), patch_path=f"{parent.patch_path}/children/{index}", children_by_key={}, targets_by_event={}, life_cycle_state=_make_life_cycle_state(component, schedule_render), ) def _copy_component_model_state(old_model_state: _ModelState) -> _ModelState: # use try/except here because not having a parent is rare (only the root state) try: parent: _ModelState | None = old_model_state.parent except AttributeError: parent = None return _ModelState( parent=parent, index=old_model_state.index, key=old_model_state.key, model=Ref(), # does not copy the model patch_path=old_model_state.patch_path, children_by_key={}, targets_by_event={}, life_cycle_state=old_model_state.life_cycle_state, ) def _update_component_model_state( old_model_state: _ModelState, new_parent: _ModelState, new_index: int, new_component: ComponentType, schedule_render: Callable[[_LifeCycleStateId], None], ) -> _ModelState: return _ModelState( parent=new_parent, index=new_index, key=old_model_state.key, model=Ref(), # does not copy the model patch_path=f"{new_parent.patch_path}/children/{new_index}", children_by_key={}, targets_by_event={}, life_cycle_state=( _update_life_cycle_state(old_model_state.life_cycle_state, new_component) if old_model_state.is_component_state else _make_life_cycle_state(new_component, schedule_render) ), ) def _make_element_model_state( parent: _ModelState, index: int, key: Any, ) -> _ModelState: return _ModelState( parent=parent, index=index, key=key, model=Ref(), patch_path=f"{parent.patch_path}/children/{index}", children_by_key={}, targets_by_event={}, ) def _update_element_model_state( old_model_state: _ModelState, new_parent: _ModelState, new_index: int, ) -> _ModelState: return _ModelState( parent=new_parent, index=new_index, key=old_model_state.key, model=Ref(), # does not copy the model patch_path=old_model_state.patch_path, children_by_key={}, targets_by_event={}, ) class _ModelState: """State that is bound to a particular element within the layout""" __slots__ = ( "__weakref__", "_parent_ref", "_render_semaphore", "children_by_key", "index", "key", "life_cycle_state", "model", "patch_path", "targets_by_event", ) def __init__( self, parent: _ModelState | None, index: int, key: Any, model: Ref[VdomJson], patch_path: str, children_by_key: dict[Key, _ModelState], targets_by_event: dict[str, str], life_cycle_state: _LifeCycleState | None = None, ): self.index = index """The index of the element amongst its siblings""" self.key = key """A key that uniquely identifies the element amongst its siblings""" self.model = model """The actual model of the element""" self.patch_path = patch_path """A "/" delimited path to the element within the greater layout""" self.children_by_key = children_by_key """Child model states indexed by their unique keys""" self.targets_by_event = targets_by_event """The element's event handler target strings indexed by their event name""" # === Conditionally Available Attributes === # It's easier to conditionally assign than to force a null check on every usage if parent is not None: self._parent_ref = weakref(parent) """The parent model state""" if life_cycle_state is not None: self.life_cycle_state = life_cycle_state """The state for the element's component (if it has one)""" @property def is_component_state(self) -> bool: return hasattr(self, "life_cycle_state") @property def parent(self) -> _ModelState: parent = self._parent_ref() if parent is None: raise RuntimeError("detached model state") # nocov return parent def append_child(self, child: Any) -> None: self.model.current["children"].append(child) def __repr__(self) -> str: # nocov return f"ModelState({ {s: getattr(self, s, None) for s in self.__slots__} })" def _make_life_cycle_state( component: ComponentType, schedule_render: Callable[[_LifeCycleStateId], None], ) -> _LifeCycleState: life_cycle_state_id = _LifeCycleStateId(uuid4().hex) return _LifeCycleState( life_cycle_state_id, LifeCycleHook(lambda: schedule_render(life_cycle_state_id)), component, ) def _update_life_cycle_state( old_life_cycle_state: _LifeCycleState, new_component: ComponentType, ) -> _LifeCycleState: return _LifeCycleState( old_life_cycle_state.id, # the hook is preserved across renders because it holds the state old_life_cycle_state.hook, new_component, ) _LifeCycleStateId = NewType("_LifeCycleStateId", str) class _LifeCycleState(NamedTuple): """Component state for :class:`_ModelState`""" id: _LifeCycleStateId """A unique identifier used in the :class:`~reactpy.core.hooks.LifeCycleHook` callback""" hook: LifeCycleHook """The life cycle hook""" component: ComponentType """The current component instance""" _Type = TypeVar("_Type") class _ThreadSafeQueue(Generic[_Type]): def __init__(self) -> None: self._loop = get_running_loop() self._queue: Queue[_Type] = Queue() self._pending: set[_Type] = set() def put(self, value: _Type) -> None: if value not in self._pending: self._pending.add(value) self._loop.call_soon_threadsafe(self._queue.put_nowait, value) async def get(self) -> _Type: value = await self._queue.get() self._pending.remove(value) return value def _get_children_info(children: list[VdomChild]) -> Sequence[_ChildInfo]: infos: list[_ChildInfo] = [] for index, child in enumerate(children): if child is None: continue elif isinstance(child, dict): child_type = _DICT_TYPE key = child.get("key") elif isinstance(child, ComponentType): child_type = _COMPONENT_TYPE key = child.key else: child = f"{child}" child_type = _STRING_TYPE key = None if key is None: key = index infos.append((child, child_type, key)) return infos _ChildInfo: TypeAlias = tuple[Any, "_ElementType", Key] # used in _process_child_type_and_key _ElementType = NewType("_ElementType", int) _DICT_TYPE = _ElementType(1) _COMPONENT_TYPE = _ElementType(2) _STRING_TYPE = _ElementType(3)