Robot Hardware Abstraction Layer

Overview

The JOSHUA Robot Layer provides a comprehensive hardware abstraction layer (HAL) that decouples high-level robot logic from the specifics of physical actuators, sensors, and communication buses. By programming against well-defined interfaces rather than concrete hardware drivers, the same control code can target a Feetech STS3215 servo arm on a bench, a LEGO SPIKE Prime over BLE, or a fully mocked test harness—without changing a single line of application logic.

Key Design Principles The Robot Layer uses the factory pattern for device instantiation, driven entirely by Protocol Buffers configuration. Both C++ and Python implementations are available for all core interfaces, enabling high-performance drivers alongside rapid-prototyping scripts. Mock implementations are provided for every device type to support testing without physical hardware.

The layer is organized into three primary subsystems:

Architecture

The Robot Layer follows a layered architecture where each subsystem is built on abstract interfaces, with concrete implementations registered in factory classes. Configuration drives which implementations are instantiated at runtime.

Robot Layer Architecture
+------------------------------------------------------------------+
|                     Configuration (Proto)                        |
|  RobotConfig { action_config, perception_config, comm_config }   |
+------------------------------------------------------------------+
          |                    |                     |
          v                    v                     v
+------------------+  +-------------------+  +------------------+
|  ActionFactory   |  | PerceptionFactory |  |   CommFactory    |
|                  |  |                   |  |                  |
| ActionType enum  |  | PerceptionType    |  | CommType enum    |
| -> Actuator impl |  | -> Sensor impl    |  | -> Transport     |
+------------------+  +-------------------+  +------------------+
          |                    |                     |
          v                    v                     v
+------------------+  +-------------------+  +------------------+
| ActionInterface  |  | PerceptionIface   |  | CommInterface    |
| ActuatorInterface|  | CameraInterface   |  | SerialComm       |
|                  |  | EncoderInterface  |  | BLEComm          |
| - STS3215 Servo  |  | LidarInterface    |  |                  |
| - Mock Motor     |  |                   |  |                  |
| - SPIKE Motor    |  | - Camera (OpenCV) |  |                  |
|                  |  | - Encoder         |  |                  |
|                  |  | - LiDAR (LDS01)   |  |                  |
|                  |  | - Mock variants   |  |                  |
+------------------+  +-------------------+  +------------------+
          |                    |                     |
          +--------------------+---------------------+
                               |
                               v
                    +---------------------+
                    |   ROS2 Node Runner   |
                    | (per-device nodes)   |
                    +---------------------+

Each factory reads its corresponding section from the Protocol Buffers configuration and returns the appropriate concrete implementation. This design ensures that new hardware support can be added by implementing an interface and registering it in the factory—no changes to the orchestration or node layer are required.

Action System (Actuators)

The Action System provides a unified interface for sending commands to physical actuators. It is built around two core abstract interfaces:

ActionInterface

The top-level abstract base class for all action devices. It defines the contract for writing commands to hardware.

// C++ - ActionInterface (abstract base)
class ActionInterface {
public:
  virtual ~ActionInterface() = default;

  // Initialize the action device with configuration
  virtual bool initialize(const ActionConfig& config) = 0;

  // Write a command packet to the hardware
  virtual bool write(const ActionPacket& packet) = 0;

  // Shut down the device gracefully
  virtual void shutdown() = 0;

  // Check if the device is ready to accept commands
  virtual bool isReady() const = 0;
};

ActuatorInterface

A more specific interface extending ActionInterface for motor-based actuators, adding methods for position, speed, and torque control.

// C++ - ActuatorInterface (extends ActionInterface)
class ActuatorInterface : public ActionInterface {
public:
  // Set target position for the actuator (in device units)
  virtual bool setPosition(int id, int position) = 0;

  // Set movement speed
  virtual bool setSpeed(int id, int speed) = 0;

  // Enable or disable torque
  virtual bool setTorque(int id, bool enable) = 0;

  // Read current position from the actuator
  virtual int readPosition(int id) = 0;
};

ActionFactory Pattern

The ActionFactory creates actuator instances based on the ActionType enum defined in the configuration proto. This is the single point of entry for all actuator instantiation.

// C++ - ActionFactory
class ActionFactory {
public:
  static std::unique_ptr<ActionInterface> create(const ActionConfig& config) {
    switch (config.action_type()) {
      case ActionType::STS3215_SERVO:
        return std::make_unique<STS3215Servo>(config);

      case ActionType::MOCK_MOTOR:
        return std::make_unique<MockMotor>(config);

      case ActionType::SPIKE_MOTOR:
        return std::make_unique<SpikeMotor>(config);

      default:
        throw std::runtime_error("Unknown ActionType: " +
          std::to_string(config.action_type()));
    }
  }
};
# Python - ActionFactory
class ActionFactory:
    """Factory for creating action device instances from configuration."""

    _registry: Dict[ActionType, Type[ActionInterface]] = {
        ActionType.STS3215_SERVO: STS3215Servo,
        ActionType.MOCK_MOTOR: MockMotor,
        ActionType.SPIKE_MOTOR: SpikeMotor,
    }

    @classmethod
    def create(cls, config: ActionConfig) -> ActionInterface:
        action_class = cls._registry.get(config.action_type)
        if action_class is None:
            raise ValueError(f"Unknown ActionType: {config.action_type}")
        return action_class(config)
Configuration-Driven The factory reads the action_type field from the proto config, so switching from real hardware to a mock driver is a single-line config change—no recompilation or code changes needed.

Supported Actuators

Actuator ActionType Enum Language Transport Use Case
STS3215 Servo STS3215_SERVO C++ + Python Serial (TTL bus) SO-ARM100 robotic arms, precision joint control
Mock Motor MOCK_MOTOR Python None (in-memory) Unit testing, CI pipelines, development without hardware
SPIKE Motor SPIKE_MOTOR Python BLE (Pybricks) LEGO SPIKE Prime educational robotics

STS3215 Servo

The Feetech STS3215 is a TTL bus servo used in the SO-ARM100 robotic arm. It supports daisy-chaining multiple servos on a single serial bus, with each servo addressed by a unique ID. The driver implements the Feetech serial protocol for position, speed, and torque control.

# Python - STS3215 Servo usage
from joshua.robot.action import ActionFactory, ActionConfig

config = ActionConfig(
    action_type=ActionType.STS3215_SERVO,
    serial_port="/dev/ttyUSB0",
    baudrate=1000000,
    servo_ids=[1, 2, 3, 4, 5, 6],
)

servo = ActionFactory.create(config)
servo.initialize(config)

# Set position for servo ID 1 to midpoint
servo.setPosition(id=1, position=2048)

# Set movement speed for servo ID 2
servo.setSpeed(id=2, speed=500)

# Enable torque on all servos
for sid in config.servo_ids:
    servo.setTorque(id=sid, enable=True)

Mock Motor

The Mock Motor is a Python-only testing implementation that simulates actuator behavior without requiring physical hardware. All commands are logged to an internal buffer for assertion in unit tests, and position reads return the last commanded value.

# Python - Mock Motor usage
from joshua.robot.action import ActionFactory, ActionConfig

config = ActionConfig(
    action_type=ActionType.MOCK_MOTOR,
    servo_ids=[1, 2, 3],
)

mock = ActionFactory.create(config)
mock.initialize(config)

# Commands are logged, not sent to hardware
mock.setPosition(id=1, position=1024)
mock.setTorque(id=1, enable=True)

# Retrieve command log for test assertions
assert mock.get_command_log()[-1] == {"id": 1, "torque": True}
print(f"Commands logged: {len(mock.get_command_log())}")

SPIKE Motor

The SPIKE Motor driver supports LEGO SPIKE Prime hubs running Pybricks firmware, communicating via Bluetooth Low Energy (BLE). The PybricksMotorDriver wraps the BLE transport layer to send motor commands to the hub.

# Python - SPIKE Motor with Pybricks BLE
from joshua.robot.action import ActionFactory, ActionConfig

config = ActionConfig(
    action_type=ActionType.SPIKE_MOTOR,
    ble_device_name="Pybricks Hub",
    motor_ports=["A", "B", "C"],
)

spike = ActionFactory.create(config)
spike.initialize(config)  # Initiates BLE connection and handshake

# Run motor on port A to position 90 degrees
spike.setPosition(id=0, position=90)

# Set continuous speed on port B
spike.setSpeed(id=1, speed=500)

ActionPacket Proto

All commands flowing through the Action System are serialized as ActionPacket Protocol Buffers messages. This provides a structured, versioned, and language-agnostic wire format for motor commands.

// Proto - ActionPacket definition
syntax = "proto3";
package joshua.robot;

message ActionPacket {
  // Target servo/motor identifier
  int32 device_id = 1;

  // Primary command fields
  int32 position = 2;       // Target position (device units)
  int32 torque = 3;         // Torque value or enable flag
  int32 speed = 4;          // Movement speed

  // Preset commands for common operations
  enum PresetCommand {
    NONE = 0;
    HOME = 1;               // Move to home/zero position
    PARK = 2;               // Move to park/safe position
    EMERGENCY_STOP = 3;     // Immediate stop, disable torque
  }
  PresetCommand preset = 5;

  // Complex multi-parameter actions
  message CompoundAction {
    repeated int32 target_positions = 1;  // Synchronized multi-joint
    repeated int32 target_speeds = 2;     // Per-joint speeds
    int32 interpolation_ms = 3;           // Interpolation duration
  }
  CompoundAction compound = 6;

  // Timestamp for synchronization
  int64 timestamp_ns = 7;
}
Multi-Joint Synchronization The CompoundAction field enables synchronized multi-joint movements with per-joint speed profiles and configurable interpolation time, which is essential for smooth robotic arm trajectories.

Perception System (Sensors)

The Perception System provides a unified interface for reading data from sensors. Like the Action System, it is built on abstract interfaces with factory-pattern instantiation.

PerceptionInterface

The top-level abstract base class for all perception devices.

// C++ - PerceptionInterface (abstract base)
class PerceptionInterface {
public:
  virtual ~PerceptionInterface() = default;

  // Initialize the sensor with configuration
  virtual bool initialize(const PerceptionConfig& config) = 0;

  // Read a data packet from the sensor
  virtual PerceptionPacket read() = 0;

  // Shut down the sensor gracefully
  virtual void shutdown() = 0;

  // Check if the sensor has new data available
  virtual bool hasData() const = 0;
};

Specialized Sensor Interfaces

More specific interfaces extend PerceptionInterface for different sensor modalities:

// C++ - Specialized sensor interfaces

class CameraInterface : public PerceptionInterface {
public:
  virtual cv::Mat captureFrame() = 0;
  virtual bool setResolution(int width, int height) = 0;
  virtual bool setFPS(int fps) = 0;
};

class EncoderInterface : public PerceptionInterface {
public:
  virtual double readAngle(int id) = 0;
  virtual double readVelocity(int id) = 0;
  virtual void setDataMode(EncoderDataMode mode) = 0;
};

class LidarInterface : public PerceptionInterface {
public:
  virtual std::vector<float> readRanges() = 0;
  virtual float getAngleMin() const = 0;
  virtual float getAngleMax() const = 0;
  virtual float getAngleIncrement() const = 0;
};

PerceptionFactory Pattern

// C++ - PerceptionFactory
class PerceptionFactory {
public:
  static std::unique_ptr<PerceptionInterface> create(
      const PerceptionConfig& config) {
    switch (config.perception_type()) {
      case PerceptionType::CAMERA:
        return std::make_unique<CameraDriver>(config);

      case PerceptionType::ENCODER:
        return std::make_unique<EncoderDriver>(config);

      case PerceptionType::LIDAR:
        return std::make_unique<LidarDriver>(config);

      case PerceptionType::MOCK_CAMERA:
        return std::make_unique<MockCamera>(config);

      case PerceptionType::MOCK_ENCODER:
        return std::make_unique<MockEncoder>(config);

      case PerceptionType::MOCK_LIDAR:
        return std::make_unique<MockLidar>(config);

      default:
        throw std::runtime_error("Unknown PerceptionType");
    }
  }
};

Supported Sensors

Sensor PerceptionType Enum Language ROS2 Message Type Use Case
Camera (OpenCV) CAMERA C++ + Python sensor_msgs/Image Visual perception, AI inference input
Encoder (STS3215) ENCODER C++ + Python std_msgs/Float64MultiArray Joint position feedback, teleoperation
LiDAR (LDS01) LIDAR C++ sensor_msgs/LaserScan, PointCloud2 Obstacle detection, mapping
Mock Camera MOCK_CAMERA Python sensor_msgs/Image Testing without camera hardware
Mock Encoder MOCK_ENCODER Python std_msgs/Float64MultiArray Testing without encoder hardware
Mock LiDAR MOCK_LIDAR Python sensor_msgs/LaserScan Testing without LiDAR hardware

Camera

The Camera driver uses OpenCV (cv::VideoCapture in C++, cv2.VideoCapture in Python) to capture frames from USB or CSI cameras. It supports configurable resolution, frame rate, and FourCC codec settings, and publishes frames as sensor_msgs/Image ROS2 messages.

# Python - Camera configuration and usage
from joshua.robot.perception import PerceptionFactory, PerceptionConfig

config = PerceptionConfig(
    perception_type=PerceptionType.CAMERA,
    camera_config=CameraConfig(
        device_index=0,
        width=640,
        height=480,
        fps=30,
        fourcc="MJPG",
    ),
    topic_name="/camera/image_raw",
)

camera = PerceptionFactory.create(config)
camera.initialize(config)

# Capture a single frame
packet = camera.read()
# packet.image_data contains raw pixel data
# packet.width, packet.height, packet.encoding describe the format

Encoder

The Encoder driver reads position feedback from STS3215 servo encoders. It supports multiple data normalization modes to provide position data in the format most useful for downstream consumers (raw device units, normalized floats, or radians).

Encoder Data Modes

Mode Enum Value Range Description
Raw RAW 0–4095 Raw 12-bit encoder ticks as read from hardware
Normalized [0, 1] NORMALIZED_0_1 0.0–1.0 Linear mapping of raw range to unit interval
Normalized [-1, 1] NORMALIZED_NEG1_1 -1.0–1.0 Centered normalization for symmetric joint ranges
Radians NORMALIZED_RADIAN 0–2π Angular position in radians for kinematic calculations
// C++ - Encoder configuration example
PerceptionConfig config;
config.set_perception_type(PerceptionType::ENCODER);
config.mutable_encoder_config()->set_serial_port("/dev/ttyUSB0");
config.mutable_encoder_config()->set_baudrate(1000000);
config.mutable_encoder_config()->set_data_mode(EncoderDataMode::NORMALIZED_NEG1_1);

// Add servo IDs to read from
for (int id : {1, 2, 3, 4, 5, 6}) {
  config.mutable_encoder_config()->add_servo_ids(id);
}

auto encoder = PerceptionFactory::create(config);
encoder->initialize(config);

// Read normalized positions for all servos
PerceptionPacket packet = encoder->read();
// packet.position_data() contains [-1.0, 1.0] values per servo

LiDAR

The LiDAR driver supports the HLS-LFCD LDS01 (LDS-01) 2D laser distance sensor. It publishes both sensor_msgs/LaserScan for 2D navigation and sensor_msgs/PointCloud2 for 3D visualization. A built-in visualizer node can render the scan data for debugging.

// C++ - LiDAR driver initialization
PerceptionConfig config;
config.set_perception_type(PerceptionType::LIDAR);
config.mutable_lidar_config()->set_serial_port("/dev/ttyUSB1");
config.mutable_lidar_config()->set_baudrate(230400);
config.mutable_lidar_config()->set_frame_id("laser_frame");

auto lidar = PerceptionFactory::create(config);
lidar->initialize(config);

// Read a full 360-degree scan
PerceptionPacket packet = lidar->read();
// packet.point_cloud_data() for PointCloud2 format
// packet.sensor_data() for LaserScan ranges

Mock Sensors

Mock implementations are available for all sensor types. They generate synthetic data (test patterns for cameras, sinusoidal values for encoders, randomized range data for LiDAR) and are used extensively in CI pipelines and unit tests.

# Python - Using mock sensors in tests
import pytest
from joshua.robot.perception import PerceptionFactory, PerceptionConfig

def test_perception_pipeline():
    """Test the full perception pipeline with mock sensors."""
    config = PerceptionConfig(
        perception_type=PerceptionType.MOCK_CAMERA,
        camera_config=CameraConfig(width=640, height=480, fps=30),
    )

    camera = PerceptionFactory.create(config)
    camera.initialize(config)

    packet = camera.read()
    assert packet.image_data is not None
    assert packet.width == 640
    assert packet.height == 480

PerceptionPacket Proto

All sensor data flows through PerceptionPacket Protocol Buffers messages, providing a unified envelope for heterogeneous sensor data.

// Proto - PerceptionPacket definition
syntax = "proto3";
package joshua.robot;

message PerceptionPacket {
  // Image data (camera output)
  message ImageData {
    bytes data = 1;            // Raw pixel data
    int32 width = 2;
    int32 height = 3;
    string encoding = 4;       // e.g., "rgb8", "bgr8", "mono8"
    int32 step = 5;            // Row stride in bytes
  }
  ImageData image_data = 1;

  // Position data (encoder output)
  message PositionData {
    repeated double values = 1;          // Position values per joint
    EncoderDataMode data_mode = 2;       // Normalization mode used
    repeated int32 device_ids = 3;       // Corresponding device IDs
  }
  PositionData position_data = 2;

  // Generic sensor data (LiDAR LaserScan)
  message SensorData {
    repeated float ranges = 1;           // Distance measurements
    repeated float intensities = 2;      // Signal intensities
    float angle_min = 3;
    float angle_max = 4;
    float angle_increment = 5;
    float range_min = 6;
    float range_max = 7;
  }
  SensorData sensor_data = 3;

  // Point cloud data (LiDAR PointCloud2)
  message PointCloudData {
    bytes data = 1;                      // Serialized point cloud
    int32 width = 2;
    int32 height = 3;
    int32 point_step = 4;
    int32 row_step = 5;
    bool is_dense = 6;
  }
  PointCloudData point_cloud_data = 4;

  // Metadata
  string frame_id = 10;
  int64 timestamp_ns = 11;
}

Communication Layer

The Communication Layer abstracts transport protocols used by action and perception devices. It provides a uniform interface for sending and receiving bytes, regardless of whether the underlying transport is serial UART, Bluetooth LE, or another protocol.

// C++ - CommInterface (abstract base)
class CommInterface {
public:
  virtual ~CommInterface() = default;

  virtual bool open(const CommConfig& config) = 0;
  virtual bool close() = 0;
  virtual bool isOpen() const = 0;

  virtual size_t write(const uint8_t* data, size_t length) = 0;
  virtual size_t read(uint8_t* buffer, size_t max_length) = 0;

  // Async read with callback (Boost.Asio-based)
  virtual void asyncRead(
      std::function<void(const uint8_t*, size_t)> callback) = 0;
};

Serial Communication

The Serial implementation uses Boost.Asio for asynchronous I/O, supporting configurable port paths and baud rates. It handles the half-duplex TTL bus protocol required by Feetech servos and full-duplex UART for LiDAR sensors.

Parameter Type Default Description
port string /dev/ttyUSB0 Serial device path
baudrate uint32 1000000 Communication speed in baud
timeout_ms uint32 100 Read/write timeout in milliseconds
data_bits uint32 8 Number of data bits
parity enum NONE Parity mode (NONE, ODD, EVEN)
stop_bits enum ONE Number of stop bits (ONE, TWO)
// C++ - Serial communication with Boost.Asio
#include "joshua/robot/comm/serial_comm.h"

CommConfig config;
config.set_comm_type(CommType::SERIAL);
config.mutable_serial_config()->set_port("/dev/ttyUSB0");
config.mutable_serial_config()->set_baudrate(1000000);
config.mutable_serial_config()->set_timeout_ms(100);

auto serial = CommFactory::create(config);
serial->open(config);

// Synchronous write
uint8_t cmd[] = {0xFF, 0xFF, 0x01, 0x04, 0x03, 0x2A, 0x00, 0xCC};
serial->write(cmd, sizeof(cmd));

// Asynchronous read with callback
serial->asyncRead([](const uint8_t* data, size_t len) {
    // Process response from servo
    std::cout << "Received " << len << " bytes" << std::endl;
});

BLE Communication

The BLE implementation provides the transport layer for Pybricks SPIKE Prime motors. It handles BLE device scanning, connection management, characteristic discovery, and notification-based data reception.

# Python - BLE communication for Pybricks
from joshua.robot.comm import CommFactory, CommConfig

config = CommConfig(
    comm_type=CommType.BLE,
    ble_config=BLEConfig(
        device_name="Pybricks Hub",
        service_uuid="c5f50001-8280-46da-89f4-6d8051e4aeef",
        char_uuid="c5f50002-8280-46da-89f4-6d8051e4aeef",
        auto_reconnect=True,
    ),
)

ble = CommFactory.create(config)
ble.open(config)

# Send motor command over BLE
command_bytes = build_pybricks_command(port="A", speed=500)
ble.write(command_bytes)

# Register notification callback for motor feedback
ble.asyncRead(callback=on_motor_feedback)

CommFactory

// C++ - CommFactory
class CommFactory {
public:
  static std::unique_ptr<CommInterface> create(const CommConfig& config) {
    switch (config.comm_type()) {
      case CommType::SERIAL:
        return std::make_unique<SerialComm>(config);

      case CommType::BLE:
        return std::make_unique<BLEComm>(config);

      default:
        throw std::runtime_error("Unknown CommType");
    }
  }
};

ROS2 Node Integration

Each hardware device in the Robot Layer runs as a separate ROS2 node, managed by the Node Generator orchestrator. The node type determines which publishers and subscribers are created, and data flows through the system as serialized ActionPacket and PerceptionPacket Protocol Buffers messages wrapped in ROS2 message types.

Node Type Mapping

Node Type Direction ROS2 Interface Proto Type
Actuator Node Subscriber std_msgs/ByteMultiArray ActionPacket
Camera Node Publisher sensor_msgs/Image PerceptionPacket.ImageData
Encoder Node Publisher std_msgs/Float64MultiArray PerceptionPacket.PositionData
LiDAR Node Publisher sensor_msgs/LaserScan PerceptionPacket.SensorData
LiDAR Node Publisher sensor_msgs/PointCloud2 PerceptionPacket.PointCloudData

Data Flow

ROS2 Data Flow
  Configuration (.pbtxt)
          |
          v
  +------------------+
  |  Node Generator   |   Parses config, spawns per-device nodes
  +------------------+
          |
    +-----+-----+-----+------ ...
    |           |           |
    v           v           v
+---------+ +---------+ +---------+
| Actuator| | Camera  | | Encoder |   Each device = 1 ROS2 node
|  Node   | |  Node   | |  Node   |
+---------+ +---------+ +---------+
    |           |           |
    | sub       | pub       | pub
    v           v           v
  [ActionPacket]  [Image]  [Float64MultiArray]
  (serialized)   (sensor)   (sensor)
    |           |           |
    v           v           v
  Hardware    Hardware    Hardware
  (Servos)    (Camera)    (Encoders)
# Python - Example: Actuator ROS2 node runner
import rclpy
from rclpy.node import Node
from std_msgs.msg import ByteMultiArray
from joshua.robot.action import ActionFactory

class ActuatorNode(Node):
    def __init__(self, config):
        super().__init__(f"actuator_{config.name}")
        self.actuator = ActionFactory.create(config)
        self.actuator.initialize(config)

        # Subscribe to command topic
        self.subscription = self.create_subscription(
            ByteMultiArray,
            f"/{config.name}/command",
            self.command_callback,
            10,  # QoS depth
        )

    def command_callback(self, msg):
        # Deserialize ActionPacket from ROS2 message
        packet = ActionPacket()
        packet.ParseFromString(bytes(msg.data))
        self.actuator.write(packet)

Adding New Hardware

The Robot Layer is designed for extensibility. Adding support for new hardware follows a consistent four-step process:

Step 1: Define Proto Configuration

Add the new device type to the appropriate enum and define its configuration message in the Protocol Buffers schema.

// Proto - Add new actuator type
enum ActionType {
  STS3215_SERVO = 0;
  MOCK_MOTOR = 1;
  SPIKE_MOTOR = 2;
  MY_NEW_MOTOR = 3;    // <-- Add new enum value
}

message MyNewMotorConfig {
  string port = 1;
  int32 baudrate = 2;
  repeated int32 motor_ids = 3;
  // ... device-specific parameters
}

message ActionConfig {
  ActionType action_type = 1;
  // ...existing fields...
  MyNewMotorConfig my_new_motor_config = 10;  // <-- Add config field
}

Step 2: Implement the Interface

Create a new class implementing ActuatorInterface (for action devices) or PerceptionInterface (for sensors).

// C++ - Implement the interface
class MyNewMotor : public ActuatorInterface {
public:
  explicit MyNewMotor(const ActionConfig& config);

  bool initialize(const ActionConfig& config) override;
  bool write(const ActionPacket& packet) override;
  void shutdown() override;
  bool isReady() const override;

  bool setPosition(int id, int position) override;
  bool setSpeed(int id, int speed) override;
  bool setTorque(int id, bool enable) override;
  int readPosition(int id) override;

private:
  std::unique_ptr<CommInterface> comm_;
  // ... device-specific state
};

Step 3: Register in Factory

Add a case for the new device type in the appropriate factory class.

// C++ - Register in ActionFactory
case ActionType::MY_NEW_MOTOR:
  return std::make_unique<MyNewMotor>(config);

Step 4: Add to Node Runner

Ensure the Node Generator can spawn a node for the new device type. In most cases, this is automatic because the factory handles instantiation—but you may need to add topic mappings if your device uses non-standard message types.

# Example .pbtxt configuration using the new hardware
robot_config {
  name: "my_robot"
  action_nodes {
    name: "new_motor_arm"
    action_config {
      action_type: MY_NEW_MOTOR
      my_new_motor_config {
        port: "/dev/ttyACM0"
        baudrate: 115200
        motor_ids: [1, 2, 3]
      }
    }
    topic: "/new_motor_arm/command"
  }
}
Testing Your New Driver Always create a corresponding mock implementation alongside the real driver. This enables the full test suite and CI pipeline to validate the integration without requiring physical hardware.