First commit

This commit is contained in:
2025-11-07 11:39:23 +00:00
commit 4fb3471833
281 changed files with 6610 additions and 0 deletions

View File

@@ -0,0 +1,263 @@
"""."""
from __future__ import annotations
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
from controller import Supervisor
# Robot constructor lacks a return type annotation in R2023b
sys.path.insert(0, Supervisor().getProjectPath()) # type: ignore[no-untyped-call]
# from lighting_control import LightingControl
import environment # configure path to include modules
from robot_logging import get_match_identifier, prefix_and_tee_streams
from robot_utils import get_game_mode, get_match_data, get_robot_file
# Get the robot object that was created when setting up the environment
_robot = Supervisor.created
assert _robot is not None, "Robot object not created"
supervisor: Supervisor = _robot # type: ignore[assignment]
class RobotData:
"""Data about a robot in the arena."""
def __init__(self, zone: int):
self.registered_ready = False
self.zone = zone
self.robot = supervisor.getFromDef(f'ROBOT{zone}')
if self.robot is None:
raise ValueError(f"Failed to get Webots node for zone {zone}")
def zone_occupied(self) -> bool:
"""Check if this zone has a robot.py file associated with it."""
try:
_ = get_robot_file(self.zone)
except FileNotFoundError:
return False
return True
def remove_robot(self) -> None:
"""Delete the robot proto from the world."""
self.robot.remove() # type: ignore[attr-defined]
def preset_robot(self) -> None:
"""Arm the robot so that it waits for the start signal."""
self.robot.getField('customData').setSFString('prestart') # type: ignore[attr-defined]
def robot_ready(self) -> bool:
"""Check if robot has set its pre-start flag."""
return bool(self.robot.getField('customData').getSFString() == 'ready') # type: ignore[attr-defined]
def start_robot(self) -> None:
"""Signal to the robot that the start button has been pressed."""
self.robot.getField('customData').setSFString('start') # type: ignore[attr-defined]
class Robots:
"""A collection of robots in the arena."""
def __init__(self) -> None:
self.robots: dict[int, RobotData] = {}
for zone in range(0, environment.NUM_ZONES):
try:
robot_data = RobotData(zone)
except ValueError as e:
print(e)
else:
self.robots[zone] = robot_data
def remove_unoccupied_robots(self) -> None:
"""Remove all robots that don't have usercode."""
for robot in list(self.robots.values()):
if not robot.zone_occupied():
robot.remove_robot()
_ = self.robots.pop(robot.zone)
def preset_robots(self) -> None:
"""Arm all robots so that they wait for the start signal."""
for robot in self.robots.values():
robot.preset_robot()
def wait_for_ready(self, timeout: float) -> None:
"""Wait for all robots to set their pre-start flags."""
end_time = supervisor.getTime() + timeout
while supervisor.getTime() < end_time:
all_ready = True
# Sleep in individual timesteps to allow the robots to update
supervisor.step()
for zone, robot in self.robots.items():
if not robot.registered_ready:
if robot.robot_ready():
print(f"Robot in zone {zone} is ready.")
# Log only once per robot when ready
robot.registered_ready = True
else:
all_ready = False
if all_ready:
break
else:
pending_robots = ', '.join([
str(zone)
for zone, robot in self.robots.items()
if not robot.robot_ready()
])
raise TimeoutError(
f"Robots in zones {pending_robots} failed to initialise. "
f"Failed to reach wait_start() within {timeout} seconds."
)
def start_robots(self) -> None:
"""Signal to all robots that their start buttons have been pressed."""
for robot in self.robots.values():
robot.start_robot()
def is_dev_mode() -> bool:
"""Load the mode file and check if we are in dev mode."""
return (get_game_mode() == 'dev')
@contextmanager
def record_animation(filename: Path) -> Iterator[None]:
"""Record an animation for the duration of the manager."""
filename.parent.mkdir(parents=True, exist_ok=True)
print(f"Saving animation to {filename}")
supervisor.animationStartRecording(str(filename))
yield
supervisor.animationStopRecording() # type: ignore[no-untyped-call]
@contextmanager
def record_video(
filename: Path,
resolution: tuple[int, int],
skip: bool = False
) -> Iterator[None]:
"""Record a video for the duration of the manager."""
filename.parent.mkdir(parents=True, exist_ok=True)
if skip:
print('Not recording movie')
yield
return
else:
print(f"Saving video to {filename}")
supervisor.movieStartRecording(
str(filename),
width=resolution[0],
height=resolution[1],
quality=100,
codec=0,
acceleration=1,
caption=False,
)
yield
supervisor.movieStopRecording() # type: ignore[no-untyped-call]
while not supervisor.movieIsReady(): # type: ignore[no-untyped-call]
time.sleep(0.1)
if supervisor.movieFailed(): # type: ignore[no-untyped-call]
print("Movie failed to record")
def save_image(filename: Path) -> None:
"""Capture an image of the arena."""
filename.parent.mkdir(parents=True, exist_ok=True)
print(f"Saving image to {filename}")
supervisor.exportImage(str(filename), 100)
def run_match(
match_duration: int,
media_path_stem: Path,
video_resolution: tuple[int, int],
skip_video: bool,
) -> None:
"""Run a match in the arena."""
robots = Robots()
robots.remove_unoccupied_robots()
time_step = int(supervisor.getBasicTimeStep())
match_timesteps = (match_duration * 1000) // time_step
# lighting_control = LightingControl(supervisor, match_timesteps)
robots.preset_robots()
robots.wait_for_ready(5)
with record_animation(media_path_stem.with_suffix('.html')):
# Animations don't support lighting changes so start the animation before
# setting the lighting. Step the simulation to allow the animation to start.
supervisor.step()
# Set initial lighting
# lighting_control.service_lighting(0)
with record_video(media_path_stem.with_suffix('.mp4'), video_resolution, skip_video):
print("===========")
print("Match start")
print("===========")
# We are ready to start the match now. "Press" the start button on the robots
robots.start_robots()
supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_FAST) # type: ignore[attr-defined]
# for current_step in range(match_timesteps + 1):
# lighting_control.service_lighting(current_step)
# supervisor.step(time_step)
supervisor.step(match_timesteps)
print("==================")
print("Game over, pausing")
print("==================")
supervisor.simulationSetMode(Supervisor.SIMULATION_MODE_PAUSE) # type: ignore[attr-defined]
# To allow for a clear image of the final state, we have reset the
# lighting after the final frame of the video.
save_image(media_path_stem.with_suffix('.jpg'))
# TODO score match
def main() -> None:
"""Run the competition supervisor."""
if is_dev_mode():
robots = Robots()
robots.remove_unoccupied_robots()
exit()
match_data = get_match_data()
match_id = get_match_identifier()
prefix_and_tee_streams(
environment.ARENA_ROOT / f'supervisor-log-{match_id}.txt',
prefix=lambda: f'[{supervisor.getTime():0.3f}] ',
)
try:
# TODO check for required libraries?
run_match(
match_data.match_duration,
environment.ARENA_ROOT / 'recordings' / match_id,
video_resolution=match_data.video_resolution,
skip_video=(not match_data.video_enabled),
)
# Set the overall Webots exit code to follow the supervisor's exit code
except Exception as e:
# Print and step so error is printed to console
print(f"Error: {e}")
supervisor.step()
supervisor.simulationQuit(1)
raise
else:
supervisor.simulationQuit(0)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,287 @@
"""
The controller for altering arena lighting provided by a DirectionalLight and a Background.
Currently doesn't support:
- Timed pre-match lighting changes
"""
from __future__ import annotations
from typing import NamedTuple
from controller import Node, Supervisor
MATCH_LIGHTING_INTENSITY = 1.5
DEFAULT_LUMINOSITY = 1
class FromEnd(NamedTuple):
"""
Represents a time relative to the end of the match.
Negative values are times before the end of the match. 0 is the last frame
of the video. All positive values will only appear in the post-match image.
"""
time: float
class ArenaLighting(NamedTuple):
"""Represents a lighting configuration for the arena."""
light_def: str
intensity: float
colour: tuple[float, float, float] = (1, 1, 1)
class LightingEffect(NamedTuple):
"""Represents a lighting effect to be applied to the arena."""
start_time: float | FromEnd
fade_time: float | None = None
lighting: ArenaLighting = ArenaLighting('SUN', intensity=MATCH_LIGHTING_INTENSITY)
luminosity: float = DEFAULT_LUMINOSITY
name: str = ""
def __repr__(self) -> str:
light = self.lighting
lights_info = [
f"({light.light_def}, int={light.intensity}, col={light.colour})"
]
return (
f"<LightingEffect: {self.name!r}, "
f"start={self.start_time}, fade={self.fade_time}, "
f"lum={self.luminosity}, "
f"{', '.join(lights_info)}"
">"
)
class LightingStep(NamedTuple):
"""Represents a step in a lighting fade."""
timestep: int
light_node: Node
intensity: float | None
colour: tuple[float, float, float] | None
luminosity: float | None
name: str | None = None
CUE_STACK = [
LightingEffect(
0,
lighting=ArenaLighting('SUN', intensity=0.2),
luminosity=0.05,
name="Pre-set",
),
LightingEffect(
0,
fade_time=1.5,
lighting=ArenaLighting('SUN', intensity=MATCH_LIGHTING_INTENSITY),
luminosity=1,
name="Fade-up",
),
LightingEffect(
FromEnd(0), # This time runs this cue as the last frame of the video
lighting=ArenaLighting('SUN', intensity=1, colour=(0.8, 0.1, 0.1)),
luminosity=0.1,
name="End of match",
),
LightingEffect(
FromEnd(1),
lighting=ArenaLighting('SUN', intensity=MATCH_LIGHTING_INTENSITY),
luminosity=DEFAULT_LUMINOSITY,
name="Post-match image",
),
]
class LightingControl:
"""Controller for managing lighting effects in the arena."""
def __init__(self, supervisor: Supervisor, duration: int) -> None:
self._robot = supervisor
self._final_timestep = duration
self.timestep = self._robot.getBasicTimeStep()
self.ambient_node = supervisor.getFromDef('AMBIENT')
# fetch all nodes used in effects, any missing nodes will be flagged here
light_names = set(effect.lighting.light_def for effect in CUE_STACK)
self.lights = {
name: supervisor.getFromDef(name)
for name in light_names
}
missing_lights = [name for name, light in self.lights.items() if light is None]
if missing_lights:
raise ValueError(f"Missing light nodes: {missing_lights}")
# Convert FromEnd times to absolute times
cue_stack = self.convert_from_end_times(CUE_STACK)
self.lighting_steps = self.generate_lighting_steps(cue_stack)
def convert_from_end_times(self, cue_stack: list[LightingEffect]) -> list[LightingEffect]:
"""Convert FromEnd times to absolute times."""
new_cue_stack = []
end_time = (self._final_timestep * self.timestep) / 1000
# @ 25 fps the last 5 timesteps are not included in the video
start_of_frame_offset = self.timestep * 6 / 1000
for cue in cue_stack:
if isinstance(cue.start_time, FromEnd):
abs_time = end_time + cue.start_time.time - start_of_frame_offset
new_cue_stack.append(cue._replace(start_time=abs_time))
else:
new_cue_stack.append(cue)
return new_cue_stack
def generate_lighting_steps(self, cue_stack: list[LightingEffect]) -> list[LightingStep]:
"""Expand the cue stack into a list of lighting steps."""
steps: list[LightingStep] = []
# Generate current values for all lights
current_values = {
name: LightingStep(
0,
light,
light.getField('intensity').getSFFloat(), # type: ignore[attr-defined]
light.getField('color').getSFColor(), # type: ignore[attr-defined]
0,
)
for name, light in self.lights.items()
}
current_luminosity = self.ambient_node.getField('luminosity').getSFFloat() # type: ignore[attr-defined]
for cue in cue_stack:
# Get the current state of the light with the current luminosity
current_state = current_values[cue.lighting.light_def]
current_state = current_state._replace(luminosity=current_luminosity)
expanded_cue = self.expand_lighting_fade(cue, current_state)
# Update current values from the last step of the cue
current_values[cue.lighting.light_def] = expanded_cue[-1]
current_luminosity = expanded_cue[-1].luminosity
steps.extend(expanded_cue)
steps.sort(key=lambda x: x.timestep)
# TODO optimise steps to remove duplicate steps
return steps
def expand_lighting_fade(
self,
cue: LightingEffect,
current_state: LightingStep,
) -> list[LightingStep]:
"""Expand a fade effect into a list of steps."""
fades = []
assert isinstance(cue.start_time, (float, int)), \
"FromEnd times should be converted to absolute times"
cue_start = int((cue.start_time * 1000) / self.timestep)
if cue.fade_time is None:
# no fade, just set values
return [LightingStep(
cue_start,
self.lights[cue.lighting.light_def],
cue.lighting.intensity,
cue.lighting.colour,
cue.luminosity,
cue.name
)]
assert current_state.intensity is not None, "Current intensity should be set"
assert current_state.colour is not None, "Current colour should be set"
assert current_state.luminosity is not None, "Current luminosity should be set"
fade_steps = int((cue.fade_time * 1000) / self.timestep)
if fade_steps == 0:
fade_steps = 1
intensity_step = (cue.lighting.intensity - current_state.intensity) / fade_steps
colour_step = [
(cue.lighting.colour[0] - current_state.colour[0]) / fade_steps,
(cue.lighting.colour[1] - current_state.colour[1]) / fade_steps,
(cue.lighting.colour[2] - current_state.colour[2]) / fade_steps,
]
luminosity_step = (cue.luminosity - current_state.luminosity) / fade_steps
for step in range(fade_steps):
fades.append(
LightingStep(
cue_start + step,
self.lights[cue.lighting.light_def],
current_state.intensity + intensity_step * step,
(
current_state.colour[0] + (colour_step[0] * step),
current_state.colour[1] + (colour_step[1] * step),
current_state.colour[2] + (colour_step[2] * step),
),
current_state.luminosity + luminosity_step * step,
cue.name if step == 0 else None,
)
)
# Replace the last step with the final values
fades.pop()
fades.append(LightingStep(
cue_start + fade_steps,
self.lights[cue.lighting.light_def],
cue.lighting.intensity,
cue.lighting.colour,
cue.luminosity,
))
return fades
def set_luminosity(self, luminosity: float) -> None:
"""Set the luminosity of the ambient node."""
self.ambient_node.getField('luminosity').setSFFloat(float(luminosity)) # type: ignore[attr-defined]
def set_node_intensity(self, node: Node, intensity: float) -> None:
"""Set the intensity of a node."""
node.getField('intensity').setSFFloat(float(intensity)) # type: ignore[attr-defined]
def set_node_colour(self, node: Node, colour: tuple[float, float, float]) -> None:
"""Set the colour of a node."""
node.getField('color').setSFColor(list(colour)) # type: ignore[attr-defined]
def service_lighting(self, current_timestep: int) -> int:
"""Service the lighting effects for the current timestep."""
index = 0
if current_timestep >= self._final_timestep and self.lighting_steps:
# Run all remaining steps
current_timestep = self.lighting_steps[-1].timestep
while (
len(self.lighting_steps) > index
and self.lighting_steps[index].timestep == current_timestep
):
lighting_step = self.lighting_steps[index]
if lighting_step.name is not None:
print(
f"Running lighting effect: {lighting_step.name} @ "
f"{current_timestep * self.timestep / 1000}"
)
if lighting_step.intensity is not None:
self.set_node_intensity(lighting_step.light_node, lighting_step.intensity)
if lighting_step.colour is not None:
self.set_node_colour(lighting_step.light_node, lighting_step.colour)
if lighting_step.luminosity is not None:
self.set_luminosity(lighting_step.luminosity)
index += 1
# Remove all steps that have been processed
self.lighting_steps = self.lighting_steps[index:]
if self.lighting_steps:
return self.lighting_steps[0].timestep - current_timestep
else:
return -1

View File

@@ -0,0 +1,3 @@
[python]
COMMAND = /home/syed/tmp/sbot-simulator-2026.0.1/venv/bin/python

View File

@@ -0,0 +1,3 @@
[python]
COMMAND = /home/syed/tmp/sbot-simulator-2026.0.1/venv/bin/python

View File

@@ -0,0 +1,150 @@
"""
The entry point for all robot controllers.
This script is responsible for setting up the environment, starting the devices,
and running the usercode.
The board simulators are run in a separate thread to allow the usercode to run
in the main thread. This provides the interface between the sr-robot3 module and Webots.
"""
import atexit
import json
import logging
import os
import runpy
import sys
import threading
from pathlib import Path
from tempfile import TemporaryDirectory
from controller import Robot
# Robot constructor lacks a return type annotation in R2023b
sys.path.insert(0, Robot().getProjectPath()) # type: ignore[no-untyped-call]
import environment # configure path to include modules
from robot_logging import get_match_identifier, prefix_and_tee_streams
from robot_utils import get_game_mode, get_robot_file, print_simulation_version
from sbot_interface.setup import setup_devices
from sbot_interface.socket_server import SocketServer
# Get the robot object that was created when setting up the environment
_robot = Robot.created
assert _robot is not None, "Robot object not created"
robot = _robot
LOGGER = logging.getLogger('usercode_runner')
def start_devices() -> SocketServer:
"""
Create the board simulators and return the SocketServer object.
Using the links or links_formatted method of the SocketServer object, the
devices' socket addresses can be accessed and passed to the usercode.
The WEBOTS_DEVICE_LOGGING environment variable, overrides the log level used.
Default is WARNING.
"""
if log_level := os.environ.get('WEBOTS_DEVICE_LOGGING'):
return setup_devices(log_level)
else:
return setup_devices()
def run_usercode(robot_file: Path, robot_zone: int, game_mode: str) -> None:
"""
Run the user's code from the given file.
Metadata is created in a temporary directory and passed to the usercode.
The system path is modified to avoid the controller modules being imported
in the usercode.
:param robot_file: The path to the robot file
:param robot_zone: The zone number
:param game_mode: The game mode string ('dev' or 'comp')
:raises Exception: If the usercode raises an exception
"""
# Remove this folder from the path
sys.path.remove(str(Path.cwd()))
# Remove our custom modules from the path
sys.path.remove(str(environment.MODULES_ROOT))
# Add the usercode folder to the path
sys.path.insert(0, str(robot_file.parent))
# Change the current directory to the usercode folder
os.chdir(robot_file.parent)
with TemporaryDirectory() as tmpdir:
# Setup metadata (zone, game_mode)
Path(tmpdir).joinpath('astoria.json').write_text(json.dumps({
"arena": "simulator",
"zone": robot_zone,
"mode": 'COMP' if game_mode == 'comp' else 'DEV',
}))
os.environ['SBOT_METADATA_PATH'] = tmpdir
os.environ['SBOT_USBKEY_PATH'] = str(Path.cwd())
# Run the usercode
# pass robot object to the usercode for keyboard robot control
runpy.run_path(str(robot_file), init_globals={'__robot__': robot})
def main() -> bool:
"""
The main entry point for the usercode runner.
This function is responsible for setting up the environment, starting the
devices, and running the usercode.
The zone number is passed as the first argument to the script using
controllerArgs on the robot.
On completion, the devices are stopped and atexit functions are run.
"""
zone = int(sys.argv[1])
game_mode = get_game_mode()
# Get the robot file
try:
robot_file = get_robot_file(zone)
except FileNotFoundError as e:
print(e.args[0])
robot.step()
# Not having a robot file is not an error in dev mode
return game_mode != 'comp'
# Setup log file
prefix_and_tee_streams(
robot_file.parent / f'log-zone-{zone}-{get_match_identifier()}.txt',
prefix=lambda: f'[{zone}| {robot.getTime():0.3f}] ',
)
# Setup devices
devices = start_devices()
# Print the simulation version
print_simulation_version()
# Pass the devices to the usercode
os.environ['WEBOTS_SIMULATOR'] = '1'
os.environ['WEBOTS_ROBOT'] = devices.links_formatted()
# Start devices in a separate thread
thread = threading.Thread(target=devices.run)
thread.start()
# Run the usercode
try:
run_usercode(robot_file, zone, game_mode)
finally:
# Run cleanup code registered in the usercode
atexit._run_exitfuncs() # noqa: SLF001
# Cleanup devices
devices.completed = True
devices.stop_event.set()
return True
if __name__ == '__main__':
exit(0 if main() else 1)