Robot Hardware Abstraction Layer
Overview
The JOSHUA Robot Layer provides a comprehensive hardware abstraction layer (HAL) that decouples high-level robot logic from the specifics of physical actuators, sensors, and communication buses. By programming against well-defined interfaces rather than concrete hardware drivers, the same control code can target a Feetech STS3215 servo arm on a bench, a LEGO SPIKE Prime over BLE, or a fully mocked test harness—without changing a single line of application logic.
The layer is organized into three primary subsystems:
- Action System — Controls actuators (servo motors, motor drivers) through a unified command interface.
- Perception System — Reads data from sensors (cameras, encoders, LiDAR) through a unified sensor interface.
- Communication Layer — Manages transport protocols (Serial, BLE) used by action and perception devices.
Architecture
The Robot Layer follows a layered architecture where each subsystem is built on abstract interfaces, with concrete implementations registered in factory classes. Configuration drives which implementations are instantiated at runtime.
+------------------------------------------------------------------+
| Configuration (Proto) |
| RobotConfig { action_config, perception_config, comm_config } |
+------------------------------------------------------------------+
| | |
v v v
+------------------+ +-------------------+ +------------------+
| ActionFactory | | PerceptionFactory | | CommFactory |
| | | | | |
| ActionType enum | | PerceptionType | | CommType enum |
| -> Actuator impl | | -> Sensor impl | | -> Transport |
+------------------+ +-------------------+ +------------------+
| | |
v v v
+------------------+ +-------------------+ +------------------+
| ActionInterface | | PerceptionIface | | CommInterface |
| ActuatorInterface| | CameraInterface | | SerialComm |
| | | EncoderInterface | | BLEComm |
| - STS3215 Servo | | LidarInterface | | |
| - Mock Motor | | | | |
| - SPIKE Motor | | - Camera (OpenCV) | | |
| | | - Encoder | | |
| | | - LiDAR (LDS01) | | |
| | | - Mock variants | | |
+------------------+ +-------------------+ +------------------+
| | |
+--------------------+---------------------+
|
v
+---------------------+
| ROS2 Node Runner |
| (per-device nodes) |
+---------------------+
Each factory reads its corresponding section from the Protocol Buffers configuration and returns the appropriate concrete implementation. This design ensures that new hardware support can be added by implementing an interface and registering it in the factory—no changes to the orchestration or node layer are required.
Action System (Actuators)
The Action System provides a unified interface for sending commands to physical actuators. It is built around two core abstract interfaces:
ActionInterface
The top-level abstract base class for all action devices. It defines the contract for writing commands to hardware.
// C++ - ActionInterface (abstract base)
class ActionInterface {
public:
virtual ~ActionInterface() = default;
// Initialize the action device with configuration
virtual bool initialize(const ActionConfig& config) = 0;
// Write a command packet to the hardware
virtual bool write(const ActionPacket& packet) = 0;
// Shut down the device gracefully
virtual void shutdown() = 0;
// Check if the device is ready to accept commands
virtual bool isReady() const = 0;
};
ActuatorInterface
A more specific interface extending ActionInterface for motor-based actuators, adding methods for position, speed, and torque control.
// C++ - ActuatorInterface (extends ActionInterface)
class ActuatorInterface : public ActionInterface {
public:
// Set target position for the actuator (in device units)
virtual bool setPosition(int id, int position) = 0;
// Set movement speed
virtual bool setSpeed(int id, int speed) = 0;
// Enable or disable torque
virtual bool setTorque(int id, bool enable) = 0;
// Read current position from the actuator
virtual int readPosition(int id) = 0;
};
ActionFactory Pattern
The ActionFactory creates actuator instances based on the ActionType enum defined in the configuration proto. This is the single point of entry for all actuator instantiation.
// C++ - ActionFactory
class ActionFactory {
public:
static std::unique_ptr<ActionInterface> create(const ActionConfig& config) {
switch (config.action_type()) {
case ActionType::STS3215_SERVO:
return std::make_unique<STS3215Servo>(config);
case ActionType::MOCK_MOTOR:
return std::make_unique<MockMotor>(config);
case ActionType::SPIKE_MOTOR:
return std::make_unique<SpikeMotor>(config);
default:
throw std::runtime_error("Unknown ActionType: " +
std::to_string(config.action_type()));
}
}
};
# Python - ActionFactory
class ActionFactory:
"""Factory for creating action device instances from configuration."""
_registry: Dict[ActionType, Type[ActionInterface]] = {
ActionType.STS3215_SERVO: STS3215Servo,
ActionType.MOCK_MOTOR: MockMotor,
ActionType.SPIKE_MOTOR: SpikeMotor,
}
@classmethod
def create(cls, config: ActionConfig) -> ActionInterface:
action_class = cls._registry.get(config.action_type)
if action_class is None:
raise ValueError(f"Unknown ActionType: {config.action_type}")
return action_class(config)
action_type field from the proto config, so switching from real hardware to a mock driver is a single-line config change—no recompilation or code changes needed.
Supported Actuators
| Actuator | ActionType Enum | Language | Transport | Use Case |
|---|---|---|---|---|
| STS3215 Servo | STS3215_SERVO |
C++ + Python | Serial (TTL bus) | SO-ARM100 robotic arms, precision joint control |
| Mock Motor | MOCK_MOTOR |
Python | None (in-memory) | Unit testing, CI pipelines, development without hardware |
| SPIKE Motor | SPIKE_MOTOR |
Python | BLE (Pybricks) | LEGO SPIKE Prime educational robotics |
STS3215 Servo
The Feetech STS3215 is a TTL bus servo used in the SO-ARM100 robotic arm. It supports daisy-chaining multiple servos on a single serial bus, with each servo addressed by a unique ID. The driver implements the Feetech serial protocol for position, speed, and torque control.
- Protocol: Feetech serial protocol over TTL half-duplex UART
- Position range: 0–4095 (12-bit resolution, 0.088 degrees per step)
- Speed control: Configurable movement speed per command
- Torque control: Enable/disable torque per servo; adjustable torque limit
- Feedback: Read-back of current position, speed, load, voltage, and temperature
- Bus topology: Up to 253 servos per bus, daisy-chained on a single serial port
# Python - STS3215 Servo usage
from joshua.robot.action import ActionFactory, ActionConfig
config = ActionConfig(
action_type=ActionType.STS3215_SERVO,
serial_port="/dev/ttyUSB0",
baudrate=1000000,
servo_ids=[1, 2, 3, 4, 5, 6],
)
servo = ActionFactory.create(config)
servo.initialize(config)
# Set position for servo ID 1 to midpoint
servo.setPosition(id=1, position=2048)
# Set movement speed for servo ID 2
servo.setSpeed(id=2, speed=500)
# Enable torque on all servos
for sid in config.servo_ids:
servo.setTorque(id=sid, enable=True)
Mock Motor
The Mock Motor is a Python-only testing implementation that simulates actuator behavior without requiring physical hardware. All commands are logged to an internal buffer for assertion in unit tests, and position reads return the last commanded value.
# Python - Mock Motor usage
from joshua.robot.action import ActionFactory, ActionConfig
config = ActionConfig(
action_type=ActionType.MOCK_MOTOR,
servo_ids=[1, 2, 3],
)
mock = ActionFactory.create(config)
mock.initialize(config)
# Commands are logged, not sent to hardware
mock.setPosition(id=1, position=1024)
mock.setTorque(id=1, enable=True)
# Retrieve command log for test assertions
assert mock.get_command_log()[-1] == {"id": 1, "torque": True}
print(f"Commands logged: {len(mock.get_command_log())}")
SPIKE Motor
The SPIKE Motor driver supports LEGO SPIKE Prime hubs running Pybricks firmware, communicating via Bluetooth Low Energy (BLE). The PybricksMotorDriver wraps the BLE transport layer to send motor commands to the hub.
- Transport: Bluetooth Low Energy (BLE) via the Pybricks protocol
- Firmware: Requires Pybricks firmware installed on the SPIKE Prime hub
- Motor types: Supports SPIKE Prime Large Motor and Medium Motor
- Commands: Position control, speed control, run-for-degrees, run-for-time
- Feedback: Motor angle, speed, and stall detection over BLE notifications
# Python - SPIKE Motor with Pybricks BLE
from joshua.robot.action import ActionFactory, ActionConfig
config = ActionConfig(
action_type=ActionType.SPIKE_MOTOR,
ble_device_name="Pybricks Hub",
motor_ports=["A", "B", "C"],
)
spike = ActionFactory.create(config)
spike.initialize(config) # Initiates BLE connection and handshake
# Run motor on port A to position 90 degrees
spike.setPosition(id=0, position=90)
# Set continuous speed on port B
spike.setSpeed(id=1, speed=500)
ActionPacket Proto
All commands flowing through the Action System are serialized as ActionPacket Protocol Buffers messages. This provides a structured, versioned, and language-agnostic wire format for motor commands.
// Proto - ActionPacket definition
syntax = "proto3";
package joshua.robot;
message ActionPacket {
// Target servo/motor identifier
int32 device_id = 1;
// Primary command fields
int32 position = 2; // Target position (device units)
int32 torque = 3; // Torque value or enable flag
int32 speed = 4; // Movement speed
// Preset commands for common operations
enum PresetCommand {
NONE = 0;
HOME = 1; // Move to home/zero position
PARK = 2; // Move to park/safe position
EMERGENCY_STOP = 3; // Immediate stop, disable torque
}
PresetCommand preset = 5;
// Complex multi-parameter actions
message CompoundAction {
repeated int32 target_positions = 1; // Synchronized multi-joint
repeated int32 target_speeds = 2; // Per-joint speeds
int32 interpolation_ms = 3; // Interpolation duration
}
CompoundAction compound = 6;
// Timestamp for synchronization
int64 timestamp_ns = 7;
}
CompoundAction field enables synchronized multi-joint movements with per-joint speed profiles and configurable interpolation time, which is essential for smooth robotic arm trajectories.
Perception System (Sensors)
The Perception System provides a unified interface for reading data from sensors. Like the Action System, it is built on abstract interfaces with factory-pattern instantiation.
PerceptionInterface
The top-level abstract base class for all perception devices.
// C++ - PerceptionInterface (abstract base)
class PerceptionInterface {
public:
virtual ~PerceptionInterface() = default;
// Initialize the sensor with configuration
virtual bool initialize(const PerceptionConfig& config) = 0;
// Read a data packet from the sensor
virtual PerceptionPacket read() = 0;
// Shut down the sensor gracefully
virtual void shutdown() = 0;
// Check if the sensor has new data available
virtual bool hasData() const = 0;
};
Specialized Sensor Interfaces
More specific interfaces extend PerceptionInterface for different sensor modalities:
// C++ - Specialized sensor interfaces
class CameraInterface : public PerceptionInterface {
public:
virtual cv::Mat captureFrame() = 0;
virtual bool setResolution(int width, int height) = 0;
virtual bool setFPS(int fps) = 0;
};
class EncoderInterface : public PerceptionInterface {
public:
virtual double readAngle(int id) = 0;
virtual double readVelocity(int id) = 0;
virtual void setDataMode(EncoderDataMode mode) = 0;
};
class LidarInterface : public PerceptionInterface {
public:
virtual std::vector<float> readRanges() = 0;
virtual float getAngleMin() const = 0;
virtual float getAngleMax() const = 0;
virtual float getAngleIncrement() const = 0;
};
PerceptionFactory Pattern
// C++ - PerceptionFactory
class PerceptionFactory {
public:
static std::unique_ptr<PerceptionInterface> create(
const PerceptionConfig& config) {
switch (config.perception_type()) {
case PerceptionType::CAMERA:
return std::make_unique<CameraDriver>(config);
case PerceptionType::ENCODER:
return std::make_unique<EncoderDriver>(config);
case PerceptionType::LIDAR:
return std::make_unique<LidarDriver>(config);
case PerceptionType::MOCK_CAMERA:
return std::make_unique<MockCamera>(config);
case PerceptionType::MOCK_ENCODER:
return std::make_unique<MockEncoder>(config);
case PerceptionType::MOCK_LIDAR:
return std::make_unique<MockLidar>(config);
default:
throw std::runtime_error("Unknown PerceptionType");
}
}
};
Supported Sensors
| Sensor | PerceptionType Enum | Language | ROS2 Message Type | Use Case |
|---|---|---|---|---|
| Camera (OpenCV) | CAMERA |
C++ + Python | sensor_msgs/Image |
Visual perception, AI inference input |
| Encoder (STS3215) | ENCODER |
C++ + Python | std_msgs/Float64MultiArray |
Joint position feedback, teleoperation |
| LiDAR (LDS01) | LIDAR |
C++ | sensor_msgs/LaserScan, PointCloud2 |
Obstacle detection, mapping |
| Mock Camera | MOCK_CAMERA |
Python | sensor_msgs/Image |
Testing without camera hardware |
| Mock Encoder | MOCK_ENCODER |
Python | std_msgs/Float64MultiArray |
Testing without encoder hardware |
| Mock LiDAR | MOCK_LIDAR |
Python | sensor_msgs/LaserScan |
Testing without LiDAR hardware |
Camera
The Camera driver uses OpenCV (cv::VideoCapture in C++, cv2.VideoCapture in Python) to capture frames from USB or CSI cameras. It supports configurable resolution, frame rate, and FourCC codec settings, and publishes frames as sensor_msgs/Image ROS2 messages.
- Resolution: Configurable width and height (e.g., 640x480, 1280x720)
- Frame rate: Configurable FPS (default: 30)
- FourCC codec: Configurable video codec (e.g.,
MJPG,YUYV) - Device index: Supports multiple cameras by device index or path
- ROS2 output: Publishes
sensor_msgs/Imageon configurable topic name
# Python - Camera configuration and usage
from joshua.robot.perception import PerceptionFactory, PerceptionConfig
config = PerceptionConfig(
perception_type=PerceptionType.CAMERA,
camera_config=CameraConfig(
device_index=0,
width=640,
height=480,
fps=30,
fourcc="MJPG",
),
topic_name="/camera/image_raw",
)
camera = PerceptionFactory.create(config)
camera.initialize(config)
# Capture a single frame
packet = camera.read()
# packet.image_data contains raw pixel data
# packet.width, packet.height, packet.encoding describe the format
Encoder
The Encoder driver reads position feedback from STS3215 servo encoders. It supports multiple data normalization modes to provide position data in the format most useful for downstream consumers (raw device units, normalized floats, or radians).
Encoder Data Modes
| Mode | Enum Value | Range | Description |
|---|---|---|---|
| Raw | RAW |
0–4095 | Raw 12-bit encoder ticks as read from hardware |
| Normalized [0, 1] | NORMALIZED_0_1 |
0.0–1.0 | Linear mapping of raw range to unit interval |
| Normalized [-1, 1] | NORMALIZED_NEG1_1 |
-1.0–1.0 | Centered normalization for symmetric joint ranges |
| Radians | NORMALIZED_RADIAN |
0–2π | Angular position in radians for kinematic calculations |
// C++ - Encoder configuration example
PerceptionConfig config;
config.set_perception_type(PerceptionType::ENCODER);
config.mutable_encoder_config()->set_serial_port("/dev/ttyUSB0");
config.mutable_encoder_config()->set_baudrate(1000000);
config.mutable_encoder_config()->set_data_mode(EncoderDataMode::NORMALIZED_NEG1_1);
// Add servo IDs to read from
for (int id : {1, 2, 3, 4, 5, 6}) {
config.mutable_encoder_config()->add_servo_ids(id);
}
auto encoder = PerceptionFactory::create(config);
encoder->initialize(config);
// Read normalized positions for all servos
PerceptionPacket packet = encoder->read();
// packet.position_data() contains [-1.0, 1.0] values per servo
LiDAR
The LiDAR driver supports the HLS-LFCD LDS01 (LDS-01) 2D laser distance sensor. It publishes both sensor_msgs/LaserScan for 2D navigation and sensor_msgs/PointCloud2 for 3D visualization. A built-in visualizer node can render the scan data for debugging.
- Sensor: HLS-LFCD LDS01 (360-degree 2D laser scanner)
- Range: 120mm to 3500mm
- Angular resolution: 1 degree (360 points per scan)
- Scan rate: ~5 Hz (configurable)
- ROS2 output:
sensor_msgs/LaserScanandsensor_msgs/PointCloud2 - Visualizer: Built-in scan visualizer node for debugging and calibration
// C++ - LiDAR driver initialization
PerceptionConfig config;
config.set_perception_type(PerceptionType::LIDAR);
config.mutable_lidar_config()->set_serial_port("/dev/ttyUSB1");
config.mutable_lidar_config()->set_baudrate(230400);
config.mutable_lidar_config()->set_frame_id("laser_frame");
auto lidar = PerceptionFactory::create(config);
lidar->initialize(config);
// Read a full 360-degree scan
PerceptionPacket packet = lidar->read();
// packet.point_cloud_data() for PointCloud2 format
// packet.sensor_data() for LaserScan ranges
Mock Sensors
Mock implementations are available for all sensor types. They generate synthetic data (test patterns for cameras, sinusoidal values for encoders, randomized range data for LiDAR) and are used extensively in CI pipelines and unit tests.
# Python - Using mock sensors in tests
import pytest
from joshua.robot.perception import PerceptionFactory, PerceptionConfig
def test_perception_pipeline():
"""Test the full perception pipeline with mock sensors."""
config = PerceptionConfig(
perception_type=PerceptionType.MOCK_CAMERA,
camera_config=CameraConfig(width=640, height=480, fps=30),
)
camera = PerceptionFactory.create(config)
camera.initialize(config)
packet = camera.read()
assert packet.image_data is not None
assert packet.width == 640
assert packet.height == 480
PerceptionPacket Proto
All sensor data flows through PerceptionPacket Protocol Buffers messages, providing a unified envelope for heterogeneous sensor data.
// Proto - PerceptionPacket definition
syntax = "proto3";
package joshua.robot;
message PerceptionPacket {
// Image data (camera output)
message ImageData {
bytes data = 1; // Raw pixel data
int32 width = 2;
int32 height = 3;
string encoding = 4; // e.g., "rgb8", "bgr8", "mono8"
int32 step = 5; // Row stride in bytes
}
ImageData image_data = 1;
// Position data (encoder output)
message PositionData {
repeated double values = 1; // Position values per joint
EncoderDataMode data_mode = 2; // Normalization mode used
repeated int32 device_ids = 3; // Corresponding device IDs
}
PositionData position_data = 2;
// Generic sensor data (LiDAR LaserScan)
message SensorData {
repeated float ranges = 1; // Distance measurements
repeated float intensities = 2; // Signal intensities
float angle_min = 3;
float angle_max = 4;
float angle_increment = 5;
float range_min = 6;
float range_max = 7;
}
SensorData sensor_data = 3;
// Point cloud data (LiDAR PointCloud2)
message PointCloudData {
bytes data = 1; // Serialized point cloud
int32 width = 2;
int32 height = 3;
int32 point_step = 4;
int32 row_step = 5;
bool is_dense = 6;
}
PointCloudData point_cloud_data = 4;
// Metadata
string frame_id = 10;
int64 timestamp_ns = 11;
}
Communication Layer
The Communication Layer abstracts transport protocols used by action and perception devices. It provides a uniform interface for sending and receiving bytes, regardless of whether the underlying transport is serial UART, Bluetooth LE, or another protocol.
// C++ - CommInterface (abstract base)
class CommInterface {
public:
virtual ~CommInterface() = default;
virtual bool open(const CommConfig& config) = 0;
virtual bool close() = 0;
virtual bool isOpen() const = 0;
virtual size_t write(const uint8_t* data, size_t length) = 0;
virtual size_t read(uint8_t* buffer, size_t max_length) = 0;
// Async read with callback (Boost.Asio-based)
virtual void asyncRead(
std::function<void(const uint8_t*, size_t)> callback) = 0;
};
Serial Communication
The Serial implementation uses Boost.Asio for asynchronous I/O, supporting configurable port paths and baud rates. It handles the half-duplex TTL bus protocol required by Feetech servos and full-duplex UART for LiDAR sensors.
| Parameter | Type | Default | Description |
|---|---|---|---|
port |
string | /dev/ttyUSB0 |
Serial device path |
baudrate |
uint32 | 1000000 |
Communication speed in baud |
timeout_ms |
uint32 | 100 |
Read/write timeout in milliseconds |
data_bits |
uint32 | 8 |
Number of data bits |
parity |
enum | NONE |
Parity mode (NONE, ODD, EVEN) |
stop_bits |
enum | ONE |
Number of stop bits (ONE, TWO) |
// C++ - Serial communication with Boost.Asio
#include "joshua/robot/comm/serial_comm.h"
CommConfig config;
config.set_comm_type(CommType::SERIAL);
config.mutable_serial_config()->set_port("/dev/ttyUSB0");
config.mutable_serial_config()->set_baudrate(1000000);
config.mutable_serial_config()->set_timeout_ms(100);
auto serial = CommFactory::create(config);
serial->open(config);
// Synchronous write
uint8_t cmd[] = {0xFF, 0xFF, 0x01, 0x04, 0x03, 0x2A, 0x00, 0xCC};
serial->write(cmd, sizeof(cmd));
// Asynchronous read with callback
serial->asyncRead([](const uint8_t* data, size_t len) {
// Process response from servo
std::cout << "Received " << len << " bytes" << std::endl;
});
BLE Communication
The BLE implementation provides the transport layer for Pybricks SPIKE Prime motors. It handles BLE device scanning, connection management, characteristic discovery, and notification-based data reception.
- Protocol: Pybricks BLE service UUID for SPIKE Prime hubs
- Connection: Automatic scan, connect, and characteristic discovery
- Write: Commands sent via BLE GATT write operations
- Read: Motor feedback received via BLE GATT notifications
- Reconnection: Automatic reconnection on BLE disconnect
# Python - BLE communication for Pybricks
from joshua.robot.comm import CommFactory, CommConfig
config = CommConfig(
comm_type=CommType.BLE,
ble_config=BLEConfig(
device_name="Pybricks Hub",
service_uuid="c5f50001-8280-46da-89f4-6d8051e4aeef",
char_uuid="c5f50002-8280-46da-89f4-6d8051e4aeef",
auto_reconnect=True,
),
)
ble = CommFactory.create(config)
ble.open(config)
# Send motor command over BLE
command_bytes = build_pybricks_command(port="A", speed=500)
ble.write(command_bytes)
# Register notification callback for motor feedback
ble.asyncRead(callback=on_motor_feedback)
CommFactory
// C++ - CommFactory
class CommFactory {
public:
static std::unique_ptr<CommInterface> create(const CommConfig& config) {
switch (config.comm_type()) {
case CommType::SERIAL:
return std::make_unique<SerialComm>(config);
case CommType::BLE:
return std::make_unique<BLEComm>(config);
default:
throw std::runtime_error("Unknown CommType");
}
}
};
ROS2 Node Integration
Each hardware device in the Robot Layer runs as a separate ROS2 node, managed by the Node Generator orchestrator. The node type determines which publishers and subscribers are created, and data flows through the system as serialized ActionPacket and PerceptionPacket Protocol Buffers messages wrapped in ROS2 message types.
Node Type Mapping
| Node Type | Direction | ROS2 Interface | Proto Type |
|---|---|---|---|
| Actuator Node | Subscriber | std_msgs/ByteMultiArray |
ActionPacket |
| Camera Node | Publisher | sensor_msgs/Image |
PerceptionPacket.ImageData |
| Encoder Node | Publisher | std_msgs/Float64MultiArray |
PerceptionPacket.PositionData |
| LiDAR Node | Publisher | sensor_msgs/LaserScan |
PerceptionPacket.SensorData |
| LiDAR Node | Publisher | sensor_msgs/PointCloud2 |
PerceptionPacket.PointCloudData |
Data Flow
Configuration (.pbtxt)
|
v
+------------------+
| Node Generator | Parses config, spawns per-device nodes
+------------------+
|
+-----+-----+-----+------ ...
| | |
v v v
+---------+ +---------+ +---------+
| Actuator| | Camera | | Encoder | Each device = 1 ROS2 node
| Node | | Node | | Node |
+---------+ +---------+ +---------+
| | |
| sub | pub | pub
v v v
[ActionPacket] [Image] [Float64MultiArray]
(serialized) (sensor) (sensor)
| | |
v v v
Hardware Hardware Hardware
(Servos) (Camera) (Encoders)
# Python - Example: Actuator ROS2 node runner
import rclpy
from rclpy.node import Node
from std_msgs.msg import ByteMultiArray
from joshua.robot.action import ActionFactory
class ActuatorNode(Node):
def __init__(self, config):
super().__init__(f"actuator_{config.name}")
self.actuator = ActionFactory.create(config)
self.actuator.initialize(config)
# Subscribe to command topic
self.subscription = self.create_subscription(
ByteMultiArray,
f"/{config.name}/command",
self.command_callback,
10, # QoS depth
)
def command_callback(self, msg):
# Deserialize ActionPacket from ROS2 message
packet = ActionPacket()
packet.ParseFromString(bytes(msg.data))
self.actuator.write(packet)
Adding New Hardware
The Robot Layer is designed for extensibility. Adding support for new hardware follows a consistent four-step process:
Step 1: Define Proto Configuration
Add the new device type to the appropriate enum and define its configuration message in the Protocol Buffers schema.
// Proto - Add new actuator type
enum ActionType {
STS3215_SERVO = 0;
MOCK_MOTOR = 1;
SPIKE_MOTOR = 2;
MY_NEW_MOTOR = 3; // <-- Add new enum value
}
message MyNewMotorConfig {
string port = 1;
int32 baudrate = 2;
repeated int32 motor_ids = 3;
// ... device-specific parameters
}
message ActionConfig {
ActionType action_type = 1;
// ...existing fields...
MyNewMotorConfig my_new_motor_config = 10; // <-- Add config field
}
Step 2: Implement the Interface
Create a new class implementing ActuatorInterface (for action devices) or PerceptionInterface (for sensors).
// C++ - Implement the interface
class MyNewMotor : public ActuatorInterface {
public:
explicit MyNewMotor(const ActionConfig& config);
bool initialize(const ActionConfig& config) override;
bool write(const ActionPacket& packet) override;
void shutdown() override;
bool isReady() const override;
bool setPosition(int id, int position) override;
bool setSpeed(int id, int speed) override;
bool setTorque(int id, bool enable) override;
int readPosition(int id) override;
private:
std::unique_ptr<CommInterface> comm_;
// ... device-specific state
};
Step 3: Register in Factory
Add a case for the new device type in the appropriate factory class.
// C++ - Register in ActionFactory
case ActionType::MY_NEW_MOTOR:
return std::make_unique<MyNewMotor>(config);
Step 4: Add to Node Runner
Ensure the Node Generator can spawn a node for the new device type. In most cases, this is automatic because the factory handles instantiation—but you may need to add topic mappings if your device uses non-standard message types.
# Example .pbtxt configuration using the new hardware
robot_config {
name: "my_robot"
action_nodes {
name: "new_motor_arm"
action_config {
action_type: MY_NEW_MOTOR
my_new_motor_config {
port: "/dev/ttyACM0"
baudrate: 115200
motor_ids: [1, 2, 3]
}
}
topic: "/new_motor_arm/command"
}
}