02/25/2026
How-to | Controls

Modbus/RTU Master in a custom snap

DISCLAIMER! Use at your own risk.

There are several ways how you could use Modbus/RTU with ctrlX CORE. You can do it through I/O or RS485 to USB adapter. If you choose to use an adapter, the simplest option is Node-RED, which is already described in How-to. The guide below will show you an alternative way of building your own snap for Modbus/RTU communication through RS485 to USB adapter.

Prerequisite:

  1. ctrlX CORE X3/X5/X7 (2.X.X / 3.X.X)

  2. USB to RS485 converter (UT-890A used in this guide)

  3. USB to USB-C adapter for connecting

  4. Modbus/RTU Slave for testing, I use ctrlX I/O RS485/RS422 module (XI522001)

  5. ctrlX AUTOMATION SDK

  6. Build Environment (Local Ubuntu 22.04/Virtual Machine/App Build Environment)

  7. Basic understanding of snapcraft

1. Overview

Snap structure is quite simple, it consists of

  • 'main.py' - our main code, to run Modbus/RTU server and publish variables to the Datalayer

  • 'setup.py' - script where we can add packages, scripts and folders to our snap

  • 'snap/snapcraft.yaml' - file that describes how to pack our project inside a snap, here we describe how our snap communicates with the rest of the system, define stage and python packages and source files

1. Snapcraft.yaml

This project can be cross built on arm64/amd46 devices, so you don't need to get native arm64 device in case you are building for ctrlX CORE X3. File below includes required plugs, pymodbus and datalayer libraries as well as other dependencies:

name: modbus-rtu-master
title: Modbus RTU master
version: 0.0.7
summary: Modbus RTU
description: |
  Settings available through Datalayer
base: core22
confinement: strict
grade: stable

architectures:
  - build-on: [amd64]
    build-for: [arm64]

apps:
  client:
    command: bin/main.py
    plugs:
      - network
      - network-bind
      - datalayer
      - system-observe
      - serial-port
      - raw-usb
      - hardware-observe
    daemon: simple
    restart-condition: always
    passthrough:
      restart-delay: 30s
    environment:
        "LD_LIBRARY_PATH": "$LD_LIBRARY_PATH:$SNAP/usr/lib/python3.10/dist-packages"


parts:
  client:
    plugin: python
    source: .
    build-environment:
      - PYTHONPATH: "$SNAPCRAFT_PART_INSTALL/usr/lib/python3/dist-packages"
    override-build: |
      snapcraftctl build
      ln -sf ../usr/lib/libsnapcraft-preload.so $SNAPCRAFT_PART_INSTALL/lib/libsnapcraft-preload.so
    stage-packages:
      - libzmq5
      - libsystemd-dev
    python-packages:
      - pymodbus
      - ctrlx-datalayer
      - ctrlx-fbs
    build-packages:
      - build-essential


  datalayerdeb:
    plugin: dump
    source: https://github.com/boschrexroth/ctrlx-automation-sdk/releases/download/3.4.0/ctrlx-datalayer-2.8.6.deb
    source-type: deb


plugs:
  datalayer:
    interface: content
    content: datalayer
    target: $SNAP_DATA/.datalayer

2. Setup.py

We include our main script, helper module for datalayer execution, and optionally python packages (can do it either here or in snapcraft.yaml):

# SPDX-FileCopyrightText: Bosch Rexroth AG
#
# SPDX-License-Identifier: MIT
from setuptools import setup

setup(name = 'modbus-rtu-master',
      version='1.0.0',
      description = 'Test Modbus RTU communication',
      author = 'Akra',
      #install_requires = ['ctrlx-datalayer', 'ctrlx-fbs'],
      packages = ['helper'],
      scripts = ['main.py'],
      license = 'MIT License'
)

3. Datalayer helper

Script for creating datalayer Provider and Client 'helper/ctrlx_datalayer_helper.py':

# SPDX-FileCopyrightText: Bosch Rexroth AG
#
# SPDX-License-Identifier: MIT

import os

import ctrlxdatalayer

"""
This script provides auxiliary methods to create ctrlX Datalayer client and provider connections to ctrlX CORE devices.
It can be used for both running in an app build environment (QEMU VM) and within the snap environment on the ctrlX CORE.

Feel free to use it in your projects and to change it if necessary.

For ease of use, the default values for IP address, user, password and SSL port are chosen to match the settings of a
newly created ctrlX CORE:

    ip="192.168.1.1"
    user="boschrexroth"
    password="boschrexroth"
    ssl_port=443

If these values do not suit your use case, explicitly pass the parameters that require different values.
Here some examples:

1. ctrlX CORE or ctrlX COREvirtual with another IP address, user and password:

    client, client_connection = get_client(system, ip="192.168.1.100", user="admin", password="-$_U/{X$aG}Z3/e<")

2. ctrlX COREvirtual with port forwarding running on the same host as the app build environment (QEMU VM):

    client, client_connection = get_client(system, ip="10.0.2.2", ssl_port=8443)

    Remarks: 
    10.0.2.2 is the IP address of the host from the point of view of the app build environment (QEMU VM).
    8443 is the host port which is forwarded to the SSL port (=433) of the ctrlX COREvirtual


IMPORTANT:
You need not change the parameter settings before building a snap and installing the snap on a ctrlX CORE.
The method get_connection_string detects the snap environment and uses automatically inter process communication. 
Therefor the connection string to the ctrlX Datalayer is:

    "ipc://"

"""


def get_connection_string(
        ip="192.168.1.1",
        user="boschrexroth",
        password="boschrexroth",
        ssl_port=443):
    """
    Combines a ctrlX Datalayer connection string.
    @param[in] ip IP address of the ctrlX CORE. Use "10.0.2.2" to connect to a ctrlX COREvirtual with port forwarding.
    @param[in] user Name of the user.
    @param[in] password Password of the user.
    @param[in] ssl_port Port number for a SSL connection. ctrlX COREvirtual with port forwarding: Use the host port (default 8443) forwarded to port 22 of the ctrlX COREvirtual.
    @returns connection_string The connection string: "ipc://" for running in snap environment, "tcp://..." for running in environment.
    """

    if 'SNAP' in os.environ:
        return "ipc://"

    # Client connection port 2069 resp. Provider connection port 2070 are obsolete
    connection_string = "tcp://"+user+":"+password+"@"+ip

    if ssl_port == 443:
        return connection_string

    return connection_string+"?sslport=" + str(ssl_port)


def get_client(system: ctrlxdatalayer.system.System,
               ip="192.168.1.1",
               user="boschrexroth",
               password="boschrexroth",
               ssl_port=443):
    """
    Creates a ctrlX Datalayer client instance.
    @param[in] system A ctrlxdatalayer.system.System instance
    @param[in] ip IP address of the ctrlX CORE. Use "10.0.2.2" to connect to a ctrlX COREvirtual with port forwarding.
    @param[in] user Name of the user.
    @param[in] password Password of the user.
    @param[in] ssl_port Port number for a SSL connection. ctrlX COREvirtual with port forwarding: Use the host port (default 8443) forwarded to port 22 of the ctrlX COREvirtual.
    @returns tuple  (client, connection_string)
    @return <client> The ctrlxdatalayer.client.Client instance or None if failed
    @return <connection_string> The connection string or None if failed
    """

    connection_string = get_connection_string(
        ip, user, password, ssl_port)
    client = system.factory().create_client(connection_string)
    if client.is_connected():
        return client, connection_string
    client.close()

    return None, connection_string


def get_provider(system: ctrlxdatalayer.system.System,
                 ip="192.168.1.1",
                 user="boschrexroth",
                 password="boschrexroth",
                 ssl_port=443):
    """
    Creates a ctrlX Datalayer provider instance.
    @param[in] system A ctrlxdatalayer.system.System instance
    @param[in] ip IP address of the ctrlX CORE. Use "10.0.2.2" to connect to a ctrlX COREvirtual with port forwarding.
    @param[in] user Name of the user.
    @param[in] password Password of the user.
    @param[in] ssl_port Port number for a SSL connection. ctrlX COREvirtual with port forwarding: Use the host port (default 8443) forwarded to port 22 of the ctrlX COREvirtual.
    @returns tuple  (provider, connection_string)
    @return <provider>, a ctrlxdatalayer.provider.Provider instance or None if failed,
    @return <connection_string>, a connection string or None if failed
    """
    connection_string = get_connection_string(
        ip, user, password, ssl_port)
    provider = system.factory().create_provider(connection_string)
    if (provider.start() == ctrlxdatalayer.variant.Result.OK) & provider.is_connected():
        return provider, connection_string
    provider.close()

    return None, connection_string

4. Main code

Let's implement 'main.py'!

4.1. Imports and variable declaration

We include all the required imports and create variables to store settings for our Modbus/RTU Master

#!/usr/bin/env python3

# SPDX-FileCopyrightText: Bosch Rexroth AG
#
# SPDX-License-Identifier: MIT

import signal
import sys
import threading
from time import sleep
import logging

import pymodbus
from pymodbus.client import ModbusSerialClient as ModbusClient
from pymodbus.framer import FramerType

from helper.ctrlx_datalayer_helper import get_client, get_provider

import ctrlxdatalayer
import flatbuffers

from comm.datalayer import DisplayFormat, Metadata, NodeClass
from ctrlxdatalayer.provider import Provider
from ctrlxdatalayer.provider_node import (
    ProviderNode,
    ProviderNodeCallbacks,
    NodeCallback,
)
from ctrlxdatalayer.variant import Result, Variant, VariantType
from ctrlxdatalayer.metadata_utils import (
    MetadataBuilder,
    AllowedOperation,
    ReferenceType,
)

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')

__close_app = False

# Modbus connection settings (will be read from datalayer)
modbus_port = "/dev/ttyUSB0"
modbus_baudrate = 9600
modbus_parity = "N"
modbus_timeout = 0.1
modbus_start_address = 248
modbus_count = 4
modbus_slave_id = 1
modbus_read_enable = False

# Modbus data storage
modbus_registers = []

# Base address for datalayer nodes
address_base = "modbus_rtu/settings/"

We will read settings for:

  • port (our ttyUSB0 address)

  • baudrate

  • parity

  • timeout

  • register start address

  • registers count

  • slave id

And write output registers, optionally you can add coils, if needed.

4.2. Datalayer provider class

This is a wrapper for Datalayer provider nodes, it implements callbacks for read, write, metadata, create, remove and browse operations:

class MyProviderNode:
    """MyProviderNode"""

    def __init__(self, provider: Provider, address: str, initialValue: Variant):
        """__init__"""
        self._cbs = ProviderNodeCallbacks(
            self.__on_create,
            self.__on_remove,
            self.__on_browse,
            self.__on_read,
            self.__on_write,
            self.__on_metadata,
        )

        self._providerNode = ProviderNode(self._cbs)

        self._provider = provider
        self._address = address
        self._data = initialValue
        self._metadata = MyProviderNode.__get_metadata(address)

    @staticmethod
    def __get_metadata(address: str) -> Variant:
        builder = MetadataBuilder(
            allowed=AllowedOperation.READ | AllowedOperation.WRITE
        )
        builder = builder.set_display_name(address)
        builder = builder.set_node_class(NodeClass.NodeClass.Variable)
        if address.rfind("string") != -1:
            builder.add_reference(ReferenceType.read(), "types/datalayer/string")
            builder.add_reference(ReferenceType.write(), "types/datalayer/string")
        return builder.build()

    def register_node(self):
        """register_node"""
        return self._provider.register_node(self._address, self._providerNode)

    def unregister_node(self):
        """unregister_node"""
        self._provider.unregister_node(self._address)
        self._metadata.close()
        self._data.close()

    def set_value(self, value: Variant):
        """set_value"""
        self._data = value

    def __on_create(
        self,
        userdata: ctrlxdatalayer.clib.userData_c_void_p,
        address: str,
        data: Variant,
        cb: NodeCallback,
    ):
        """__on_create"""
        print("__on_create()", "address:", address, "userdata:", userdata, flush=True)
        cb(Result.OK, data)

    def __on_remove(
        self,
        userdata: ctrlxdatalayer.clib.userData_c_void_p,
        address: str,
        cb: NodeCallback,
    ):
        """__on_remove"""
        print("__on_remove()", "address:", address, "userdata:", userdata, flush=True)
        cb(Result.UNSUPPORTED, None)

    def __on_browse(
        self,
        userdata: ctrlxdatalayer.clib.userData_c_void_p,
        address: str,
        cb: NodeCallback,
    ):
        """__on_browse"""
        print("__on_browse()", "address:", address, "userdata:", userdata, flush=True)
        with Variant() as new_data:
            new_data.set_array_string([])
            cb(Result.OK, new_data)

    def __on_read(
        self,
        userdata: ctrlxdatalayer.clib.userData_c_void_p,
        address: str,
        data: Variant,
        cb: NodeCallback,
    ):
        """__on_read"""
        new_data = self._data
        cb(Result.OK, new_data)

    def __on_write(
        self,
        userdata: ctrlxdatalayer.clib.userData_c_void_p,
        address: str,
        data: Variant,
        cb: NodeCallback,
    ):
        """__on_write"""
        if self._data.get_type() != data.get_type():
            cb(Result.TYPE_MISMATCH, None)
            return

        result, self._data = data.clone()
        cb(Result.OK, self._data)

    def __on_metadata(
        self,
        userdata: ctrlxdatalayer.clib.userData_c_void_p,
        address: str,
        cb: NodeCallback,
    ):
        """__on_metadata"""
        cb(Result.OK, self._metadata)

4.3. Datalayer Provider functions

We set base address of the node in Datalayer and create functions for publishing:

def provide_string(provider: ctrlxdatalayer.provider, name: str, initial_value: str):
    """provide_string"""
    print("Creating string provider node " + address_base + name, flush=True)
    variantString = Variant()
    variantString.set_string(initial_value)
    provider_node_str = MyProviderNode(provider, address_base + name, variantString)
    result = provider_node_str.register_node()
    if result != ctrlxdatalayer.variant.Result.OK:
        print(
            "ERROR Registering node " + address_base + name + " failed with:",
            result,
            flush=True,
        )
    return provider_node_str


def provide_int(provider: ctrlxdatalayer.provider, name: str, initial_value: int):
    """provide_int"""
    print("Creating int provider node " + address_base + name, flush=True)
    variantInt = Variant()
    variantInt.set_int32(initial_value)
    provider_node_int = MyProviderNode(provider, address_base + name, variantInt)
    result = provider_node_int.register_node()
    if result != ctrlxdatalayer.variant.Result.OK:
        print(
            "ERROR Registering node " + address_base + name + " failed with:",
            result,
            flush=True,
        )
    return provider_node_int


def provide_float(provider: ctrlxdatalayer.provider, name: str, initial_value: float):
    """provide_float"""
    print("Creating float provider node " + address_base + name, flush=True)
    variantFloat = Variant()
    variantFloat.set_float32(initial_value)
    provider_node_flt = MyProviderNode(provider, address_base + name, variantFloat)
    result = provider_node_flt.register_node()
    if result != ctrlxdatalayer.variant.Result.OK:
        print(
            "ERROR Registering node " + address_base + name + " failed with:",
            result,
            flush=True,
        )
    return provider_node_flt


def provide_bool(provider: ctrlxdatalayer.provider, name: str, initial_value: bool):
    """provide_bool"""
    print("Creating bool provider node " + address_base + name, flush=True)
    variantBool = Variant()
    variantBool.set_bool8(initial_value)
    provider_node_bl = MyProviderNode(provider, address_base + name, variantBool)
    result = provider_node_bl.register_node()
    if result != ctrlxdatalayer.variant.Result.OK:
        print(
            "ERROR Registering node " + address_base + name + " failed with:",
            result,
            flush=True,
        )
    return provider_node_bl


def provide_int_array(provider: ctrlxdatalayer.provider, name: str):
    """provide_int_array"""
    print("Creating int array provider node " + address_base + name, flush=True)
    variantIntArray = Variant()
    variantIntArray.set_array_int32([])
    provider_node_arr = MyProviderNode(provider, address_base + name, variantIntArray)
    result = provider_node_arr.register_node()
    if result != ctrlxdatalayer.variant.Result.OK:
        print(
            "ERROR Registering node " + address_base + name + " failed with:",
            result,
            flush=True,
        )
    return provider_node_arr

4.4. Datalayer Provider thread

It will start create datalayer provider, register output nodes and run until shutdown, then delete the created nodes:

def handler(signum, frame):
    """handler"""
    global __close_app
    __close_app = True

def dl_provider():
    """Datalayer Provider Thread"""
    with ctrlxdatalayer.system.System("") as datalayer_system:
        datalayer_system.start(False)

        provider, connection_string = get_provider(
            datalayer_system, ip="192.168.1.1", user="boschrexroth", password="boschrexroth"
        )
        if provider is None:
            print("ERROR Connecting", connection_string, "failed.", flush=True)
            sys.exit(1)

        with provider:
            result = provider.start()
            if result != Result.OK:
                print(
                    "ERROR Starting ctrlX Data Layer Provider failed with:",
                    result,
                    flush=True,
                )
                return

            # Create provider nodes for Modbus settings
            provide_node_port = provide_string(provider, "port", modbus_port)
            provide_node_baudrate = provide_int(provider, "baudrate", modbus_baudrate)
            provide_node_parity = provide_string(provider, "parity", modbus_parity)
            provide_node_timeout = provide_float(provider, "timeout", modbus_timeout)
            provide_node_start_address = provide_int(provider, "start_address", modbus_start_address)
            provide_node_count = provide_int(provider, "count", modbus_count)
            provide_node_slave_id = provide_int(provider, "slave_id", modbus_slave_id)
            provide_node_read_enable = provide_bool(provider, "read_enable", modbus_read_enable)

            # Output node for holding registers
            provide_node_registers = provide_int_array(provider, "registers")

            print("INFO Running provider loop...", flush=True)
            while provider.is_connected() and not __close_app:
                sleep(1.0)

            print("ERROR ctrlX Data Layer Provider is disconnected", flush=True)

            # Cleanup
            provide_node_port.unregister_node()
            del provide_node_port
            provide_node_baudrate.unregister_node()
            del provide_node_baudrate
            provide_node_parity.unregister_node()
            del provide_node_parity
            provide_node_timeout.unregister_node()
            del provide_node_timeout
            provide_node_start_address.unregister_node()
            del provide_node_start_address
            provide_node_count.unregister_node()
            del provide_node_count
            provide_node_slave_id.unregister_node()
            del provide_node_slave_id
            provide_node_read_enable.unregister_node()
            del provide_node_read_enable
            provide_node_registers.unregister_node()
            del provide_node_registers

            print("Stopping ctrlX Data Layer provider:", end=" ", flush=True)
            result = provider.stop()
            print(result, flush=True)

        stop_ok = datalayer_system.stop(False)
        print("System Stop", stop_ok, flush=True)

4.5. Datalayer Client thread

We cyclically read and write data to the variables created by Provider.

def dl_client():
    """Datalayer Client Thread"""
    global modbus_port, modbus_baudrate, modbus_parity, modbus_timeout
    global modbus_start_address, modbus_count, modbus_slave_id
    global modbus_read_enable, modbus_registers

    with ctrlxdatalayer.system.System("") as datalayer_system:
        datalayer_system.start(False)

        datalayer_client, datalayer_client_connection_string = get_client(
            datalayer_system, ip="192.168.1.1", user="boschrexroth", password="boschrexroth"
        )
        if datalayer_client is None:
            print(
                f"ERROR Connecting {datalayer_client_connection_string} failed.",
                flush=True
            )
            sys.exit(1)

        with datalayer_client:
            if datalayer_client.is_connected() is False:
                print(f"ERROR ctrlX Data Layer is NOT connected: {datalayer_client_connection_string}",
                    flush=True
                )
                sys.exit(1)

            while datalayer_client.is_connected() and not __close_app:
                # Read Modbus settings from datalayer
                addr = address_base + "port"
                result, string_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_port = string_var.get_string()

                addr = address_base + "baudrate"
                result, int_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_baudrate = int_var.get_int32()

                addr = address_base + "parity"
                result, string_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_parity = string_var.get_string()

                addr = address_base + "timeout"
                result, float_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_timeout = float_var.get_float32()

                addr = address_base + "start_address"
                result, int_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_start_address = int_var.get_int32()

                addr = address_base + "count"
                result, int_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_count = int_var.get_int32()

                addr = address_base + "slave_id"
                result, int_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_slave_id = int_var.get_int32()

                addr = address_base + "read_enable"
                result, bool_var = datalayer_client.read_sync(addr)
                if result == Result.OK:
                    modbus_read_enable = bool_var.get_bool8()

                # Write registers data back to datalayer
                with Variant() as data:
                    data.set_array_int32(modbus_registers)
                    addr = address_base + "registers"
                    result = datalayer_client.write_sync(addr, data)

                sleep(0.5)

            print("ERROR ctrlX Data Layer Client is NOT connected")

        stop_ok = datalayer_system.stop(False)
        print("System Stop", stop_ok, flush=True)

4.6. Modbus/RTU Master

Now it's time to add the Modbus/RTU Master itself, we read the settings and wait for trigger signal from Datalayer to start our Master, I also add delay to refresh register data every second:

def modbus_master():
    """Modbus Reading"""
    global modbus_port, modbus_baudrate, modbus_parity, modbus_timeout
    global modbus_start_address, modbus_count, modbus_slave_id
    global modbus_read_enable, modbus_registers

    client = None
    last_config = None

    while not __close_app:
        # New settings
        current_config = (modbus_port, modbus_baudrate, modbus_parity, modbus_timeout)

        if modbus_read_enable:
            # Reconnect if configuration changed
            if current_config != last_config:
                if client is not None:
                    try:
                        client.close()
                    except:
                        pass

                logger.info(f"Connecting to Modbus RTU: port={modbus_port}, baudrate={modbus_baudrate}, parity={modbus_parity}, timeout={modbus_timeout}")

                try:
                    client = ModbusClient(
                        port=modbus_port,
                        baudrate=modbus_baudrate,
                        parity=modbus_parity,
                        timeout=modbus_timeout
                    )
                    connection = client.connect()
                    if connection:
                        logger.info("Modbus RTU connected successfully")
                        last_config = current_config
                    else:
                        logger.error("Failed to connect to Modbus RTU")
                        client = None
                        sleep(2.0)
                        continue
                except Exception as e:
                    logger.error(f"Error connecting to Modbus RTU: {e}")
                    client = None
                    sleep(2.0)
                    continue

            # Read holding registers
            if client is not None:
                try:
                    read_vals = client.read_holding_registers(
                        modbus_start_address,
                        count=modbus_count,
                        device_id=modbus_slave_id
                    )

                    if read_vals is not None and not read_vals.isError():
                        modbus_registers = list(read_vals.registers)
                        logger.info(f"Read registers: {modbus_registers}")
                    else:
                        logger.error(f"Error reading registers: {read_vals}")
                        modbus_registers = []

                except Exception as e:
                    logger.error(f"Exception reading Modbus registers: {e}")
                    modbus_registers = []

            sleep(1.0)  # Delay
        else:
            # Disconnect when read is disabled
            if client is not None:
                try:
                    client.close()
                except:
                    pass
                client = None
                last_config = None
                logger.info("Modbus reading disabled, disconnected")

            sleep(0.5)

4.6. Threading

Now we just add threading to start all parts of our code simultaneously:

if __name__ == "__main__":
    signal.signal(signal.SIGINT, handler)
    signal.signal(signal.SIGTERM, handler)
    signal.signal(signal.SIGABRT, handler)

    try:
        logger.info('Starting Modbus RTU Snap with Datalayer Integration')

        # Start provider thread
        logger.info('Starting Data Layer Provider thread')
        t_provider = threading.Thread(target=dl_provider)
        t_provider.start()

        sleep(3.0)  # Wait for provider to initialize

        # Start client thread
        logger.info('Starting Data Layer Client thread')
        t_client = threading.Thread(target=dl_client)
        t_client.start()

        # Start Modbus master thread
        logger.info('Starting Modbus Master thread')
        t_modbus = threading.Thread(target=modbus_master)
        t_modbus.start()

        t_provider.join()
        t_client.join()
        t_modbus.join()

    except Exception as e:
        logger.error("Error: " + str(e))

5. Build the snap

Assuming your build environment is already set, just run build command, make sure your build-on and build-for sections in 'snapcraft.yaml' are set correctly:

snapcraft --destructive-mode

6. Install the snap

Now, install the snap on your device:

Modbus/RTU snap

7. Connect and Wire

I use ctrlX I/O module instead of a real slave in this example. For the real-world application, it's required to wire slave device directly to Modbus adapter. You can skip this step in case you have an actual slave device.

Connect the Converter to USB port of ctrlX CORE and wire it to the slave device accordingly. In this case I wire ctrlX CORE RS485 I/O module, Tx+/Tx-/Ground to Tx+/Tx-/Ground of adapter.

RS485 Wiring

If you will do it with ctrlX I/O in Slave mode, the essential steps would be to configure the module in ctrlX I/O Engineering, adding required configurations (RS485 mode, baudrate, parity) and creating a simple PLC code.

  1. Scan bus for I/O modules

  2. Add settings

  3. Download it to the Device

Ctrlx I/O Engineering

Simple example to use I/O as a slave can be found in library examples:

PLC program for testing

You will need to change input and output addresses depending on your actual I/O:

I/O Addressing

8. Control the app through Datalayer

Now you can communicate I/O with the snap and send register data from PLC to snap:

Registers in Datalayer

See the full video of the running process:

Have fun with it!

Related Links:

GitHub - boschrexroth/ctrlx-automation-sdk: ctrlX AUTOMATION Software Development Kit

Various methods to use ModbusRTU (RS485) with ctrlX CORE

Get started with snaps | Snapcraft documentation

1
Types
How-to
Products
Controls
IO
PLC

Latest published/updated articles