custom_outbox.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. /*
  2. * SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD
  3. *
  4. * SPDX-License-Identifier: Unlicense OR CC0-1.0
  5. */
  6. #include <algorithm>
  7. #include <cstddef>
  8. #include <exception>
  9. #include <deque>
  10. #include <cstdint>
  11. #include <memory>
  12. #include <ranges>
  13. #include <utility>
  14. #include <vector>
  15. #include <string>
  16. #include <memory_resource>
  17. #include "esp_log.h"
  18. #include "mqtt_outbox.h"
  19. constexpr auto TAG = "custom_outbox";
  20. /*
  21. * The trace resource class is created here as an example on how to build a custom memory resource
  22. * The class is only needed to show where we are allocating from and to track allocations and deallocations.
  23. */
  24. class trace_resource : public std::pmr::memory_resource {
  25. public:
  26. explicit trace_resource(std::string resource_name, std::pmr::memory_resource *upstream_resource = std::pmr::get_default_resource()) : upstream{upstream_resource}, name{std::move(resource_name)} {}
  27. [[nodiscard]] std::string_view get_name() const noexcept
  28. {
  29. return std::string_view(name);
  30. }
  31. [[nodiscard]] auto upstream_resource() const
  32. {
  33. return upstream;
  34. }
  35. private:
  36. void *do_allocate(std::size_t bytes, std::size_t alignment) override
  37. {
  38. auto *allocated = upstream->allocate(bytes, alignment);
  39. allocated_total += bytes;
  40. ESP_LOGI(name.c_str(), "%s: %zu bytes allocated, %zu total bytes in use", name.c_str(), bytes, allocated_total);
  41. return allocated;
  42. }
  43. void do_deallocate(void *ptr, std::size_t bytes, std::size_t alignment) override
  44. {
  45. upstream->deallocate(ptr, bytes, alignment);
  46. ESP_LOGI(name.c_str(), "%s: %zu bytes deallocated, %zu total bytes in use", name.c_str(), bytes, allocated_total);
  47. }
  48. [[nodiscard]] bool do_is_equal(const std::pmr::memory_resource &other) const noexcept override
  49. {
  50. return this == &other;
  51. }
  52. size_t allocated_total{};
  53. std::pmr::memory_resource *upstream;
  54. std::string name;
  55. };
  56. struct outbox_item {
  57. /* Defining the allocator_type to let compiler know that our type is allocator aware,
  58. * This way the allocator used for the outbox is propagated to the messages*/
  59. using allocator_type = std::pmr::polymorphic_allocator<>;
  60. /* Few strong types to diferetiate parameters*/
  61. enum class id_t : int {};
  62. enum class type_t : int {};
  63. enum class qos_t : int {};
  64. /* Allocator aware constructors */
  65. outbox_item(
  66. std::pmr::vector<uint8_t> message,
  67. id_t msg_id,
  68. type_t msg_type,
  69. qos_t msg_qos,
  70. outbox_tick_t tick,
  71. pending_state_t pending_state,
  72. allocator_type alloc = {}
  73. ) : message(std::move(message), alloc), id(msg_id), type(msg_type), qos(msg_qos), tick(tick), pending_state(pending_state) {}
  74. /*Copy and move constructors have an extra allocator parameter, for copy default and allocator aware are the same.*/
  75. outbox_item(const outbox_item &other, allocator_type alloc = {}) : message(other.message, alloc), id(other.id), type(other.type), qos(other.qos), tick(other.tick), pending_state(other.pending_state) {}
  76. outbox_item(outbox_item &&other, allocator_type alloc) noexcept : message(std::move(other.message), alloc), id(other.id), type(other.type), qos(other.qos), tick(other.tick), pending_state(other.pending_state)
  77. {}
  78. outbox_item(const outbox_item &) = default;
  79. outbox_item(outbox_item &&other) = default;
  80. outbox_item &operator=(const outbox_item &rhs) = default;
  81. outbox_item &operator=(outbox_item &&other) = default;
  82. ~outbox_item() = default;
  83. /* Getters to support outbox operation */
  84. [[nodiscard]] auto state() const noexcept
  85. {
  86. return pending_state;
  87. }
  88. [[nodiscard]] allocator_type get_allocator() const
  89. {
  90. return message.get_allocator();
  91. }
  92. void set(pending_state state) noexcept
  93. {
  94. pending_state = state;
  95. }
  96. void set(outbox_tick_t n_tick) noexcept
  97. {
  98. tick = n_tick;
  99. }
  100. [[nodiscard]] auto get_id() const noexcept
  101. {
  102. return id;
  103. }
  104. [[nodiscard]] auto get_type() const noexcept
  105. {
  106. return type;
  107. }
  108. [[nodiscard]] auto get_tick() const noexcept
  109. {
  110. return tick;
  111. }
  112. [[nodiscard]] auto get_data(size_t *len, uint16_t *msg_id, int *msg_type, int *msg_qos)
  113. {
  114. *len = message.size();
  115. *msg_id = static_cast<uint16_t>(id);
  116. *msg_type = static_cast<int>(type);
  117. *msg_qos = static_cast<int>(qos);
  118. return message.data();
  119. }
  120. [[nodiscard]] auto get_size() const noexcept
  121. {
  122. return message.size();
  123. }
  124. private:
  125. std::pmr::vector<uint8_t> message;
  126. id_t id;
  127. type_t type;
  128. qos_t qos;
  129. outbox_tick_t tick;
  130. pending_state_t pending_state;
  131. };
  132. /*
  133. * For the outbox_t we let the special member functions as default and
  134. * we don't extend the allocator aware versions for the sake of the simplicity, since the operations are not needed in the usage.
  135. */
  136. struct outbox_t {
  137. using allocator_type = std::pmr::polymorphic_allocator<>;
  138. explicit outbox_t(allocator_type alloc = {}) : queue(alloc) {}
  139. outbox_item_handle_t get(outbox_item::id_t msg_id)
  140. {
  141. if (auto item = std::ranges::find_if(queue, [msg_id](auto & item) {
  142. return item.get_id() == msg_id;
  143. });
  144. item != std::end(queue)) {
  145. return &(*item);
  146. }
  147. return nullptr;
  148. }
  149. int delete_expired(outbox_tick_t current_tick, outbox_tick_t timeout)
  150. {
  151. return std::erase_if(queue, [current_tick, timeout, this](const outbox_item & item) {
  152. if (current_tick - item.get_tick() > timeout) {
  153. total_size -= item.get_size();
  154. return true;
  155. }
  156. return false;
  157. });
  158. }
  159. outbox_item::id_t delete_single_expired(outbox_tick_t current_tick, outbox_tick_t timeout)
  160. {
  161. if (auto erase = std::ranges::find_if(queue, [current_tick, timeout](auto & item) {
  162. return (current_tick - item.get_tick() > timeout);
  163. }); erase != std::end(queue)) {
  164. auto msg_id = erase->get_id();
  165. total_size -= erase->get_size();
  166. queue.erase(erase);
  167. return msg_id;
  168. }
  169. return outbox_item::id_t{-1};
  170. }
  171. auto erase(outbox_item_handle_t to_erase)
  172. {
  173. return erase_if([to_erase](auto & item) {
  174. return &item == to_erase;
  175. });
  176. }
  177. auto erase(outbox_item::id_t msg_id, outbox_item::type_t msg_type)
  178. {
  179. return erase_if([msg_id, msg_type](auto & item) {
  180. return (item.get_id() == msg_id && (item.get_type() == msg_type));
  181. });
  182. }
  183. [[nodiscard]] auto size() const noexcept
  184. {
  185. return total_size;
  186. }
  187. void clear()
  188. {
  189. queue.clear();
  190. }
  191. outbox_item_handle_t enqueue(outbox_message_handle_t message, outbox_tick_t tick) noexcept
  192. {
  193. try {
  194. auto &item =
  195. queue.emplace_back(std::pmr::vector<uint8_t> {message->data, message->data + message->len},
  196. outbox_item::id_t{message->msg_id},
  197. outbox_item::type_t{message->msg_type},
  198. outbox_item::qos_t{message->msg_qos},
  199. tick,
  200. QUEUED
  201. );
  202. total_size += item.get_size();
  203. ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%" PRIu64, message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(this));
  204. return &item;
  205. } catch (const std::exception &e) {
  206. return nullptr;
  207. }
  208. }
  209. outbox_item_handle_t dequeue(pending_state_t state, outbox_tick_t *tick)
  210. {
  211. if (auto item = std::ranges::find_if(queue, [state](auto & item) {
  212. return item.state() == state;
  213. });
  214. item != std::end(queue)) {
  215. if (tick != nullptr) {
  216. *tick = item->get_tick();
  217. }
  218. return &(*item);
  219. }
  220. return nullptr;
  221. }
  222. [[nodiscard]] allocator_type get_allocator() const
  223. {
  224. return queue.get_allocator();
  225. }
  226. private:
  227. [[nodiscard]] esp_err_t erase_if(std::predicate<outbox_item &> auto &&predicate)
  228. {
  229. if (auto to_erase = std::ranges::find_if(queue, predicate); to_erase != std::end(queue)) {
  230. total_size -= to_erase->get_size();
  231. queue.erase(to_erase);
  232. return ESP_OK;
  233. }
  234. return ESP_FAIL;
  235. }
  236. std::size_t total_size{};
  237. std::pmr::deque<outbox_item> queue ;
  238. };
  239. extern "C" {
  240. outbox_handle_t outbox_init()
  241. {
  242. /* First we create a fixed size memory buffer to be used. */
  243. static constexpr auto work_memory_size = 16 * 1024;
  244. static std::array<std::byte, work_memory_size> resource_buffer{};
  245. try {
  246. /*
  247. * Since the outbox is managed by a C API we can't rely on C++ automatic cleanup and smart pointers but, on production code it would be better to add the
  248. * memory resources to outbox_t, applying RAII principles, and make only outbox_item allocator aware. For the sake of the example we are keeping them
  249. * separated to explictly show the relations.
  250. * First we create the monotonic buffer and add null_memory_resource as upstream. This way if our working memory is exausted an exception is thrown.
  251. */
  252. auto *monotonic_resource = new std::pmr::monotonic_buffer_resource{resource_buffer.data(), resource_buffer.size(), std::pmr::null_memory_resource()};
  253. /*Here we add our custom trace wrapper type to trace allocations and deallocations*/
  254. auto *trace_monotonic = new trace_resource("Monotonic", monotonic_resource);
  255. /* We compose monotonic buffer with pool resource, since the monotonic deallocate is a no-op and we need to remove messages to not go out of memory.*/
  256. auto *pool_resource = new std::pmr::unsynchronized_pool_resource{trace_monotonic};
  257. auto *trace_pool = new trace_resource("Pool", pool_resource);
  258. /* Our outbox class is created using the trace_pool as memory resource */
  259. auto *outbox = new outbox_t{trace_pool};
  260. return outbox;
  261. } catch (const std::exception &e) {
  262. ESP_LOGD(TAG, "Not enough memory to construct the outbox, review the resource_buffer size");
  263. return nullptr;
  264. }
  265. }
  266. outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
  267. {
  268. return outbox->enqueue(message, tick);
  269. }
  270. outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
  271. {
  272. return outbox->get(outbox_item::id_t{msg_id});
  273. }
  274. outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
  275. {
  276. return outbox->dequeue(pending, tick);
  277. }
  278. }
  279. uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
  280. {
  281. if (item == nullptr) {
  282. return nullptr;
  283. }
  284. return item->get_data(len, msg_id, msg_type, qos);
  285. }
  286. esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
  287. {
  288. return outbox->erase(item_to_delete);
  289. }
  290. esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
  291. {
  292. return outbox->erase(outbox_item::id_t{msg_id}, outbox_item::type_t{msg_type});
  293. }
  294. int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
  295. {
  296. return static_cast<int>(outbox->delete_single_expired(current_tick, timeout));
  297. }
  298. int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
  299. {
  300. return outbox->delete_expired(current_tick, timeout);
  301. }
  302. esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
  303. {
  304. if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) {
  305. item->set(pending);
  306. return ESP_OK;
  307. }
  308. return ESP_FAIL;
  309. }
  310. pending_state_t outbox_item_get_pending(outbox_item_handle_t item)
  311. {
  312. if (item != nullptr) {
  313. return item->state();
  314. }
  315. return QUEUED;
  316. }
  317. esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
  318. {
  319. if (auto *item = outbox->get(outbox_item::id_t{msg_id}); item != nullptr) {
  320. item->set(tick);
  321. return ESP_OK;
  322. }
  323. return ESP_FAIL;
  324. }
  325. uint64_t outbox_get_size(outbox_handle_t outbox)
  326. {
  327. return outbox->size();
  328. }
  329. void outbox_delete_all_items(outbox_handle_t outbox)
  330. {
  331. outbox->clear();
  332. }
  333. void outbox_destroy(outbox_handle_t outbox)
  334. {
  335. auto *trace_pool = static_cast<trace_resource *>(outbox->get_allocator().resource());
  336. auto *pool_resource = static_cast<std::pmr::unsynchronized_pool_resource *>(trace_pool->upstream_resource());
  337. auto *trace_monotonic = static_cast<trace_resource *>(pool_resource->upstream_resource());
  338. auto *monotonic_resource = static_cast<std::pmr::monotonic_buffer_resource *>(trace_monotonic->upstream_resource());
  339. delete monotonic_resource;
  340. delete trace_monotonic;
  341. delete pool_resource;
  342. delete trace_pool;
  343. delete outbox;
  344. }