initial commit

This commit is contained in:
rus07tam 2025-10-26 16:26:46 +00:00
commit 19c9b9537d
115 changed files with 4940 additions and 0 deletions

1
.envrc Normal file
View file

@ -0,0 +1 @@
use flake

50
.github/workflows/docs-publish.yml vendored Normal file
View file

@ -0,0 +1,50 @@
name: Documentation publish
on:
push:
branches:
- main
workflow_dispatch:
permissions:
contents: read
pages: write
id-token: write
concurrency:
group: "pages"
cancel-in-progress: false
jobs:
build-docs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up uv
uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python-version }}
activate-environment: "false"
enable-cache: "auto"
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
restore-cache: "true"
save-cache: "true"
- name: Install dependencies
run: uv sync --all-groups
- name: Install Sphinx
run: uv pip install sphinx sphinx-rtd-theme sphinx-autodoc-typehints
- name: Build documentation
run: uv run sphinx-build -b html docs/source docs/_build/html
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: docs/_build/html
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

33
.github/workflows/pylint.yml vendored Normal file
View file

@ -0,0 +1,33 @@
name: Pylint
on: [push, pull_request]
jobs:
lint:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up uv
uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python-version }}
activate-environment: "true"
enable-cache: "auto"
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
restore-cache: "true"
save-cache: "true"
- name: Install project dependencies
run: |
uv sync --all-groups
- name: Install pylint
run: |
uv pip install pylint
- name: Lint code with pylint
run: |
uv run pylint src/snakia

59
.github/workflows/pypi-publish.yml vendored Normal file
View file

@ -0,0 +1,59 @@
name: PyPi publish
on:
release:
types: [published]
permissions:
contents: read
jobs:
release-build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.12"]
steps:
- uses: actions/checkout@v4
- name: Install uv with all available options
uses: astral-sh/setup-uv@v7
with:
python-version: ${{ matrix.python-version }}
activate-environment: "false"
enable-cache: "auto"
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
restore-cache: "true"
save-cache: "true"
- run: uv build
- name: Upload distributions
uses: actions/upload-artifact@v4
with:
name: release-dists
path: dist/
pypi-publish:
runs-on: ubuntu-latest
needs:
- release-build
permissions:
id-token: write
environment:
name: pypi
steps:
- name: Retrieve release distributions
uses: actions/download-artifact@v4
with:
name: release-dists
path: dist/
- name: Publish release distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: dist/

17
.gitignore vendored Normal file
View file

@ -0,0 +1,17 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
# Others
.direnv
.mypy_cache
.python-version
.vscode
_autosummary

121
LICENSE Normal file
View file

@ -0,0 +1,121 @@
Creative Commons Legal Code
CC0 1.0 Universal
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.
Statement of Purpose
The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.
2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.
4. Limitations and Disclaimers.
a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.

871
README.md Normal file
View file

@ -0,0 +1,871 @@
# 🐍 Snakia Framework
**Snakia** is a modern Python framework for creating applications with Entity-Component-System (ECS) architecture, event system, and reactive programming. Built with performance (maybe) and modularity in mind, Snakia provides a clean API for developing complex applications ranging from games to terminal user interfaces.
## 📋 Table of Contents
- [🎯 Roadmap & TODO](#-roadmap--todo)
- [🚀 Installation](#-installation)
- [🚀 Quick Start](#-quick-start)
- [🏗️ Architecture](#-architecture)
- [⚙️ Core](#-core)
- [🎯 ECS System](#-ecs-system)
- [📡 Event System (ES)](#-event-system-es)
- [🔌 Plugin System](#-plugin-system)
- [🎨 TUI System](#-tui-system)
- [⚡ Reactive Programming (RX)](#-reactive-programming-rx)
- [🛠️ Utilities](#-utilities)
- [🎭 Decorators](#-decorators)
- [🏷️ Properties](#-properties)
- [🌐 Platform Abstraction](#-platform-abstraction)
- [📦 Examples](#-examples)
- [🤝 Contributing](#-contributing)
- [🆘 Support](#-support)
- [📄 License](#-license)
### ✨ Key Features
- 🏗️ **ECS Architecture** - Flexible entity-component-system for scalable game/app logic
- 📡 **Event System** - Asynchronous event handling with filters and priorities
- 🔌 **Plugin System** - Modular plugin architecture for extensibility
- 🎨 **TUI Framework** - Rich terminal user interface with reactive widgets
- ⚡ **Reactive Programming** - Observable data streams and reactive bindings
- 🛠️ **Rich Utilities** - Decorators, properties, platform abstraction, and more
- 🎯 **Type Safety** - Full type hints and Pydantic integration
> ⚠️ **Experimental Framework**
> This framework is currently in **beta/experimental stage**. Not all features are fully implemented, there might be bugs, and the API is subject to change. Use at your own risk! 🚧
## 🚀 Installation
### Prerequisites
- **Python** >= 3.12
- **pip** or **uv** (recommended) package manager
### Install from PyPi (recommended)
```bash
pip install snakia
```
### Install from Source
```bash
# Clone the repository
git clone https://github.com/RuJect/Snakia.git
cd Snakia
# Install with pip
pip install -e .
# Or with uv (recommended)
uv sync
```
## 🎯 Roadmap & TODO
Here's what we're working on to make Snakia even better:
- [ ] Plugin Isolation: restrict plugin access to only events and components statically defined in manifest
- [ ] Async & Multithreading: implement proper async/await support and multithreading capabilities
- [ ] Platform Support: expand platform abstraction to support more operating systems
- [ ] Random Implementations: add various random generations implementations
- [ ] TUI Widgets: create more ready-to-use TUI widgets and components
- [ ] Code Documentation: add comprehensive docstrings and inline comments
- [ ] Documentation: create detailed API documentation and tutorials
## 🚀 Quick Start
```python
from snakia.core.engine import Engine
from snakia.core.loader import Meta, Plugin, PluginProcessor
from snakia.core.ecs import Component
from snakia.types import Version
# Creating a component
class HealthComponent(Component):
value: int = 100
max_value: int = 100
# Creating a processor
class HealthProcessor(PluginProcessor):
def process(self, system):
for entity, (health,) in system.get_components(HealthComponent):
if health.value <= 0:
print(f"Entity {entity} died!")
# Creating a plugin
class HealthPlugin(Plugin, meta=Meta(
name="health",
version=Version.from_args(1, 0, 0),
processors=(HealthProcessor,)
)):
def on_load(self): pass
def on_unload(self): pass
# Starting the engine
engine = Engine()
engine.loader.register(HealthPlugin)
engine.loader.load_all()
engine.start()
```
## 🏗️ Architecture
Snakia is built on a modular architecture with clear separation of concerns:
```plaintext
Snakia/
├── core/ # Framework core
│ ├── engine.py # Main engine
│ ├── ecs/ # Entity-Component-System
│ ├── es/ # Event System
│ ├── loader/ # Plugin loading system
│ ├── rx/ # Reactive programming
│ └── tui/ # Terminal User Interface
├── decorators/ # Decorators
├── property/ # Property system
├── platform/ # Platform abstraction
├── utils/ # Utilities
├── random/ # Random number generation
├── field/ # Typed fields
└── types/ # Special types
```
## ⚙️ Core
### Engine
The central component of the framework that coordinates all systems:
```python
from snakia.core.engine import Engine
engine = Engine()
# Systems:
# - engine.system - ECS system
# - engine.dispatcher - Event system
# - engine.loader - Plugin loader
engine.start() # Start all systems
engine.stop() # Stop all systems
engine.update() # Update systems
```
## 🎯 ECS System
Entity-Component-System architecture for creating flexible and performant applications.
### Component
Base class for all components:
```python
from snakia.core.ecs import Component
from pydantic import Field
class PositionComponent(Component):
x: float = Field(default=0.0)
y: float = Field(default=0.0)
class VelocityComponent(Component):
vx: float = Field(default=0.0)
vy: float = Field(default=0.0)
```
### Processor
Processors handle components in the system:
```python
from snakia.core.ecs import Processor, System
class MovementProcessor(Processor):
def process(self, system: System) -> None:
# Get all entities with Position and Velocity
for entity, (pos, vel) in system.get_components(
PositionComponent, VelocityComponent
):
pos.x += vel.vx
pos.y += vel.vy
```
### System
Entity and component management:
```python
# Creating an entity with components
entity = system.create_entity(
PositionComponent(x=10, y=20),
VelocityComponent(vx=1, vy=0)
)
# Adding a component to an existing entity
system.add_component(entity, HealthComponent(value=100))
# Getting entity components
pos, vel = system.get_components_of_entity(
entity, PositionComponent, VelocityComponent
)
# Checking for components
if system.has_components(entity, PositionComponent, VelocityComponent):
print("Entity has position and velocity")
# Removing a component
system.remove_component(entity, VelocityComponent)
# Deleting an entity
system.delete_entity(entity)
```
## 📡 Event System (ES)
Asynchronous event system with filter and priority support.
### Event
Base class for events:
```python
from snakia.core.es import Event
from pydantic import Field
class PlayerDiedEvent(Event):
player_id: int = Field()
cause: str = Field(default="unknown")
ttl: int = Field(default=10) # Event lifetime
```
### Handler
Event handlers:
```python
from snakia.core.es import Handler, Action
def on_player_died(event: PlayerDiedEvent) -> Action | None:
print(f"Player {event.player_id} died from {event.cause}")
return Action.move(1) # Move to next handler
```
### Filter
Event filters:
```python
from snakia.core.es import Filter
def only_important_deaths(event: PlayerDiedEvent) -> bool:
return event.cause in ["boss", "pvp"]
# Using a filter
@dispatcher.on(PlayerDiedEvent, filter=only_important_deaths)
def handle_important_death(event: PlayerDiedEvent):
print("Important death occurred!")
```
### Dispatcher
Central event dispatcher:
```python
from snakia.core.es import Dispatcher, Subscriber
dispatcher = Dispatcher()
# Subscribing to an event
dispatcher.subscribe(PlayerDiedEvent, Subscriber(
handler=on_player_died,
filter=only_important_deaths,
priority=10
))
# Decorator for subscription
@dispatcher.on(PlayerDiedEvent, priority=5)
def handle_death(event: PlayerDiedEvent):
print("Death handled!")
# Publishing an event
dispatcher.publish(PlayerDiedEvent(player_id=123, cause="boss"))
```
## 🔌 Plugin System
Modular system for loading and managing plugins.
### Plugin
Base class for plugins:
```python
from snakia.core.loader import Meta, Plugin, PluginProcessor
from snakia.types import Version
class MyProcessor(PluginProcessor):
def process(self, system):
# Processor logic
pass
class MyPlugin(Plugin, meta=Meta(
name="my_plugin",
author="developer",
version=Version.from_args(1, 0, 0),
processors=(MyProcessor,),
subscribers=()
)):
def on_load(self):
print("Plugin loaded!")
def on_unload(self):
print("Plugin unloaded!")
```
### Meta
Plugin metadata:
```python
from snakia.core.loader import Meta
from snakia.core.es import Subscriber
meta = Meta(
name="plugin_name",
author="author_name",
version=Version.from_args(1, 0, 0),
processors=(Processor1, Processor2),
subscribers=(
(EventType, Subscriber(handler, filter, priority)),
)
)
```
### Loader
Plugin loader:
```python
from snakia.core.loader import Loader
loader = Loader(engine)
# Registering a plugin
loader.register(MyPlugin)
# Loading all plugins
loader.load_all()
# Unloading all plugins
loader.unload_all()
```
## 🎨 TUI System
System for creating text-based user interfaces.
### Widget
Base class for widgets:
```python
from snakia.core.tui import Widget, Canvas, CanvasChar
from snakia.core.rx import Bindable
class MyWidget(Widget):
def __init__(self):
super().__init__()
self.text = self.state("Hello World")
self.color = self.state(CanvasChar(fg_color="red"))
def on_render(self) -> Canvas:
canvas = Canvas(20, 5)
canvas.draw_text(0, 0, self.text.value, self.color.value)
return canvas
```
### Canvas
Drawing canvas:
```python
from snakia.core.tui import Canvas, CanvasChar
canvas = Canvas(80, 24)
# Drawing text
canvas.draw_text(10, 5, "Hello", CanvasChar(fg_color="blue"))
# Drawing rectangle
canvas.draw_rect(0, 0, 20, 10, CanvasChar("█", fg_color="green"))
# Filling area
canvas.draw_filled_rect(5, 5, 10, 5, CanvasChar(" ", bg_color="red"))
# Lines
canvas.draw_line_h(0, 0, 20, CanvasChar("-"))
canvas.draw_line_v(0, 0, 10, CanvasChar("|"))
```
### CanvasChar
Character with attributes:
```python
from snakia.core.tui import CanvasChar
char = CanvasChar(
char="A",
fg_color="red", # Text color
bg_color="blue", # Background color
bold=True, # Bold
italic=False, # Italic
underline=True # Underline
)
```
### Renderer
Screen rendering:
```python
from snakia.core.tui import RenderContext
from snakia.core.tui.render import ANSIRenderer
import sys
class StdoutTarget:
def write(self, text: str): sys.stdout.write(text)
def flush(self): sys.stdout.flush()
renderer = ANSIRenderer(StdoutTarget())
with RenderContext(renderer) as ctx:
ctx.render(widget.render())
```
### Ready-made Widgets
```python
from snakia.core.tui.widgets import (
TextWidget, BoxWidget,
HorizontalSplitWidget, VerticalSplitWidget
)
# Text widget
text = TextWidget("Hello", CanvasChar(fg_color="red", bold=True))
# Box widget
box = BoxWidget(10, 5, CanvasChar("█", fg_color="yellow"))
# Splitters
h_split = HorizontalSplitWidget([text1, text2], "|")
v_split = VerticalSplitWidget([h_split, box], "-")
```
## ⚡ Reactive Programming (RX)
Reactive programming system for creating responsive interfaces.
### Bindable
Reactive variables:
```python
from snakia.core.rx import Bindable, ValueChanged
# Creating a reactive variable
counter = Bindable(0)
# Subscribing to changes
def on_change(event: ValueChanged[int]):
print(f"Counter changed from {event.old_value} to {event.new_value}")
counter.subscribe(on_change)
# Changing value
counter.set(5) # Will call on_change
counter(10) # Alternative syntax
```
### AsyncBindable
Asynchronous reactive variables:
```python
from snakia.core.rx import AsyncBindable
async_counter = AsyncBindable(0)
async def async_handler(event: ValueChanged[int]):
print(f"Async counter: {event.new_value}")
await async_counter.subscribe(async_handler, run_now=True)
await async_counter.set(42)
```
### Operators
```python
from snakia.core.rx import map, filter, combine, merge
# Transformation
doubled = map(counter, lambda x: x * 2)
# Filtering
even_only = filter(counter, lambda x: x % 2 == 0)
# Combining
combined = combine(counter, doubled, lambda a, b: a + b)
# Merging streams
merged = merge(counter, async_counter)
```
## 🛠️ Utilities
### to_async
Converting synchronous functions to asynchronous:
```python
from snakia.utils import to_async
def sync_function(x):
return x * 2
async_function = to_async(sync_function)
result = await async_function(5)
```
### nolock
Performance optimization:
```python
from snakia.utils import nolock
def busy_loop():
while running:
# Work
nolock() # Release GIL
```
### inherit
Simplified inheritance:
```python
from snakia.utils import inherit
class Base:
def method(self): pass
class Derived(inherit(Base)):
def method(self):
super().method()
# Additional logic
```
### this
Reference to current object:
```python
from snakia.utils import this
def func():
return this() # Returns `<function func at ...>`
```
### throw
Throwing exceptions:
```python
from snakia.utils import throw
def validate(value):
if value < 0:
throw(ValueError("Value must be positive"))
```
### frame
Working with frames:
```python
from snakia.utils import frame
def process_frame():
current_frame = frame()
# Process frame
```
## 🎭 Decorators
### inject_replace
Method replacement:
```python
from snakia.decorators import inject_replace
class Original:
def method(self): return "original"
@inject_replace(Original, "method")
def new_method(self): return "replaced"
```
### inject_before / inject_after
Hooks before and after execution:
```python
from snakia.decorators import inject_before, inject_after
@inject_before(MyClass, "method")
def before_hook(self): print("Before method")
@inject_after(MyClass, "method")
def after_hook(self): print("After method")
```
### singleton
Singleton pattern:
```python
from snakia.decorators import singleton
@singleton
class Database:
def __init__(self):
self.connection = "connected"
```
### pass_exceptions
Exception handling:
```python
from snakia.decorators import pass_exceptions
@pass_exceptions(ValueError, TypeError)
def risky_function():
# Code that might throw exceptions
pass
```
## 🏷️ Properties
### readonly
Read-only property:
```python
from snakia.property import readonly
class Currency:
@readonly
def rate(self) -> int:
return 100
currency = Currency()
currency.rate = 200
print(currency.rate) # Output: 100
```
### initonly
Initialization-only property:
```python
from snakia.property import initonly
class Person:
name = initonly[str]("name")
bob = Person()
bob.name = "Bob"
print(bob.name) # Output: "Bob"
bob.name = "not bob"
print(bob.name) # Output: "Bob"
```
### 🏛️ classproperty
Class property:
```python
from snakia.property import classproperty
class MyClass:
@classproperty
def class_value(cls):
return "class_value"
```
## 🌐 Platform Abstraction
### 🖥️ PlatformOS
Operating system abstraction:
```python
from snakia.platform import PlatformOS, OS
# Detecting current OS
current_os = OS.current()
if current_os == PlatformOS.LINUX:
print("Running on Linux")
elif current_os == PlatformOS.ANDROID:
print("Running on Android")
```
### 🏗️ PlatformLayer
Platform layers:
```python
from snakia.platform import LinuxLayer, AndroidLayer
# Linux layer
linux_layer = LinuxLayer()
# Android layer
android_layer = AndroidLayer()
```
## 📦 Examples
### Health System
```python
from snakia.core.engine import Engine
from snakia.core.ecs import Component
from snakia.core.es import Event
from snakia.core.loader import Meta, Plugin, PluginProcessor
from snakia.types import Version
from pydantic import Field
class HealthComponent(Component):
max_value: int = Field(default=100, ge=0)
value: int = Field(default=100, ge=0)
class DamageComponent(Component):
damage: int = Field(ge=0)
ticks: int = Field(default=1, ge=0)
class DeathEvent(Event):
entity: int = Field()
class HealthProcessor(PluginProcessor):
def process(self, system):
# Processing damage
for entity, (damage, health) in system.get_components(
DamageComponent, HealthComponent
):
health.value -= damage.damage
damage.ticks -= 1
if damage.ticks <= 0:
system.remove_component(entity, DamageComponent)
if health.value <= 0:
system.remove_component(entity, HealthComponent)
self.plugin.dispatcher.publish(DeathEvent(entity=entity))
class HealthPlugin(Plugin, meta=Meta(
name="health",
version=Version.from_args(1, 0, 0),
processors=(HealthProcessor,)
)):
def on_load(self): pass
def on_unload(self): pass
# Usage
engine = Engine()
engine.loader.register(HealthPlugin)
engine.loader.load_all()
# Creating a player
player = engine.system.create_entity(
HealthComponent(value=100, max_value=100)
)
# Dealing damage
engine.system.add_component(player, DamageComponent(damage=25, ticks=1))
engine.start()
```
### TUI Application
```python
from snakia.core.tui import CanvasChar, RenderContext
from snakia.core.tui.render import ANSIRenderer
from snakia.core.tui.widgets import TextWidget, BoxWidget, VerticalSplitWidget
import sys
class StdoutTarget:
def write(self, text: str): sys.stdout.write(text)
def flush(self): sys.stdout.flush()
def main():
# Creating widgets
title = TextWidget("Snakia TUI", CanvasChar(fg_color="cyan", bold=True))
content = TextWidget("Welcome to Snakia!", CanvasChar(fg_color="white"))
box = BoxWidget(20, 5, CanvasChar("█", fg_color="green"))
# Layout
layout = VerticalSplitWidget([title, content, box], "-")
# Rendering
renderer = ANSIRenderer(StdoutTarget())
with RenderContext(renderer) as ctx:
ctx.render(layout.render())
if __name__ == "__main__":
main()
```
## 🤝 Contributing
We welcome contributions to Snakia development! Whether you're fixing bugs, adding features, or improving documentation, your help is appreciated.
### How to Contribute
1. **Fork** the repository
2. **Create** a feature branch (`git checkout -b feature/amazing-feature`)
3. **Make** your changes
4. **Add** tests if applicable
5. **Commit** your changes (`git commit -m 'Add amazing feature'`)
6. **Push** to the branch (`git push origin feature/amazing-feature`)
7. **Open** a Pull Request
### Development Guidelines
- Add type hints to all new code
- Write clear commit messages
- Update documentation for new features
- Test your changes thoroughly
## 🆘 Support
Need help? We're here to assist you!
- 🐛 **Bug Reports** - [GitHub Issues](https://github.com/RuJect/Snakia/issues)
- 💬 **Community Chat** - [RuJect Community Telegram](https://t.me/RuJect_Community)
- 📧 **Direct Contact** - mailto:rus07tam.uwu@gmail.com
## 📄 License
See the `LICENSE` file for details.

20
docs/Makefile Normal file
View file

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = source
BUILDDIR = build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

35
docs/make.bat Normal file
View file

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=source
set BUILDDIR=build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

15
docs/source/api.rst Normal file
View file

@ -0,0 +1,15 @@
API Reference
=============
.. autosummary::
:toctree: _autosummary
:recursive:
snakia.core
snakia.decorators
snakia.field
snakia.platform
snakia.property
snakia.random
snakia.types
snakia.utils

34
docs/source/conf.py Normal file
View file

@ -0,0 +1,34 @@
import sys
from pathlib import Path
sys.path.insert(
0, str((Path(__file__).parent.parent.parent / "src").resolve())
)
project = "Snakia"
copyright = "2025, RuJect"
author = "RuJect"
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.autosummary",
"sphinx.ext.viewcode",
"sphinx_autodoc_typehints",
]
autosummary_generate = True
autosummary_imported_members = True
autodoc_default_options = {
"members": True,
"undoc-members": True,
"private-members": False,
"special-members": "__init__",
"inherited-members": True,
"show-inheritance": True,
}
templates_path = ["_templates"]
html_theme = "sphinx_rtd_theme"
html_static_path = ["_static"]

15
docs/source/index.rst Normal file
View file

@ -0,0 +1,15 @@
Welcome to Snakia!
==================
.. toctree::
:maxdepth: 2
:caption: Contents:
api
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

88
examples/health_plugin.py Normal file
View file

@ -0,0 +1,88 @@
from typing import final
from pydantic import Field
from snakia.core.ecs import Component
from snakia.core.ecs.system import System
from snakia.core.engine import Engine
from snakia.core.es import Event
from snakia.core.loader import Meta, Plugin, PluginProcessor
from snakia.types import Version
class HealthComponent(Component):
max_value: int = Field(default=100, ge=0)
value: int = Field(default=100, ge=0)
class DamageComponent(Component):
damage: int = Field(ge=0)
ticks: int = Field(default=1, ge=0)
class HealComponent(Component):
heal: int = Field(ge=0)
ticks: int = Field(default=1, ge=0)
class DeathEvent(Event):
entity: int = Field()
class HealthProcessor(PluginProcessor):
def process(self, system: System) -> None:
for entity, (heal, health) in system.get_components(
HealComponent, HealthComponent
):
health.value += heal.heal
heal.ticks -= 1
if heal.ticks <= 0:
system.remove_component(entity, HealComponent)
for entity, (damage, health) in system.get_components(
DamageComponent, HealthComponent
):
health.value -= damage.damage
damage.ticks -= 1
if damage.ticks <= 0:
system.remove_component(entity, DamageComponent)
if health.value <= 0:
system.remove_component(entity, HealthComponent)
self.plugin.dispatcher.publish(DeathEvent(entity=entity))
@final
class HealthPlugin(
Plugin,
meta=Meta(
name="health",
author="snakia",
version=Version.from_args(1, 0, 0),
subscribers=(),
processors=(HealthProcessor,),
),
):
def on_load(self) -> None:
pass
def on_unload(self) -> None:
pass
def main() -> None:
engine = Engine()
engine.loader.register(HealthPlugin)
engine.loader.load_all()
@engine.dispatcher.on(DeathEvent)
def on_death(event: DeathEvent) -> None:
print(f"Entity: {event.entity} is death!")
player = engine.system.create_entity()
engine.system.add_component(player, HealthComponent())
engine.system.add_component(player, DamageComponent(damage=10, ticks=10))
engine.start()
if __name__ == "__main__":
main()

35
examples/tui.py Normal file
View file

@ -0,0 +1,35 @@
import sys
from snakia.core.tui import CanvasChar, RenderContext
from snakia.core.tui.render import ANSIRenderer
from snakia.core.tui.widgets import (BoxWidget, HorizontalSplitWidget,
TextWidget, VerticalSplitWidget)
class StdoutTarget:
def write(self, text: str) -> None:
sys.stdout.write(text)
def flush(self) -> None:
sys.stdout.flush()
def main() -> None:
text1 = TextWidget("Hello", CanvasChar(fg_color="red", bold=True))
text2 = TextWidget("World", CanvasChar(fg_color="blue", bold=True))
text3 = TextWidget("Snakia", CanvasChar(fg_color="green", bold=True))
box1 = BoxWidget(10, 3, CanvasChar("", fg_color="yellow"))
box2 = BoxWidget(8, 5, CanvasChar("", fg_color="magenta"))
horizontal_split = HorizontalSplitWidget([text1, text2, text3], "|")
vertical_split = VerticalSplitWidget([horizontal_split, box1, box2], "-")
renderer = ANSIRenderer(StdoutTarget())
with RenderContext(renderer) as ctx:
ctx.render(vertical_split.render())
if __name__ == "__main__":
main()

61
flake.lock generated Normal file
View file

@ -0,0 +1,61 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1761114652,
"narHash": "sha256-f/QCJM/YhrV/lavyCVz8iU3rlZun6d+dAiC3H+CDle4=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "01f116e4df6a15f4ccdffb1bcd41096869fb385c",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

39
flake.nix Normal file
View file

@ -0,0 +1,39 @@
{
description = "Snakia dev shell";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};
outputs =
{
nixpkgs,
flake-utils,
...
}:
flake-utils.lib.eachDefaultSystem (
system:
let
pkgs = import nixpkgs {
inherit system;
};
in
{
devShells.default = pkgs.mkShell {
nativeBuildInputs = with pkgs; [
python312
];
buildInputs = with pkgs; [
black
nixfmt
uv
isort
mypy
python312Packages.pylint
];
};
}
);
}

40
pyproject.toml Normal file
View file

@ -0,0 +1,40 @@
[project]
name = "snakia"
version = "0.4.0"
description = "Modern python framework"
readme = "README.md"
authors = [
{ name = "rus07tam", email = "rus07tam@gmail.com" }
]
keywords = ["python3", "event system", "ecs", "reactive programming"]
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Programming Language :: Python :: Free Threading",
]
requires-python = ">=3.12"
dependencies = [
"networkx>=3.4.2",
"pydantic>=2.12.3",
]
license = "CC0-1.0"
license-files = ["LICENSE"]
[project.urls]
Homepage = "https://github.com/ruject/snakia"
Repository = "https://github.com/ruject/snakia"
"Issue Tracker" = "https://github.com/ruject/snakia/issues"
[build-system]
requires = ["uv_build>=0.8.14,<0.9.0"]
build-backend = "uv_build"
[tool.pylint.'master']
init-hook = "import sys; sys.path.append('.venv/lib/python3.12/site-packages')"
disable = ["C0114", "C0115", "C0116", "R0801"]
max-args = 8
max-positional-arguments = 7
min-public-methods = 1

17
requirements.txt Normal file
View file

@ -0,0 +1,17 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml -o requirements.txt
annotated-types==0.7.0
# via pydantic
networkx==3.5
# via snakia (pyproject.toml)
pydantic==2.12.3
# via snakia (pyproject.toml)
pydantic-core==2.41.4
# via pydantic
typing-extensions==4.15.0
# via
# pydantic
# pydantic-core
# typing-inspection
typing-inspection==0.4.2
# via pydantic

3
src/snakia/__init__.py Normal file
View file

@ -0,0 +1,3 @@
"""
Snakia framework
"""

View file

View file

@ -0,0 +1,5 @@
from .component import Component
from .processor import Processor
from .system import System
__all__ = ["Processor", "Component", "System"]

View file

@ -0,0 +1,7 @@
from abc import ABC
from pydantic import BaseModel
class Component(ABC, BaseModel):
pass

View file

@ -0,0 +1,25 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ClassVar
if TYPE_CHECKING:
from .system import System
class Processor(ABC):
"""
A processor is a class that processes the system.
"""
before: ClassVar[tuple[type[Processor], ...]] = ()
after: ClassVar[tuple[type[Processor], ...]] = ()
@abstractmethod
def process(self, system: System) -> None:
"""
Processes the system. Called once per update.
Args:
system (System): The system to process.
"""

View file

@ -0,0 +1,309 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Iterable
from itertools import count
from typing import Any, cast, overload
import networkx as nx # type: ignore
from snakia.utils import nolock
from .component import Component
from .processor import Processor
class System:
"""
A system is a collection of entities and components that can be processed by processors.
"""
__processors: list[Processor]
__components: dict[type[Component], set[int]]
__entitites: dict[int, dict[type[Component], Component]]
__entity_counter: count[int]
__dead_entities: set[int]
__is_running: bool
def __init__(self) -> None:
self.__processors = []
self.__components = defaultdict(set)
self.__entitites = defaultdict(dict)
self.__entity_counter = count(start=1)
self.__dead_entities = set()
self.__is_running = False
@property
def is_running(self) -> bool:
"""Returns True if the system is running."""
return self.__is_running
def full_reset(self) -> None:
"""Resets the system to its initial state."""
self.__processors = []
self.__components = defaultdict(set)
self.__entitites = defaultdict(dict)
self.__entity_counter = count(start=1)
self.__dead_entities = set()
def get_processor[P: Processor](
self, processor_type: type[P], /
) -> P | None:
"""Returns the first processor of the given type."""
for processor in self.__processors:
if isinstance(processor, processor_type):
return processor
return None
def add_processor(self, proccessor: Processor) -> None:
"""Adds a processor to the system."""
self.__processors.append(proccessor)
self._sort_processors()
def remove_processor(self, processor_type: type[Processor]) -> None:
"""Removes a processor from the system."""
for processor in self.__processors:
if isinstance(processor, processor_type):
self.__processors.remove(processor)
@overload
def get_components[A: Component](
self, __c1: type[A], /
) -> Iterable[tuple[int, tuple[A]]]: ...
@overload
def get_components[A: Component, B: Component](
self, __c1: type[A], __c2: type[B], /
) -> Iterable[tuple[int, tuple[A, B]]]: ...
@overload
def get_components[A: Component, B: Component, C: Component](
self, __c1: type[A], __c2: type[B], __c3: type[C], /
) -> Iterable[tuple[int, tuple[A, B, C]]]: ...
@overload
def get_components[A: Component, B: Component, C: Component, D: Component](
self, __c1: type[A], __c2: type[B], __c3: type[C], __c4: type[D], /
) -> Iterable[tuple[int, tuple[A, B, C, D]]]: ...
@overload
def get_components[
A: Component,
B: Component,
C: Component,
D: Component,
E: Component,
](
self,
__c1: type[A],
__c2: type[B],
__c3: type[C],
__c4: type[D],
__c5: type[E],
/,
) -> Iterable[tuple[int, tuple[A, B, C, D]]]: ...
def get_components(
self, *component_types: type[Component]
) -> Iterable[tuple[int, tuple[Component, ...]]]:
"""Returns all entities with the given components."""
entity_set = set.intersection(
*(
self.__components[component_type]
for component_type in component_types
)
)
for entity in entity_set:
yield (
entity,
tuple(
self.__entitites[entity][component_type]
for component_type in component_types
),
)
@overload
def get_components_of_entity[A: Component](
self, entity: int, __c1: type[A], /
) -> tuple[A | None]: ...
@overload
def get_components_of_entity[A: Component, B: Component](
self, entity: int, __c1: type[A], __c2: type[B], /
) -> tuple[A | None, B | None]: ...
@overload
def get_components_of_entity[A: Component, B: Component, C: Component](
self, entity: int, __c1: type[A], __c2: type[B], __c3: type[C], /
) -> tuple[A | None, B | None, C | None]: ...
@overload
def get_components_of_entity[
A: Component,
B: Component,
C: Component,
D: Component,
](
self,
entity: int,
__c1: type[A],
__c2: type[B],
__c3: type[C],
__c4: type[D],
/,
) -> tuple[A | None, B | None, C | None, D | None]: ...
@overload
def get_components_of_entity[
A: Component,
B: Component,
C: Component,
D: Component,
E: Component,
](
self,
entity: int,
__c1: type[A],
__c2: type[B],
__c3: type[C],
__c4: type[D],
__c5: type[E],
/,
) -> tuple[A | None, B | None, C | None, D | None, E | None]: ...
def get_components_of_entity(
self, entity: int, /, *component_types: type[Component]
) -> tuple[Any, ...]:
"""Returns the components of the given entity."""
entity_dict = self.__entitites[entity]
return (
*(
entity_dict.get(component_type, None)
for component_type in component_types
),
)
def get_component[C: Component](
self, component_type: type[C], /
) -> Iterable[tuple[int, C]]:
"""Returns all entities with the given component."""
for entity in self.__components[component_type].copy():
yield entity, cast(C, self.__entitites[entity][component_type])
@overload
def get_component_of_entity[C: Component](
self, entity: int, component_type: type[C], /
) -> C | None: ...
@overload
def get_component_of_entity[C: Component, D: Any](
self, entity: int, component_type: type[C], /, default: D
) -> C | D: ...
def get_component_of_entity(
self,
entity: int,
component_type: type[Component],
/,
default: Any = None,
) -> Any:
"""Returns the component of the given entity."""
return self.__entitites[entity].get(component_type, default)
def add_component(self, entity: int, component: Component) -> None:
"""Adds a component to an entity."""
component_type = type(component)
self.__components[component_type].add(entity)
self.__entitites[entity][component_type] = component
def has_component(
self, entity: int, component_type: type[Component]
) -> bool:
"""Returns True if the entity has the given component."""
return component_type in self.__entitites[entity]
def has_components(
self, entity: int, *component_types: type[Component]
) -> bool:
"""Returns True if the entity has all the given components."""
components_dict = self.__entitites[entity]
return all(
comp_type in components_dict for comp_type in component_types
)
def remove_component[C: Component](
self, entity: int, component_type: type[C]
) -> C | None:
"""Removes a component from an entity."""
self.__components[component_type].discard(entity)
if not self.__components[component_type]:
del self.__components[component_type]
return self.__entitites[entity].pop(component_type) # type: ignore
def create_entity(self, *components: Component) -> int:
"""Creates an entity with the given components."""
entity = next(self.__entity_counter)
if entity not in self.__entitites:
self.__entitites[entity] = {}
for component in components:
component_type = type(component)
self.__components[component_type].add(entity)
if component_type not in self.__entitites[entity]:
self.__entitites[entity][component_type] = component
return entity
def delete_entity(self, entity: int, immediate: bool = False) -> None:
"""Deletes an entity."""
if immediate:
for component_type in self.__entitites[entity]:
self.__components[component_type].discard(entity)
if not self.__components[component_type]:
del self.__components[component_type]
del self.__entitites[entity]
else:
self.__dead_entities.add(entity)
def entity_exists(self, entity: int) -> bool:
"""Returns True if the entity exists."""
return (
entity in self.__entitites and entity not in self.__dead_entities
)
def start(self) -> None:
"""Starts the system."""
self.__is_running = True
while self.__is_running:
self.update()
nolock()
def stop(self) -> None:
"""Stops the system."""
self.__is_running = False
def update(self) -> None:
"""Updates the system."""
self._clear_dead_entities()
for processor in self.__processors:
processor.process(self)
def _clear_dead_entities(self) -> None:
for entity in self.__dead_entities:
self.delete_entity(entity, immediate=True)
self.__dead_entities = set()
def _sort_processors(self) -> None:
processors = self.__processors
graph: nx.DiGraph[Processor] = nx.DiGraph()
for p in processors:
graph.add_node(p)
for p in processors:
for after_cls in p.after:
for q in processors:
if isinstance(q, after_cls):
graph.add_edge(q, p)
for before_cls in p.before:
for q in processors:
if isinstance(q, before_cls):
graph.add_edge(p, q)
sorted_processors = list(nx.topological_sort(graph))
self.__processors = sorted_processors

37
src/snakia/core/engine.py Normal file
View file

@ -0,0 +1,37 @@
import threading
from typing import Final
from .ecs import System
from .es import Dispatcher
from .loader.loader import Loader
class Engine:
def __init__(self) -> None:
self.system: Final = System()
self.dispatcher: Final = Dispatcher()
self.loader: Final = Loader(self)
self.__system_thread: threading.Thread | None = None
self.__dispatcher_thread: threading.Thread | None = None
def start(self) -> None:
self.__system_thread = threading.Thread(
target=self.system.start, daemon=False
)
self.__dispatcher_thread = threading.Thread(
target=self.dispatcher.start, daemon=False
)
self.__system_thread.start()
self.__dispatcher_thread.start()
def stop(self) -> None:
if self.__system_thread is not None:
self.system.stop()
self.__system_thread.join()
if self.__dispatcher_thread is not None:
self.dispatcher.stop()
self.__dispatcher_thread.join()
def update(self) -> None:
self.system.update()
self.dispatcher.update()

View file

@ -0,0 +1,16 @@
from .action import Action
from .dispatcher import Dispatcher
from .event import Event
from .filter import Filter
from .handler import Handler
from .subscriber import Subscriber
__all__ = (
"Event",
"Action",
"Filter",
"Handler",
"Handler",
"Subscriber",
"Dispatcher",
)

View file

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Self
from pydantic import BaseModel, Field
class Action(BaseModel):
move: int = Field(default=1)
@classmethod
def stop(cls) -> Self:
"""Skip all handlers."""
return cls(move=2**8)
@classmethod
def go_start(cls) -> Self:
"""Go to the first handler."""
return cls(move=-(2**8))
@classmethod
def next(cls, count: int = 1) -> Self:
"""Skip one handler."""
return cls(move=count)
@classmethod
def prev(cls, count: int = 1) -> Self:
"""Go back one handler."""
return cls(move=-count)
@classmethod
def skip(cls, count: int = 1) -> Self:
"""Skip n handlers.
Args:
count (int): The number of handlers to skip.
"""
return cls(move=count + 1)

View file

@ -0,0 +1,108 @@
from __future__ import annotations
import queue
from collections import defaultdict
from typing import Callable, Final
from snakia.utils import nolock
from .event import Event
from .filter import Filter
from .handler import Handler
from .subscriber import Subscriber
class Dispatcher:
"""
Event dispatcher
"""
__running: bool
def __init__(self) -> None:
self.__queue: Final = queue.Queue[Event]()
self.__subscribers: Final[
dict[type[Event], list[Subscriber[Event]]]
] = defaultdict(list)
self.__running = False
@property
def is_running(self) -> bool:
"""Returns True if the dispatcher is running."""
return self.__running
def subscribe[T: Event](
self, event_type: type[T], subscriber: Subscriber[T]
) -> None:
"""Subscribe to an event type."""
self.__subscribers[event_type].append(subscriber) # type: ignore
def unsubscribe[T: Event](
self, event_type: type[T], subscriber: Subscriber[T]
) -> None:
"""Unsubscribe from an event type."""
for sub in self.__subscribers[event_type].copy():
if sub.handler != subscriber.handler:
continue
if sub.priority != subscriber.priority:
continue
self.__subscribers[event_type].remove(sub)
def on[T: Event](
self,
event: type[T],
filter: Filter[T] | None = None, # noqa: W0622 # pylint: disable=W0622
priority: int = -1,
) -> Callable[[Handler[T]], Handler[T]]:
"""Decorator to subscribe to an event."""
def wrapper(handler: Handler[T]) -> Handler[T]:
self.subscribe(event, Subscriber(handler, filter, priority))
return handler
return wrapper
def publish(self, event: Event) -> None:
"""Add an event to the queue."""
self.__queue.put(event)
def start(self) -> None:
"""Start the update loop."""
self.__running = True
while self.__running:
self.update()
nolock()
def stop(self) -> None:
"""Stop the update loop."""
self.__running = False
def update(self, block: bool = False) -> None:
"""Update the dispatcher."""
try:
event = self.__queue.get(block=block, timeout=None)
for base in event.__class__.mro():
if not issubclass(base, Event):
continue
self.__handle_event(base, event)
self.__queue.task_done()
except queue.Empty:
pass
def __handle_event(self, event_type: type[Event], event: Event) -> None:
subscribers = self.__subscribers[event_type]
subscribers.sort(key=lambda s: s.priority, reverse=True)
i = 0
while i < len(subscribers):
subscriber = subscribers[i]
if subscriber.filters is not None and not subscriber.filters(
event
):
continue
action = subscriber.handler(event)
if action is not None:
i = max(0, min(len(subscribers), i + action.move))
else:
i += 1
event.reduce_ttl()

View file

@ -0,0 +1,13 @@
from __future__ import annotations
from abc import ABC
from pydantic import BaseModel, Field
class Event(ABC, BaseModel):
ttl: int = Field(default=2**6, kw_only=True, ge=0)
def reduce_ttl(self) -> None:
"""Reduce the TTL of the event by 1."""
self.ttl -= 1

View file

@ -0,0 +1,11 @@
from __future__ import annotations
from typing import Protocol
from .event import Event
class Filter[T: Event](Protocol):
"""Filter for an event."""
def __call__(self, event: T) -> bool: ...

View file

@ -0,0 +1,12 @@
from __future__ import annotations
from typing import Optional, Protocol
from .action import Action
from .event import Event
class Handler[T: Event](Protocol):
"""Handler for an event."""
def __call__(self, event: T) -> Optional[Action]: ...

View file

@ -0,0 +1,16 @@
from __future__ import annotations
from typing import NamedTuple
from .event import Event
from .filter import Filter
from .handler import Handler
class Subscriber[T: Event](NamedTuple):
"""
Subscriber for an event."""
handler: Handler[T]
filters: Filter[T] | None
priority: int

View file

@ -0,0 +1,61 @@
import sys
from types import TracebackType
from typing import Any, Callable, Protocol, final
class ExceptionHook[T: BaseException](Protocol):
def __call__(
self, exception: T, frame: TracebackType | None, /
) -> bool | None: ...
@final
class _ExceptionManager:
def __init__(self) -> None:
self.__hooks: list[tuple[type[BaseException], ExceptionHook[Any]]] = []
sys.excepthook = self._excepthook
def hook_exception[T: BaseException](
self, exception_type: type[T], func: ExceptionHook[T]
) -> ExceptionHook[T]:
self.__hooks.append((exception_type, func))
return func
def on_exception[T: BaseException](
self, exception_type: type[T]
) -> Callable[[ExceptionHook[T]], ExceptionHook[T]]:
def inner(func: ExceptionHook[T]) -> ExceptionHook[T]:
self.hook_exception(exception_type, func)
return func
return inner
def _on_except(
self,
type_: type[BaseException],
exception: BaseException,
frame: TracebackType | None,
) -> None:
for hook_type, hook_func in self.__hooks:
if hook_type == type_ or issubclass(hook_type, type_):
result = hook_func(exception, frame)
if result:
break
def _excepthook(
self,
type_: type[BaseException],
exception: BaseException,
frame: TracebackType | None,
) -> None:
while True:
try:
self._on_except(type_, exception, frame)
break
except BaseException as e: # noqa: W0718 # pylint: disable=W0718
if e is exception:
return
type_, exception = type(e), e
ExceptionManager = _ExceptionManager()

View file

@ -0,0 +1,6 @@
from .loadable import Loadable
from .meta import Meta
from .plugin import Plugin
from .plugin_processor import PluginProcessor
__all__ = ["Loadable", "Meta", "Plugin", "PluginProcessor"]

View file

@ -0,0 +1,18 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from snakia.core.engine import Engine
class Loadable(ABC):
@abstractmethod
def __init__(self, engine: Engine) -> None: ...
@abstractmethod
def load(self) -> None: ...
@abstractmethod
def unload(self) -> None: ...

View file

@ -0,0 +1,24 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Final
if TYPE_CHECKING:
from snakia.core.engine import Engine
from snakia.core.loader import Loadable
class Loader:
def __init__(self, engine: Engine) -> None:
self.__engine: Final = engine
self.__loadables: Final[list[Loadable]] = []
def register(self, loadable: Callable[[Engine], Loadable]) -> None:
self.__loadables.append(loadable(self.__engine))
def load_all(self) -> None:
for loadable in self.__loadables:
loadable.load()
def unload_all(self) -> None:
for loadable in reversed(self.__loadables):
loadable.unload()

View file

@ -0,0 +1,39 @@
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
from snakia.core.es import Event, Subscriber
from snakia.types import Version
from .plugin_processor import PluginProcessor
class Meta(BaseModel):
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)
name: str = Field(
default="unknown",
min_length=4,
max_length=32,
pattern="^[a-z0-9_]{4,32}$",
)
author: str = Field(
default="unknown",
min_length=4,
max_length=32,
pattern="^[a-z0-9_]{4,32}$",
)
version: Version = Field(
default_factory=lambda: Version(major=1, minor=0, patch=0)
)
subscribers: tuple[tuple[type[Event], Subscriber[Event]], ...] = Field(
default_factory=tuple
)
processors: tuple[type[PluginProcessor], ...] = Field(
default_factory=tuple
)
@property
def id(self) -> str:
return f"{self.author}.{self.name}"

View file

@ -0,0 +1,72 @@
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING, ClassVar, Final, final
from snakia.core.ecs import System
from snakia.core.es import Dispatcher
from .loadable import Loadable
from .meta import Meta
if TYPE_CHECKING:
from snakia.core.engine import Engine
class Plugin(Loadable):
__meta: ClassVar[Meta]
@final
def __init__(self, engine: Engine) -> None:
self.__engine: Final = engine
@final
@property
def meta(self) -> Meta:
"""The plugin's metadata."""
return self.__meta
@final
@property
def dispatcher(self) -> Dispatcher:
return self.__engine.dispatcher
@final
@property
def system(self) -> System:
return self.__engine.system
@final
def load(self) -> None:
for processor in self.meta.processors:
self.__engine.system.add_processor(processor(self))
for event_type, subscriber in self.meta.subscribers:
self.__engine.dispatcher.subscribe(event_type, subscriber)
self.on_load()
@final
def unload(self) -> None:
for processor in self.meta.processors:
self.__engine.system.remove_processor(processor)
for event_type, subscriber in self.meta.subscribers:
self.__engine.dispatcher.unsubscribe(event_type, subscriber)
self.on_unload()
@abstractmethod
def on_load(self) -> None:
pass
@abstractmethod
def on_unload(self) -> None:
pass
if TYPE_CHECKING:
@final
def __init_subclass__(cls, meta: Meta) -> None:
pass
else:
def __init_subclass__(cls, meta: Meta) -> None:
cls.meta = meta

View file

@ -0,0 +1,14 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Final, final
from snakia.core.ecs import Processor
if TYPE_CHECKING:
from .plugin import Plugin
class PluginProcessor(Processor):
@final
def __init__(self, plugin: Plugin) -> None:
self.plugin: Final = plugin

View file

@ -0,0 +1,26 @@
from .async_bindable import AsyncBindable
from .base_bindable import BaseBindable, BindableSubscriber, ValueChanged
from .bindable import Bindable
from .chain import chain
from .combine import combine
from .concat import concat
from .const import const
from .filter import filter # noqa: W0622 # pylint: disable=W0622
from .map import map # noqa: W0622 # pylint: disable=W0622
from .merge import async_merge, merge
__all__ = [
"Bindable",
"AsyncBindable",
"BaseBindable",
"BindableSubscriber",
"ValueChanged",
"chain",
"combine",
"concat",
"const",
"filter",
"map",
"merge",
"async_merge",
]

View file

@ -0,0 +1,105 @@
from typing import Any, Awaitable, Callable, Literal, overload
from .base_bindable import BaseBindable, BindableSubscriber, ValueChanged
class AsyncBindable[T: Any](BaseBindable[T]):
"""
An asynchronous bindable.
"""
def __init__(self, default_value: T | None = None) -> None:
super().__init__(default_value)
self.__subscribers: list[BindableSubscriber[T, Awaitable[Any]]] = []
@property
def value(self) -> T:
return self.__value
@property
def subscribers(
self,
) -> tuple[BindableSubscriber[T, Awaitable[Any]], ...]:
"""Get the subscribers."""
return (*self.__subscribers,)
async def set(self, value: T) -> None:
"""Set the value."""
e = ValueChanged(self.__value, value)
self.__value = value
for subscriber in self.__subscribers:
await subscriber(e)
@overload
def subscribe(
self,
subscriber: BindableSubscriber[T, Awaitable[Any]],
/,
run_now: Literal[True],
) -> Awaitable[None]: ...
@overload
def subscribe(
self,
subscriber: BindableSubscriber[T, Awaitable[Any]],
/,
run_now: Literal[False] = False,
) -> None: ...
def subscribe(
self,
subscriber: BindableSubscriber[T, Awaitable[Any]],
/,
run_now: bool = False,
) -> None | Awaitable[None]:
"""Subscribe to an value."""
self.__subscribers.append(subscriber)
if run_now:
async def _run() -> None:
await subscriber(
ValueChanged(self.__default_value, self.__value)
)
return _run()
return None
def unsubscribe(
self, subscriber: BindableSubscriber[T, Awaitable[Any]]
) -> None:
"""Unsubscribe from an value."""
self.__subscribers.remove(subscriber)
@overload
def on(
self, run_now: Literal[True]
) -> Callable[
[BindableSubscriber[T, Awaitable[Any]]], Awaitable[None]
]: ...
@overload
def on(
self, run_now: Literal[False] = False
) -> Callable[[BindableSubscriber[T, Awaitable[Any]]], None]: ...
def on(self, run_now: bool = False) -> Callable[
[BindableSubscriber[T, Awaitable[Any]]],
None | Awaitable[None],
]:
"""Decorator to subscribe to an value."""
def wrapper(
subscriber: BindableSubscriber[T, Awaitable[Any]],
) -> None | Awaitable[None]:
self.__subscribers.append(subscriber)
if run_now:
async def _run() -> None:
await subscriber(
ValueChanged(self.__default_value, self.__value)
)
return _run()
return None
return wrapper

View file

@ -0,0 +1,28 @@
from typing import Any, NamedTuple, Protocol
class ValueChanged[T](NamedTuple):
old_value: T
new_value: T
class BindableSubscriber[T: Any, R: Any](Protocol):
def __call__(self, value: ValueChanged[T], /) -> R: ...
class BaseBindable[T: Any]:
def __init__(self, default_value: T | None = None) -> None:
if default_value is not None:
self.__default_value: T = default_value
self.__value: T = default_value
@property
def default_value(self) -> T:
return self.__default_value
@property
def value(self) -> T:
return self.__value
def set_silent(self, value: T) -> None:
self.__value = value

View file

@ -0,0 +1,47 @@
from typing import Any, Callable
from .base_bindable import BaseBindable, BindableSubscriber, ValueChanged
class Bindable[T: Any](BaseBindable[T]):
"""
A bindable value.
"""
def __init__(self, default_value: T | None = None) -> None:
super().__init__(default_value)
self.__subscribers: list[BindableSubscriber[T, Any]] = []
@property
def subscribers(self) -> tuple[BindableSubscriber[T, Any], ...]:
"""Get the subscribers."""
return (*self.__subscribers,)
def set(self, value: T) -> None:
"""Set the value."""
e = ValueChanged(self.__value, value)
self.set_silent(value)
for subscriber in self.__subscribers:
subscriber(e)
def subscribe(
self, subscriber: BindableSubscriber[T, Any], /, run_now: bool = False
) -> None:
"""Subscribe to an value."""
self.__subscribers.append(subscriber)
if run_now:
subscriber(ValueChanged(self.default_value, self.value))
def unsubscribe(self, subscriber: BindableSubscriber[T, Any]) -> None:
"""Unsubscribe from an value."""
self.__subscribers.remove(subscriber)
def on(
self, run_now: bool = False
) -> Callable[[BindableSubscriber[T, Any]], None]:
"""Decorator to subscribe to an value."""
def wrapper(subscriber: BindableSubscriber[T, Any]) -> None:
self.subscribe(subscriber, run_now)
return wrapper

View file

@ -0,0 +1,53 @@
from typing import Any, Callable, overload
@overload
def chain[**P, A](func1: Callable[P, A], /) -> Callable[P, A]: ...
@overload
def chain[**P, A, B](
func1: Callable[P, A], func2: Callable[[A], B], /
) -> Callable[P, B]: ...
@overload
def chain[**P, A, B, C](
func1: Callable[P, A], func2: Callable[[A], B], func3: Callable[[B], C], /
) -> Callable[P, C]: ...
@overload
def chain[**P, A, B, C, D](
func1: Callable[P, A],
func2: Callable[[A], B],
func3: Callable[[B], C],
func4: Callable[[C], D],
/,
) -> Callable[P, D]: ...
@overload
def chain[**P, A, B, C, D, E](
func1: Callable[P, A],
func2: Callable[[A], B],
func3: Callable[[B], C],
func4: Callable[[C], D],
func5: Callable[[D], E],
/,
) -> Callable[P, E]: ...
@overload
def chain[**P](
func1: Callable[P, Any], /, *funcs: Callable[[Any], Any]
) -> Callable[P, Any]: ...
def chain[**P](
func1: Callable[P, Any], /, *funcs: Callable[[Any], Any]
) -> Callable[P, Any]:
def inner(*args: P.args, **kwargs: P.kwargs) -> Any:
v = func1(*args, **kwargs)
for f in funcs:
v = f(v)
return v
return inner

View file

@ -0,0 +1,94 @@
import operator
from typing import Any, Callable, overload
from snakia.utils import to_async
from .async_bindable import AsyncBindable
from .base_bindable import ValueChanged
from .bindable import Bindable
from .concat import concat
@overload
def combine[A, R](
source1: Bindable[A] | AsyncBindable[A],
/,
*,
combiner: Callable[[A], R],
) -> Bindable[R]: ...
@overload
def combine[A, B, R](
source1: Bindable[A] | AsyncBindable[A],
source2: Bindable[B] | AsyncBindable[B],
/,
*,
combiner: Callable[[A, B], R],
) -> Bindable[R]: ...
@overload
def combine[A, B, C, R](
source1: Bindable[A] | AsyncBindable[A],
source2: Bindable[B] | AsyncBindable[B],
source3: Bindable[C] | AsyncBindable[C],
/,
*,
combiner: Callable[[A, B, C], R],
) -> Bindable[R]: ...
@overload
def combine[A, B, C, D, R](
source1: Bindable[A] | AsyncBindable[A],
source2: Bindable[B] | AsyncBindable[B],
source3: Bindable[C] | AsyncBindable[C],
source4: Bindable[D] | AsyncBindable[D],
/,
*,
combiner: Callable[[A, B, C, D], R],
) -> Bindable[R]: ...
@overload
def combine[A, B, C, D, R](
source1: Bindable[A] | AsyncBindable[A],
source2: Bindable[B] | AsyncBindable[B],
source3: Bindable[C] | AsyncBindable[C],
source4: Bindable[D] | AsyncBindable[D],
/,
*,
combiner: Callable[[A, B, C, D], R],
) -> Bindable[R]: ...
@overload
def combine[R](
*sources: Bindable[Any] | AsyncBindable[Any],
combiner: Callable[..., R],
) -> Bindable[R]: ...
def combine[R](
*sources: Bindable[Any] | AsyncBindable[Any],
combiner: Callable[..., R],
) -> Bindable[R]:
combined = Bindable[R]()
values = [*map(lambda s: s.value, sources)]
for i, source in enumerate(sources):
def make_subscriber(
index: int,
) -> Callable[[ValueChanged[Any]], None]:
return concat(
lambda v: operator.setitem(values, index, v.new_value),
lambda _: combiner(*values),
)
if isinstance(source, Bindable):
source.subscribe(make_subscriber(i))
else:
source.subscribe(to_async(make_subscriber(i)))
return combined

View file

@ -0,0 +1,9 @@
from typing import Any, Callable
def concat[**P](*funcs: Callable[P, Any]) -> Callable[P, None]:
def inner(*args: P.args, **kwargs: P.kwargs) -> None:
for f in funcs:
f(*args, **kwargs)
return inner

View file

@ -0,0 +1,5 @@
from typing import Callable
def const[T](value: T) -> Callable[[], T]:
return lambda: value

View file

@ -0,0 +1,9 @@
import builtins
from typing import Callable, Iterable, TypeGuard
# noqa: W0622 # pylint: disable=W0622
def filter[S, T](
f: Callable[[S], TypeGuard[T]],
) -> Callable[[Iterable[S]], Iterable[T]]:
return lambda iterable: builtins.filter(f, iterable)

View file

@ -0,0 +1,9 @@
import builtins
from typing import Any, Callable, Iterable
# noqa: W0622 # pylint: disable=W0622
def map[T: Any, U](
func: Callable[[T], U], /
) -> Callable[[Iterable[T]], Iterable[U]]:
return lambda iterable: builtins.map(func, iterable)

View file

@ -0,0 +1,20 @@
from .async_bindable import AsyncBindable
from .bindable import Bindable
def merge[T](
*sources: Bindable[T],
) -> Bindable[T]:
merged = Bindable[T]()
for source in sources:
source.subscribe(lambda v: merged.set(v.new_value), run_now=True)
return merged
async def async_merge[T](
*sources: AsyncBindable[T],
) -> AsyncBindable[T]:
merged = AsyncBindable[T]()
for source in sources:
await source.subscribe(lambda v: merged.set(v.new_value), run_now=True)
return merged

View file

@ -0,0 +1,13 @@
from .canvas import Canvas
from .char import CanvasChar
from .renderer import RenderContext, Renderer, RenderTarget
from .widget import Widget
__all__ = [
"Canvas",
"CanvasChar",
"Renderer",
"RenderContext",
"RenderTarget",
"Widget",
]

View file

@ -0,0 +1,173 @@
from __future__ import annotations
from typing import Final, Iterable
from .char import CanvasChar
class Canvas:
"""
A canvas is a 2D array of characters.
"""
__slots__ = "__buffer", "__default", "width", "height"
def __init__(
self, width: int, height: int, default_value: CanvasChar = CanvasChar()
) -> None:
width = max(width, 0)
height = max(height, 0)
self.width: Final[int] = width
self.height: Final[int] = height
self.__default: Final[CanvasChar] = default_value
self.__buffer: list[CanvasChar] = [default_value] * self.total
@property
def total(self) -> int:
return self.width * self.height
def get(self, x: int, y: int, /) -> CanvasChar:
"""Get the character at the given position."""
return self.__buffer[self._get_index(x, y)]
def get_row(self, y: int, /) -> Iterable[CanvasChar]:
"""Get the row at the given position."""
start_index = self._get_index(0, y)
end_index = start_index + self.width
return self.__buffer[start_index:end_index]
def get_column(self, x: int, /) -> Iterable[CanvasChar]:
"""Get the column at the given position."""
return (
self.__buffer[self._get_index(x, y)] for y in range(self.height)
)
def set(self, x: int, y: int, value: CanvasChar, /) -> None:
"""Set the character at the given position."""
self.__buffer[self._get_index(x, y)] = value
def set_row(self, y: int, value: CanvasChar, /) -> None:
"""Set the row at the given position."""
start_index = self._get_index(0, y)
end_index = start_index + self.width
self.__buffer[start_index:end_index] = [value] * self.width
def set_column(self, x: int, value: CanvasChar, /) -> None:
"""Set the column at the given position."""
for y in range(self.height):
self.set(x, y, value)
def set_area(
self,
x: int,
y: int,
width: int,
height: int,
value: CanvasChar,
) -> None:
"""Set the area at the given position."""
for i in range(
self._get_index(x, y), self._get_index(x + width, y + height)
):
self.__buffer[i] = value
def clear(self) -> None:
"""Clear the canvas."""
self.fill(self.__default)
def fill(self, value: CanvasChar, /) -> None:
"""Fill the canvas with the given value."""
self.__buffer = [value] * self.total
def draw_line_h(
self,
x: int,
y: int,
length: int,
char: CanvasChar,
) -> None:
"""Draw a horizontal line."""
for i in range(length):
self.set(x + i, y, char)
def draw_line_v(
self,
x: int,
y: int,
length: int,
char: CanvasChar,
) -> None:
"""Draw a vertical line."""
for i in range(length):
self.set(x, y + i, char)
def draw_rect(
self,
x: int,
y: int,
width: int,
height: int,
char: CanvasChar,
) -> None:
"""Draw a rectangle."""
# Bottom and top
self.draw_line_h(x, y, width, char)
self.draw_line_h(x, y + height - 1, width, char)
# Left and right
self.draw_line_v(x, y, height, char)
self.draw_line_v(x + width - 1, y, height, char)
def copy_from(self, other: Canvas, x: int = 0, y: int = 0) -> None:
"""Copy the given canvas to the current canvas."""
for dy in range(min(other.height, self.height - y)):
for dx in range(min(other.width, self.width - x)):
self.set(x + dx, y + dy, other.get(dx, dy))
def draw_text(self, x: int, y: int, text: str, char: CanvasChar) -> None:
"""Draw text on the canvas."""
for i, c in enumerate(text):
if x + i < self.width:
self.set(
x + i,
y,
CanvasChar(
char=c,
fg_color=char.fg_color,
bg_color=char.bg_color,
bold=char.bold,
italic=char.italic,
underline=char.underline,
),
)
def draw_filled_rect(
self,
x: int,
y: int,
width: int,
height: int,
char: CanvasChar,
) -> None:
"""Draw a filled rectangle."""
for dy in range(height):
for dx in range(width):
self.set(x + dx, y + dy, char)
def is_valid_position(self, x: int, y: int) -> bool:
"""Check if the given position is valid."""
return 0 <= x < self.width and 0 <= y < self.height
def get_subcanvas(self, x: int, y: int, width: int, height: int) -> Canvas:
"""Get a subcanvas from the current canvas."""
subcanvas = Canvas(width, height, self.__default)
for dy in range(height):
for dx in range(width):
if self.is_valid_position(x + dx, y + dy):
subcanvas.set(dx, dy, self.get(x + dx, y + dy))
return subcanvas
def _get_index(self, x: int, y: int) -> int:
return self.width * y + x

View file

@ -0,0 +1,30 @@
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class CanvasChar:
char: str = " "
fg_color: str | None = None
bg_color: str | None = None
bold: bool = False
italic: bool = False
underline: bool = False
def __str__(self) -> str:
return self.char
def __repr__(self) -> str:
attrs = []
if self.fg_color:
attrs.append(f"fg={self.fg_color}")
if self.bg_color:
attrs.append(f"bg={self.bg_color}")
if self.bold:
attrs.append("bold")
if self.italic:
attrs.append("italic")
if self.underline:
attrs.append("underline")
attr_str = f"[{', '.join(attrs)}]" if attrs else ""
return f"CanvasChar('{self.char}'{attr_str})"

View file

@ -0,0 +1,4 @@
from .ansi import ANSIRenderer
from .plain_text import PlainTextRenderer
__all__ = ["ANSIRenderer", "PlainTextRenderer"]

View file

@ -0,0 +1,75 @@
from snakia.core.tui import Canvas, CanvasChar, Renderer, RenderTarget
class ANSIRenderer(Renderer):
def __init__(self, target: RenderTarget) -> None:
super().__init__(target)
self._current_char = CanvasChar()
def render(self, canvas: Canvas) -> None:
for y in range(canvas.height):
for x in range(canvas.width):
char = canvas.get(x, y)
self._render_char(char)
self.target.write("\n")
def _render_char(self, char: CanvasChar) -> None:
if char != self._current_char:
self._reset_attributes()
self._apply_attributes(char)
self._current_char = char
self.target.write(char.char)
def _reset_attributes(self) -> None:
self.target.write("\033[0m")
def _apply_attributes(self, char: CanvasChar) -> None:
codes = []
if char.bold:
codes.append("1")
if char.italic:
codes.append("3")
if char.underline:
codes.append("4")
if char.fg_color:
codes.append(f"38;5;{self._color_to_ansi(char.fg_color)}")
if char.bg_color:
codes.append(f"48;5;{self._color_to_ansi(char.bg_color)}")
if codes:
self.target.write(f"\033[{';'.join(codes)}m")
def _color_to_ansi(self, color: str) -> int:
color_map = {
"black": 0,
"red": 1,
"green": 2,
"yellow": 3,
"blue": 4,
"magenta": 5,
"cyan": 6,
"white": 7,
"bright_black": 8,
"bright_red": 9,
"bright_green": 10,
"bright_yellow": 11,
"bright_blue": 12,
"bright_magenta": 13,
"bright_cyan": 14,
"bright_white": 15,
}
return color_map.get(color.lower(), 7)
def clear_screen(self) -> None:
self.target.write("\033[2J")
def hide_cursor(self) -> None:
self.target.write("\033[?25l")
def show_cursor(self) -> None:
self.target.write("\033[?25h")
def set_cursor_position(self, x: int, y: int) -> None:
self.target.write(f"\033[{y + 1};{x + 1}H")

View file

@ -0,0 +1,22 @@
from snakia.core.tui import Canvas, Renderer
class PlainTextRenderer(Renderer):
def render(self, canvas: Canvas) -> None:
for y in range(canvas.height):
for x in range(canvas.width):
char = canvas.get(x, y)
self.target.write(char.char)
self.target.write("\n")
def clear_screen(self) -> None:
pass
def hide_cursor(self) -> None:
pass
def show_cursor(self) -> None:
pass
def set_cursor_position(self, x: int, y: int) -> None:
pass

View file

@ -0,0 +1,59 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Protocol
from .canvas import Canvas
class RenderTarget(Protocol):
def write(self, text: str) -> None: ...
def flush(self) -> None: ...
class Renderer(ABC):
def __init__(self, target: RenderTarget) -> None:
self.target = target
@abstractmethod
def render(self, canvas: Canvas) -> None:
pass
@abstractmethod
def clear_screen(self) -> None:
pass
@abstractmethod
def hide_cursor(self) -> None:
pass
@abstractmethod
def show_cursor(self) -> None:
pass
@abstractmethod
def set_cursor_position(self, x: int, y: int) -> None:
pass
class RenderContext:
def __init__(self, renderer: Renderer) -> None:
self.renderer = renderer
self._cursor_visible = True
def __enter__(self) -> RenderContext:
self.renderer.hide_cursor()
self.renderer.clear_screen()
self._cursor_visible = False
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if not self._cursor_visible:
self.renderer.show_cursor()
self.renderer.target.flush()
def render(self, canvas: Canvas) -> None:
self.renderer.set_cursor_position(0, 0)
self.renderer.render(canvas)
self.renderer.target.flush()

View file

@ -0,0 +1,36 @@
from abc import ABC, abstractmethod
from typing import Final, final
from snakia.core.rx import AsyncBindable, Bindable
from snakia.utils import to_async
from .canvas import Canvas
class Widget(ABC):
def __init__(self) -> None:
self.dirty: Final = Bindable(True)
self.__cache: Canvas = Canvas(0, 0)
@abstractmethod
def on_render(self) -> Canvas: ...
@final
def render(self) -> Canvas:
if self.dirty.value:
result = self.on_render()
self.__cache = result
self.dirty.set(False)
return self.__cache
@final
def state[T](self, default_value: T) -> Bindable[T]:
field = Bindable(default_value)
field.subscribe(lambda _: self.dirty.set(True))
return field
@final
def async_state[T](self, default_value: T) -> AsyncBindable[T]:
field = AsyncBindable(default_value)
field.subscribe(to_async(lambda _: self.dirty.set(True)))
return field

View file

@ -0,0 +1,13 @@
from .box import BoxWidget
from .container import ContainerWidget
from .horizontal_split import HorizontalSplitWidget
from .text import TextWidget
from .vertical_split import VerticalSplitWidget
__all__ = [
"ContainerWidget",
"TextWidget",
"BoxWidget",
"HorizontalSplitWidget",
"VerticalSplitWidget",
]

View file

@ -0,0 +1,21 @@
from snakia.core.tui import Widget
from snakia.core.tui.canvas import Canvas
from snakia.core.tui.char import CanvasChar
class BoxWidget(Widget):
def __init__(
self, width: int, height: int, char: CanvasChar = CanvasChar("")
) -> None:
super().__init__()
self.width = self.state(width)
self.height = self.state(height)
self.char = self.state(char)
def on_render(self) -> Canvas:
width = self.width.value
height = self.height.value
char = self.char.value
canvas = Canvas(width, height, CanvasChar())
canvas.draw_filled_rect(0, 0, width, height, char)
return canvas

View file

@ -0,0 +1,9 @@
from typing import Final, Iterable
from snakia.core.tui import Widget
class ContainerWidget(Widget):
def __init__(self, children: Iterable[Widget]) -> None:
super().__init__()
self.children: Final = self.state([*children])

View file

@ -0,0 +1,43 @@
from typing import Iterable
from snakia.core.tui import Widget
from snakia.core.tui.canvas import Canvas
from snakia.core.tui.char import CanvasChar
from .container import ContainerWidget
class HorizontalSplitWidget(ContainerWidget):
def __init__(
self, children: Iterable[Widget], splitter_char: str = "|"
) -> None:
super().__init__(children)
self.splitter_char = splitter_char
def on_render(self) -> Canvas:
children_list = self.children.value
if not children_list:
return Canvas(0, 0, CanvasChar())
child_canvases = [child.render() for child in children_list]
total_width = (
sum(canvas.width for canvas in child_canvases)
+ len(child_canvases)
- 1
)
max_height = max(canvas.height for canvas in child_canvases)
result = Canvas(total_width, max_height, CanvasChar())
x_offset = 0
for i, canvas in enumerate(child_canvases):
result.copy_from(canvas, x_offset, 0)
x_offset += canvas.width
if i < len(child_canvases) - 1:
splitter_char = CanvasChar(self.splitter_char)
for y in range(max_height):
result.set(x_offset, y, splitter_char)
x_offset += 1
return result

View file

@ -0,0 +1,17 @@
from snakia.core.tui import Widget
from snakia.core.tui.canvas import Canvas
from snakia.core.tui.char import CanvasChar
class TextWidget(Widget):
def __init__(self, text: str, char: CanvasChar = CanvasChar()) -> None:
super().__init__()
self.text = self.state(text)
self.char = self.state(char)
def on_render(self) -> Canvas:
text = self.text.value
char = self.char.value
canvas = Canvas(len(text), 1, CanvasChar())
canvas.draw_text(0, 0, text, char)
return canvas

View file

@ -0,0 +1,43 @@
from typing import Iterable
from snakia.core.tui import Widget
from snakia.core.tui.canvas import Canvas
from snakia.core.tui.char import CanvasChar
from .container import ContainerWidget
class VerticalSplitWidget(ContainerWidget):
def __init__(
self, children: Iterable[Widget], splitter_char: str = "-"
) -> None:
super().__init__(children)
self.splitter_char = splitter_char
def on_render(self) -> Canvas:
children_list = self.children.value
if not children_list:
return Canvas(0, 0, CanvasChar())
child_canvases = [child.render() for child in children_list]
max_width = max(canvas.width for canvas in child_canvases)
total_height = (
sum(canvas.height for canvas in child_canvases)
+ len(child_canvases)
- 1
)
result = Canvas(max_width, total_height, CanvasChar())
y_offset = 0
for i, canvas in enumerate(child_canvases):
result.copy_from(canvas, 0, y_offset)
y_offset += canvas.height
if i < len(child_canvases) - 1:
splitter_char = CanvasChar(self.splitter_char)
for x in range(max_width):
result.set(x, y_offset, splitter_char)
y_offset += 1
return result

View file

@ -0,0 +1,18 @@
from .inject_after import after_hook, inject_after
from .inject_before import before_hook, inject_before
from .inject_const import inject_const
from .inject_replace import inject_replace, replace_hook
from .pass_exceptions import pass_exceptions
from .singleton import singleton
__all__ = [
"inject_replace",
"replace_hook",
"inject_after",
"after_hook",
"inject_before",
"before_hook",
"inject_const",
"pass_exceptions",
"singleton",
]

View file

@ -0,0 +1,22 @@
from typing import Callable
from .inject_replace import inject_replace
def inject_after[T: object, **P, R](
obj: T, target: Callable[P, R], hook: Callable[[R], R]
) -> T:
def inner(*args: P.args, **kwargs: P.kwargs) -> R:
return hook(target(*args, **kwargs))
return inject_replace(obj, target, inner)
def after_hook[**P, R](
obj: object, target: Callable[P, R]
) -> Callable[[Callable[[R], R]], Callable[[R], R]]:
def hook(new: Callable[[R], R]) -> Callable[[R], R]:
inject_after(obj, target, new)
return new
return hook

View file

@ -0,0 +1,24 @@
from typing import Any, Callable
from .inject_replace import inject_replace
def inject_before[T: object, **P, R](
obj: T, target: Callable[P, R], hook: Callable[P, Any]
) -> T:
def inner(*args: P.args, **kwargs: P.kwargs) -> R:
hook(*args, **kwargs)
return target(*args, **kwargs)
return inject_replace(obj, target, inner)
def before_hook[**P, R](
obj: object, target: Callable[P, R]
) -> Callable[[Callable[P, Any]], Callable[P, Any]]:
def hook(new: Callable[P, Any]) -> Callable[P, Any]:
inject_before(obj, target, new)
return new
return hook

View file

@ -0,0 +1,47 @@
import sys
from types import FunctionType
from typing import Any, Callable, cast
if sys.version_info >= (3, 13):
def inject_const[T: Callable[..., Any]](**consts: Any) -> Callable[[T], T]:
def inner(func: T) -> T:
values = [*func.__code__.co_consts]
for i, name in enumerate(func.__code__.co_varnames):
if name in consts:
values[i + 1] = consts[name]
return cast(
T,
FunctionType(
code=func.__code__.replace(co_consts=(*values,)),
globals=func.__globals__,
name=func.__name__,
argdefs=func.__defaults__,
closure=func.__closure__,
kwdefaults=func.__kwdefaults__,
),
)
return inner
else:
def inject_const[T: Callable[..., Any]](**consts: Any) -> Callable[[T], T]:
def inner(func: T) -> T:
values = [*func.__code__.co_consts]
for i, name in enumerate(func.__code__.co_varnames):
if name in consts:
values[i + 1] = consts[name]
return cast(
T,
FunctionType(
code=func.__code__.replace(co_consts=(*values,)),
globals=func.__globals__,
name=func.__name__,
argdefs=func.__defaults__,
closure=func.__closure__,
# kwdefaults=func.__kwdefaults__,
),
)
return inner

View file

@ -0,0 +1,20 @@
from typing import Callable
def inject_replace[T: object, **P, R](
obj: T, old: Callable[P, R], new: Callable[P, R]
) -> T:
for k, v in obj.__dict__.items():
if v is old:
setattr(obj, k, new)
return obj
def replace_hook[**P, R](
obj: object, old: Callable[P, R]
) -> Callable[[Callable[P, R]], Callable[P, R]]:
def hook(new: Callable[P, R]) -> Callable[P, R]:
inject_replace(obj, old, new)
return new
return hook

View file

@ -0,0 +1,32 @@
from __future__ import annotations
from typing import Any, Callable, overload
@overload
def pass_exceptions[**P](
*errors: type[Exception],
) -> Callable[[Callable[P, Any | None]], Callable[P, Any | None]]: ...
@overload
def pass_exceptions[**P, R](
*errors: type[Exception],
default: R,
) -> Callable[[Callable[P, R]], Callable[P, R]]: ...
def pass_exceptions(
*errors: type[Exception],
default: Any = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
except Exception as e: # noqa: W0718 # pylint: disable=W0718
if type(e) not in errors:
raise e from Exception()
return default
return wrapper
return decorator

View file

@ -0,0 +1,2 @@
def singleton[T](cls: type[T]) -> T:
return cls()

View file

@ -0,0 +1,15 @@
from .auto import AutoField
from .bool import BoolField
from .field import Field
from .float import FloatField
from .int import IntField
from .str import StrField
__all__ = [
"Field",
"AutoField",
"BoolField",
"FloatField",
"IntField",
"StrField",
]

25
src/snakia/field/auto.py Normal file
View file

@ -0,0 +1,25 @@
import pickle
from typing import Final, override
from .field import Field
class AutoField[T](Field[T]):
__slots__ = ("__target_type",)
def __init__(
self, default_value: T, *, target_type: type[T] | None = None
) -> None:
super().__init__(default_value)
self.__target_type: Final = target_type
@override
def serialize(self, value: T, /) -> bytes:
return pickle.dumps(value)
@override
def deserialize(self, serialized: bytes, /) -> T:
value = pickle.loads(serialized)
if not isinstance(value, self.__target_type or object):
return self.default_value
return value # type: ignore

13
src/snakia/field/bool.py Normal file
View file

@ -0,0 +1,13 @@
from typing import override
from .field import Field
class BoolField(Field[bool]):
@override
def serialize(self, value: bool, /) -> bytes:
return b"\x01" if value else b"\x00"
@override
def deserialize(self, serialized: bytes, /) -> bool:
return serialized == b"\x01"

49
src/snakia/field/field.py Normal file
View file

@ -0,0 +1,49 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable, Final, final
from snakia.property.priv_property import PrivProperty
from snakia.utils import inherit
class Field[T: Any](ABC, PrivProperty[T]):
def __init__(self, default_value: T) -> None:
self.default_value: Final[T] = default_value
super().__init__(default_value)
@abstractmethod
def serialize(self, value: T, /) -> bytes:
"""Serialize a value
:param value: value to serialize
:type value: T
:return: serialized value
:rtype: bytes
"""
@abstractmethod
def deserialize(self, serialized: bytes, /) -> T:
"""Deserialize a value
:param serialized: serialized value
:type serialized: bytes
:return: deserialized value
:rtype: T
"""
@final
@classmethod
def custom[R](
cls: type[Field[Any]],
serialize: Callable[[R], str],
deserialize: Callable[[str], R],
) -> type[Field[R]]:
return inherit(
cls, {"serialize": serialize, "deserialize": deserialize}
)
if TYPE_CHECKING:
@classmethod
def type(cls) -> type[T]: ...

14
src/snakia/field/float.py Normal file
View file

@ -0,0 +1,14 @@
import struct
from typing import override
from .field import Field
class FloatField(Field[float]):
@override
def serialize(self, value: float, /) -> bytes:
return struct.pack(">f", value)
@override
def deserialize(self, serialized: bytes, /) -> float:
return struct.unpack(">f", serialized)[0] # type: ignore

14
src/snakia/field/int.py Normal file
View file

@ -0,0 +1,14 @@
from typing import override
from .field import Field
class IntField(Field[int]):
@override
def serialize(self, value: int, /) -> bytes:
length = (value.bit_length() + 7) // 8
return value.to_bytes(length, "little")
@override
def deserialize(self, serialized: bytes, /) -> int:
return int.from_bytes(serialized, "little")

17
src/snakia/field/str.py Normal file
View file

@ -0,0 +1,17 @@
from typing import Final, override
from .field import Field
class StrField(Field[str]):
def __init__(self, default_value: str, *, encoding: str = "utf-8") -> None:
super().__init__(default_value)
self.encoding: Final = encoding
@override
def serialize(self, value: str, /) -> bytes:
return value.encode(self.encoding)
@override
def deserialize(self, serialized: bytes, /) -> str:
return serialized.decode(self.encoding)

16
src/snakia/field/t.py Normal file
View file

@ -0,0 +1,16 @@
# noqa: W0622 # pylint: disable=W0622
from .auto import AutoField as auto
from .bool import BoolField as bool
from .field import Field as field
from .float import FloatField as float
from .int import IntField as int
from .str import StrField as str
__all__ = [
"auto",
"bool",
"field",
"float",
"int",
"str",
]

View file

@ -0,0 +1,20 @@
from .android import AndroidLayer
from .freebsd import FreebsdLayer
from .ios import IosLayer
from .layer import PlatformLayer
from .linux import LinuxLayer
from .macos import MacosLayer
from .os import OS, PlatformOS
from .windows import WindowsLayer
__all__ = (
"PlatformOS",
"OS",
"PlatformLayer",
"AndroidLayer",
"FreebsdLayer",
"IosLayer",
"LinuxLayer",
"MacosLayer",
"WindowsLayer",
)

View file

@ -0,0 +1,53 @@
from __future__ import annotations
from ctypes import CDLL, Array, c_char, c_char_p, create_string_buffer
from typing import Any, Final, Literal, cast, overload
from .layer import PlatformLayer
from .os import PlatformOS
class AndroidLayer(PlatformLayer[Literal[PlatformOS.ANDROID]]):
target = PlatformOS.ANDROID
PROP_VALUE_MAX: Final = 92
@overload
def get_prop(self, name: str) -> str | None: ...
@overload
def get_prop[T](self, name: str, default: T) -> str | T: ...
def get_prop(self, name: str, default: Any = None) -> Any:
buffer = create_string_buffer(self.PROP_VALUE_MAX)
length = self.system_property_get(name.encode("UTF-8"), buffer)
if length == 0:
return default
return buffer.value.decode("UTF-8", "backslashreplace")
def system_property_get(self, name: bytes, default: Array[c_char]) -> int:
func = getattr(CDLL("libc.so"), "__system_property_get")
func.argtypes = (c_char_p, c_char_p)
result = cast(int, func(name, default))
return result
def release(self, default: str = "") -> str:
return self.get_prop("ro.build.version.release", default)
def api_level(self, default: int) -> int:
return int(self.get_prop("ro.build.version.sdk", default))
def manufacturer(self, default: str = "") -> str:
return self.get_prop("ro.product.manufacturer", default)
def model(self, default: str = "") -> str:
return self.get_prop("ro.product.model", default)
def device(self, default: str = "") -> str:
return self.get_prop("ro.product.device", default)
def is_emulator(self, default: bool) -> bool:
prop = self.get_prop("ro.kernel.qemu", None)
if prop is None:
return default
return prop == "1"

View file

@ -0,0 +1,11 @@
from __future__ import annotations
from typing import Literal
from .layer import PlatformLayer
from .os import PlatformOS
# TODO: create a freebds layer
class FreebsdLayer(PlatformLayer[Literal[PlatformOS.FREEBSD]]):
target = PlatformOS.FREEBSD

View file

@ -0,0 +1,11 @@
from __future__ import annotations
from typing import Literal
from .layer import PlatformLayer
from .os import PlatformOS
# TODO: create a ios layer
class IosLayer(PlatformLayer[Literal[PlatformOS.IOS]]):
target = PlatformOS.IOS

View file

@ -0,0 +1,28 @@
from __future__ import annotations
from typing import ClassVar, Self, final, overload
from .os import PlatformOS
class PlatformLayer[T: PlatformOS]:
target: ClassVar[PlatformOS] = PlatformOS.UNKNOWN
@final
def __init__(self, platform: PlatformOS) -> None:
if platform != self.target:
raise NotImplementedError(
f"{self.__class__.__name__} is not implemented for {platform._name_}"
)
@overload
@classmethod
def try_get(cls, platform: T, /) -> Self: ...
@overload
@classmethod
def try_get(cls, platform: PlatformOS, /) -> Self | None: ...
@classmethod
def try_get(cls, platform: PlatformOS, /) -> Self | None:
if platform == cls.target:
return cls(platform)
return None

View file

@ -0,0 +1,57 @@
from __future__ import annotations
import re
from typing import Literal
from .layer import PlatformLayer
from .os import PlatformOS
class LinuxLayer(PlatformLayer[Literal[PlatformOS.LINUX]]):
target = PlatformOS.LINUX
def os_release_raw(self) -> str:
"""Read /etc/os-release or /usr/lib/os-release"""
try:
return open("/etc/os-release", encoding="utf-8").read()
except FileNotFoundError:
return open("/usr/lib/os-release", encoding="utf-8").read()
def os_release(self) -> dict[str, str]:
"""Parse `os_release_raw` and return a dict"""
raw = self.os_release_raw()
info = {
"ID": "linux",
}
os_release_line = re.compile(
"^(?P<name>[a-zA-Z0-9_]+)=(?P<quote>[\"']?)(?P<value>.*)(?P=quote)$"
)
os_release_unescape = re.compile(r"\\([\\\$\"\'`])")
for line in raw.split("\n"):
mo = os_release_line.match(line)
if mo is not None:
info[mo.group("name")] = os_release_unescape.sub(
r"\1", mo.group("value")
)
return info
def distro_name(self) -> str:
"""Return the distro name."""
return self.os_release().get("name", "linux")
def distro_pretty_name(self) -> str:
"""Return the distro pretty name."""
return self.os_release().get("PRETTY_NAME", "Linux")
def distro_id(self) -> str:
"""Return the distro id."""
return self.os_release().get("ID", "linux")
def version(self) -> str:
"""Return the distro version."""
return self.os_release().get("VERSION_ID", "0")
def codename(self) -> str:
"""Return the distro codename."""
return self.os_release().get("VERSION_CODENAME", "unknown")

View file

@ -0,0 +1,11 @@
from __future__ import annotations
from typing import Literal
from .layer import PlatformLayer
from .os import PlatformOS
# TODO: create a macos layer
class MacosLayer(PlatformLayer[Literal[PlatformOS.MACOS]]):
target = PlatformOS.MACOS

52
src/snakia/platform/os.py Normal file
View file

@ -0,0 +1,52 @@
from __future__ import annotations
import sys
from enum import IntEnum
from typing import Final
class PlatformOS(IntEnum):
UNKNOWN = 0
ANDROID = 1
FREEBSD = 2
IOS = 3
LINUX = 4
MACOS = 5
WINDOWS = 6
@property
def is_apple(self) -> bool:
"""MacOS, iOS"""
return self in [PlatformOS.MACOS, PlatformOS.IOS]
@property
def is_linux(self) -> bool:
"""Linux, Android"""
return self in [PlatformOS.LINUX, PlatformOS.ANDROID]
@classmethod
def resolve(cls) -> PlatformOS:
"""Get the current platform."""
platform = sys.platform
result = PlatformOS.UNKNOWN
if platform in ["win32", "win16", "dos", "cygwin", "msys"]:
result = PlatformOS.WINDOWS
if platform.startswith("linux"):
result = PlatformOS.LINUX
if platform.startswith("freebsd"):
result = PlatformOS.FREEBSD
if platform == "darwin":
result = PlatformOS.MACOS
if platform == "ios":
result = PlatformOS.IOS
if platform == "android":
result = PlatformOS.ANDROID
if platform.startswith("java"):
result = PlatformOS.UNKNOWN
return result
OS: Final[PlatformOS] = PlatformOS.resolve()
"""The current platform."""

View file

@ -0,0 +1,11 @@
from __future__ import annotations
from typing import Literal
from .layer import PlatformLayer
from .os import PlatformOS
# TODO: create a windows layer
class WindowsLayer(PlatformLayer[Literal[PlatformOS.WINDOWS]]):
target = PlatformOS.WINDOWS

View file

@ -0,0 +1,19 @@
from .cell_property import CellProperty
from .classproperty import ClassProperty
from .hook_property import HookProperty
from .initonly import Initonly, initonly
from .priv_property import PrivProperty
from .property import Property
from .readonly import Readonly, readonly
__all__ = [
"CellProperty",
"ClassProperty",
"HookProperty",
"Initonly",
"initonly",
"PrivProperty",
"Property",
"Readonly",
"readonly",
]

View file

@ -0,0 +1,67 @@
from typing import Any, Callable, Self
from snakia.types import empty
type _Cell[T] = T | None
type _Getter[T] = Callable[[Any, _Cell[T]], T]
type _Setter[T] = Callable[[Any, _Cell[T], T], _Cell[T]]
type _Deleter[T] = Callable[[Any, _Cell[T]], _Cell[T]]
class CellProperty[T]:
"""
A property that uses a cell to store its value.
"""
__slots__ = ("__name", "__fget", "__fset", "__fdel")
def __init__(
self,
fget: _Getter[T],
fset: _Setter[T] = empty.func,
fdel: _Deleter[T] = empty.func,
) -> None:
self.__fget: _Getter[T] = fget
self.__fset: _Setter[T] = fset
self.__fdel: _Deleter[T] = fdel
self.__name = ""
def __set_name__(self, owner: type, name: str) -> None:
self.__name = f"_{owner.__name__}__{name}"
def __get__(self, instance: Any, owner: type | None = None, /) -> T:
cell = self.__fget(instance, self.__get_cell(instance))
self.__set_cell(instance, cell)
return cell
def __set__(self, instance: Any, value: T, /) -> None:
cell = self.__fset(instance, self.__get_cell(instance), value)
self.__set_cell(instance, cell)
def __delete__(self, instance: Any, /) -> None:
cell = self.__fdel(instance, self.__get_cell(instance))
self.__set_cell(instance, cell)
def getter(self, fget: _Getter[T], /) -> Self:
"""Descriptor getter."""
self.__fget = fget
return self
def setter(self, fset: _Setter[T], /) -> Self:
"""Descriptor setter."""
self.__fset = fset
return self
def deleter(self, fdel: _Deleter[T], /) -> Self:
"""Descriptor deleter."""
self.__fdel = fdel
return self
def __get_cell(self, instance: Any) -> T | None:
return getattr(instance, self.__name, None)
def __set_cell(self, instance: Any, value: T | None) -> None:
if value is None:
delattr(instance, self.__name)
else:
setattr(instance, self.__name, value)

View file

@ -0,0 +1,64 @@
from typing import Any, Callable, Self
from snakia.types import empty
class ClassProperty[T]:
"""
Class property
"""
__slots__ = ("__fget", "__fset", "__fdel")
def __init__(
self,
fget: Callable[[Any], T],
fset: Callable[[Any, T], None] = empty.func,
fdel: Callable[[Any], None] = empty.func,
) -> None:
self.__fget = fget
self.__fset = fset
self.__fdel = fdel
def __get__(self, _: Any, owner: type | None = None, /) -> T:
return self.__fget(owner)
def __set__(self, instance: Any | None, value: T, /) -> None:
owner = type(instance) if instance else instance
return self.__fset(owner, value)
def __delete__(self, instance: Any | None, /) -> None:
owner = type(instance) if instance else instance
return self.__fdel(owner)
def getter(self, fget: Callable[[Any], T], /) -> Self:
"""Descriptor getter."""
self.__fget = fget
return self
def setter(self, fset: Callable[[Any, T], None], /) -> Self:
"""Descriptor setter."""
self.__fset = fset
return self
def deleter(self, fdel: Callable[[Any], None], /) -> Self:
"""Descriptor deleter."""
self.__fdel = fdel
return self
def classproperty[T](
fget: Callable[[Any], T] = empty.func,
fset: Callable[[Any, T], None] = empty.func,
fdel: Callable[[Any], None] = empty.func,
) -> ClassProperty[T]:
"""Create a class property.
Args:
fget (Callable[[Any], T], optional): The getter function. Defaults to empty.func.
fset (Callable[[Any, T], None], optional): The setter function. Defaults to empty.func.
fdel (Callable[[Any], None], optional): The deleter function. Defaults to empty.func.
Returns:
ClassProperty[T]: The class property.
"""
return ClassProperty(fget, fset, fdel)

View file

@ -0,0 +1,53 @@
from typing import Any, Callable, Self
from snakia.types import empty
from .priv_property import PrivProperty
class HookProperty[T](PrivProperty[T]):
"""
A property that calls a function when the property is set, get, or deleted.
"""
__slots__ = ("__on_set", "__on_get", "__on_del")
def __init__(
self,
on_get: Callable[[T], None],
on_set: Callable[[T], None] = empty.func,
on_del: Callable[[T], None] = empty.func,
) -> None:
super().__init__()
self.__on_set: Callable[[T], None] = on_set
self.__on_get: Callable[[T], None] = on_get
self.__on_del: Callable[[T], None] = on_del
def __get__(self, instance: Any, owner: type | None = None, /) -> T:
value = super().__get__(instance, owner)
self.__on_get(value)
return value
def __set__(self, instance: Any, value: T, /) -> None:
self.__on_set(value)
return super().__set__(instance, value)
def __delete__(self, instance: Any, /) -> None:
value = super().__get__(instance)
self.__on_del(value)
return super().__delete__(instance)
def getter(self, on_get: Callable[[T], None], /) -> Self:
"""Descriptor getter."""
self.__on_get = on_get
return self
def setter(self, on_set: Callable[[T], None], /) -> Self:
"""Descriptor setter."""
self.__on_set = on_set
return self
def deleter(self, on_del: Callable[[T], None], /) -> Self:
"""Descriptor deleter."""
self.__on_del = on_del
return self

View file

@ -0,0 +1,17 @@
from typing import Any
from .priv_property import PrivProperty
class Initonly[T](PrivProperty[T]):
"""Property that can only be set once."""
def __set__(self, instance: Any, value: T, /) -> None:
if hasattr(instance, self.name):
return
super().__set__(instance, value)
def initonly() -> Initonly[Any]:
"""Factory for `Initonly`."""
return Initonly()

View file

@ -0,0 +1,29 @@
from typing import Any
class PrivProperty[T]:
__slots__ = "__name", "__default_value"
__name: str
def __init__(self, default_value: T | None = None) -> None:
self.__default_value: T | None = default_value
def __set_name__(self, owner: type, name: str) -> None:
self.__name = f"_{owner.__name__}__{name}"
def __get__(self, instance: Any, owner: type | None = None, /) -> T:
if self.__default_value:
return getattr(instance, self.__name, self.__default_value)
return getattr(instance, self.__name)
def __set__(self, instance: Any, value: T, /) -> None:
setattr(instance, self.__name, value)
def __delete__(self, instance: Any, /) -> None:
delattr(instance, self.__name)
@property
def name(self) -> str:
"""Return the name of the variable associated with the property."""
return self.__name

View file

@ -0,0 +1,42 @@
from typing import Any, Callable, Self
from snakia.types import empty
class Property[T]:
"""
A property that can be set, get, and deleted."""
def __init__(
self,
fget: Callable[[Any], T] = empty.func,
fset: Callable[[Any, T], None] = empty.func,
fdel: Callable[[Any], None] = empty.func,
) -> None:
self.__fget = fget
self.__fset = fset
self.__fdel = fdel
def __get__(self, instance: Any, owner: type | None = None, /) -> T:
return self.__fget(instance)
def __set__(self, instance: Any, value: T, /) -> None:
return self.__fset(instance, value)
def __delete__(self, instance: Any, /) -> None:
return self.__fdel(instance)
def getter(self, fget: Callable[[Any], T], /) -> Self:
"""Descriptor getter."""
self.__fget = fget
return self
def setter(self, fset: Callable[[Any, T], None], /) -> Self:
"""Descriptor setter."""
self.__fset = fset
return self
def deleter(self, fdel: Callable[[Any], None], /) -> Self:
"""Descriptor deleter."""
self.__fdel = fdel
return self

View file

@ -0,0 +1,33 @@
from typing import Any, Callable
class Readonly[T]:
"""
Readonly property.
"""
__slots__ = ("__fget",)
def __init__(
self,
fget: Callable[[Any], T],
) -> None:
self.__fget = fget
def __get__(self, instance: Any, owner: type | None = None, /) -> T:
return self.__fget(instance)
def __set__(self, instance: Any, value: T, /) -> None:
pass
def readonly[T](value: T) -> Readonly[T]:
"""Create a readonly property with the given value.
Args:
value (T): The value to set the readonly property to.
Returns:
Readonly[T]: The readonly property.
"""
return Readonly(lambda _: value)

View file

@ -0,0 +1,5 @@
from .os import OSRandom
from .python import PythonRandom
from .random import Random
__all__ = ["OSRandom", "PythonRandom", "Random"]

18
src/snakia/random/os.py Normal file
View file

@ -0,0 +1,18 @@
import os
from .random import Random
class OSRandom(Random[None]):
"""
A random number generator that uses the OS (cryptographically secure) to generate random bytes.
"""
def bits(self, k: int) -> int:
return int.from_bytes(os.urandom((k + 7) // 8)) & ((1 << k) - 1)
def get_state(self) -> None:
return None
def set_state(self, value: None) -> None:
pass

View file

@ -0,0 +1,16 @@
import random
from .random import Random
type _State = tuple[int, tuple[int, ...], int | float | None]
class PythonRandom(Random[_State]):
def bits(self, k: int) -> int:
return random.getrandbits(k)
def get_state(self) -> _State:
return random.getstate()
def set_state(self, value: _State) -> None:
random.setstate(value)

View file

@ -0,0 +1,58 @@
import builtins
from abc import ABC, abstractmethod
from typing import Any, MutableSequence, Sequence, final
class Random[S](ABC):
"""
A random number generator.
"""
@abstractmethod
def bits(self, k: builtins.int) -> builtins.int:
"""Return k random bits."""
@abstractmethod
def set_state(self, value: S) -> None:
"""Set the state of the random number generator."""
@abstractmethod
def get_state(self) -> S:
"""Get the state of the random number generator."""
@final
def bytes(self, n: builtins.int) -> bytes:
"""Return n random bytes."""
return self.bits(n * 8).to_bytes(n, "little")
@final
def below(self, n: builtins.int) -> builtins.int:
"""Return a random int in the range [0,n). Defined for n > 0."""
k = n.bit_length()
while True:
x = self.bits(k)
if x < n:
return x
@final
def int(self, start: builtins.int, end: builtins.int) -> builtins.int:
"""Return a random int in the range [start, end]."""
return self.below(end + 1 - start) + start
@final
def float(self) -> float:
"""Return a random float in the range [0.0, 1.0)."""
return self.bits(32) / (1 << 32)
@final
def choice[T](self, seq: Sequence[T]) -> T:
"""Return a random element from a non-empty sequence."""
return seq[self.below(len(seq))]
@final
def shuffle[T: MutableSequence[Any]](self, seq: T) -> T:
"""Shuffle a sequence in place."""
for i in range(len(seq) - 1, 0, -1):
j = self.below(i + 1)
seq[i], seq[j] = seq[j], seq[i]
return seq

Some files were not shown because too many files have changed in this diff Show more