Actors in Cpp

2024 06 02 head

There is a strong need for higher-level frameworks ensuring safe concurrent programming.

The Actor Model is one of the best approaches to safe concurrency and legible code [1]. The model is based on Message Passing.

Although multiple actors can run at the same time, an actor will process a given message sequentially. This means that if you send three messages to the same actor, it will just execute one at a time.

Messages are sent asynchronously to an actor that needs to store them somewhere while it is processing another message. The message queue is the place where these messages are stored. The ordering is normally a FIFO First In, First Out order.

Another interesting aspect of the actor model is that it does not matter if the actor that I am sending a message to is running locally or in another node.

An actor is a computational entity that, in response to a message it receives, can concurrently:

  • Send a finite number of messages to other actors.

  • Create a finite number of new actors.

  • Designate the behavior to be used for the next message it receives.

Actors

actors

An actor has exactly one message queue representing its mailbox. It is the sole owner of the message queue and will process all messages stored in the queue.

No other actor has access to the private message queue of a specific message queue. The actor is the sole owner of all messages stored in its mailbox.

The action of sending a message from one actor to another implies the transfer of ownership for this message. We strongly suggest implementing messages as immutable value objects.

Messages

The message abstraction structure is

    template<typename T>
    class Message {
    public:
        Message(int kind, T data) : _kind{kind}, _data(data) {}

        inline int kind() const noexcept { return _kind; };                    (1)

        inline void kind(int kind) { _kind = kind; }

        inline T data() const noexcept { return _data; }                       (2)

        inline void data(T data) { _data = data; }
    private:
        int _kind;
        T _data;
    };
1 The field can be used to identify the type of the payload. If you use variants, either put the variant index or define constants for the various payload types. The kind field is when more a legacy field because variants provide build-in check mechanisms.
2 The payload of the message.

Application Messages

Each application will define the payload for the messages sent between actors. We recommend defining an overall message type for all messages defined in the application.

struct PayloadOne {};                                                          (1)
struct PayloadTwo {};

struct Data;                                                                   (2)

typedef std::variant<PayLoadOne, PayLoaTwo> Variants;                          (3)

struct Data {                                                                  (4)
    Variants data;
};

Message<Data> build(PayLoadOne data) {                                         (5)
    Data dataVariant{data};
    return {(int) (dataVariant.data.index()), {data}};
}

Message<Data>* build(PayLoadOne& data) {                                       (6)
    Data dataVariant{data};
    Message<Data>* msg = msgPool.acquire();
    msg->_kind = dataVariant.data.index();
    msg->_data = dataVariant;
    return msg;
}
1 Define the various payloads for the different types of messages exchanged in the application.
2 Forward declaration for declaration with template referencing the message class.
3 Define a type verified union of all payload types.
4 Define the structure we declared before as forward declaration.
5 Optional build function to simplify the creation of message objects.
6 Optional build function if you are using a message pool.

The above approach works without trouble if all fields are destructible.

The more delicate problem is dependent on how the underlying realtime implementation to transfer messages is realized. If the layer uses the C++ standard library constructs, you are safe. You are also safe if all the fields on your payloads are trivially destructible.

If the layer uses a C approach with raw byte copying of messages, you must care about RAII and destructor calls when the message object leaves its C++ scope. In this case, you must use a message allocator with explicit acquire and release operations to circumvent automatic object destruction.

A reference implementation is provided with the vinci library.

Actor Implementation

The actor structure source code is:

/**
 * Define the abstraction of an actor. An actor is an active object with a private mailbox
 * containing a queue of messages.
 * Copy constructor and assignment operator are deleted to avoid copying of the actor.
 * @tparam T type of the message payload
 */
template<typename T>
class Actor {
 public:
  /**
   * Maximum length of an actor name. To avoid heap usage, static sized char strings are
   * used.
   */
    static const int ACTOR_NAME_SIZE = 32;

    virtual ~Actor();

    /**
     * Return the human-readable name of the actor.
     * @return name of the actor
     */
    const char* name() const noexcept;

    /**
     * Join with the actor thread.
     */
    virtual void join() = 0;

    /**
     * Send a message to the given actor.
     * @param actor actor receiving the message
     * @param msg message to be sent
     */
    static void send(Actor<T>& actor, Message<T>* msg);

protected:
    /**
     * Constructor of the class.
     * @param name name of the actor. The name should be human-readable and a unique
     * identifier of the active object.
     * @param pool pool providing message instances.
     */
    Actor(const char* name, MessagePool<T>& pool) : _pool{pool} {
        strncpy(_name, name, ACTOR_NAME_SIZE);
    }

    Actor(const Actor<T>&) = delete;
    Actor<T>& operator=(const Actor<T>&) = delete;

    /**
     * Process the next received message. Overwrite the method to implement the business
     * logic of your actor.
     * The returned message can be nullptr if the waitDelay is set to a value bigger than 0.
     * @param  msg message to process
     * @return flag indicating if the processing should continue
     */
    virtual bool processMsg(Message<T>* msg) = 0;

    /**
     * Receive a message.
     * @param msg received message.
    ` */
    virtual void receive(Message<T>* msg) = 0;

    /**
     * Get the next message of the message queue. The call is blocks until message is
     * received
     * @return the next message
     */
    virtual Message<T>* message() = 0;

    /**
     * Get the next message of the message queue. The call is blocks until message is
     * received or timeout occurs.
     * @param ticks timeout in ticks
     * @return the next message or nullptr if timeout occurred
     */
    virtual Message<T>* message(const uint32_t ticks) = 0;

    /**
     * Suspend the actor for the specified time.
     * @param ticks delay in milliseconds
     */
    virtual void delay(const uint32_t ticks) = 0;

    /**
     * Run the body of the actor. Overwrite this method only for special needs no covered by
     * the regular actor semantic.
     * The default behavior is to wait for the next message and process it with the @ref
     * processMsg(const Message<T>&).
     */
    virtual void runBody();

    inline MessagePool<T>& pool() { return _pool; };

    inline uint32_t waitDelay() { return _waitDelay; };

    inline void waitDelay(uint32_t waitDelay) {
        _waitDelay = waitDelay;
    };
private:
    char _name[ACTOR_NAME_SIZE];
    MessagePool<T>& _pool;
    uint32_t _waitDelay = 0;
};

Implementing Actors

The processing logic for an actor has the following form. The actions and guards should only have one parameter to pass the message being processed.

bool MyActor::processMessage(Message<Data>* msg) {
    bool continues = true;
    Variants variants = msg.data().data;
    if (const MyActorData *data = std::get_if<MyActorData>(variants)) {
        processMsgInFsm(*data);
    }
}

If the message was acquired from the message pool, release the message back to the pool before leaving the method.

Finite State Machines

Embedded applications often have relative simple flat state machine describing their behavior. Implement flat state machines as a double nested switch statement. The first statement selects the actor current state, the second statement selects the type of message to be processed. A guard is implemented as a conditional statement.

State state = INIT;                                        (1)

void processMsgInFsm(MyActorData& event) {
    switch (state) {
        case STATE_1:
            switch (event->id) {                           (2)
                case (ID_A):
                    if (guard_1(event)) {                  (3)
                        action_a_1(event);                 (4)
                        state = STATE_2;
                    } else if (guard_2(event)) {
                        action_a_2(event)) {
                        state = STATE_3;
                    }
                    break;
                case (ID_B):
                    action_b(event);
                    state = STATE_N;
                    break;
                ...
            }
        ...
    }
}
1 Current state of the actor. The type of the variable should be an enumeration.
2 Identify the message through is identifier. A message should be a value object.
3 Evaluate an optional guard condition to decide if the transition will be selected.
4 Implement the transition from state STATE_1 to STATE_2 and execute the associated action action_a. It is customary to pass the message as parameter to the function.

The state machine can be documented with a table describing for each state, the event be processed, an optional guard, the target state and the action.

Finite state machines can contain timeout events or guards. Two major approaches exist to implement these timeout dependencies. Either you have access to a global timer manager or you wait on an event with a timeout clause.

We recommend using the timeout clause in the wait next message call for C++ implementations. The code is straight forward and legible.

The Java solution will rather use the timer manager approach due to the native introspection support. The C++ approach uses the std::variant<..> static approach for implementing events, the Java approach uses sealed class hierarchies and instanceof operator.

Always chose an implementation reflecting the idioms of the chosen technology stack. Pattern matching is part of modern Java language but not of C++ language.

This design approach improves the maintainability of the application [2, 3, 4].

More complex state machines can be implemented with the help of statechart notation [2].

Message Handling

Message objects should not be allocated on the heap.

If your RTOS is C based, avoid using classes allocating memory from the heap. Beware that C++ standard library strings use the heap. You have also to be cautious of the scoping of your message declaration to avoid destruction of the objects when leaving the declaration scope.

The library provides a static message pool to avoid heap usage and RAII problems when leaving a declaration scope. The tracking of the maximum number for simultaneously used messages in development mode is an additional advantage.

    template<typename T>
    class MsgPoolLogic {
    public:
        /**
         * Acquire a message from the pool.
         * @return pointer to the acquired message
         */
        Message<T>* acquire();

        /**
         * Release the message and return it to the pool.
         * @param msg pointer to the message to release
         */
        void release(Message<T>* msg);

        /**
         * Return the number of free messages in the pool.
         * @return number of free messages
         */
        int nrOfFreeMsgs();
    };

Lessons Learnt

The concepts implemented in the library are available for C and C++ based embedded applications. The presented version is the object-oriented C++ based variant.

We use char arrays and std::array instead of std::string and vector to eliminate dynamic memory allocation in the library. This approach has advantages for memory-constrained microcontrollers.

The simplest approach to implement an actor-based embedded realtime system is:

  • Use the above-described abstractions.

  • Define all the actors of your system as static variables. Therefore, no actors are dynamically allocated on the heap. Give all actors the same priority. The behavior of the system should not be dependent on the priority. The priority has only an influence on how fast a message will be processed but how it will be processed.

  • Define the message payload as a std:variant

  • Use the message pool to acquire and release message instances. Therefore, no allocation of messages on the heap is performed.

  • Interrupt routines can process the data and send a message to an actor with static void send(Actor<T>& actor, Message<T>* msg) call-

The implementation of the library uses solely primitive types and avoids heap usage.

Overall, we do not see any reason not to use the C++ language for realtime embedded applications. We acknowledge the language has an initial steep learning curve. The advantage is more static checks from the compiler and better libraries.

References

[1] V. Vernon, Reactive Messaging Patterns with the Actor Model : Applications and Integration in Scala and Akka. Addison-Wesley Professional, 2015 [Online]. Available: https://www.amazon.com/dp/B011S8YC5G

[2] D. Farley, Modern Software Engineering. Pearson Education, Limited, 2022 [Online]. Available: https://www.amazon.com/dp/B09GG6XKS4

[3] J. Humble and D. Farley, Continuous Delivery. Addison-Wesley Professional, 2010 [Online]. Available: https://www.amazon.com/dp/0321601912

[4] D. Farley, Continuous Delivery Pipelines. 2021 [Online]. Available: https://www.amazon.com/dp/B096YGZVZ9