feat(track-core): add portable training runtimes
Move scheme and PID training runtime behavior into the pure track_core layer and expose render sinks for injected strip application. Add ESP compatibility adapters, Python bindings/test scaffolding, and in-memory render support so app_track_bt can consume core render and runtime logic without duplicating it. Cover circular/linear rendering boundaries, all scheme runtime types, scheme render_to parity, PID sample de-duplication, speed suppression, and live tuning in track-core tests.
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
from ._core import (
|
||||
AccelerationProfile,
|
||||
Color,
|
||||
DecodedScheme,
|
||||
MemoryStrip,
|
||||
MTSegment,
|
||||
RepeatedSMSegment,
|
||||
SMSegment,
|
||||
SchemeKind,
|
||||
SchemeTrackRuntime,
|
||||
SchemeTrackState,
|
||||
SchemeTrainingRuntime,
|
||||
STSegment,
|
||||
TrackConfig,
|
||||
TrackDrawKind,
|
||||
TrackError,
|
||||
TrackInfo,
|
||||
TrackRenderPlan,
|
||||
TrackRenderSpan,
|
||||
TrackReport,
|
||||
TrackSchemeStatus,
|
||||
TrackState,
|
||||
make_mileage_time_scheme,
|
||||
make_render_plan,
|
||||
make_repeated_speed_mileage_time_scheme,
|
||||
make_scheme_track_runtime,
|
||||
make_speed_mileage_scheme,
|
||||
make_speed_time_scheme,
|
||||
render_pixels,
|
||||
scheme_track_info,
|
||||
scheme_track_report,
|
||||
start_scheme_track,
|
||||
stop_scheme_track,
|
||||
tick_scheme_track,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"AccelerationProfile",
|
||||
"Color",
|
||||
"DecodedScheme",
|
||||
"MemoryStrip",
|
||||
"MTSegment",
|
||||
"RepeatedSMSegment",
|
||||
"SMSegment",
|
||||
"SchemeKind",
|
||||
"SchemeTrackRuntime",
|
||||
"SchemeTrackState",
|
||||
"SchemeTrainingRuntime",
|
||||
"STSegment",
|
||||
"TrackConfig",
|
||||
"TrackDrawKind",
|
||||
"TrackError",
|
||||
"TrackInfo",
|
||||
"TrackRenderPlan",
|
||||
"TrackRenderSpan",
|
||||
"TrackReport",
|
||||
"TrackSchemeStatus",
|
||||
"TrackState",
|
||||
"make_mileage_time_scheme",
|
||||
"make_render_plan",
|
||||
"make_repeated_speed_mileage_time_scheme",
|
||||
"make_scheme_track_runtime",
|
||||
"make_speed_mileage_scheme",
|
||||
"make_speed_time_scheme",
|
||||
"render_pixels",
|
||||
"scheme_track_info",
|
||||
"scheme_track_report",
|
||||
"start_scheme_track",
|
||||
"stop_scheme_track",
|
||||
"tick_scheme_track",
|
||||
]
|
||||
@@ -0,0 +1,304 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import time
|
||||
|
||||
from . import (
|
||||
AccelerationProfile,
|
||||
Color,
|
||||
MTSegment,
|
||||
RepeatedSMSegment,
|
||||
SMSegment,
|
||||
STSegment,
|
||||
SchemeTrainingRuntime,
|
||||
TrackConfig,
|
||||
TrackDrawKind,
|
||||
TrackInfo,
|
||||
TrackReport,
|
||||
TrackState,
|
||||
make_mileage_time_scheme,
|
||||
make_repeated_speed_mileage_time_scheme,
|
||||
make_speed_mileage_scheme,
|
||||
make_speed_time_scheme,
|
||||
render_pixels,
|
||||
)
|
||||
|
||||
|
||||
DRAWLIST_TAG = "track_core_drawlist"
|
||||
|
||||
|
||||
def _load_dearpygui():
|
||||
try:
|
||||
import dearpygui.dearpygui as dpg
|
||||
except ImportError as exc:
|
||||
raise SystemExit(
|
||||
"DearPyGui is not installed. Install the emulator extra with "
|
||||
'`python -m pip install -e ".[emulator]"`.'
|
||||
) from exc
|
||||
return dpg
|
||||
|
||||
|
||||
def _rgba(color: Color) -> tuple[int, int, int, int]:
|
||||
return (color.r, color.g, color.b, 255)
|
||||
|
||||
|
||||
def _sm(speed: float, mileage_m: int) -> SMSegment:
|
||||
segment = SMSegment()
|
||||
segment.speed_m_s = speed
|
||||
segment.mileage_from_start_m = mileage_m
|
||||
return segment
|
||||
|
||||
|
||||
def _mt(mileage_m: int, time_s: int) -> MTSegment:
|
||||
segment = MTSegment()
|
||||
segment.mileage_to_travel_this_segment_m = mileage_m
|
||||
segment.time_since_start_s = time_s
|
||||
return segment
|
||||
|
||||
|
||||
def _st(speed: float, time_s: int) -> STSegment:
|
||||
segment = STSegment()
|
||||
segment.speed_m_s = speed
|
||||
segment.time_since_start_s = time_s
|
||||
return segment
|
||||
|
||||
|
||||
def _rsmt(time_s: int, segments: list[SMSegment]) -> RepeatedSMSegment:
|
||||
segment = RepeatedSMSegment()
|
||||
segment.time_since_start_s = time_s
|
||||
segment.speed_mileage_segments = segments
|
||||
return segment
|
||||
|
||||
|
||||
def _profile_from_ui(dpg) -> AccelerationProfile:
|
||||
return (
|
||||
AccelerationProfile.smooth
|
||||
if dpg.get_value("accel_profile") == "smooth"
|
||||
else AccelerationProfile.instant
|
||||
)
|
||||
|
||||
|
||||
def _color_from_ui(dpg) -> Color:
|
||||
rgb = dpg.get_value("runner_color")
|
||||
return Color(int(rgb[0]), int(rgb[1]), int(rgb[2]))
|
||||
|
||||
|
||||
def _config_from_ui(dpg) -> TrackConfig:
|
||||
line_length = float(dpg.get_value("line_length"))
|
||||
active_length = min(float(dpg.get_value("active_length")), max(0.1, line_length - 0.001))
|
||||
|
||||
config = TrackConfig.default_config()
|
||||
config.draw_kind = (
|
||||
TrackDrawKind.circular
|
||||
if dpg.get_value("draw_kind") == "circular"
|
||||
else TrackDrawKind.linear
|
||||
)
|
||||
config.line_length_m = line_length
|
||||
config.active_line_length_m = active_length
|
||||
config.head_offset_m = float(dpg.get_value("head_offset"))
|
||||
config.line_leds_num = int(dpg.get_value("led_count"))
|
||||
return config
|
||||
|
||||
|
||||
def _manual_state_to_core(dpg) -> tuple[TrackConfig, TrackInfo, TrackReport]:
|
||||
config = _config_from_ui(dpg)
|
||||
color = _color_from_ui(dpg)
|
||||
|
||||
info = TrackInfo()
|
||||
info.color = color
|
||||
info.id = 1
|
||||
info.is_running = dpg.get_value("manual_running")
|
||||
info.num_segments = 1
|
||||
|
||||
report = TrackReport()
|
||||
report.id = 1
|
||||
report.state = TrackState.run if info.is_running else TrackState.stop
|
||||
report.mileage_m = dpg.get_value("manual_mileage")
|
||||
report.speed_m_s = dpg.get_value("manual_speed")
|
||||
report.time_elapsed_ms = 0
|
||||
return config, info, report
|
||||
|
||||
|
||||
def _draw_linear(dpg, pixels: list[Color]) -> None:
|
||||
width = 840
|
||||
x0 = 30
|
||||
y0 = 180
|
||||
gap = 1 if len(pixels) <= 240 else 0
|
||||
cell_w = max(1, int((width - gap * max(0, len(pixels) - 1)) / max(1, len(pixels))))
|
||||
cell_h = 34
|
||||
|
||||
for index, color in enumerate(pixels):
|
||||
x = x0 + index * (cell_w + gap)
|
||||
dpg.draw_rectangle(
|
||||
(x, y0),
|
||||
(x + cell_w, y0 + cell_h),
|
||||
color=(55, 58, 64, 255),
|
||||
fill=_rgba(color),
|
||||
parent=DRAWLIST_TAG,
|
||||
)
|
||||
|
||||
dpg.draw_rectangle(
|
||||
(x0 - 1, y0 - 1),
|
||||
(x0 + width + 1, y0 + cell_h + 1),
|
||||
color=(90, 96, 108, 255),
|
||||
parent=DRAWLIST_TAG,
|
||||
)
|
||||
dpg.draw_text((x0, y0 + 52), f"{len(pixels)} LEDs", color=(190, 195, 205, 255), parent=DRAWLIST_TAG)
|
||||
|
||||
|
||||
def _draw_circular(dpg, pixels: list[Color]) -> None:
|
||||
center = (440, 158)
|
||||
radius = 122
|
||||
led_radius = 5 if len(pixels) <= 160 else 3
|
||||
|
||||
dpg.draw_circle(center, radius, color=(80, 86, 98, 255), thickness=1, parent=DRAWLIST_TAG)
|
||||
for index, color in enumerate(pixels):
|
||||
angle = (2.0 * math.pi * index / max(1, len(pixels))) - (math.pi / 2.0)
|
||||
pos = (
|
||||
center[0] + math.cos(angle) * radius,
|
||||
center[1] + math.sin(angle) * radius,
|
||||
)
|
||||
dpg.draw_circle(
|
||||
pos,
|
||||
led_radius,
|
||||
color=(42, 45, 52, 255),
|
||||
fill=_rgba(color),
|
||||
parent=DRAWLIST_TAG,
|
||||
)
|
||||
|
||||
dpg.draw_text((30, 300), f"{len(pixels)} LEDs", color=(190, 195, 205, 255), parent=DRAWLIST_TAG)
|
||||
|
||||
|
||||
class TrackRuntimeApp:
|
||||
def __init__(self) -> None:
|
||||
self.runtime = SchemeTrainingRuntime()
|
||||
self.last_frame_s = time.perf_counter()
|
||||
|
||||
def build_scheme(self, dpg):
|
||||
kind = dpg.get_value("scheme_kind")
|
||||
color = _color_from_ui(dpg)
|
||||
profile = _profile_from_ui(dpg)
|
||||
|
||||
if kind == "SM":
|
||||
return make_speed_mileage_scheme(
|
||||
1,
|
||||
color,
|
||||
profile,
|
||||
[_sm(1.0, 0), _sm(2.8, 10), _sm(1.2, 24), _sm(2.0, 40)],
|
||||
)
|
||||
if kind == "MT":
|
||||
return make_mileage_time_scheme(
|
||||
1,
|
||||
color,
|
||||
profile,
|
||||
[_mt(12, 0), _mt(20, 6), _mt(10, 18), _mt(1, 28)],
|
||||
)
|
||||
if kind == "RSMT":
|
||||
return make_repeated_speed_mileage_time_scheme(
|
||||
1,
|
||||
color,
|
||||
profile,
|
||||
[
|
||||
_rsmt(0, [_sm(1.0, 0), _sm(3.0, 10), _sm(1.2, 24)]),
|
||||
_rsmt(18, [_sm(2.2, 0), _sm(4.0, 8), _sm(1.0, 20)]),
|
||||
_rsmt(42, [_sm(1.4, 0), _sm(2.4, 12), _sm(1.4, 24)]),
|
||||
],
|
||||
)
|
||||
return make_speed_time_scheme(
|
||||
1,
|
||||
color,
|
||||
profile,
|
||||
[_st(1.0, 0), _st(3.2, 8), _st(1.4, 18), _st(2.4, 30), _st(0.8, 42)],
|
||||
)
|
||||
|
||||
def load_scheme(self, dpg) -> None:
|
||||
self.runtime.clear()
|
||||
self.runtime.add_scheme(self.build_scheme(dpg))
|
||||
self.runtime.start()
|
||||
self.last_frame_s = time.perf_counter()
|
||||
|
||||
def start(self) -> None:
|
||||
self.runtime.start()
|
||||
self.last_frame_s = time.perf_counter()
|
||||
|
||||
def stop(self) -> None:
|
||||
self.runtime.stop()
|
||||
|
||||
def _runtime_pixels(self, dpg, config: TrackConfig) -> list[Color]:
|
||||
now_s = time.perf_counter()
|
||||
delta_s = min(0.1, now_s - self.last_frame_s)
|
||||
self.last_frame_s = now_s
|
||||
|
||||
if dpg.get_value("source_mode") == "runtime":
|
||||
self.runtime.tick(config, delta_s * float(dpg.get_value("time_scale")))
|
||||
return self.runtime.render_pixels(config)
|
||||
|
||||
manual_config, info, report = _manual_state_to_core(dpg)
|
||||
return render_pixels(manual_config, info, report)
|
||||
|
||||
def render(self, dpg) -> None:
|
||||
dpg.delete_item(DRAWLIST_TAG, children_only=True)
|
||||
config = _config_from_ui(dpg)
|
||||
pixels = self._runtime_pixels(dpg, config)
|
||||
if config.draw_kind == TrackDrawKind.circular:
|
||||
_draw_circular(dpg, pixels)
|
||||
else:
|
||||
_draw_linear(dpg, pixels)
|
||||
|
||||
states = self.runtime.state_collection()
|
||||
if states:
|
||||
state = states[0]
|
||||
dpg.draw_text(
|
||||
(30, 18),
|
||||
f"id {state.id} {state.time_elapsed_ms / 1000.0:5.1f}s "
|
||||
f"{state.mileage_m:6.2f}m {state.speed_m_s:4.2f}m/s",
|
||||
color=(220, 225, 235, 255),
|
||||
parent=DRAWLIST_TAG,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
dpg = _load_dearpygui()
|
||||
app = TrackRuntimeApp()
|
||||
|
||||
dpg.create_context()
|
||||
dpg.create_viewport(title="track-core emulator", width=920, height=560)
|
||||
|
||||
with dpg.window(label="track-core emulator", tag="main_window", width=900, height=540):
|
||||
with dpg.group(horizontal=True):
|
||||
dpg.add_combo(("runtime", "manual"), default_value="runtime", label="source", tag="source_mode")
|
||||
dpg.add_combo(("ST", "SM", "MT", "RSMT"), default_value="ST", label="scheme", tag="scheme_kind", callback=lambda *_: app.load_scheme(dpg))
|
||||
dpg.add_combo(("instant", "smooth"), default_value="smooth", label="accel", tag="accel_profile", callback=lambda *_: app.load_scheme(dpg))
|
||||
dpg.add_combo(("circular", "linear"), default_value="circular", label="draw", tag="draw_kind")
|
||||
|
||||
with dpg.group(horizontal=True):
|
||||
dpg.add_button(label="load", callback=lambda *_: app.load_scheme(dpg))
|
||||
dpg.add_button(label="start", callback=lambda *_: app.start())
|
||||
dpg.add_button(label="stop", callback=lambda *_: app.stop())
|
||||
dpg.add_color_edit(default_value=(0, 255, 0, 255), label="color", tag="runner_color", no_alpha=True, callback=lambda *_: app.load_scheme(dpg))
|
||||
|
||||
dpg.add_slider_float(label="time scale", tag="time_scale", default_value=1.0, min_value=0.0, max_value=8.0)
|
||||
dpg.add_slider_float(label="line length m", tag="line_length", default_value=40.0, min_value=5.0, max_value=80.0)
|
||||
dpg.add_slider_float(label="active length m", tag="active_length", default_value=10.0, min_value=0.1, max_value=40.0)
|
||||
dpg.add_slider_float(label="head offset m", tag="head_offset", default_value=0.0, min_value=-40.0, max_value=40.0)
|
||||
dpg.add_slider_int(label="LEDs", tag="led_count", default_value=160, min_value=8, max_value=400)
|
||||
dpg.add_checkbox(label="manual running", default_value=True, tag="manual_running")
|
||||
dpg.add_slider_float(label="manual mileage m", tag="manual_mileage", default_value=8.5, min_value=-80.0, max_value=120.0)
|
||||
dpg.add_slider_float(label="manual speed m/s", tag="manual_speed", default_value=1.0, min_value=0.0, max_value=10.0)
|
||||
|
||||
dpg.add_drawlist(width=880, height=330, tag=DRAWLIST_TAG)
|
||||
|
||||
app.load_scheme(dpg)
|
||||
dpg.setup_dearpygui()
|
||||
dpg.set_primary_window("main_window", True)
|
||||
dpg.show_viewport()
|
||||
|
||||
while dpg.is_dearpygui_running():
|
||||
app.render(dpg)
|
||||
dpg.render_dearpygui_frame()
|
||||
|
||||
dpg.destroy_context()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user