Event Manager APIs

The EventManager class enables anonymous publish/subscribe interactions between different software components and contexts in the system. It provides a singleton class which manages the registry of publishers and subscribers in the system and provides loose coupling between them. To subscribe to events/data, a component must register itself with the manager by calling add_subscriber() - which will register a callback function associated with that component for the event/topic provided. All callback functions for a given topic/event are called from the same thread/context - a thread that is started and managed by the EventManager. As noted in a few places, it is recommended to use a (de-)serialization library such as espp::serialization / alpaca for transforming data structures to/from std::vector<uint8_t> for publishing/subscribing.

API Reference

Header File

Classes

class EventManager : public espp::BaseComponent

Singleton class for managing events. Provides mechanisms for anonymous publish / subscribe interactions - enabling one to one, one to many, many to one, and many to many data distribution with loose coupling and low overhead. Each topic runs a thread for that topic’s subscribers, executing all the callbacks in sequence and then going to sleep again until new data is published.

Event Manager Example

  espp::EventManager::get().set_log_level(espp::Logger::Verbosity::WARN);

  // NOTE: we'll use a simple string for publishing on drive/control, but
  // normally you'd use a struct and a serialization library like alpaca, so
  // that's what we'll do for battery/state

  // let's define a struct to contain data for an event
  struct BatteryState {
    float voltage{48.0f};
    float current{0.0f};
    bool is_charging{false};
    float temperature_celsius{25.0f};
    float state_of_charge{100.0f};
  };

  // let's make some event names that we'll use for this example
  const std::string event1 = "battery/state";
  const std::string event2 = "drive/control";

  static int num_published = 0;
  static int num_received = 0;

  // Make a task which has a pub/sub in it. NOTE: in real code, this would
  // likely be within a custom class and the registration would happen in the
  // constructor, with the remove_publisher / remove_subscriber calls in its
  // destructor. Also NOTE: you would likely use a serialization library like
  // alpaca (wrapped in espp serialization component) for serializing and
  // deserializing the data structures being passed to string data.
  auto task_1_fn = [&](auto &m, auto &cv) {
    {
      // Just for fun, we'll only define the subscriber callback within the
      // context of this task function
      static auto event2_cb = [&](const std::vector<uint8_t> &data) {
        // we know this is a string, so just convert it to a string and
        // print it
        std::string data_str(data.begin(), data.end());
        logger.debug("Task 1 cb got data: '{}'", data_str);
        num_received++;
        // block here like we're doing work
        std::this_thread::sleep_for(10ms);
      };
      // we only want to register once, so ust the std::call_once /
      // std::once_flag functionality to only register the first time the
      // task is run
      static std::once_flag flag;
      std::call_once(flag, [&]() {
        auto &em = espp::EventManager::get();
        logger.info("Task 1 registering!");
        auto did_pub = em.add_publisher(event1, "task 1");
        auto did_sub = em.add_subscriber(event2, "task 1", event2_cb);
        logger.info("Task 1 publishing:  {}", did_pub);
        logger.info("Task 1 subscribing: {}", did_sub);
        // sleep for a little bit to let the other task register its
        // subscribers/publishers before we start publishing ensuring that the
        // subscriber callback is registered before the publisher publishes
        std::this_thread::sleep_for(10ms);
      });
      // periodically publish on event1
      logger.debug("Task 1 publishing on {}", event1);
      static BatteryState bs;
      bs.current = 1.0f;
      bs.voltage -= 0.1f;
      bs.state_of_charge -= 5.0f;
      bs.temperature_celsius += 0.2f;
      std::vector<uint8_t> buffer;
      espp::serialize(bs, buffer);
      espp::EventManager::get().publish(event1, buffer);
      num_published++;
      buffer.clear();
      bs.current = -1.0f;
      espp::serialize(bs, buffer);
      espp::EventManager::get().publish(event1, buffer);
      num_published++;
      std::unique_lock<std::mutex> lk(m);
      cv.wait_for(lk, 500ms);
    }
    // we don't want to stop, so return false
    return false;
  };
  auto task1 = espp::Task({.callback = task_1_fn, .task_config = {.name = "Task 1"}});

  // Now let's make another task which will have pub/sub as well
  auto task_2_fn = [&](auto &m, auto &cv) {
    {
      // Just for fun, we'll only define the subscriber callback within the
      // context of this task function
      static auto event1_cb = [&](const std::vector<uint8_t> &data) {
        // we know the data is a BatteryState struct, so deserialize it and
        // print it
        std::error_code ec;
        auto bs = espp::deserialize<BatteryState>(data, ec);
        if (ec) {
          logger.error("Couldn't deserialize BatteryState: {}", ec.message());
          return;
        }
        logger.debug("Task 2 got battery state data:\n"
                     "  voltage:             {:.2f}\n"
                     "  current:             {:.2f}\n"
                     "  is_charging:         {}\n"
                     "  temperature_celsius: {:.2f}\n"
                     "  state_of_charge:     {:.2f}",
                     bs.voltage, bs.current, bs.is_charging, bs.temperature_celsius,
                     bs.state_of_charge);
        num_received++;
        // block here like we're doing work
        std::this_thread::sleep_for(10ms);
      };
      // we only want to register once, so ust the std::call_once /
      // std::once_flag functionality to only register the first time the
      // task is run
      static std::once_flag flag;
      std::call_once(flag, [&]() {
        auto &em = espp::EventManager::get();
        logger.info("Task 2 registering!");
        auto did_pub = em.add_publisher(event2, "task 2");
        // NOTE: we're using a custom task config here to show how you can
        // configure the task that the subscriber callback will run in
        // (priority, stack size, etc.). Only the first subscription on a topic
        // will use the task config, any subsequent subscriptions will use the
        // same task as the first subscription.
        espp::Task::BaseConfig task_config{
            .name = "Task 2 subscriber task",
            .stack_size_bytes = 8192,
            .priority = 10, // 5 is default, 10 is higher
        };
        auto did_sub = em.add_subscriber(event1, "task 2", event1_cb, task_config);
        logger.info("Task 2 publishing:  {}", did_pub);
        logger.info("Task 2 subscribing: {}", did_sub);
        // sleep for a little bit to let the other task register its
        // subscribers/publishers, ensuring that the subscriber callback is
        // registered before the publisher publishes
        std::this_thread::sleep_for(10ms);
      });
      // periodically publish on event2
      logger.debug("Task 2 publishing on {}", event2);
      static int iteration = 0;
      std::string data = fmt::format("Task 2 data {}", iteration++);
      std::vector<uint8_t> buffer(data.begin(), data.end());
      espp::EventManager::get().publish(event2, buffer);
      num_published++;
      espp::EventManager::get().publish(event2, buffer);
      num_published++;
      std::unique_lock<std::mutex> lk(m);
      cv.wait_for(lk, 500ms);
    }
    // we don't want to stop, so return false
    return false;
  };
  auto task2 = espp::Task({.callback = task_2_fn, .task_config = {.name = "Task 2"}});

  // now start the tasks
  task1.start();
  task2.start();

  // Now let's just wait for a little while for those tasks to run, showcasing
  // the pub/sub interactions in the log output.
  logger.info("Sleeping for 5s...");
  std::this_thread::sleep_for(5s);

  task1.stop();
  task2.stop();

  // since the tasks are done, let's remove their publishers/subscibers here
  // (though as noted above, this would normally be done in a class
  // destructor)
  auto &em = espp::EventManager::get();
  em.remove_publisher(event1, "task 1");
  em.remove_subscriber(event2, "task 1");
  em.remove_publisher(event2, "task 2");
  em.remove_subscriber(event1, "task 2");

Note

In c++ objects, it’s recommended to call the add_publisher/add_subscriber functions in the class constructor and then to call the remove_publisher/remove_subscriber functions in the class destructor.

Note

It is recommended (unless you are only interested in events and not data or are only needing to transmit actual strings) to use a serialization library (such as espp::serialization - which wraps alpaca) to serialize your data structures to string when publishing and then deserialize your data from string in the subscriber callbacks.

Public Types

typedef std::function<void(const std::vector<uint8_t>&)> event_callback_fn

Function definition for function prototypes to be called when subscription/event data is available.

Param std::vector<uint8_t>&

The data associated with the event

Public Functions

bool add_publisher(const std::string &topic, const std::string &component)

Register a publisher for component on topic.

Parameters
  • topic – Topic name for the data being published.

  • component – Name of the component publishing data.

Returns

True if the publisher was added, false if it was already registered for that component.

bool add_subscriber(const std::string &topic, const std::string &component, const espp::EventManager::event_callback_fn &callback, const size_t stack_size_bytes = 8192)

Register a subscriber for component on topic.

Note

The stack size is only used if a subscriber is not already registered for that topic. If a subscriber is already registered for that topic, the stack size is ignored.

Parameters
  • topic – Topic name for the data being subscribed to.

  • component – Name of the component publishing data.

  • callback – The event_callback_fn to be called when receicing data on topic.

  • stack_size_bytes – The stack size in bytes to use for the subscriber

Returns

True if the subscriber was added, false if it was already registered for that component.

bool add_subscriber(const std::string &topic, const std::string &component, const espp::EventManager::event_callback_fn &callback, const espp::Task::BaseConfig &task_config)

Register a subscriber for component on topic.

Note

The task_config is only used if a subscriber is not already registered for that topic. If a subscriber is already registered for that topic, the task_config is ignored.

Parameters
  • topic – Topic name for the data being subscribed to.

  • component – Name of the component publishing data.

  • callback – The event_callback_fn to be called when receicing data on topic.

  • task_config – The task configuration to use for the subscriber.

Returns

True if the subscriber was added, false if it was already registered for that component.

bool publish(const std::string &topic, const std::vector<uint8_t> &data)

Publish data on topic.

Parameters
  • topic – Topic to publish data on.

  • data – Data to publish, within a vector container.

Returns

True if data was successfully published to topic, false otherwise. Publish will not occur (and will return false) if there are no subscribers for this topic.

bool remove_publisher(const std::string &topic, const std::string &component)

Remove component's publisher for topic.

Parameters
  • topic – The topic that component was publishing on.

  • component – The component for which the publisher was registered.

Returns

True if the publisher was removed, false if it was not registered.

bool remove_subscriber(const std::string &topic, const std::string &component)

Remove component's subscriber for topic.

Parameters
  • topic – The topic that component was subscribing to.

  • component – The component for which the subscriber was registered.

Returns

True if the subscriber was removed, false if it was not registered.

inline const std::string &get_name() const

Get the name of the component

Note

This is the tag of the logger

Returns

A const reference to the name of the component

inline void set_log_tag(const std::string_view &tag)

Set the tag for the logger

Parameters

tag – The tag to use for the logger

inline espp::Logger::Verbosity get_log_level() const

Get the log level for the logger

Returns

The verbosity level of the logger

inline void set_log_level(espp::Logger::Verbosity level)

Set the log level for the logger

Parameters

level – The verbosity level to use for the logger

inline void set_log_verbosity(espp::Logger::Verbosity level)

Set the log verbosity for the logger

See also

set_log_level

Note

This is a convenience method that calls set_log_level

Parameters

level – The verbosity level to use for the logger

inline espp::Logger::Verbosity get_log_verbosity() const

Get the log verbosity for the logger

See also

get_log_level

Note

This is a convenience method that calls get_log_level

Returns

The verbosity level of the logger

inline void set_log_rate_limit(std::chrono::duration<float> rate_limit)

Set the rate limit for the logger

Note

Only calls to the logger that have _rate_limit suffix will be rate limited

Parameters

rate_limit – The rate limit to use for the logger

Public Static Functions

static inline EventManager &get()

Get the singleton instance of the EventManager.

Returns

A reference to the EventManager singleton.