feat(emulator): add editable training schema panel

Redesign the DearPyGui emulator into a two-column layout with an interactive training schema editor and runtime preview.

Add JSON load/save support for ST, SM, MT, and RSMT schemes, including draft normalization, conversion to core schemes, and explicit Apply validation so intermediate edits do not restart or block the preview.

Update manual mode to clamp negative mileage, remove speed from manual positioning, preserve stopped runtime state when reloading schemas, and keep linear rendering visible with span-based drawing.

Fix the core linear pingpong render plan so reverse travel remains visible near endpoints and prefix/postfix colors stay tied to physical sides instead of reversing with heading.

Add C++ and Python regressions for schema conversion, emulator edit behavior, manual mode, linear endpoint visibility, and fixed prefix/postfix colors.
This commit is contained in:
2026-05-18 17:23:00 +08:00
parent 1005e50be0
commit 7b0f6a523d
5 changed files with 1178 additions and 158 deletions
+579 -131
View File
@@ -2,29 +2,36 @@ from __future__ import annotations
import math
import time
from pathlib import Path
from . import (
AccelerationProfile,
Color,
MTSegment,
RepeatedSMSegment,
SMSegment,
STSegment,
SchemeTrainingRuntime,
TrackConfig,
TrackDrawKind,
TrackInfo,
TrackReport,
TrackState,
make_mileage_time_scheme,
make_repeated_speed_mileage_time_scheme,
make_speed_mileage_scheme,
make_speed_time_scheme,
render_pixels,
)
from .emulator_scheme import (
ACCELERATION_PROFILES,
SCHEME_KINDS,
clone_draft,
default_draft,
draft_to_scheme,
load_draft,
normalize_draft,
save_draft,
)
DRAWLIST_TAG = "track_core_drawlist"
EDITOR_TAG = "scheme_editor"
STATUS_TAG = "schema_status"
FILE_PATH_TAG = "schema_file_path"
LOAD_DIALOG_TAG = "schema_load_dialog"
SAVE_DIALOG_TAG = "schema_save_dialog"
def _load_dearpygui():
@@ -42,44 +49,8 @@ def _rgba(color: Color) -> tuple[int, int, int, int]:
return (color.r, color.g, color.b, 255)
def _sm(speed: float, mileage_m: int) -> SMSegment:
segment = SMSegment()
segment.speed_m_s = speed
segment.mileage_from_start_m = mileage_m
return segment
def _mt(mileage_m: int, time_s: int) -> MTSegment:
segment = MTSegment()
segment.mileage_to_travel_this_segment_m = mileage_m
segment.time_since_start_s = time_s
return segment
def _st(speed: float, time_s: int) -> STSegment:
segment = STSegment()
segment.speed_m_s = speed
segment.time_since_start_s = time_s
return segment
def _rsmt(time_s: int, segments: list[SMSegment]) -> RepeatedSMSegment:
segment = RepeatedSMSegment()
segment.time_since_start_s = time_s
segment.speed_mileage_segments = segments
return segment
def _profile_from_ui(dpg) -> AccelerationProfile:
return (
AccelerationProfile.smooth
if dpg.get_value("accel_profile") == "smooth"
else AccelerationProfile.instant
)
def _color_from_ui(dpg) -> Color:
rgb = dpg.get_value("runner_color")
def _color_from_draft(draft: dict) -> Color:
rgb = draft.get("color", [0, 255, 0])
return Color(int(rgb[0]), int(rgb[1]), int(rgb[2]))
@@ -100,43 +71,73 @@ def _config_from_ui(dpg) -> TrackConfig:
return config
def _manual_state_to_core(dpg) -> tuple[TrackConfig, TrackInfo, TrackReport]:
def _toggle_draw_kind(dpg) -> None:
next_kind = "linear" if dpg.get_value("draw_kind") == "circular" else "circular"
dpg.set_value("draw_kind", next_kind)
def _manual_state_to_core(dpg, draft: dict | None = None) -> tuple[TrackConfig, TrackInfo, TrackReport]:
config = _config_from_ui(dpg)
color = _color_from_ui(dpg)
color = _color_from_draft(draft or default_draft())
info = TrackInfo()
info.color = color
info.id = 1
info.id = int((draft or {}).get("id", 1))
info.is_running = dpg.get_value("manual_running")
info.num_segments = 1
report = TrackReport()
report.id = 1
report.id = info.id
report.state = TrackState.run if info.is_running else TrackState.stop
report.mileage_m = dpg.get_value("manual_mileage")
report.speed_m_s = dpg.get_value("manual_speed")
report.mileage_m = max(0.0, float(dpg.get_value("manual_mileage")))
report.speed_m_s = 0.0
report.time_elapsed_ms = 0
return config, info, report
def _draw_linear(dpg, pixels: list[Color]) -> None:
width = 840
x0 = 30
y0 = 180
width = 560
x0 = 24
y0 = 174
gap = 1 if len(pixels) <= 240 else 0
cell_w = max(1, int((width - gap * max(0, len(pixels) - 1)) / max(1, len(pixels))))
cell_h = 34
pitch = cell_w + gap
for index, color in enumerate(pixels):
x = x0 + index * (cell_w + gap)
dpg.draw_rectangle(
(x0, y0),
(x0 + width, y0 + cell_h),
color=(55, 58, 64, 255),
fill=(16, 18, 22, 255),
parent=DRAWLIST_TAG,
)
index = 0
while index < len(pixels):
color = pixels[index]
if color == Color.black():
index += 1
continue
start = index
while index < len(pixels) and pixels[index] == color:
index += 1
count = index - start
x = x0 + start * pitch
span_width = count * cell_w + max(0, count - 1) * gap
dpg.draw_rectangle(
(x, y0),
(x + cell_w, y0 + cell_h),
color=(55, 58, 64, 255),
(x + span_width, y0 + cell_h),
color=_rgba(color),
fill=_rgba(color),
parent=DRAWLIST_TAG,
)
for index in range(len(pixels) + 1):
x = x0 + index * pitch - (gap if index == len(pixels) and gap else 0)
dpg.draw_line((x, y0), (x, y0 + cell_h), color=(70, 74, 84, 255), parent=DRAWLIST_TAG)
dpg.draw_rectangle(
(x0 - 1, y0 - 1),
(x0 + width + 1, y0 + cell_h + 1),
@@ -147,7 +148,7 @@ def _draw_linear(dpg, pixels: list[Color]) -> None:
def _draw_circular(dpg, pixels: list[Color]) -> None:
center = (440, 158)
center = (304, 158)
radius = 122
led_radius = 5 if len(pixels) <= 160 else 3
@@ -166,56 +167,42 @@ def _draw_circular(dpg, pixels: list[Color]) -> None:
parent=DRAWLIST_TAG,
)
dpg.draw_text((30, 300), f"{len(pixels)} LEDs", color=(190, 195, 205, 255), parent=DRAWLIST_TAG)
dpg.draw_text((24, 300), f"{len(pixels)} LEDs", color=(190, 195, 205, 255), parent=DRAWLIST_TAG)
class TrackRuntimeApp:
def __init__(self) -> None:
self.runtime = SchemeTrainingRuntime()
self.draft = default_draft("ST")
self.file_path: Path | None = None
self.last_frame_s = time.perf_counter()
self.last_valid_draft = clone_draft(self.draft)
self.status = "Ready"
self.has_unapplied_edits = False
def build_scheme(self, dpg):
kind = dpg.get_value("scheme_kind")
color = _color_from_ui(dpg)
profile = _profile_from_ui(dpg)
def load_scheme(self, dpg, run: bool | None = None) -> bool:
if run is None:
run = True if not self.runtime.has_program() else not self.runtime.all_stopped()
if kind == "SM":
return make_speed_mileage_scheme(
1,
color,
profile,
[_sm(1.0, 0), _sm(2.8, 10), _sm(1.2, 24), _sm(2.0, 40)],
)
if kind == "MT":
return make_mileage_time_scheme(
1,
color,
profile,
[_mt(12, 0), _mt(20, 6), _mt(10, 18), _mt(1, 28)],
)
if kind == "RSMT":
return make_repeated_speed_mileage_time_scheme(
1,
color,
profile,
[
_rsmt(0, [_sm(1.0, 0), _sm(3.0, 10), _sm(1.2, 24)]),
_rsmt(18, [_sm(2.2, 0), _sm(4.0, 8), _sm(1.0, 20)]),
_rsmt(42, [_sm(1.4, 0), _sm(2.4, 12), _sm(1.4, 24)]),
],
)
return make_speed_time_scheme(
1,
color,
profile,
[_st(1.0, 0), _st(3.2, 8), _st(1.4, 18), _st(2.4, 30), _st(0.8, 42)],
)
try:
normalized = normalize_draft(self.draft)
scheme = draft_to_scheme(normalized)
except Exception as exc:
self.set_status(dpg, f"Invalid scheme: {exc}", error=True)
return False
def load_scheme(self, dpg) -> None:
self.draft = normalized
self.runtime.clear()
self.runtime.add_scheme(self.build_scheme(dpg))
self.runtime.start()
self.runtime.add_scheme(scheme)
if run:
self.runtime.start()
else:
self.runtime.stop()
self.last_frame_s = time.perf_counter()
self.last_valid_draft = clone_draft(normalized)
self.has_unapplied_edits = False
self.set_status(dpg, "Loaded into runtime" if run else "Loaded into runtime (stopped)")
return True
def start(self) -> None:
self.runtime.start()
@@ -224,6 +211,99 @@ class TrackRuntimeApp:
def stop(self) -> None:
self.runtime.stop()
def set_status(self, dpg, message: str, error: bool = False) -> None:
self.status = message
if dpg.does_item_exist(STATUS_TAG):
dpg.set_value(STATUS_TAG, message)
dpg.configure_item(STATUS_TAG, color=(255, 110, 90) if error else (170, 220, 170))
def refresh_file_path(self, dpg) -> None:
if dpg.does_item_exist(FILE_PATH_TAG):
dpg.set_value(FILE_PATH_TAG, str(self.file_path) if self.file_path else "")
def mark_dirty(self, dpg) -> None:
self.has_unapplied_edits = True
self.set_status(dpg, "Edited; click Apply to validate and reload")
def apply_editor_values(self, dpg) -> None:
self.draft["id"] = int(dpg.get_value("schema_id"))
self.draft["kind"] = dpg.get_value("schema_kind")
self.draft["acceleration_profile"] = dpg.get_value("schema_accel")
color = dpg.get_value("schema_color")
self.draft["color"] = [int(color[0]), int(color[1]), int(color[2])]
def apply_schema(self, dpg) -> bool:
self.apply_editor_values(dpg)
return self.load_scheme(dpg)
def reset_template(self, dpg, kind: str | None = None) -> None:
self.draft = default_draft(kind or self.draft.get("kind", "ST"))
self.file_path = None
self.refresh_file_path(dpg)
rebuild_editor(dpg, self)
self.mark_dirty(dpg)
def save_current(self, dpg) -> None:
if self.file_path is None:
dpg.show_item(SAVE_DIALOG_TAG)
return
try:
self.draft = save_draft(self.file_path, self.draft)
self.set_status(dpg, f"Saved {self.file_path}")
rebuild_editor(dpg, self)
except Exception as exc:
self.set_status(dpg, f"Save failed: {exc}", error=True)
def save_as(self, dpg, path: Path) -> None:
if path.suffix == "":
path = path.with_suffix(".json")
try:
self.draft = save_draft(path, self.draft)
self.file_path = path
self.refresh_file_path(dpg)
self.set_status(dpg, f"Saved {path}")
rebuild_editor(dpg, self)
except Exception as exc:
self.set_status(dpg, f"Save failed: {exc}", error=True)
def load_from_file(self, dpg, path: Path) -> None:
try:
draft = load_draft(path)
draft_to_scheme(draft)
except Exception as exc:
self.set_status(dpg, f"Load failed: {exc}", error=True)
return
self.draft = draft
self.file_path = path
self.refresh_file_path(dpg)
rebuild_editor(dpg, self)
self.mark_dirty(dpg)
self.set_status(dpg, f"Loaded {path}; click Apply to validate and reload")
def _draw_status(self, dpg) -> None:
if dpg.get_value("source_mode") == "manual":
_, info, report = _manual_state_to_core(dpg, self.last_valid_draft)
state = "run" if info.is_running else "stop"
dpg.draw_text(
(24, 18),
f"manual {state} {report.mileage_m:6.2f}m {dpg.get_value('draw_kind')} model",
color=(220, 225, 235, 255),
parent=DRAWLIST_TAG,
)
return
states = self.runtime.state_collection()
if states:
state = states[0]
dpg.draw_text(
(24, 18),
f"id {state.id} {state.time_elapsed_ms / 1000.0:5.1f}s "
f"{state.mileage_m:6.2f}m {state.speed_m_s:4.2f}m/s",
color=(220, 225, 235, 255),
parent=DRAWLIST_TAG,
)
def _runtime_pixels(self, dpg, config: TrackConfig) -> list[Color]:
now_s = time.perf_counter()
delta_s = min(0.1, now_s - self.last_frame_s)
@@ -233,7 +313,7 @@ class TrackRuntimeApp:
self.runtime.tick(config, delta_s * float(dpg.get_value("time_scale")))
return self.runtime.render_pixels(config)
manual_config, info, report = _manual_state_to_core(dpg)
manual_config, info, report = _manual_state_to_core(dpg, self.last_valid_draft)
return render_pixels(manual_config, info, report)
def render(self, dpg) -> None:
@@ -245,16 +325,380 @@ class TrackRuntimeApp:
else:
_draw_linear(dpg, pixels)
states = self.runtime.state_collection()
if states:
state = states[0]
dpg.draw_text(
(30, 18),
f"id {state.id} {state.time_elapsed_ms / 1000.0:5.1f}s "
f"{state.mileage_m:6.2f}m {state.speed_m_s:4.2f}m/s",
color=(220, 225, 235, 255),
parent=DRAWLIST_TAG,
self._draw_status(dpg)
def _segment_input_float(dpg, tag: str, value: float, callback) -> None:
dpg.add_input_float(
tag=tag,
default_value=float(value),
width=92,
min_value=0.0,
min_clamped=True,
step=0.1,
format="%.2f",
callback=callback,
)
def _segment_input_int(dpg, tag: str, value: int, callback) -> None:
dpg.add_input_int(
tag=tag,
default_value=int(value),
width=82,
min_value=0,
min_clamped=True,
step=1,
callback=callback,
)
def _editor_changed(dpg, app: TrackRuntimeApp) -> None:
app.apply_editor_values(dpg)
app.mark_dirty(dpg)
def _set_simple_segment_value(dpg, app: TrackRuntimeApp, index: int, field: str, value) -> None:
app.draft["segments"][index][field] = value
_editor_changed(dpg, app)
def _set_rsmt_time(dpg, app: TrackRuntimeApp, index: int, value: int) -> None:
app.draft["segments"][index]["time_s"] = value
_editor_changed(dpg, app)
def _set_rsmt_sub_value(
dpg,
app: TrackRuntimeApp,
segment_index: int,
sub_index: int,
field: str,
value,
) -> None:
app.draft["segments"][segment_index]["sub_segments"][sub_index][field] = value
_editor_changed(dpg, app)
def _add_simple_segment(dpg, app: TrackRuntimeApp) -> None:
app.apply_editor_values(dpg)
kind = app.draft["kind"]
segments = app.draft["segments"]
if kind == "SM":
last = segments[-1] if segments else {"mileage_m": 0, "speed_m_s": 1.0}
segments.append({"mileage_m": int(last["mileage_m"]) + 5, "speed_m_s": float(last["speed_m_s"])})
elif kind == "MT":
last = segments[-1] if segments else {"time_s": 0, "mileage_m": 5}
segments.append({"time_s": int(last["time_s"]) + 5, "mileage_m": max(1, int(last["mileage_m"]))})
else:
last = segments[-1] if segments else {"time_s": 0, "speed_m_s": 1.0}
segments.append({"time_s": int(last["time_s"]) + 5, "speed_m_s": float(last["speed_m_s"])})
rebuild_editor(dpg, app)
app.mark_dirty(dpg)
def _remove_simple_segment(dpg, app: TrackRuntimeApp, index: int) -> None:
app.apply_editor_values(dpg)
if len(app.draft["segments"]) > 1:
app.draft["segments"].pop(index)
rebuild_editor(dpg, app)
app.mark_dirty(dpg)
def _duplicate_simple_segment(dpg, app: TrackRuntimeApp, index: int) -> None:
app.apply_editor_values(dpg)
source = clone_draft(app.draft["segments"][index])
if app.draft["kind"] == "SM":
source["mileage_m"] = int(source["mileage_m"]) + 5
else:
source["time_s"] = int(source["time_s"]) + 5
app.draft["segments"].insert(index + 1, source)
rebuild_editor(dpg, app)
app.mark_dirty(dpg)
def _add_rsmt_segment(dpg, app: TrackRuntimeApp) -> None:
app.apply_editor_values(dpg)
segments = app.draft["segments"]
last = segments[-1] if segments else {"time_s": 0}
segments.append(
{
"time_s": int(last["time_s"]) + 10,
"sub_segments": [
{"mileage_m": 0, "speed_m_s": 1.0},
{"mileage_m": 8, "speed_m_s": 1.0},
],
}
)
rebuild_editor(dpg, app)
app.mark_dirty(dpg)
def _remove_rsmt_segment(dpg, app: TrackRuntimeApp, index: int) -> None:
app.apply_editor_values(dpg)
if len(app.draft["segments"]) > 1:
app.draft["segments"].pop(index)
rebuild_editor(dpg, app)
app.mark_dirty(dpg)
def _add_rsmt_sub_segment(dpg, app: TrackRuntimeApp, segment_index: int) -> None:
app.apply_editor_values(dpg)
sub_segments = app.draft["segments"][segment_index]["sub_segments"]
last = sub_segments[-1] if sub_segments else {"mileage_m": 0, "speed_m_s": 1.0}
sub_segments.append({"mileage_m": int(last["mileage_m"]) + 5, "speed_m_s": float(last["speed_m_s"])})
rebuild_editor(dpg, app)
app.mark_dirty(dpg)
def _remove_rsmt_sub_segment(dpg, app: TrackRuntimeApp, segment_index: int, sub_index: int) -> None:
app.apply_editor_values(dpg)
sub_segments = app.draft["segments"][segment_index]["sub_segments"]
if len(sub_segments) > 1:
sub_segments.pop(sub_index)
rebuild_editor(dpg, app)
app.mark_dirty(dpg)
def rebuild_editor(dpg, app: TrackRuntimeApp) -> None:
dpg.delete_item(EDITOR_TAG, children_only=True)
draft = app.draft
with dpg.group(parent=EDITOR_TAG):
dpg.add_text("Training Scheme")
dpg.add_input_text(
label="file",
tag=FILE_PATH_TAG,
default_value=str(app.file_path) if app.file_path else "",
readonly=True,
width=410,
)
with dpg.group(horizontal=True):
dpg.add_button(label="Load JSON", callback=lambda *_: dpg.show_item(LOAD_DIALOG_TAG))
dpg.add_button(label="Apply", callback=lambda *_: app.apply_schema(dpg))
dpg.add_button(label="Save", callback=lambda *_: app.save_current(dpg))
dpg.add_button(label="Save As", callback=lambda *_: dpg.show_item(SAVE_DIALOG_TAG))
dpg.add_button(label="Reset", callback=lambda *_: app.reset_template(dpg))
dpg.add_separator()
with dpg.group(horizontal=True):
dpg.add_input_int(
label="id",
tag="schema_id",
default_value=int(draft["id"]),
width=90,
min_value=0,
max_value=255,
min_clamped=True,
max_clamped=True,
callback=lambda *_: _editor_changed(dpg, app),
)
dpg.add_combo(
SCHEME_KINDS,
label="kind",
tag="schema_kind",
default_value=draft["kind"],
width=96,
callback=lambda *_: app.reset_template(dpg, dpg.get_value("schema_kind")),
)
dpg.add_combo(
ACCELERATION_PROFILES,
label="accel",
tag="schema_accel",
default_value=draft["acceleration_profile"],
width=110,
callback=lambda *_: _editor_changed(dpg, app),
)
dpg.add_color_edit(
default_value=(*draft["color"], 255),
label="color",
tag="schema_color",
no_alpha=True,
width=260,
callback=lambda *_: _editor_changed(dpg, app),
)
dpg.add_separator()
if draft["kind"] == "RSMT":
_build_rsmt_editor(dpg, app)
else:
_build_simple_editor(dpg, app)
dpg.add_separator()
dpg.add_text(app.status, tag=STATUS_TAG, color=(170, 220, 170))
def _build_simple_editor(dpg, app: TrackRuntimeApp) -> None:
kind = app.draft["kind"]
dpg.add_button(label="Add Segment", callback=lambda *_: _add_simple_segment(dpg, app))
with dpg.table(header_row=True, borders_innerH=True, borders_outerH=True, borders_innerV=True):
dpg.add_table_column(label="#", width_fixed=True)
if kind == "SM":
dpg.add_table_column(label="mileage m")
dpg.add_table_column(label="speed m/s")
elif kind == "MT":
dpg.add_table_column(label="time s")
dpg.add_table_column(label="mileage m")
else:
dpg.add_table_column(label="time s")
dpg.add_table_column(label="speed m/s")
dpg.add_table_column(label="actions")
for index, segment in enumerate(app.draft["segments"]):
with dpg.table_row():
dpg.add_text(str(index))
if kind == "SM":
_segment_input_int(
dpg,
f"segment_{index}_mileage",
int(segment["mileage_m"]),
lambda sender, value, user_data: _set_simple_segment_value(
dpg, app, user_data, "mileage_m", int(value)
),
)
dpg.set_item_user_data(f"segment_{index}_mileage", index)
_segment_input_float(
dpg,
f"segment_{index}_speed",
float(segment["speed_m_s"]),
lambda sender, value, user_data: _set_simple_segment_value(
dpg, app, user_data, "speed_m_s", float(value)
),
)
dpg.set_item_user_data(f"segment_{index}_speed", index)
elif kind == "MT":
_segment_input_int(
dpg,
f"segment_{index}_time",
int(segment["time_s"]),
lambda sender, value, user_data: _set_simple_segment_value(
dpg, app, user_data, "time_s", int(value)
),
)
dpg.set_item_user_data(f"segment_{index}_time", index)
_segment_input_int(
dpg,
f"segment_{index}_mileage",
int(segment["mileage_m"]),
lambda sender, value, user_data: _set_simple_segment_value(
dpg, app, user_data, "mileage_m", int(value)
),
)
dpg.set_item_user_data(f"segment_{index}_mileage", index)
else:
_segment_input_int(
dpg,
f"segment_{index}_time",
int(segment["time_s"]),
lambda sender, value, user_data: _set_simple_segment_value(
dpg, app, user_data, "time_s", int(value)
),
)
dpg.set_item_user_data(f"segment_{index}_time", index)
_segment_input_float(
dpg,
f"segment_{index}_speed",
float(segment["speed_m_s"]),
lambda sender, value, user_data: _set_simple_segment_value(
dpg, app, user_data, "speed_m_s", float(value)
),
)
dpg.set_item_user_data(f"segment_{index}_speed", index)
with dpg.group(horizontal=True):
dpg.add_button(
label="copy",
callback=lambda _, __, ___=None, i=index: _duplicate_simple_segment(dpg, app, i),
)
dpg.add_button(
label="del",
callback=lambda _, __, ___=None, i=index: _remove_simple_segment(dpg, app, i),
)
def _build_rsmt_editor(dpg, app: TrackRuntimeApp) -> None:
dpg.add_button(label="Add Time Segment", callback=lambda *_: _add_rsmt_segment(dpg, app))
for segment_index, segment in enumerate(app.draft["segments"]):
with dpg.collapsing_header(label=f"time segment {segment_index}", default_open=True):
with dpg.group(horizontal=True):
dpg.add_text("time s")
_segment_input_int(
dpg,
f"rsmt_{segment_index}_time",
int(segment["time_s"]),
lambda sender, value, user_data: _set_rsmt_time(dpg, app, user_data, int(value)),
)
dpg.set_item_user_data(f"rsmt_{segment_index}_time", segment_index)
dpg.add_button(
label="Add Sub-Segment",
callback=lambda _, __, ___=None, i=segment_index: _add_rsmt_sub_segment(dpg, app, i),
)
dpg.add_button(
label="Delete Time Segment",
callback=lambda _, __, ___=None, i=segment_index: _remove_rsmt_segment(dpg, app, i),
)
with dpg.table(header_row=True, borders_innerH=True, borders_outerH=True, borders_innerV=True):
dpg.add_table_column(label="#")
dpg.add_table_column(label="mileage m")
dpg.add_table_column(label="speed m/s")
dpg.add_table_column(label="actions")
for sub_index, sub_segment in enumerate(segment["sub_segments"]):
with dpg.table_row():
dpg.add_text(str(sub_index))
_segment_input_int(
dpg,
f"rsmt_{segment_index}_{sub_index}_mileage",
int(sub_segment["mileage_m"]),
lambda sender, value, user_data: _set_rsmt_sub_value(
dpg, app, user_data[0], user_data[1], "mileage_m", int(value)
),
)
dpg.set_item_user_data(f"rsmt_{segment_index}_{sub_index}_mileage", (segment_index, sub_index))
_segment_input_float(
dpg,
f"rsmt_{segment_index}_{sub_index}_speed",
float(sub_segment["speed_m_s"]),
lambda sender, value, user_data: _set_rsmt_sub_value(
dpg, app, user_data[0], user_data[1], "speed_m_s", float(value)
),
)
dpg.set_item_user_data(f"rsmt_{segment_index}_{sub_index}_speed", (segment_index, sub_index))
dpg.add_button(
label="del",
callback=lambda _, __, ___=None, i=segment_index, j=sub_index: _remove_rsmt_sub_segment(
dpg, app, i, j
),
)
def _dialog_path(app_data) -> Path:
if "file_path_name" in app_data:
return Path(app_data["file_path_name"])
return Path(app_data["current_path"]) / app_data["file_name"]
def _build_file_dialogs(dpg, app: TrackRuntimeApp) -> None:
with dpg.file_dialog(
directory_selector=False,
show=False,
callback=lambda _, app_data, ___=None: app.load_from_file(dpg, _dialog_path(app_data)),
tag=LOAD_DIALOG_TAG,
width=700,
height=420,
):
dpg.add_file_extension(".json", color=(120, 190, 255, 255))
with dpg.file_dialog(
directory_selector=False,
show=False,
callback=lambda _, app_data, ___=None: app.save_as(dpg, _dialog_path(app_data)),
tag=SAVE_DIALOG_TAG,
width=700,
height=420,
):
dpg.add_file_extension(".json", color=(120, 190, 255, 255))
def main() -> None:
@@ -262,33 +706,37 @@ def main() -> None:
app = TrackRuntimeApp()
dpg.create_context()
dpg.create_viewport(title="track-core emulator", width=920, height=560)
dpg.create_viewport(title="track-core emulator", width=1160, height=690)
with dpg.window(label="track-core emulator", tag="main_window", width=900, height=540):
with dpg.window(label="track-core emulator", tag="main_window", width=1140, height=660):
with dpg.group(horizontal=True):
dpg.add_combo(("runtime", "manual"), default_value="runtime", label="source", tag="source_mode")
dpg.add_combo(("ST", "SM", "MT", "RSMT"), default_value="ST", label="scheme", tag="scheme_kind", callback=lambda *_: app.load_scheme(dpg))
dpg.add_combo(("instant", "smooth"), default_value="smooth", label="accel", tag="accel_profile", callback=lambda *_: app.load_scheme(dpg))
dpg.add_combo(("circular", "linear"), default_value="circular", label="draw", tag="draw_kind")
with dpg.child_window(width=500, height=630, border=True):
dpg.add_child_window(tag=EDITOR_TAG, width=480, height=610, border=False)
with dpg.child_window(width=610, height=630, border=True):
dpg.add_text("Runtime Preview")
with dpg.group(horizontal=True):
dpg.add_combo(("runtime", "manual"), default_value="runtime", label="source", tag="source_mode")
dpg.add_combo(("circular", "linear"), default_value="circular", label="track model", tag="draw_kind")
dpg.add_button(label="toggle model", callback=lambda *_: _toggle_draw_kind(dpg))
with dpg.group(horizontal=True):
dpg.add_button(label="load", callback=lambda *_: app.load_scheme(dpg))
dpg.add_button(label="start", callback=lambda *_: app.start())
dpg.add_button(label="stop", callback=lambda *_: app.stop())
dpg.add_color_edit(default_value=(0, 255, 0, 255), label="color", tag="runner_color", no_alpha=True, callback=lambda *_: app.load_scheme(dpg))
with dpg.group(horizontal=True):
dpg.add_button(label="reload schema", callback=lambda *_: app.load_scheme(dpg))
dpg.add_button(label="start", callback=lambda *_: app.start())
dpg.add_button(label="stop", callback=lambda *_: app.stop())
dpg.add_slider_float(label="time scale", tag="time_scale", default_value=1.0, min_value=0.0, max_value=8.0)
dpg.add_slider_float(label="line length m", tag="line_length", default_value=40.0, min_value=5.0, max_value=80.0)
dpg.add_slider_float(label="active length m", tag="active_length", default_value=10.0, min_value=0.1, max_value=40.0)
dpg.add_slider_float(label="head offset m", tag="head_offset", default_value=0.0, min_value=-40.0, max_value=40.0)
dpg.add_slider_int(label="LEDs", tag="led_count", default_value=160, min_value=8, max_value=400)
dpg.add_checkbox(label="manual running", default_value=True, tag="manual_running")
dpg.add_slider_float(label="manual mileage m", tag="manual_mileage", default_value=8.5, min_value=-80.0, max_value=120.0)
dpg.add_slider_float(label="manual speed m/s", tag="manual_speed", default_value=1.0, min_value=0.0, max_value=10.0)
dpg.add_drawlist(width=880, height=330, tag=DRAWLIST_TAG)
dpg.add_slider_float(label="time scale", tag="time_scale", default_value=1.0, min_value=0.0, max_value=8.0)
dpg.add_slider_float(label="line length m", tag="line_length", default_value=40.0, min_value=5.0, max_value=80.0)
dpg.add_slider_float(label="active length m", tag="active_length", default_value=10.0, min_value=0.1, max_value=40.0)
dpg.add_slider_float(label="head offset m", tag="head_offset", default_value=0.0, min_value=-40.0, max_value=40.0)
dpg.add_slider_int(label="LEDs", tag="led_count", default_value=160, min_value=8, max_value=400)
dpg.add_checkbox(label="manual running", default_value=True, tag="manual_running")
dpg.add_slider_float(label="manual position m", tag="manual_mileage", default_value=8.5, min_value=0.0, max_value=120.0)
dpg.add_drawlist(width=590, height=330, tag=DRAWLIST_TAG)
_build_file_dialogs(dpg, app)
rebuild_editor(dpg, app)
app.load_scheme(dpg)
dpg.setup_dearpygui()
dpg.set_primary_window("main_window", True)
dpg.show_viewport()
+357
View File
@@ -0,0 +1,357 @@
from __future__ import annotations
import copy
import json
import math
from pathlib import Path
from typing import Any
from . import (
AccelerationProfile,
Color,
MTSegment,
RepeatedSMSegment,
SMSegment,
STSegment,
make_mileage_time_scheme,
make_repeated_speed_mileage_time_scheme,
make_speed_mileage_scheme,
make_speed_time_scheme,
)
SCHEMA_VERSION = 1
SCHEME_KINDS = ("ST", "SM", "MT", "RSMT")
ACCELERATION_PROFILES = ("instant", "smooth")
def _sm(speed: float, mileage_m: int) -> SMSegment:
segment = SMSegment()
segment.speed_m_s = speed
segment.mileage_from_start_m = mileage_m
return segment
def _mt(mileage_m: int, time_s: int) -> MTSegment:
segment = MTSegment()
segment.mileage_to_travel_this_segment_m = mileage_m
segment.time_since_start_s = time_s
return segment
def _st(speed: float, time_s: int) -> STSegment:
segment = STSegment()
segment.speed_m_s = speed
segment.time_since_start_s = time_s
return segment
def _rsmt(time_s: int, segments: list[SMSegment]) -> RepeatedSMSegment:
segment = RepeatedSMSegment()
segment.time_since_start_s = time_s
segment.speed_mileage_segments = segments
return segment
def default_draft(kind: str = "ST") -> dict[str, Any]:
if kind == "SM":
segments: list[dict[str, Any]] = [
{"mileage_m": 0, "speed_m_s": 1.0},
{"mileage_m": 10, "speed_m_s": 2.8},
{"mileage_m": 24, "speed_m_s": 1.2},
{"mileage_m": 40, "speed_m_s": 2.0},
]
elif kind == "MT":
segments = [
{"time_s": 0, "mileage_m": 12},
{"time_s": 6, "mileage_m": 20},
{"time_s": 18, "mileage_m": 10},
{"time_s": 28, "mileage_m": 1},
]
elif kind == "RSMT":
segments = [
{
"time_s": 0,
"sub_segments": [
{"mileage_m": 0, "speed_m_s": 1.0},
{"mileage_m": 10, "speed_m_s": 3.0},
{"mileage_m": 24, "speed_m_s": 1.2},
],
},
{
"time_s": 18,
"sub_segments": [
{"mileage_m": 0, "speed_m_s": 2.2},
{"mileage_m": 8, "speed_m_s": 4.0},
{"mileage_m": 20, "speed_m_s": 1.0},
],
},
{
"time_s": 42,
"sub_segments": [
{"mileage_m": 0, "speed_m_s": 1.4},
{"mileage_m": 12, "speed_m_s": 2.4},
{"mileage_m": 24, "speed_m_s": 1.4},
],
},
]
else:
kind = "ST"
segments = [
{"time_s": 0, "speed_m_s": 1.0},
{"time_s": 8, "speed_m_s": 3.2},
{"time_s": 18, "speed_m_s": 1.4},
{"time_s": 30, "speed_m_s": 2.4},
{"time_s": 42, "speed_m_s": 0.8},
]
return {
"version": SCHEMA_VERSION,
"id": 1,
"kind": kind,
"acceleration_profile": "smooth",
"color": [0, 255, 0],
"segments": segments,
}
def validation_error(draft: dict[str, Any]) -> str | None:
try:
normalize_draft(draft)
except ValueError as exc:
return str(exc)
return None
def normalize_draft(draft: dict[str, Any]) -> dict[str, Any]:
if not isinstance(draft, dict):
raise ValueError("scheme JSON must be an object")
version = _integer(draft.get("version", SCHEMA_VERSION), "version")
if version != SCHEMA_VERSION:
raise ValueError(f"unsupported scheme version {version}")
kind = str(draft.get("kind", "ST"))
if kind not in SCHEME_KINDS:
raise ValueError(f"unsupported scheme kind {kind!r}")
profile = str(draft.get("acceleration_profile", "smooth"))
if profile not in ACCELERATION_PROFILES:
raise ValueError(f"unsupported acceleration profile {profile!r}")
normalized = {
"version": SCHEMA_VERSION,
"id": _bounded_int(draft.get("id", 1), "id", 0, 255),
"kind": kind,
"acceleration_profile": profile,
"color": _color_list(draft.get("color", [0, 255, 0])),
"segments": _normalize_segments(kind, draft.get("segments", [])),
}
return normalized
def clone_draft(draft: dict[str, Any]) -> dict[str, Any]:
return copy.deepcopy(draft)
def load_draft(path: str | Path) -> dict[str, Any]:
with Path(path).open("r", encoding="utf-8") as stream:
return normalize_draft(json.load(stream))
def save_draft(path: str | Path, draft: dict[str, Any]) -> dict[str, Any]:
normalized = normalize_draft(draft)
with Path(path).open("w", encoding="utf-8") as stream:
json.dump(normalized, stream, indent=2)
stream.write("\n")
return normalized
def draft_to_scheme(draft: dict[str, Any]):
normalized = normalize_draft(draft)
scheme_id = int(normalized["id"])
color = Color(*normalized["color"])
profile = (
AccelerationProfile.smooth
if normalized["acceleration_profile"] == "smooth"
else AccelerationProfile.instant
)
kind = normalized["kind"]
segments = normalized["segments"]
if kind == "SM":
return make_speed_mileage_scheme(
scheme_id,
color,
profile,
[_sm(float(segment["speed_m_s"]), int(segment["mileage_m"])) for segment in segments],
)
if kind == "MT":
return make_mileage_time_scheme(
scheme_id,
color,
profile,
[_mt(int(segment["mileage_m"]), int(segment["time_s"])) for segment in segments],
)
if kind == "RSMT":
return make_repeated_speed_mileage_time_scheme(
scheme_id,
color,
profile,
[
_rsmt(
int(segment["time_s"]),
[
_sm(float(sub_segment["speed_m_s"]), int(sub_segment["mileage_m"]))
for sub_segment in segment["sub_segments"]
],
)
for segment in segments
],
)
return make_speed_time_scheme(
scheme_id,
color,
profile,
[_st(float(segment["speed_m_s"]), int(segment["time_s"])) for segment in segments],
)
def _normalize_segments(kind: str, value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
raise ValueError("segments must be a list")
if kind == "ST":
segments = [
{
"time_s": _bounded_int(segment.get("time_s"), "time_s", 0, 65535),
"speed_m_s": _nonnegative_float(segment.get("speed_m_s"), "speed_m_s"),
}
for segment in _objects(value, "segments")
]
_require_nonempty(segments, "ST")
segments.sort(key=lambda segment: segment["time_s"])
_require_first_zero(segments[0]["time_s"], "first ST time_s")
_require_strictly_increasing(segments, "time_s")
return segments
if kind == "SM":
segments = [
{
"mileage_m": _bounded_int(segment.get("mileage_m"), "mileage_m", 0, 65535),
"speed_m_s": _nonnegative_float(segment.get("speed_m_s"), "speed_m_s"),
}
for segment in _objects(value, "segments")
]
_require_nonempty(segments, "SM")
segments.sort(key=lambda segment: segment["mileage_m"])
_require_first_zero(segments[0]["mileage_m"], "first SM mileage_m")
_require_strictly_increasing(segments, "mileage_m")
return segments
if kind == "MT":
segments = [
{
"time_s": _bounded_int(segment.get("time_s"), "time_s", 0, 65535),
"mileage_m": _bounded_int(segment.get("mileage_m"), "mileage_m", 0, 65535),
}
for segment in _objects(value, "segments")
]
if len(segments) < 2:
raise ValueError("MT needs at least two segments")
segments.sort(key=lambda segment: segment["time_s"])
_require_first_zero(segments[0]["time_s"], "first MT time_s")
_require_strictly_increasing(segments, "time_s")
for index, segment in enumerate(segments[:-1]):
if segment["mileage_m"] <= 0:
raise ValueError(f"MT segment {index} mileage_m must be greater than 0")
return segments
segments = []
for index, segment in enumerate(_objects(value, "segments")):
sub_segments = [
{
"mileage_m": _bounded_int(sub_segment.get("mileage_m"), "mileage_m", 0, 65535),
"speed_m_s": _nonnegative_float(sub_segment.get("speed_m_s"), "speed_m_s"),
}
for sub_segment in _objects(segment.get("sub_segments", []), f"RSMT segment {index} sub_segments")
]
_require_nonempty(sub_segments, f"RSMT segment {index}")
sub_segments.sort(key=lambda sub_segment: sub_segment["mileage_m"])
_require_first_zero(sub_segments[0]["mileage_m"], f"first RSMT segment {index} mileage_m")
_require_strictly_increasing(sub_segments, "mileage_m")
segments.append(
{
"time_s": _bounded_int(segment.get("time_s"), "time_s", 0, 65535),
"sub_segments": sub_segments,
}
)
_require_nonempty(segments, "RSMT")
segments.sort(key=lambda segment: segment["time_s"])
_require_first_zero(segments[0]["time_s"], "first RSMT time_s")
_require_strictly_increasing(segments, "time_s")
return segments
def _objects(value: Any, name: str) -> list[dict[str, Any]]:
if not isinstance(value, list):
raise ValueError(f"{name} must be a list")
if not all(isinstance(item, dict) for item in value):
raise ValueError(f"{name} entries must be objects")
return value
def _require_nonempty(segments: list[dict[str, Any]], kind: str) -> None:
if not segments:
raise ValueError(f"{kind} needs at least one segment")
def _require_first_zero(value: int, name: str) -> None:
if value != 0:
raise ValueError(f"{name} must be 0")
def _require_strictly_increasing(segments: list[dict[str, Any]], field: str) -> None:
for index in range(1, len(segments)):
if int(segments[index][field]) <= int(segments[index - 1][field]):
raise ValueError(f"{field} must be strictly increasing")
def _integer(value: Any, name: str) -> int:
if isinstance(value, bool):
raise ValueError(f"{name} must be an integer")
try:
result = int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{name} must be an integer") from exc
if result != value and not (isinstance(value, float) and value.is_integer()):
raise ValueError(f"{name} must be an integer")
return result
def _bounded_int(value: Any, name: str, minimum: int, maximum: int) -> int:
result = _integer(value, name)
if result < minimum or result > maximum:
raise ValueError(f"{name} must be between {minimum} and {maximum}")
return result
def _nonnegative_float(value: Any, name: str) -> float:
try:
result = float(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{name} must be a number") from exc
if not math.isfinite(result) or result < 0.0:
raise ValueError(f"{name} must be a finite non-negative number")
return result
def _color_list(value: Any) -> list[int]:
if not isinstance(value, (list, tuple)) or len(value) < 3:
raise ValueError("color must contain red, green, and blue values")
return [
_bounded_int(value[0], "color red", 0, 255),
_bounded_int(value[1], "color green", 0, 255),
_bounded_int(value[2], "color blue", 0, 255),
]