diff --git a/src/snakia/core/rx/__init__.py b/src/snakia/core/rx/__init__.py index d411a45..b4bf207 100644 --- a/src/snakia/core/rx/__init__.py +++ b/src/snakia/core/rx/__init__.py @@ -2,7 +2,7 @@ from .async_bindable import AsyncBindable from .base_bindable import BaseBindable, BindableSubscriber, ValueChanged from .bindable import Bindable from .chains import chain -from .combines import combine +from .combines import async_combine, combine from .concats import concat from .conds import cond from .consts import const @@ -16,6 +16,8 @@ __all__ = [ "BaseBindable", "BindableSubscriber", "ValueChanged", + "async_combine", + "async_merge", "chain", "combine", "concat", @@ -24,5 +26,4 @@ __all__ = [ "filter", "map", "merge", - "async_merge", ] diff --git a/src/snakia/core/rx/combines.py b/src/snakia/core/rx/combines.py index 7b519bd..345d0a7 100644 --- a/src/snakia/core/rx/combines.py +++ b/src/snakia/core/rx/combines.py @@ -1,12 +1,11 @@ -import operator -from typing import Any, Callable, TypeVar, overload +from typing import Any, Awaitable, Callable, TypeVar, overload -from snakia.utils import to_async +from snakia.types import Unset +from snakia.utils import caller, to_async from .async_bindable import AsyncBindable from .base_bindable import ValueChanged from .bindable import Bindable -from .concats import concat A = TypeVar("A") B = TypeVar("B") @@ -22,6 +21,7 @@ def combine( /, *, combiner: Callable[[A], R], + default_value: R | Unset = Unset(), ) -> Bindable[R]: ... @@ -32,6 +32,7 @@ def combine( /, *, combiner: Callable[[A, B], R], + default_value: R | Unset = Unset(), ) -> Bindable[R]: ... @@ -43,6 +44,7 @@ def combine( /, *, combiner: Callable[[A, B, C], R], + default_value: R | Unset = Unset(), ) -> Bindable[R]: ... @@ -55,6 +57,7 @@ def combine( /, *, combiner: Callable[[A, B, C, D], R], + default_value: R | Unset = Unset(), ) -> Bindable[R]: ... @@ -67,6 +70,7 @@ def combine( /, *, combiner: Callable[[A, B, C, D], R], + default_value: R | Unset = Unset(), ) -> Bindable[R]: ... @@ -74,28 +78,116 @@ def combine( def combine( *sources: Bindable[Any] | AsyncBindable[Any], combiner: Callable[..., R], + default_value: R | Unset = Unset(), ) -> Bindable[R]: ... def combine( *sources: Bindable[Any] | AsyncBindable[Any], combiner: Callable[..., R], + default_value: R | Unset = Unset(), ) -> Bindable[R]: combined = Bindable[R]() - values = [*map(lambda s: s.value, sources)] + Unset.map( + default_value, + combined.set_silent, + caller(combined.set_silent, sources[0].default_value), + ) - for i, source in enumerate(sources): - - def make_subscriber( - index: int, - ) -> Callable[[ValueChanged[Any]], None]: - return concat( - lambda v: operator.setitem(values, index, v.new_value), - lambda _: combiner(*values), - ) + def subscriber(_: ValueChanged[Any]) -> None: + combined.set(combiner(*[*map(lambda s: s.value, sources)])) + for source in sources: if isinstance(source, Bindable): - source.subscribe(make_subscriber(i)) + source.subscribe(subscriber) else: - source.subscribe(to_async(make_subscriber(i))) + source.subscribe(to_async(subscriber)) + return combined + + +@overload +def async_combine( + source1: AsyncBindable[A], + /, + *, + combiner: Callable[[A], Awaitable[R]], + default_value: R | Unset = Unset(), +) -> AsyncBindable[R]: ... + + +@overload +def async_combine( + source1: AsyncBindable[A], + source2: AsyncBindable[B], + /, + *, + combiner: Callable[[A, B], Awaitable[R]], + default_value: R | Unset = Unset(), +) -> AsyncBindable[R]: ... + + +@overload +def async_combine( + source1: AsyncBindable[A], + source2: AsyncBindable[B], + source3: AsyncBindable[C], + /, + *, + combiner: Callable[[A, B, C], Awaitable[R]], + default_value: R | Unset = Unset(), +) -> AsyncBindable[R]: ... + + +@overload +def async_combine( + source1: AsyncBindable[A], + source2: AsyncBindable[B], + source3: AsyncBindable[C], + source4: AsyncBindable[D], + /, + *, + combiner: Callable[[A, B, C, D], Awaitable[R]], + default_value: R | Unset = Unset(), +) -> AsyncBindable[R]: ... + + +@overload +def async_combine( + source1: AsyncBindable[A], + source2: AsyncBindable[B], + source3: AsyncBindable[C], + source4: AsyncBindable[D], + /, + *, + combiner: Callable[[A, B, C, D], Awaitable[R]], + default_value: R | Unset = Unset(), +) -> AsyncBindable[R]: ... + + +@overload +def async_combine( + *sources: AsyncBindable[Any], + combiner: Callable[..., Awaitable[R]], + default_value: R | Unset = Unset(), +) -> AsyncBindable[R]: ... + + +def async_combine( + *sources: AsyncBindable[Any], + combiner: Callable[..., Awaitable[R]], + default_value: R | Unset = Unset(), +) -> AsyncBindable[R]: + combined = AsyncBindable[R]() + Unset.map( + default_value, + combined.set_silent, + caller(combined.set_silent, sources[0].default_value), + ) + + async def subscriber(_: ValueChanged[Any]) -> None: + result = await combiner(*[*map(lambda s: s.value, sources)]) + await combined.set(result) + + for source in sources: + source.subscribe(subscriber) return combined