feat: add async_combine

This commit is contained in:
rus07tam 2025-11-23 10:48:20 +00:00
parent c7cd08b7e0
commit a49fe64be8
2 changed files with 111 additions and 18 deletions

View file

@ -2,7 +2,7 @@ from .async_bindable import AsyncBindable
from .base_bindable import BaseBindable, BindableSubscriber, ValueChanged from .base_bindable import BaseBindable, BindableSubscriber, ValueChanged
from .bindable import Bindable from .bindable import Bindable
from .chains import chain from .chains import chain
from .combines import combine from .combines import async_combine, combine
from .concats import concat from .concats import concat
from .conds import cond from .conds import cond
from .consts import const from .consts import const
@ -16,6 +16,8 @@ __all__ = [
"BaseBindable", "BaseBindable",
"BindableSubscriber", "BindableSubscriber",
"ValueChanged", "ValueChanged",
"async_combine",
"async_merge",
"chain", "chain",
"combine", "combine",
"concat", "concat",
@ -24,5 +26,4 @@ __all__ = [
"filter", "filter",
"map", "map",
"merge", "merge",
"async_merge",
] ]

View file

@ -1,12 +1,11 @@
import operator from typing import Any, Awaitable, Callable, TypeVar, overload
from typing import Any, Callable, TypeVar, overload
from snakia.utils import to_async from snakia.types import Unset
from snakia.utils import caller, to_async
from .async_bindable import AsyncBindable from .async_bindable import AsyncBindable
from .base_bindable import ValueChanged from .base_bindable import ValueChanged
from .bindable import Bindable from .bindable import Bindable
from .concats import concat
A = TypeVar("A") A = TypeVar("A")
B = TypeVar("B") B = TypeVar("B")
@ -22,6 +21,7 @@ def combine(
/, /,
*, *,
combiner: Callable[[A], R], combiner: Callable[[A], R],
default_value: R | Unset = Unset(),
) -> Bindable[R]: ... ) -> Bindable[R]: ...
@ -32,6 +32,7 @@ def combine(
/, /,
*, *,
combiner: Callable[[A, B], R], combiner: Callable[[A, B], R],
default_value: R | Unset = Unset(),
) -> Bindable[R]: ... ) -> Bindable[R]: ...
@ -43,6 +44,7 @@ def combine(
/, /,
*, *,
combiner: Callable[[A, B, C], R], combiner: Callable[[A, B, C], R],
default_value: R | Unset = Unset(),
) -> Bindable[R]: ... ) -> Bindable[R]: ...
@ -55,6 +57,7 @@ def combine(
/, /,
*, *,
combiner: Callable[[A, B, C, D], R], combiner: Callable[[A, B, C, D], R],
default_value: R | Unset = Unset(),
) -> Bindable[R]: ... ) -> Bindable[R]: ...
@ -67,6 +70,7 @@ def combine(
/, /,
*, *,
combiner: Callable[[A, B, C, D], R], combiner: Callable[[A, B, C, D], R],
default_value: R | Unset = Unset(),
) -> Bindable[R]: ... ) -> Bindable[R]: ...
@ -74,28 +78,116 @@ def combine(
def combine( def combine(
*sources: Bindable[Any] | AsyncBindable[Any], *sources: Bindable[Any] | AsyncBindable[Any],
combiner: Callable[..., R], combiner: Callable[..., R],
default_value: R | Unset = Unset(),
) -> Bindable[R]: ... ) -> Bindable[R]: ...
def combine( def combine(
*sources: Bindable[Any] | AsyncBindable[Any], *sources: Bindable[Any] | AsyncBindable[Any],
combiner: Callable[..., R], combiner: Callable[..., R],
default_value: R | Unset = Unset(),
) -> Bindable[R]: ) -> Bindable[R]:
combined = Bindable[R]() combined = Bindable[R]()
values = [*map(lambda s: s.value, sources)] Unset.map(
default_value,
for i, source in enumerate(sources): combined.set_silent,
caller(combined.set_silent, sources[0].default_value),
def make_subscriber(
index: int,
) -> Callable[[ValueChanged[Any]], None]:
return concat(
lambda v: operator.setitem(values, index, v.new_value),
lambda _: combiner(*values),
) )
def subscriber(_: ValueChanged[Any]) -> None:
combined.set(combiner(*[*map(lambda s: s.value, sources)]))
for source in sources:
if isinstance(source, Bindable): if isinstance(source, Bindable):
source.subscribe(make_subscriber(i)) source.subscribe(subscriber)
else: else:
source.subscribe(to_async(make_subscriber(i))) source.subscribe(to_async(subscriber))
return combined
@overload
def async_combine(
source1: AsyncBindable[A],
/,
*,
combiner: Callable[[A], Awaitable[R]],
default_value: R | Unset = Unset(),
) -> AsyncBindable[R]: ...
@overload
def async_combine(
source1: AsyncBindable[A],
source2: AsyncBindable[B],
/,
*,
combiner: Callable[[A, B], Awaitable[R]],
default_value: R | Unset = Unset(),
) -> AsyncBindable[R]: ...
@overload
def async_combine(
source1: AsyncBindable[A],
source2: AsyncBindable[B],
source3: AsyncBindable[C],
/,
*,
combiner: Callable[[A, B, C], Awaitable[R]],
default_value: R | Unset = Unset(),
) -> AsyncBindable[R]: ...
@overload
def async_combine(
source1: AsyncBindable[A],
source2: AsyncBindable[B],
source3: AsyncBindable[C],
source4: AsyncBindable[D],
/,
*,
combiner: Callable[[A, B, C, D], Awaitable[R]],
default_value: R | Unset = Unset(),
) -> AsyncBindable[R]: ...
@overload
def async_combine(
source1: AsyncBindable[A],
source2: AsyncBindable[B],
source3: AsyncBindable[C],
source4: AsyncBindable[D],
/,
*,
combiner: Callable[[A, B, C, D], Awaitable[R]],
default_value: R | Unset = Unset(),
) -> AsyncBindable[R]: ...
@overload
def async_combine(
*sources: AsyncBindable[Any],
combiner: Callable[..., Awaitable[R]],
default_value: R | Unset = Unset(),
) -> AsyncBindable[R]: ...
def async_combine(
*sources: AsyncBindable[Any],
combiner: Callable[..., Awaitable[R]],
default_value: R | Unset = Unset(),
) -> AsyncBindable[R]:
combined = AsyncBindable[R]()
Unset.map(
default_value,
combined.set_silent,
caller(combined.set_silent, sources[0].default_value),
)
async def subscriber(_: ValueChanged[Any]) -> None:
result = await combiner(*[*map(lambda s: s.value, sources)])
await combined.set(result)
for source in sources:
source.subscribe(subscriber)
return combined return combined