RunQueue APIs
RunQueue
The RunQueue component provides an implementation of a task runqueue which can be used to run functions asynchronously and with optional relative priorities. The runqueue itself is implemented as a priority queue of functions that is handled within its own task. This task is just a standard espp::Task, so it can be configured with custom task priority and core id (as well as stack size and other parameters).
Code examples for the runqueue API are provided in the runqueue example folder.
API Reference
Header File
Classes
-
class RunQueue : public espp::BaseComponent
This class implements a run queue for generic functions.
The functions can be added to the queue with a priority. The functions are executed in the order of their priority, but the currently executing function will run to completion before the next function is executed.
The RunQueue is implemented as a Task that runs the functions in the queue. The Task will run the highest priority function in the queue (if any) and will block indefinitely until either either a new function is added to the queue or the RunQueue is stopped.
Because the RunQueue is implemented as an espp::Task, you can customize its task configuration, such as priority, stack size, core id, etc.
The RunQueue can be configured to start automatically or manually. If the RunQueue is configured to start automatically, it will start when it is constructed. If it is configured to start manually, it will not start until the start() method is called.
The RunQueue is thread-safe and can be used from multiple threads.
RunQueue Example
espp::RunQueue runqueue({.log_level = espp::Logger::Verbosity::DEBUG}); // make some functions to run and schedule them with different priorities to // show the priority queue works std::mutex done_mutex; std::condition_variable done_cv; bool done = false; auto task = [&logger](int id) { auto core = xPortGetCoreID(); logger.info("Task {} running on core {}", id, core); std::this_thread::sleep_for(1s); logger.info("Task {} done on core {}", id, core); }; auto stop_task = [&]() { logger.info("stop task running!"); std::this_thread::sleep_for(1s); logger.info("Stop task done!"); std::unique_lock lock(done_mutex); done = true; done_cv.notify_one(); }; logger.info("Scheduling tasks..."); auto task0 = std::bind(task, 0); auto task1 = std::bind(task, 1); auto task2 = std::bind(task, 2); auto task3 = std::bind(task, 3); auto task0_id = runqueue.add_function( task0, espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added auto task1_id = runqueue.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run fourth auto task3_id = runqueue.add_function(task3, espp::RunQueue::MAX_PRIORITY); // this will run second auto task2_id = runqueue.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run third auto stop_task_id = runqueue.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last logger.info("All tasks scheduled!"); // print our the task ids logger.info("Task 0 id: {}", task0_id); logger.info("Task 1 id: {}", task1_id); logger.info("Task 2 id: {}", task2_id); logger.info("Task 3 id: {}", task3_id); logger.info("Stop task id: {}", stop_task_id); // check the API for queue_size logger.info("Queue size: {}", runqueue.queue_size()); // check the API for is_running logger.info("RunQueue is running: {}", runqueue.is_running()); // check the API for is_function_queued logger.info("Task 0 is queued: {}", runqueue.is_function_queued(task0_id)); logger.info("Task 1 is queued: {}", runqueue.is_function_queued(task1_id)); logger.info("Task 2 is queued: {}", runqueue.is_function_queued(task2_id)); logger.info("Task 3 is queued: {}", runqueue.is_function_queued(task3_id)); logger.info("Stop task is queued: {}", runqueue.is_function_queued(stop_task_id)); // check the API for removing an invalid id if (runqueue.remove_function(espp::RunQueue::INVALID_ID)) { logger.error("Incorrectly Removed invalid id!"); } else { logger.info("Correctly failed to remove invalid id!"); } // check the api for removing a valid but non-existent id if (runqueue.remove_function(999)) { logger.error("Incorrectly Removed non-existent id!"); } else { logger.info("Correctly failed to remove non-existent id!"); } // check the api for removing the currently running id (which should fail) if (runqueue.remove_function(runqueue.get_running_id().value())) { logger.error("Incorrectly Removed currently running id!"); } else { logger.info("Correctly failed to remove currently running id!"); } // NOTE: in the next example (below) we'll check removing a valid ID // check the API for get_queued_ids(bool include_running) auto queued_ids = runqueue.get_queued_ids(true); logger.info("Queued ids (including running): {}", queued_ids); // check the API for get_running_id auto running_id = runqueue.get_running_id(); logger.info("Running id: {}", running_id); logger.info("Waiting for stop task to complete..."); std::unique_lock lock(done_mutex); done_cv.wait(lock, [&done] { return done; }); // check the API for get_running_id again (should return nullopt) running_id = runqueue.get_running_id(); logger.info("Running id: {}", running_id); // check the api for get_queued_ids again queued_ids = runqueue.get_queued_ids(true); logger.info("Queued ids (including running): {}", queued_ids); // check the api for is_function_queued again logger.info("Task 0 is queued: {}", runqueue.is_function_queued(task0_id)); logger.info("Task 1 is queued: {}", runqueue.is_function_queued(task1_id)); logger.info("Task 2 is queued: {}", runqueue.is_function_queued(task2_id)); logger.info("Task 3 is queued: {}", runqueue.is_function_queued(task3_id)); logger.info("Stop task is queued: {}", runqueue.is_function_queued(stop_task_id)); logger.info("All tasks done!");
Multiple RunQueues Example
std::mutex done_mutex; std::condition_variable done_cv; bool done = false; auto task = [&logger](int id) { auto core = xPortGetCoreID(); logger.info("Task {} running on core {}", id, core); std::this_thread::sleep_for(1s); logger.info("Task {} done on core {}", id, core); }; auto stop_task = [&]() { auto core = xPortGetCoreID(); logger.info("stop task running on core {}", core); std::this_thread::sleep_for(1s); logger.info("Stop task done on core {}", core); std::unique_lock lock(done_mutex); done = true; done_cv.notify_one(); }; espp::RunQueue runqueue0({.task_config = {.name = "core0 runq", .core_id = 0}}); espp::RunQueue runqueue1({.task_config = {.name = "core1 runq", .core_id = 1}}); logger.info("Scheduling tasks..."); auto task0 = std::bind(task, 0); auto task1 = std::bind(task, 1); auto task2 = std::bind(task, 2); runqueue0.add_function( task0, espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added runqueue0.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run third runqueue0.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run second auto id_to_remove = runqueue0.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last runqueue1.add_function( task0, espp::RunQueue::MIN_PRIORITY); // this will run first because it was first to be added runqueue1.add_function(task1, espp::RunQueue::MIN_PRIORITY + 1); // this will run third runqueue1.add_function(task2, espp::RunQueue::MIN_PRIORITY + 2); // this will run second runqueue1.add_function(stop_task, espp::RunQueue::MIN_PRIORITY); // this will run last logger.info("All tasks scheduled!"); // now remove the stop task from runqueue0 if (!runqueue0.remove_function(id_to_remove)) { logger.error("Failed to remove task from runqueue0!"); } else { logger.info("Removed task from runqueue0!"); } logger.info("Waiting for stop task to complete..."); std::unique_lock lock(done_mutex); done_cv.wait(lock, [&done] { return done; });
Note
Function priorities are relative to each other and are only compared to other tasks in the specific RunQueue object. Different RunQueue objects may be active at the same time and the priorities of functions in one RunQueue are not compared to the priorities of functions in another. Similarly, the priorities of functions is not taken into account in the FreeRTOS scheduler - all functions in a given RunQueue will run with the same task priority as the RunQueue’s task.
Note
All functions within a RunQueue share the same task context, so you should set the stack size for the RunQueue accordingly.
Warning
Care should be taken to ensure that no functions in the queue are blocking on each other, and all functions in the queue must return (they cannot be infinite loops).
Warning
Functions take a long time to run may delay or prevent the execution of other functions in the queue. For this reason it’s recommended to try to keep the functions as short-lived as possible, and to minimize the priorities of any functions which take longer to execute.
Public Types
-
using Priority = std::uint8_t
The type used to represent the priority of a function.
-
using Id = std::uint32_t
The type used to represent the id of a function.
-
using Function = std::function<void(void)>
A function that takes no arguments and returns void.
Public Functions
-
explicit RunQueue(const Config &config)
Construct a RunQueue.
- Parameters
config – The configuration for the RunQueue.
-
std::size_t queue_size() const
Get the number of functions in the queue.
- Returns
The number of functions in the queue.
-
bool is_running() const
Get whether the run queue is running.
- Returns
True if the run queue is running.
-
void start()
Start the run queue.
-
void stop()
Stop the run queue.
Note
This must wait until the currently running function (if any) has completed before stopping the run queue.
-
Id add_function(const Function &function, Priority priority = MIN_PRIORITY)
Add a function to the queue.
- Parameters
function – The function to add.
priority – The priority of the function. Defaults to MIN_PRIORITY.
- Returns
The id of the function. This will be auto-generated and can be used to query or remove the function from the queue.
-
bool remove_function(Id id)
Remove a function from the queue.
Note
This will not remove or stop the currently running function (if any).
Note
This will return false if the id is INVALID_ID.
Note
This will return false if the id is the id of the currently running function.
- Parameters
id – The id of the function to remove. Cannot be INVALID_ID.
- Returns
True if the function was removed.
-
bool is_function_queued(Id id)
Check if a function is queued or running.
Note
This will return false if the the id is INVALID_ID.
- Parameters
id – The id of the function to check. Cannot be INVALID_ID.
- Returns
True if the function is queued or is currently running.
-
void clear_queue()
Remove all functions from the queue.
Note
This will not remove or stop the currently running function (if any).
-
std::vector<Id> get_queued_ids(bool include_running = false)
Get the ids of all functions in the queue in order of priority.
Note
The vector will be in order of priority, with the highest priority function at the end of the vector. This means that if include_running is true, the id of the currently running function will be the last element in the vector.
- Parameters
include_running – Whether to include the id of the currently running function (if any) in the returned vector.
- Returns
A vector of the ids of all functions in the queue, optionally including the id of the currently running function. Maybe empty if there are no functions queued or running.
-
std::optional<Id> get_running_id()
Get the id of the currently running function.
Note
This may return nullopt if the currently running function has completed but the runner task has not yet fetched the next function from the queue.
- Returns
The id of the currently running function, or std::nullopt if no function is currently running.
-
inline const std::string &get_name() const
Get the name of the component
Note
This is the tag of the logger
- Returns
A const reference to the name of the component
-
inline void set_log_tag(const std::string_view &tag)
Set the tag for the logger
- Parameters
tag – The tag to use for the logger
-
inline espp::Logger::Verbosity get_log_level() const
Get the log level for the logger
See also
See also
- Returns
The verbosity level of the logger
-
inline void set_log_level(espp::Logger::Verbosity level)
Set the log level for the logger
See also
See also
- Parameters
level – The verbosity level to use for the logger
-
inline void set_log_verbosity(espp::Logger::Verbosity level)
Set the log verbosity for the logger
See also
See also
See also
Note
This is a convenience method that calls set_log_level
- Parameters
level – The verbosity level to use for the logger
-
inline espp::Logger::Verbosity get_log_verbosity() const
Get the log verbosity for the logger
See also
See also
See also
Note
This is a convenience method that calls get_log_level
- Returns
The verbosity level of the logger
-
inline void set_log_rate_limit(std::chrono::duration<float> rate_limit)
Set the rate limit for the logger
See also
Note
Only calls to the logger that have _rate_limit suffix will be rate limited
- Parameters
rate_limit – The rate limit to use for the logger
Public Static Attributes
-
static constexpr Priority MIN_PRIORITY = std::numeric_limits<Priority>::min()
The minimum priority a function can have.
Friends
-
inline friend bool operator<(const PriorityFunction &lhs, const PriorityFunction &rhs)
Less than operator for PriorityFunction.
- Parameters
lhs – The left hand side of the comparison.
rhs – The right hand side of the comparison.
- Returns
True if the right hand side has a greater priority.
-
inline friend bool operator>(const PriorityFunction &lhs, const PriorityFunction &rhs)
Greater than operator for PriorityFunction.
- Parameters
lhs – The left hand side of the comparison.
rhs – The right hand side of the comparison.
- Returns
True if the left hand side has a greater priority.
-
inline friend bool operator==(const PriorityFunction &lhs, const PriorityFunction &rhs)
Equality operator for PriorityFunction.
- Parameters
lhs – The left hand side of the comparison.
rhs – The right hand side of the comparison.
- Returns
True if the left and right hand side have the same priority and id.
-
struct PriorityFunction
A pair of a priority and a function.
-
using Priority = std::uint8_t