Event Manager APIs

The EventManager class enables anonymous publish/subscribe interactions between different software components and contexts in the system. It provides a singleton class which manages the registry of publishers and subscribers in the system and provides loose coupling between them. To subscribe to events/data, a component must register itself with the manager by calling add_subscriber() - which will register a callback function associated with that component for the event/topic provided. All callback functions for a given topic/event are called from the same thread/context - a thread that is started and managed by the EventManager. As noted in a few places, it is recommended to use a (de-)serialization library such as espp::serialization / alpaca for transforming data structures to/from std::vector<uint8_t> for publishing/subscribing.

API Reference

Header File

Classes

class EventManager : public espp::BaseComponent

Singleton class for managing events. Provides mechanisms for anonymous publish / subscribe interactions - enabling one to one, one to many, many to one, and many to many data distribution with loose coupling and low overhead. Each topic runs a thread for that topic’s subscribers, executing all the callbacks in sequence and then going to sleep again until new data is published.

Event Manager Example

  espp::EventManager::get().set_log_level(espp::Logger::Verbosity::WARN);

  // NOTE: we'll use a simple string for publishing on drive/control, but
  // normally you'd use a struct and a serialization library like alpaca, so
  // that's what we'll do for battery/state

  // let's define a struct to contain data for an event
  struct BatteryState {
    float voltage{48.0f};
    float current{0.0f};
    bool is_charging{false};
    float temperature_celsius{25.0f};
    float state_of_charge{100.0f};
  };

  // let's make some event names that we'll use for this example
  const std::string event1 = "battery/state";
  const std::string event2 = "drive/control";

  static int num_published = 0;
  static int num_received = 0;

  // Make a task which has a pub/sub in it. NOTE: in real code, this would
  // likely be within a custom class and the registration would happen in the
  // constructor, with the remove_publisher / remove_subscriber calls in its
  // destructor. Also NOTE: you would likely use a serialization library like
  // alpaca (wrapped in espp serialization component) for serializing and
  // deserializing the data structures being passed to string data.
  auto task_1_fn = [&](auto &m, auto &cv) {
    {
      // Just for fun, we'll only define the subscriber callback within the
      // context of this task function
      static auto event2_cb = [&](const std::vector<uint8_t> &data) {
        // we know this is a string, so just convert it to a string and
        // print it
        std::string data_str(data.begin(), data.end());
        logger.debug("Task 1 cb got data: '{}'", data_str);
        num_received++;
        // block here like we're doing work
        std::this_thread::sleep_for(10ms);
      };
      // we only want to register once, so ust the std::call_once /
      // std::once_flag functionality to only register the first time the
      // task is run
      static std::once_flag flag;
      std::call_once(flag, [&]() {
        auto &em = espp::EventManager::get();
        logger.info("Task 1 registering!");
        auto did_pub = em.add_publisher(event1, "task 1");
        auto did_sub = em.add_subscriber(event2, "task 1", event2_cb);
        logger.info("Task 1 publishing:  {}", did_pub);
        logger.info("Task 1 subscribing: {}", did_sub);
        // sleep for a little bit to let the other task register its
        // subscribers/publishers before we start publishing ensuring that the
        // subscriber callback is registered before the publisher publishes
        std::this_thread::sleep_for(10ms);
      });
      // periodically publish on event1
      logger.debug("Task 1 publishing on {}", event1);
      static BatteryState bs;
      bs.current = 1.0f;
      bs.voltage -= 0.1f;
      bs.state_of_charge -= 5.0f;
      bs.temperature_celsius += 0.2f;
      std::vector<uint8_t> buffer;
      espp::serialize(bs, buffer);
      espp::EventManager::get().publish(event1, buffer);
      num_published++;
      buffer.clear();
      bs.current = -1.0f;
      espp::serialize(bs, buffer);
      espp::EventManager::get().publish(event1, buffer);
      num_published++;
      std::unique_lock<std::mutex> lk(m);
      cv.wait_for(lk, 500ms);
    }
    // we don't want to stop, so return false
    return false;
  };
  auto task1 = espp::Task({.callback = task_1_fn, .task_config = {.name = "Task 1"}});

  // Now let's make another task which will have pub/sub as well
  auto task_2_fn = [&](auto &m, auto &cv) {
    {
      // Just for fun, we'll only define the subscriber callback within the
      // context of this task function
      static auto event1_cb = [&](const std::vector<uint8_t> &data) {
        // we know the data is a BatteryState struct, so deserialize it and
        // print it
        std::error_code ec;
        auto bs = espp::deserialize<BatteryState>(data, ec);
        if (ec) {
          logger.error("Couldn't deserialize BatteryState: {}", ec.message());
          return;
        }
        logger.debug("Task 2 got battery state data:\n"
                     "  voltage:             {:.2f}\n"
                     "  current:             {:.2f}\n"
                     "  is_charging:         {}\n"
                     "  temperature_celsius: {:.2f}\n"
                     "  state_of_charge:     {:.2f}",
                     bs.voltage, bs.current, bs.is_charging, bs.temperature_celsius,
                     bs.state_of_charge);
        num_received++;
        // block here like we're doing work
        std::this_thread::sleep_for(10ms);
      };
      // we only want to register once, so ust the std::call_once /
      // std::once_flag functionality to only register the first time the
      // task is run
      static std::once_flag flag;
      std::call_once(flag, [&]() {
        auto &em = espp::EventManager::get();
        logger.info("Task 2 registering!");
        auto did_pub = em.add_publisher(event2, "task 2");
        // NOTE: we're using a custom task config here to show how you can
        // configure the task that the subscriber callback will run in
        // (priority, stack size, etc.). Only the first subscription on a topic
        // will use the task config, any subsequent subscriptions will use the
        // same task as the first subscription.
        espp::Task::BaseConfig task_config{
            .name = "Task 2 subscriber task",
            .stack_size_bytes = 8192,
            .priority = 10, // 5 is default, 10 is higher
        };
        auto did_sub = em.add_subscriber(event1, "task 2", event1_cb, task_config);
        logger.info("Task 2 publishing:  {}", did_pub);
        logger.info("Task 2 subscribing: {}", did_sub);
        // sleep for a little bit to let the other task register its
        // subscribers/publishers, ensuring that the subscriber callback is
        // registered before the publisher publishes
        std::this_thread::sleep_for(10ms);
      });
      // periodically publish on event2
      logger.debug("Task 2 publishing on {}", event2);
      static int iteration = 0;
      std::string data = fmt::format("Task 2 data {}", iteration++);
      std::vector<uint8_t> buffer(data.begin(), data.end());
      espp::EventManager::get().publish(event2, buffer);
      num_published++;
      espp::EventManager::get().publish(event2, buffer);
      num_published++;
      std::unique_lock<std::mutex> lk(m);
      cv.wait_for(lk, 500ms);
    }
    // we don't want to stop, so return false
    return false;
  };
  auto task2 = espp::Task({.callback = task_2_fn, .task_config = {.name = "Task 2"}});

  // now start the tasks
  task1.start();
  task2.start();

  // Now let's just wait for a little while for those tasks to run, showcasing
  // the pub/sub interactions in the log output.
  logger.info("Sleeping for 5s...");
  std::this_thread::sleep_for(5s);

  task1.stop();
  task2.stop();

  // since the tasks are done, let's remove their publishers/subscibers here
  // (though as noted above, this would normally be done in a class
  // destructor)
  auto &em = espp::EventManager::get();
  em.remove_publisher(event1, "task 1");
  em.remove_subscriber(event2, "task 1");
  em.remove_publisher(event2, "task 2");
  em.remove_subscriber(event1, "task 2");

Note

In c++ objects, it’s recommended to call the add_publisher/add_subscriber functions in the class constructor and then to call the remove_publisher/remove_subscriber functions in the class destructor.

Note

It is recommended (unless you are only interested in events and not data or are only needing to transmit actual strings) to use a serialization library (such as espp::serialization - which wraps alpaca) to serialize your data structures to string when publishing and then deserialize your data from string in the subscriber callbacks.

Public Types

typedef std::function<void(const std::vector<uint8_t>&)> event_callback_fn

Function definition for function prototypes to be called when subscription/event data is available.

Param std::vector<uint8_t>&:

The data associated with the event

Public Functions

bool add_publisher(const std::string &topic, const std::string &component)

Register a publisher for component on topic.

Parameters:
  • topic – Topic name for the data being published.

  • component – Name of the component publishing data.

Returns:

True if the publisher was added, false if it was already registered for that component.

bool add_subscriber(const std::string &topic, const std::string &component, const espp::EventManager::event_callback_fn &callback, const size_t stack_size_bytes = 8192)

Register a subscriber for component on topic.

Note

The stack size is only used if a subscriber is not already registered for that topic. If a subscriber is already registered for that topic, the stack size is ignored.

Parameters:
  • topic – Topic name for the data being subscribed to.

  • component – Name of the component publishing data.

  • callback – The event_callback_fn to be called when receicing data on topic.

  • stack_size_bytes – The stack size in bytes to use for the subscriber

Returns:

True if the subscriber was added, false if it was already registered for that component.

bool add_subscriber(const std::string &topic, const std::string &component, const espp::EventManager::event_callback_fn &callback, const espp::Task::BaseConfig &task_config)

Register a subscriber for component on topic.

Note

The task_config is only used if a subscriber is not already registered for that topic. If a subscriber is already registered for that topic, the task_config is ignored.

Parameters:
  • topic – Topic name for the data being subscribed to.

  • component – Name of the component publishing data.

  • callback – The event_callback_fn to be called when receicing data on topic.

  • task_config – The task configuration to use for the subscriber.

Returns:

True if the subscriber was added, false if it was already registered for that component.

bool publish(const std::string &topic, const std::vector<uint8_t> &data)

Publish data on topic.

Parameters:
  • topic – Topic to publish data on.

  • data – Data to publish, within a vector container.

Returns:

True if data was successfully published to topic, false otherwise. Publish will not occur (and will return false) if there are no subscribers for this topic.

bool remove_publisher(const std::string &topic, const std::string &component)

Remove component's publisher for topic.

Parameters:
  • topic – The topic that component was publishing on.

  • component – The component for which the publisher was registered.

Returns:

True if the publisher was removed, false if it was not registered.

bool remove_subscriber(const std::string &topic, const std::string &component)

Remove component's subscriber for topic.

Parameters:
  • topic – The topic that component was subscribing to.

  • component – The component for which the subscriber was registered.

Returns:

True if the subscriber was removed, false if it was not registered.

inline const std::string &get_name() const

Get the name of the component

Note

This is the tag of the logger

Returns:

A const reference to the name of the component

inline void set_log_tag(const std::string_view &tag)

Set the tag for the logger

Parameters:

tag – The tag to use for the logger

inline espp::Logger::Verbosity get_log_level() const

Get the log level for the logger

Returns:

The verbosity level of the logger

inline void set_log_level(espp::Logger::Verbosity level)

Set the log level for the logger

Parameters:

level – The verbosity level to use for the logger

inline void set_log_verbosity(espp::Logger::Verbosity level)

Set the log verbosity for the logger

See also

set_log_level

Note

This is a convenience method that calls set_log_level

Parameters:

level – The verbosity level to use for the logger

inline espp::Logger::Verbosity get_log_verbosity() const

Get the log verbosity for the logger

See also

get_log_level

Note

This is a convenience method that calls get_log_level

Returns:

The verbosity level of the logger

inline void set_log_rate_limit(std::chrono::duration<float> rate_limit)

Set the rate limit for the logger

Note

Only calls to the logger that have _rate_limit suffix will be rate limited

Parameters:

rate_limit – The rate limit to use for the logger

Public Static Functions

static inline EventManager &get()

Get the singleton instance of the EventManager.

Returns:

A reference to the EventManager singleton.