Event Manager APIs
The EventManager class enables anonymous publish/subscribe interactions between different software components and contexts in the system. It provides a singleton class which manages the registry of publishers and subscribers in the system and provides loose coupling between them. To subscribe to events/data, a component must register itself with the manager by calling add_subscriber() - which will register a callback function associated with that component for the event/topic provided. All callback functions for a given topic/event are called from the same thread/context - a thread that is started and managed by the EventManager. As noted in a few places, it is recommended to use a (de-)serialization library such as espp::serialization / alpaca for transforming data structures to/from std::vector<uint8_t> for publishing/subscribing.
API Reference
Header File
Classes
-
class EventManager : public espp::BaseComponent
Singleton class for managing events. Provides mechanisms for anonymous publish / subscribe interactions - enabling one to one, one to many, many to one, and many to many data distribution with loose coupling and low overhead. Each topic runs a thread for that topic’s subscribers, executing all the callbacks in sequence and then going to sleep again until new data is published.
Event Manager Example
espp::EventManager::get().set_log_level(espp::Logger::Verbosity::WARN); // NOTE: we'll use a simple string for publishing on drive/control, but // normally you'd use a struct and a serialization library like alpaca, so // that's what we'll do for battery/state // let's define a struct to contain data for an event struct BatteryState { float voltage{48.0f}; float current{0.0f}; bool is_charging{false}; float temperature_celsius{25.0f}; float state_of_charge{100.0f}; }; // let's make some event names that we'll use for this example const std::string event1 = "battery/state"; const std::string event2 = "drive/control"; static int num_published = 0; static int num_received = 0; // Make a task which has a pub/sub in it. NOTE: in real code, this would // likely be within a custom class and the registration would happen in the // constructor, with the remove_publisher / remove_subscriber calls in its // destructor. Also NOTE: you would likely use a serialization library like // alpaca (wrapped in espp serialization component) for serializing and // deserializing the data structures being passed to string data. auto task_1_fn = [&](auto &m, auto &cv) { { // Just for fun, we'll only define the subscriber callback within the // context of this task function static auto event2_cb = [&](const std::vector<uint8_t> &data) { // we know this is a string, so just convert it to a string and // print it std::string data_str(data.begin(), data.end()); logger.debug("Task 1 cb got data: '{}'", data_str); num_received++; // block here like we're doing work std::this_thread::sleep_for(10ms); }; // we only want to register once, so ust the std::call_once / // std::once_flag functionality to only register the first time the // task is run static std::once_flag flag; std::call_once(flag, [&]() { auto &em = espp::EventManager::get(); logger.info("Task 1 registering!"); auto did_pub = em.add_publisher(event1, "task 1"); auto did_sub = em.add_subscriber(event2, "task 1", event2_cb); logger.info("Task 1 publishing: {}", did_pub); logger.info("Task 1 subscribing: {}", did_sub); // sleep for a little bit to let the other task register its // subscribers/publishers before we start publishing ensuring that the // subscriber callback is registered before the publisher publishes std::this_thread::sleep_for(10ms); }); // periodically publish on event1 logger.debug("Task 1 publishing on {}", event1); static BatteryState bs; bs.current = 1.0f; bs.voltage -= 0.1f; bs.state_of_charge -= 5.0f; bs.temperature_celsius += 0.2f; std::vector<uint8_t> buffer; espp::serialize(bs, buffer); espp::EventManager::get().publish(event1, buffer); num_published++; buffer.clear(); bs.current = -1.0f; espp::serialize(bs, buffer); espp::EventManager::get().publish(event1, buffer); num_published++; std::unique_lock<std::mutex> lk(m); cv.wait_for(lk, 500ms); } // we don't want to stop, so return false return false; }; auto task1 = espp::Task({.callback = task_1_fn, .task_config = {.name = "Task 1"}}); // Now let's make another task which will have pub/sub as well auto task_2_fn = [&](auto &m, auto &cv) { { // Just for fun, we'll only define the subscriber callback within the // context of this task function static auto event1_cb = [&](const std::vector<uint8_t> &data) { // we know the data is a BatteryState struct, so deserialize it and // print it std::error_code ec; auto bs = espp::deserialize<BatteryState>(data, ec); if (ec) { logger.error("Couldn't deserialize BatteryState: {}", ec.message()); return; } logger.debug("Task 2 got battery state data:\n" " voltage: {:.2f}\n" " current: {:.2f}\n" " is_charging: {}\n" " temperature_celsius: {:.2f}\n" " state_of_charge: {:.2f}", bs.voltage, bs.current, bs.is_charging, bs.temperature_celsius, bs.state_of_charge); num_received++; // block here like we're doing work std::this_thread::sleep_for(10ms); }; // we only want to register once, so ust the std::call_once / // std::once_flag functionality to only register the first time the // task is run static std::once_flag flag; std::call_once(flag, [&]() { auto &em = espp::EventManager::get(); logger.info("Task 2 registering!"); auto did_pub = em.add_publisher(event2, "task 2"); // NOTE: we're using a custom task config here to show how you can // configure the task that the subscriber callback will run in // (priority, stack size, etc.). Only the first subscription on a topic // will use the task config, any subsequent subscriptions will use the // same task as the first subscription. espp::Task::BaseConfig task_config{ .name = "Task 2 subscriber task", .stack_size_bytes = 8192, .priority = 10, // 5 is default, 10 is higher }; auto did_sub = em.add_subscriber(event1, "task 2", event1_cb, task_config); logger.info("Task 2 publishing: {}", did_pub); logger.info("Task 2 subscribing: {}", did_sub); // sleep for a little bit to let the other task register its // subscribers/publishers, ensuring that the subscriber callback is // registered before the publisher publishes std::this_thread::sleep_for(10ms); }); // periodically publish on event2 logger.debug("Task 2 publishing on {}", event2); static int iteration = 0; std::string data = fmt::format("Task 2 data {}", iteration++); std::vector<uint8_t> buffer(data.begin(), data.end()); espp::EventManager::get().publish(event2, buffer); num_published++; espp::EventManager::get().publish(event2, buffer); num_published++; std::unique_lock<std::mutex> lk(m); cv.wait_for(lk, 500ms); } // we don't want to stop, so return false return false; }; auto task2 = espp::Task({.callback = task_2_fn, .task_config = {.name = "Task 2"}}); // now start the tasks task1.start(); task2.start(); // Now let's just wait for a little while for those tasks to run, showcasing // the pub/sub interactions in the log output. logger.info("Sleeping for 5s..."); std::this_thread::sleep_for(5s); task1.stop(); task2.stop(); // since the tasks are done, let's remove their publishers/subscibers here // (though as noted above, this would normally be done in a class // destructor) auto &em = espp::EventManager::get(); em.remove_publisher(event1, "task 1"); em.remove_subscriber(event2, "task 1"); em.remove_publisher(event2, "task 2"); em.remove_subscriber(event1, "task 2");
Note
In c++ objects, it’s recommended to call the add_publisher/add_subscriber functions in the class constructor and then to call the remove_publisher/remove_subscriber functions in the class destructor.
Note
It is recommended (unless you are only interested in events and not data or are only needing to transmit actual strings) to use a serialization library (such as espp::serialization - which wraps alpaca) to serialize your data structures to string when publishing and then deserialize your data from string in the subscriber callbacks.
Public Types
-
typedef std::function<void(const std::vector<uint8_t>&)> event_callback_fn
Function definition for function prototypes to be called when subscription/event data is available.
- Param std::vector<uint8_t>&
The data associated with the event
Public Functions
-
bool add_publisher(const std::string &topic, const std::string &component)
Register a publisher for
component
ontopic
.- Parameters
topic – Topic name for the data being published.
component – Name of the component publishing data.
- Returns
True if the publisher was added, false if it was already registered for that component.
-
bool add_subscriber(const std::string &topic, const std::string &component, const espp::EventManager::event_callback_fn &callback, const size_t stack_size_bytes = 8192)
Register a subscriber for
component
ontopic
.Note
The stack size is only used if a subscriber is not already registered for that topic. If a subscriber is already registered for that topic, the stack size is ignored.
- Parameters
topic – Topic name for the data being subscribed to.
component – Name of the component publishing data.
callback – The event_callback_fn to be called when receicing data on
topic
.stack_size_bytes – The stack size in bytes to use for the subscriber
- Returns
True if the subscriber was added, false if it was already registered for that component.
-
bool add_subscriber(const std::string &topic, const std::string &component, const espp::EventManager::event_callback_fn &callback, const espp::Task::BaseConfig &task_config)
Register a subscriber for
component
ontopic
.Note
The task_config is only used if a subscriber is not already registered for that topic. If a subscriber is already registered for that topic, the task_config is ignored.
- Parameters
topic – Topic name for the data being subscribed to.
component – Name of the component publishing data.
callback – The event_callback_fn to be called when receicing data on
topic
.task_config – The task configuration to use for the subscriber.
- Returns
True if the subscriber was added, false if it was already registered for that component.
-
bool publish(const std::string &topic, const std::vector<uint8_t> &data)
Publish
data
ontopic
.- Parameters
topic – Topic to publish data on.
data – Data to publish, within a vector container.
- Returns
True if
data
was successfully published totopic
, false otherwise. Publish will not occur (and will return false) if there are no subscribers for this topic.
-
bool remove_publisher(const std::string &topic, const std::string &component)
Remove
component's
publisher fortopic
.- Parameters
topic – The topic that
component
was publishing on.component – The component for which the publisher was registered.
- Returns
True if the publisher was removed, false if it was not registered.
-
bool remove_subscriber(const std::string &topic, const std::string &component)
Remove
component's
subscriber fortopic
.- Parameters
topic – The topic that
component
was subscribing to.component – The component for which the subscriber was registered.
- Returns
True if the subscriber was removed, false if it was not registered.
-
inline const std::string &get_name() const
Get the name of the component
Note
This is the tag of the logger
- Returns
A const reference to the name of the component
-
inline void set_log_tag(const std::string_view &tag)
Set the tag for the logger
- Parameters
tag – The tag to use for the logger
-
inline espp::Logger::Verbosity get_log_level() const
Get the log level for the logger
See also
See also
- Returns
The verbosity level of the logger
-
inline void set_log_level(espp::Logger::Verbosity level)
Set the log level for the logger
See also
See also
- Parameters
level – The verbosity level to use for the logger
-
inline void set_log_verbosity(espp::Logger::Verbosity level)
Set the log verbosity for the logger
See also
See also
See also
Note
This is a convenience method that calls set_log_level
- Parameters
level – The verbosity level to use for the logger
-
inline espp::Logger::Verbosity get_log_verbosity() const
Get the log verbosity for the logger
See also
See also
See also
Note
This is a convenience method that calls get_log_level
- Returns
The verbosity level of the logger
-
inline void set_log_rate_limit(std::chrono::duration<float> rate_limit)
Set the rate limit for the logger
See also
Note
Only calls to the logger that have _rate_limit suffix will be rate limited
- Parameters
rate_limit – The rate limit to use for the logger
Public Static Functions
-
static inline EventManager &get()
Get the singleton instance of the EventManager.
- Returns
A reference to the EventManager singleton.
-
typedef std::function<void(const std::vector<uint8_t>&)> event_callback_fn