From d19b54481a34383ca14eccd08f2126d02392234a Mon Sep 17 00:00:00 2001 From: FengShun Date: Fri, 25 Sep 2020 23:11:55 +0800 Subject: [PATCH] fix: uORB topics lost messages when publications overflow --- msg/gps_inject_data.msg | 2 +- msg/mavlink_log.msg | 2 +- msg/orb_test_medium.msg | 2 +- msg/sensor_mag.msg | 2 +- msg/subsystem_info.msg | 2 +- msg/transponder_report.msg | 2 +- msg/tune_control.msg | 2 +- msg/uavcan_parameter_request.msg | 2 +- msg/ulog_stream.msg | 2 +- msg/vehicle_command.msg | 2 +- msg/vehicle_command_ack.msg | 2 +- src/modules/uORB/Subscription.cpp | 5 +- src/modules/uORB/Subscription.hpp | 2 +- src/modules/uORB/SubscriptionCallback.hpp | 2 +- src/modules/uORB/uORBDeviceMaster.cpp | 9 +- src/modules/uORB/uORBDeviceNode.cpp | 72 ++++++-- src/modules/uORB/uORBDeviceNode.hpp | 22 ++- .../uORB/uORB_tests/uORBTest_UnitTest.cpp | 164 +++++++++++++++++- .../uORB/uORB_tests/uORBTest_UnitTest.hpp | 11 ++ 19 files changed, 274 insertions(+), 35 deletions(-) diff --git a/msg/gps_inject_data.msg b/msg/gps_inject_data.msg index 5daae0623e..66d431143b 100644 --- a/msg/gps_inject_data.msg +++ b/msg/gps_inject_data.msg @@ -3,4 +3,4 @@ uint8 len # length of data uint8 flags # LSB: 1=fragmented uint8[182] data # data to write to GPS device (RTCM message) -uint8 ORB_QUEUE_LENGTH = 6 +uint8 ORB_QUEUE_LENGTH = 8 diff --git a/msg/mavlink_log.msg b/msg/mavlink_log.msg index a3f3c1a2f5..e1ed1e98c9 100644 --- a/msg/mavlink_log.msg +++ b/msg/mavlink_log.msg @@ -3,4 +3,4 @@ uint64 timestamp # time since system start (microseconds) char[127] text uint8 severity # log level (same as in the linux kernel, starting with 0) -uint8 ORB_QUEUE_LENGTH = 5 +uint8 ORB_QUEUE_LENGTH = 4 diff --git a/msg/orb_test_medium.msg b/msg/orb_test_medium.msg index 57334b213f..b25ae1c851 100644 --- a/msg/orb_test_medium.msg +++ b/msg/orb_test_medium.msg @@ -4,4 +4,4 @@ int32 val uint8[64] junk -# TOPICS orb_test_medium orb_test_medium_multi orb_test_medium_queue orb_test_medium_queue_poll +# TOPICS orb_test_medium orb_test_medium_multi orb_test_medium_wrap_around orb_test_medium_queue orb_test_medium_queue_poll diff --git a/msg/sensor_mag.msg b/msg/sensor_mag.msg index e515477f45..12ade417e6 100644 --- a/msg/sensor_mag.msg +++ b/msg/sensor_mag.msg @@ -13,4 +13,4 @@ uint32 error_count bool is_external # if true the mag is external (i.e. not built into the board) -uint8 ORB_QUEUE_LENGTH = 3 +uint8 ORB_QUEUE_LENGTH = 4 diff --git a/msg/subsystem_info.msg b/msg/subsystem_info.msg index afbc05d510..d66e654a14 100644 --- a/msg/subsystem_info.msg +++ b/msg/subsystem_info.msg @@ -36,4 +36,4 @@ bool enabled bool ok uint64 subsystem_type -uint8 ORB_QUEUE_LENGTH = 5 +uint8 ORB_QUEUE_LENGTH = 8 diff --git a/msg/transponder_report.msg b/msg/transponder_report.msg index 32a5ef5850..6a1ca06418 100644 --- a/msg/transponder_report.msg +++ b/msg/transponder_report.msg @@ -47,4 +47,4 @@ uint16 ADSB_EMITTER_TYPE_SERVICE_SURFACE=18 uint16 ADSB_EMITTER_TYPE_POINT_OBSTACLE=19 uint16 ADSB_EMITTER_TYPE_ENUM_END=20 -uint8 ORB_QUEUE_LENGTH = 10 +uint8 ORB_QUEUE_LENGTH = 8 diff --git a/msg/tune_control.msg b/msg/tune_control.msg index 000b2c3248..dba37757d2 100644 --- a/msg/tune_control.msg +++ b/msg/tune_control.msg @@ -35,4 +35,4 @@ uint8 VOLUME_LEVEL_MIN = 0 uint8 VOLUME_LEVEL_DEFAULT = 40 uint8 VOLUME_LEVEL_MAX = 100 -uint8 ORB_QUEUE_LENGTH = 3 +uint8 ORB_QUEUE_LENGTH = 4 diff --git a/msg/uavcan_parameter_request.msg b/msg/uavcan_parameter_request.msg index 601b13d947..3cfec15f81 100644 --- a/msg/uavcan_parameter_request.msg +++ b/msg/uavcan_parameter_request.msg @@ -20,4 +20,4 @@ uint8 param_type # MAVLink parameter type int64 int_value # current value if param_type is int-like float32 real_value # current value if param_type is float-like -uint8 ORB_QUEUE_LENGTH = 3 +uint8 ORB_QUEUE_LENGTH = 4 diff --git a/msg/ulog_stream.msg b/msg/ulog_stream.msg index 9bee7e7dcb..d206b4a425 100644 --- a/msg/ulog_stream.msg +++ b/msg/ulog_stream.msg @@ -16,4 +16,4 @@ uint16 msg_sequence # allows determine drops uint8 flags # see FLAGS_* uint8[249] data # ulog data -uint8 ORB_QUEUE_LENGTH = 14 # TODO: we might be able to reduce this if mavlink polled on the topic +uint8 ORB_QUEUE_LENGTH = 16 # TODO: we might be able to reduce this if mavlink polled on the topic diff --git a/msg/vehicle_command.msg b/msg/vehicle_command.msg index 9a4a631e89..229a40e893 100644 --- a/msg/vehicle_command.msg +++ b/msg/vehicle_command.msg @@ -133,7 +133,7 @@ uint8 FAILURE_TYPE_SLOW = 5 uint8 FAILURE_TYPE_DELAYED = 6 uint8 FAILURE_TYPE_INTERMITTENT = 7 -uint8 ORB_QUEUE_LENGTH = 3 +uint8 ORB_QUEUE_LENGTH = 4 float32 param1 # Parameter 1, as defined by MAVLink uint16 VEHICLE_CMD enum. float32 param2 # Parameter 2, as defined by MAVLink uint16 VEHICLE_CMD enum. diff --git a/msg/vehicle_command_ack.msg b/msg/vehicle_command_ack.msg index 4dedf1879d..a1861c3473 100644 --- a/msg/vehicle_command_ack.msg +++ b/msg/vehicle_command_ack.msg @@ -13,7 +13,7 @@ uint16 ARM_AUTH_DENIED_REASON_TIMEOUT = 3 uint16 ARM_AUTH_DENIED_REASON_AIRSPACE_IN_USE = 4 uint16 ARM_AUTH_DENIED_REASON_BAD_WEATHER = 5 -uint8 ORB_QUEUE_LENGTH = 3 +uint8 ORB_QUEUE_LENGTH = 4 uint16 command uint8 result diff --git a/src/modules/uORB/Subscription.cpp b/src/modules/uORB/Subscription.cpp index abbc16291f..09acd60702 100644 --- a/src/modules/uORB/Subscription.cpp +++ b/src/modules/uORB/Subscription.cpp @@ -65,10 +65,7 @@ bool Subscription::subscribe() _node = node; _node->add_internal_subscriber(); - const unsigned curr_gen = _node->published_message_count(); - - // If there were any previous publications allow the subscriber to read them - _last_generation = curr_gen - math::min((unsigned)_node->get_queue_size(), curr_gen); + _last_generation = _node->get_initial_generation(); return true; } diff --git a/src/modules/uORB/Subscription.hpp b/src/modules/uORB/Subscription.hpp index 195fbb0a9c..8e082d2a60 100644 --- a/src/modules/uORB/Subscription.hpp +++ b/src/modules/uORB/Subscription.hpp @@ -137,7 +137,7 @@ public: /** * Check if there is a new update. */ - bool updated() { return advertised() && (_node->published_message_count() != _last_generation); } + bool updated() { return advertised() && _node->updates_available(_last_generation); } /** * Update the struct diff --git a/src/modules/uORB/SubscriptionCallback.hpp b/src/modules/uORB/SubscriptionCallback.hpp index 63bb0771c7..9795f50a59 100644 --- a/src/modules/uORB/SubscriptionCallback.hpp +++ b/src/modules/uORB/SubscriptionCallback.hpp @@ -160,7 +160,7 @@ public: { // schedule immediately if updated (queue depth or subscription interval) if ((_required_updates == 0) - || (_subscription.get_node()->published_message_count() >= (_subscription.get_last_generation() + _required_updates))) { + || (_subscription.get_node()->updates_available(_subscription.get_last_generation()) >= _required_updates)) { if (updated()) { _work_item->ScheduleNow(); } diff --git a/src/modules/uORB/uORBDeviceMaster.cpp b/src/modules/uORB/uORBDeviceMaster.cpp index 8f9cfe2fdc..3998db275e 100644 --- a/src/modules/uORB/uORBDeviceMaster.cpp +++ b/src/modules/uORB/uORBDeviceMaster.cpp @@ -268,7 +268,8 @@ int uORB::DeviceMaster::addNewDeviceNodes(DeviceNodeStatisticsData **first_node, max_topic_name_length = name_length; } - last_node->last_pub_msg_count = last_node->node->published_message_count(); + // Pass in 0 to get the index of the latest published data + last_node->last_pub_msg_count = last_node->node->updates_available(0); } return 0; @@ -374,9 +375,9 @@ void uORB::DeviceMaster::showTop(char **topic_filter, int num_filters) cur_node = first_node; while (cur_node) { - unsigned int num_msgs = cur_node->node->published_message_count(); - cur_node->pub_msg_delta = roundf((num_msgs - cur_node->last_pub_msg_count) / dt); - cur_node->last_pub_msg_count = num_msgs; + unsigned int num_msgs = cur_node->node->updates_available(cur_node->last_pub_msg_count); + cur_node->pub_msg_delta = roundf(num_msgs / dt); + cur_node->last_pub_msg_count += num_msgs; cur_node = cur_node->next; } diff --git a/src/modules/uORB/uORBDeviceNode.cpp b/src/modules/uORB/uORBDeviceNode.cpp index bd2afb5e59..387feae1c7 100644 --- a/src/modules/uORB/uORBDeviceNode.cpp +++ b/src/modules/uORB/uORBDeviceNode.cpp @@ -44,12 +44,48 @@ static uORB::SubscriptionInterval *filp_to_subscription(cdev::file_t *filp) { return static_cast(filp->f_priv); } +// Determine the data range +static inline bool is_in_range(unsigned left, unsigned value, unsigned right) +{ + if (right > left) { + return (left <= value) && (value <= right); + + } else { // Maybe the data overflowed and a wraparound occurred + return (left <= value) || (value <= right); + } +} + +// round up to nearest power of two +// Such as 0 => 1, 1 => 1, 2 => 2 ,3 => 4, 10 => 16, 60 => 64, 65...255 => 128 +// Note: When the input value > 128, the output is always 128 +static inline uint8_t round_pow_of_two_8(uint8_t n) +{ + if (n == 0) { + return 1; + } + + // Avoid is already a power of 2 + uint8_t value = n - 1; + + // Fill 1 + value |= value >> 1U; + value |= value >> 2U; + value |= value >> 4U; + + // Unable to round-up, take the value of round-down + if (value == UINT8_MAX) { + value >>= 1U; + } + + return value + 1; +} + uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t instance, const char *path, uint8_t queue_size) : CDev(path), _meta(meta), _instance(instance), - _queue_size(queue_size) + _queue_size(round_pow_of_two_8(queue_size)) { } @@ -130,24 +166,23 @@ uORB::DeviceNode::copy(void *dst, unsigned &generation) ATOMIC_ENTER; const unsigned current_generation = _generation.load(); - if (current_generation > generation + _queue_size) { - // Reader is too far behind: some messages are lost - generation = current_generation - _queue_size; - } - - if ((current_generation == generation) && (generation > 0)) { + if (current_generation == generation) { /* The subscriber already read the latest message, but nothing new was published yet. * Return the previous message */ --generation; } + // Compatible with normal and overflow conditions + if (!is_in_range(current_generation - _queue_size, generation, current_generation - 1)) { + // Reader is too far behind: some messages are lost + generation = current_generation - _queue_size; + } + memcpy(dst, _data + (_meta->o_size * (generation % _queue_size)), _meta->o_size); ATOMIC_LEAVE; - if (generation < current_generation) { - ++generation; - } + ++generation; return true; } @@ -223,6 +258,9 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen) item->call(); } + /* Mark at least one data has been published */ + _data_valid = true; + ATOMIC_LEAVE; /* notify any poll waiters */ @@ -502,10 +540,22 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size) return PX4_ERROR; } - _queue_size = queue_size; + _queue_size = round_pow_of_two_8(queue_size); return PX4_OK; } +unsigned uORB::DeviceNode::get_initial_generation() +{ + ATOMIC_ENTER; + + // If there any previous publications allow the subscriber to read them + unsigned generation = _generation.load() - (_data_valid ? 1 : 0); + + ATOMIC_LEAVE; + + return generation; +} + bool uORB::DeviceNode::register_callback(uORB::SubscriptionCallback *callback_sub) { diff --git a/src/modules/uORB/uORBDeviceNode.hpp b/src/modules/uORB/uORBDeviceNode.hpp index 93f90d4934..e6bab77564 100644 --- a/src/modules/uORB/uORBDeviceNode.hpp +++ b/src/modules/uORB/uORBDeviceNode.hpp @@ -50,6 +50,11 @@ class Manager; class SubscriptionCallback; } +namespace uORBTest +{ +class UnitTest; +} + /** * Per-object device instance. */ @@ -188,7 +193,18 @@ public: int8_t subscriber_count() const { return _subscriber_count; } - unsigned published_message_count() const { return _generation.load(); } + /** + * Returns the number of updated data relative to the parameter 'generation' + * We can get the correct value regardless of wrap-around or not. + * @param generation The generation of subscriber + */ + unsigned updates_available(unsigned generation) const { return _generation.load() - generation; } + + /** + * Return the initial generation to the subscriber + * @return The initial generation. + */ + unsigned get_initial_generation(); const orb_metadata *get_meta() const { return _meta; } @@ -224,10 +240,12 @@ protected: void poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t events) override; private: + friend uORBTest::UnitTest; const orb_metadata *_meta; /**< object metadata information */ - uint8_t *_data{nullptr}; /**< allocated object buffer */ + uint8_t *_data{nullptr}; /**< allocated object buffer */ + bool _data_valid{false}; /**< At least one valid data */ px4::atomic _generation{0}; /**< object generation count */ List _callbacks; diff --git a/src/modules/uORB/uORB_tests/uORBTest_UnitTest.cpp b/src/modules/uORB/uORB_tests/uORBTest_UnitTest.cpp index 8de57e3ad0..ce6b7d5613 100644 --- a/src/modules/uORB/uORB_tests/uORBTest_UnitTest.cpp +++ b/src/modules/uORB/uORB_tests/uORBTest_UnitTest.cpp @@ -202,6 +202,12 @@ int uORBTest::UnitTest::test() return ret; } + ret = test_wrap_around(); + + if (ret != OK) { + return ret; + } + ret = test_queue(); if (ret != OK) { @@ -582,6 +588,162 @@ int uORBTest::UnitTest::test_multi_reversed() return test_note("PASS multi-topic reversed"); } +int uORBTest::UnitTest::test_wrap_around() +{ + test_note("Testing orb wrap-around"); + + orb_test_medium_s t{}; + orb_test_medium_s u{}; + orb_advert_t ptopic{nullptr}; + bool updated{false}; + + // Advertise but not publish topics, only generate device_node, which is convenient for modifying DeviceNode::_generation + const int queue_size = 16; + ptopic = orb_advertise_queue(ORB_ID(orb_test_medium_wrap_around), nullptr, queue_size); + + if (ptopic == nullptr) { + return test_fail("advertise failed: %d", errno); + } + + auto node = uORB::Manager::get_instance()->get_device_master()->getDeviceNode(ORB_ID(orb_test_medium_wrap_around), 0); + + if (node == nullptr) { + return test_fail("get device node failed."); + } + + // Set generation + { + // Set generation to the location where wrap-around is about to be: (0 - queue_size / 2) => (UINT_MAX - queue_size / 2 + 1) + set_generation(*node, unsigned(-(queue_size / 2))); + + if (node->updates_available(0) != unsigned(-(queue_size / 2))) { + return test_fail("set generation failed."); + } + } + + t.val = 0; + orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t); + + int sfd = orb_subscribe(ORB_ID(orb_test_medium_wrap_around)); + + if (sfd < 0) { + return test_fail("subscribe failed: %d", errno); + } + + orb_check(sfd, &updated); + + if (!updated) { + return test_fail("update flag not set"); + } + + if (PX4_OK != orb_copy(ORB_ID(orb_test_medium_wrap_around), sfd, &u)) { + return test_fail("copy(1) failed: %d", errno); + } + + if (u.val != t.val) { + return test_fail("copy(1) mismatch: %d expected %d", u.val, t.val); + } + + orb_check(sfd, &updated); + + if (updated) { + return test_fail("spurious updated flag"); + } + +#define CHECK_UPDATED(element) \ + orb_check(sfd, &updated); \ + if (!updated) { \ + return test_fail("update flag not set, element %i", element); \ + } +#define CHECK_NOT_UPDATED(element) \ + orb_check(sfd, &updated); \ + if (updated) { \ + return test_fail("update flag set, element %i", element); \ + } +#define CHECK_COPY(i_got, i_correct) \ + orb_copy(ORB_ID(orb_test_medium_wrap_around), sfd, &u); \ + if (i_got != i_correct) { \ + return test_fail("got wrong element from the queue (got %i, should be %i)", i_got, i_correct); \ + } + + //no messages in the queue anymore + + test_note(" Testing to write some elements..."); + + for (int i = 0; i < queue_size - 2; ++i) { + t.val = i; + orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t); + } + + for (int i = 0; i < queue_size - 2; ++i) { + CHECK_UPDATED(i); + CHECK_COPY(u.val, i); + } + + CHECK_NOT_UPDATED(queue_size); + +#define SET_GENERTATION() \ + set_generation(*node, unsigned(-(queue_size / 2))); \ + if (node->updates_available(0) != unsigned(-(queue_size / 2))) { \ + return test_fail("set generation failed."); \ + } \ + for (int i = 0; i < queue_size; i++) { \ + if (PX4_OK != orb_copy(ORB_ID(orb_test_medium_wrap_around), sfd, &u)) { \ + break; \ + } \ + } + + test_note(" Testing overflow..."); + int overflow_by = 3; + + SET_GENERTATION(); + + for (int i = 0; i < queue_size + overflow_by; ++i) { + t.val = i; + orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t); + } + + for (int i = 0; i < queue_size; ++i) { + CHECK_UPDATED(i); + CHECK_COPY(u.val, i + overflow_by); + } + + CHECK_NOT_UPDATED(queue_size); + + test_note(" Testing underflow..."); + + SET_GENERTATION(); + + t.val = queue_size; + orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t); + + CHECK_UPDATED(-1); + CHECK_COPY(u.val, t.val); + + for (int i = 0; i < queue_size; ++i) { + CHECK_NOT_UPDATED(i); + CHECK_COPY(u.val, t.val); + } + +#undef SET_GENERTATION + + t.val = 943; + orb_publish(ORB_ID(orb_test_medium_wrap_around), ptopic, &t); + CHECK_UPDATED(-1); + test_note("before copy"); + CHECK_COPY(u.val, t.val); + +#undef CHECK_COPY +#undef CHECK_UPDATED +#undef CHECK_NOT_UPDATED + + test_note("before unadvertise"); + orb_unadvertise(ptopic); + orb_unsubscribe(sfd); + + return test_note("PASS orb queuing"); +} + int uORBTest::UnitTest::test_queue() { test_note("Testing orb queuing"); @@ -597,7 +759,7 @@ int uORBTest::UnitTest::test_queue() return test_fail("subscribe failed: %d", errno); } - const int queue_size = 11; + const int queue_size = 16; t.val = 0; ptopic = orb_advertise_queue(ORB_ID(orb_test_medium_queue), &t, queue_size); diff --git a/src/modules/uORB/uORB_tests/uORBTest_UnitTest.hpp b/src/modules/uORB/uORB_tests/uORBTest_UnitTest.hpp index 8812f92a16..6e3026429b 100644 --- a/src/modules/uORB/uORB_tests/uORBTest_UnitTest.hpp +++ b/src/modules/uORB/uORB_tests/uORBTest_UnitTest.hpp @@ -35,6 +35,9 @@ #define _uORBTest_UnitTest_hpp_ #include +#include +#include +#include #include #include #include @@ -68,6 +71,12 @@ public: // Disallow copy UnitTest(const uORBTest::UnitTest & /*unused*/) = delete; + // Assist in testing the wrap-around situation + static void set_generation(uORB::DeviceNode &node, unsigned generation) + { + node._generation.store(generation); + } + private: UnitTest() : pubsubtest_passed(false), pubsubtest_print(false) {} @@ -94,6 +103,8 @@ private: int test_multi2(); + int test_wrap_around(); + /* queuing tests */ int test_queue(); static int pub_test_queue_entry(int argc, char *argv[]);