Skip to content

Writing Integration Tests with Python Embedded Device Manager (pyedmgr) Pigweed RPC

Introduction

This tutorial will introduce the Python Embedded Device Manager (pyedmgr) and Pigweed RPC libraries and show how they can be used to create integration tests.

  • pyedmgr is a Python library for automating discovery, flashing and communication with clusters of boards and FVPs. In test scenarios, pyedmgr manages the lifetime of the devices used in the test, making the tests easier to write and understand. The documentation for pyedmgr can be viewed at https://iot.sites.arm.com/open-iot-sdk/tools/pyedmgr/
  • Pigweed RPC provides Python and C++ libraries for RPC communication between devices using protocol buffers. A single binary can provide RPC APIs to access the features to be tested. The same binary can then be reused across a suite of tests, and because the RPC response objects contain typed fields, the test script and system-under-test can exchanged typed objects without any manual string parsing. Pigweed's documentation is available at https://pigweed.dev/

We will be using the rpc-socket example as a base for the tutorial. This example spawns two FVPs and uses RPC to command the first device to bind to a socket as a server, and the second device to connect as a client. The two FVPs then exchange messages and close their sockets. We will use code from the example to demonstrate how Pigweed RPC and pyedmgr work together.

Setup

You can follow the rpc-socket example's readme to learn how to set up pyedmgr and Pigweed RPC.

Pigweed RPC

Pigweed RPC provides remote procedure calls from our Python script to our native binary on the FVP. It is capable of performing RPC over any stream-like interface but in this example we will be using the FVP's simulated UART.

Define an RPC Service

The first requirement is to create a .proto file which defines our RPC API. The RPC API for the rpc-socket example is a wrapper around the IoT Socket API. For example, to create a socket we use the iotSocketCreate function:

int32_t iotSocketCreate(int32_t af, int32_t type, int32_t protocol);

We can create a .proto file to expose this as follows:

iot-socket.proto

syntax = "proto3";

package iotsdk.socket.pw_rpc;

message CreateRequest
{
    int32 af = 1;
    int32 type = 2;
    int32 protocol = 3;
}

message CreateResponse
{
    int32 status = 1;
}

service IotSocketService
{
    rpc Create(CreateRequest) returns (CreateResponse)
    {
    }
}

This defines an IotSocketService API with one available API call - Create - which takes a CreateRequest and returns a CreateResponse indicating whether the request was successful.

In our CMake we link to the necessary Pigweed modules:

CMakeLists.txt

# Create protobuf library target. We specify the proto file as a SOURCE, which will be compiled when we link to the
# target. We can also create an 'options' file and pass it as 'INPUTS'. The options file is used for configuration such
# as maximum sizes of buffers and strings.
pw_proto_library(iotsdk-examples-rpc-socket-proto
    SOURCES
        src/iot-socket.proto
)

# ...

# We will use the IoT SDK's pw_sys_io implementation for I/O. We can override the buffer sizes:
target_compile_definitions(pw_iot_sdk_config
    INTERFACE
        PW_IOTSDK_SYS_IO_TX_BUFFER_SIZE=2048
        PW_IOTSDK_SYS_IO_RX_BUFFER_SIZE=2048
)

# ...

target_link_libraries(iotsdk-examples-rpc-socket
    # ...

    # We will use pw_log with the UART backend.
    pw_log
    pw_log_mcu_driver_hal
    pw_log_mdh_cmsis_rtos_lock

    # Threads.
    pw_sync.interrupt_spin_lock
    pw_sync_cmsis_rtos.interrupt_spin_lock

    # IO
    pw_sys_io
    pw_sys_io_mcu_driver_hal

    # RPC
    pw_bytes
    pw_protobuf
    pw_rpc.server
    pw_hdlc

    # Utilities
    pw_span
    pw_string

    # Link to the library. The .pwpb_rpc suffix is required to select the protocol buffer implementation. We could also
    # choose .nanopb_rpc.
    iotsdk-tests-iot-socket-proto.pwpb_rpc

    # ...
)

Of particular interest are the pw_proto_library and iotsdk-tests-iot-socket-proto.pwpb_rpc lines. The pw_proto_library CMake macro creates a target which will cause CMake to compile the .proto file. We include this as a link library. The suffix .pwpb_rpc part of the target name selects the Protocol Buffers implementation, in this case Pigweed Protobuf (another option is nanopb which you can select with .nanopb_rpc).

This generates headers containing the required classes to implement the service.

Implement the Server

We then implement the server. At present this can only be done in C++.

server.cc

#include <algorithm>
#include <array>
#include <inttypes.h>
#include <string.h>

#include "iot_socket.h"
#include "pw_rpc/server.h"

#include "pw_hdlc/rpc_channel.h"
#include "pw_hdlc/rpc_packets.h"
#include "pw_log/log.h"
#include "pw_rpc/channel.h"
#include "pw_stream/sys_io_stream.h"
#include "pw_string/util.h"

// Generated headers
#include "iot-socket.pwpb.h"
#include "iot-socket.rpc.pwpb.h"

namespace {
// Alias required types into the current namespace. There is one Message type per request and response,
// and a base class for the Service.
using CreateRequest = iotsdk::socket::pw_rpc::CreateRequest::Message;
using CreateResponse = iotsdk::socket::pw_rpc::CreateResponse::Message;
template <class T> using ServiceBase = iotsdk::socket::pw_rpc::pw_rpc::pwpb::IotSocketService::Service<T>;

// Incoming packets decode buffer
constexpr size_t kBufferSize = 2048;

// Create RPC server with pw_sys_io as I/O, and one channel
pw::stream::SysIoWriter writer;
pw::hdlc::RpcChannelOutput hdlc_channel_output(writer, pw::hdlc::kDefaultRpcAddress, "HDLC output");
std::array<pw::rpc::Channel, 1> channels{pw::rpc::Channel::Create<1>(&hdlc_channel_output)};
pw::rpc::Server server(channels);

// Implement the RPC calls in the Service.
struct Service : ServiceBase<Service> {
    pw::Status Create(const CreateRequest &request, CreateResponse &response)
    {
        response.status = iotSocketCreate(request.af, request.type, request.protocol);

        if (response.status < 0) {
            PW_LOG_WARN("iotSocketCreate: %" PRId32, response.status);
        }

        PW_LOG_DEBUG("Socket %" PRId32 " created", response.status);
        return pw::OkStatus();
    }
};
} // namespace

// Start the RPC server. This is called from a thread created by main.
void start_rpc_app()
{
    static std::array<std::byte, kBufferSize> buffer;

    // Register the service.
    server.RegisterService(service);

    PW_LOG_INFO("Starting s erver");

    // Start the server. This function will not return.
    pw::hdlc::ReadAndProcessPackets(server, hdlc_channel_output, buffer);
}

On the Python side, Pigweed RPC again compiles the .proto file, but uses it to dynamically create a Python client object:

example.py

from pw_hdlc.rpc import HdlcRpcClient, default_channels

PROTO = "iot-socket.proto"

# Dummy function which reads n bytes from the running FVP
def recv_from_fvp(n:int) -> bytes:
    ...

# Dummy function which writes n bytes to the FVP
def send_to_fvp(bs:bytes):
    ...

# Create HdlcRpcClient. We pass the functions to read and write data and the
# protobuf file's path
client = HdlcRpcClient(
    partial(recv_from_fvp, 1),
    [PROTO],
    default_channels(send_to_fvp),
)

# Wait for protoc
time.sleep(1)

# We can now access the RPC using the generated object
service = client.rpcs().iotsdk.socket.pw_rpc.IotSocketService

AF_INET = 1
SOCK_STREAM = 1
PROTO_TCP = 1

status, response = service.Create(af=AF_INET, type=SOCK_STREAM, protocol=PROTO_TCP)
if not status.ok():
    raise RuntimeError(f'RPC error: {status}')
if response.status < 0:
    raise RuntimeError(f'IoT socket error: {response.status}')

print(f'Opened socket: {response.status}')

For further Pigweed RPC information see https://pigweed.dev/pw_rpc/

pyedmgr

pyedmgr launches our FVPs, flashes our firmware binary and provides the means to communicate with the FVP.

To use pyedmgr for integration testing we can create a pytest test case:

from pyedmgr import (
    AbstractChannel,
    SynchronousSocketChannel,
    TestCaseContext,
    TestDevice,
    fixture_test_case,
)

@pytest.mark.asyncio
@pytest.mark.parametrize(
    "fixture_test_case",
    [
        # Test case descriptor
        {
            "FVP_Corstone_SSE-300_Ethos-U55": {
                1: ["__build/iotsdk-examples-rpc-socket-server"],
                2: ["__build/iotsdk-examples-rpc-socket-server"],
            }
        }
    ],
    # Required to call fixture_test_case as a function rather than binding the
    # above dictionary to the variable named fixture_test_case.
    indirect=["fixture_test_case"]
)
async def test__pyedmgr_pw_rpc(fixture_test_case):
    # When the "async with ..." line completes and we enter the scope, the
    # devices will be running the firmware and accessible through the context
    # object.
    async with fixture_test_case as context:
        pass
    # When we leave the scope, whether due to normal control flow or an
    # exception, the resources are released.

It's recommended to read the pyedmgr documentation for details, but in short, this instructs pyedmgr to spawn two FVPs and flash the RPC server binary to each one. This is performed on the async with line to the effect that on the following line we have a TestCaseContext object which gives access to the running FVPs.

Putting them together

The following snippet shows how to use pyedmgr and Pigweed RPC together to create the integration test. In this code we will flash the RPC server to two FVPs and create a socket on each one. You can then continue to the full rpc-socket example which goes on to connect the FVPs to each other.

import logging
from functools import partial
import os
import pytest
import socket
import time
from threading import Thread
from typing import Optional, Tuple
from pw_hdlc.rpc import HdlcRpcClient, default_channels

from pyedmgr import (
    AbstractChannel,
    SynchronousSocketChannel,
    TestCaseContext,
    TestDevice,
    fixture_test_case,
)

# .proto file passed as PROTO environment variable
PROTO = os.environ["PROTO"]

# iotSocketCreate options
AF_INET     = 1
SOCK_STREAM = 1
PROTO_TCP   = 1

# The rpc-socket example uses UART0 for logging and UART1 for RPC
RPC_TERMINAL = 1

@pytest.mark.asyncio
@pytest.mark.parametrize(
    "fixture_test_case",
    [
        {
            "FVP_Corstone_SSE-300_Ethos-U55": {
                1: ["__build/iotsdk-examples-rpc-socket-server"],
                2: ["__build/iotsdk-examples-rpc-socket-server"],
            }
        }
    ],
    indirect=["fixture_test_case"]
)
def test__pyedmgr_pw_rpc(fixture_test_case):
    async with fixture_test_case as context:
        # Create a SynchronousSocketChannel (simulated UART) connection to the device which will act as server
        server_device = context.allocated_devices[0]
        server_device.channel.close()
        server_channel = server_device.controller.get_channel(terminal=RPC_TERMINAL, sync=True)
        server_channel.open()

        # And the same for the client
        client_device = context.allocated_devices[1]
        client_device.channel.close()
        client_channel = client_device.controller.get_channel(terminal=RPC_TERMINAL, sync=True)
        client_channel.open()

        # Tell Pigweed how to communicate with the channels
        server_client = HdlcRpcClient(
            partial(server_channel.read, RPC_READ_SIZE),
            [PROTO],
            default_channels(server_channel.write),
        )
        client_client = HdlcRpcClient(
            partial(client_channel.read, RPC_READ_SIZE),
            [PROTO],
            default_channels(client_channel.write),
        )

        time.sleep(1)

        # Generate the Python clients
        server = server_client.rpcs().iotsdk.socket.pw_rpc.IotSocketService
        client = client_client.rpcs().iotsdk.socket.pw_rpc.IotSocketService

        # Create socket on server
        status, response = server.Create(af=AF_INET, type=SOCK_STREAM, protocol=PROTO_TCP)
        assert status.ok(), "RPC error"
        assert response.status >= 0, "IoT socket error"

        # Create socket on client
        status, response = client.Create(af=AF_INET, type=SOCK_STREAM, protocol=PROTO_TCP)
        assert status.ok(), "RPC error"
        assert response.status >= 0, "IoT socket error"