diff --git a/src/lib/flight_tasks/tasks/Offboard/FlightTaskOffboard.cpp b/src/lib/flight_tasks/tasks/Offboard/FlightTaskOffboard.cpp index c7e0a2cb61..f0925a060f 100644 --- a/src/lib/flight_tasks/tasks/Offboard/FlightTaskOffboard.cpp +++ b/src/lib/flight_tasks/tasks/Offboard/FlightTaskOffboard.cpp @@ -45,11 +45,6 @@ bool FlightTaskOffboard::updateInitialize() { bool ret = FlightTask::updateInitialize(); - _sub_triplet_setpoint.update(); - - // require a valid triplet - ret = ret && _sub_triplet_setpoint.get().current.valid; - // require valid position / velocity in xy return ret && PX4_ISFINITE(_position(0)) && PX4_ISFINITE(_position(1)) @@ -73,6 +68,8 @@ bool FlightTaskOffboard::update() // reset setpoint for every loop _resetSetpoints(); + _sub_triplet_setpoint.update(); + if (!_sub_triplet_setpoint.get().current.valid) { _setDefaultConstraints(); _position_setpoint = _position; diff --git a/src/modules/uORB/Subscription.cpp b/src/modules/uORB/Subscription.cpp index 0783ba1ffe..8e5f096c02 100644 --- a/src/modules/uORB/Subscription.cpp +++ b/src/modules/uORB/Subscription.cpp @@ -65,16 +65,10 @@ bool Subscription::subscribe() _node = node; _node->add_internal_subscriber(); - // If there were any previous publications, allow the subscriber to read them const unsigned curr_gen = _node->published_message_count(); - const uint8_t q_size = _node->get_queue_size(); - if (q_size < curr_gen) { - _last_generation = curr_gen - q_size; - - } else { - _last_generation = 0; - } + // If there were any previous publications allow the subscriber to read them + _last_generation = curr_gen - math::min((unsigned)_node->get_queue_size(), curr_gen); return true; } diff --git a/src/modules/uORB/Subscription.hpp b/src/modules/uORB/Subscription.hpp index 0160e40900..61ff5c977d 100644 --- a/src/modules/uORB/Subscription.hpp +++ b/src/modules/uORB/Subscription.hpp @@ -68,6 +68,7 @@ public: _orb_id(id), _instance(instance) { + subscribe(); } /** @@ -80,6 +81,7 @@ public: _orb_id((meta == nullptr) ? ORB_ID::INVALID : static_cast(meta->o_id)), _instance(instance) { + subscribe(); } ~Subscription() @@ -111,19 +113,19 @@ public: /** * Check if there is a new update. * */ - bool updated() { return advertised() ? (_node->published_message_count() != _last_generation) : false; } + bool updated() { return advertised() && (_node->published_message_count() != _last_generation); } /** * Update the struct * @param dst The uORB message struct we are updating. */ - bool update(void *dst) { return updated() ? copy(dst) : false; } + bool update(void *dst) { return updated() && _node->copy(dst, _last_generation); } /** * Copy the struct * @param dst The uORB message struct we are updating. */ - bool copy(void *dst) { return advertised() ? _node->copy(dst, _last_generation) : false; } + bool copy(void *dst) { return advertised() && _node->copy(dst, _last_generation); } uint8_t get_instance() const { return _instance; } unsigned get_last_generation() const { return _last_generation; } diff --git a/src/modules/uORB/uORBDeviceNode.cpp b/src/modules/uORB/uORBDeviceNode.cpp index 4557a14276..1bc85814f9 100644 --- a/src/modules/uORB/uORBDeviceNode.cpp +++ b/src/modules/uORB/uORBDeviceNode.cpp @@ -42,6 +42,8 @@ #include "uORBCommunicator.hpp" #endif /* ORB_COMMUNICATOR */ +static uORB::SubscriptionInterval *filp_to_subscription(cdev::file_t *filp) { return static_cast(filp->f_priv); } + uORB::DeviceNode::DeviceNode(const struct orb_metadata *meta, const uint8_t instance, const char *path, ORB_PRIO priority, uint8_t queue_size) : CDev(path), @@ -77,22 +79,16 @@ uORB::DeviceNode::open(cdev::file_t *filp) if (filp->f_oflags == PX4_F_RDONLY) { /* allocate subscriber data */ - SubscriberData *sd = new SubscriberData{}; + SubscriptionInterval *sd = new SubscriptionInterval(_meta, 0, _instance); if (nullptr == sd) { return -ENOMEM; } - /* If there were any previous publications, allow the subscriber to read them */ - const unsigned gen = published_message_count(); - sd->generation = gen - (_queue_size < gen ? _queue_size : gen); - filp->f_priv = (void *)sd; int ret = CDev::open(filp); - add_internal_subscriber(); - if (ret != PX4_OK) { PX4_ERR("CDev::open failed"); delete sd; @@ -113,93 +109,63 @@ int uORB::DeviceNode::close(cdev::file_t *filp) { if (filp->f_oflags == PX4_F_RDONLY) { /* subscriber */ - SubscriberData *sd = static_cast(filp->f_priv); - - if (sd != nullptr) { - remove_internal_subscriber(); - - delete sd; - sd = nullptr; - } + SubscriptionInterval *sd = filp_to_subscription(filp); + delete sd; } return CDev::close(filp); } -bool -uORB::DeviceNode::copy_locked(void *dst, unsigned &generation) const -{ - bool updated = false; - - if ((dst != nullptr) && (_data != nullptr)) { - const unsigned current_generation = _generation.load(); - - if (current_generation > generation + _queue_size) { - // Reader is too far behind: some messages are lost - generation = current_generation - _queue_size; - } - - if ((current_generation == generation) && (generation > 0)) { - /* The subscriber already read the latest message, but nothing new was published yet. - * Return the previous message - */ - --generation; - } - - memcpy(dst, _data + (_meta->o_size * (generation % _queue_size)), _meta->o_size); - - if (generation < current_generation) { - ++generation; - } - - updated = true; - } - - return updated; -} - bool uORB::DeviceNode::copy(void *dst, unsigned &generation) { - ATOMIC_ENTER; + if ((dst != nullptr) && (_data != nullptr)) { + if (_queue_size == 1) { + ATOMIC_ENTER; + memcpy(dst, _data, _meta->o_size); + generation = _generation.load(); + ATOMIC_LEAVE; + return true; - bool updated = copy_locked(dst, generation); + } else { + ATOMIC_ENTER; + const unsigned current_generation = _generation.load(); - ATOMIC_LEAVE; + if (current_generation > generation + _queue_size) { + // Reader is too far behind: some messages are lost + generation = current_generation - _queue_size; + } - return updated; + if ((current_generation == generation) && (generation > 0)) { + /* The subscriber already read the latest message, but nothing new was published yet. + * Return the previous message + */ + --generation; + } + + memcpy(dst, _data + (_meta->o_size * (generation % _queue_size)), _meta->o_size); + ATOMIC_LEAVE; + + if (generation < current_generation) { + ++generation; + } + + return true; + } + } + + return false; } ssize_t uORB::DeviceNode::read(cdev::file_t *filp, char *buffer, size_t buflen) { - /* if the object has not been written yet, return zero */ - if (_data == nullptr) { - return 0; - } - /* if the caller's buffer is the wrong size, that's an error */ if (buflen != _meta->o_size) { return -EIO; } - SubscriberData *sd = static_cast(filp->f_priv); - - /* - * Perform an atomic copy & state update - */ - ATOMIC_ENTER; - - // if subscriber has an interval track the last update time - if (sd->update_interval) { - sd->update_interval->last_update = hrt_absolute_time(); - } - - copy_locked(buffer, sd->generation); - - ATOMIC_LEAVE; - - return _meta->o_size; + return filp_to_subscription(filp)->copy(buffer) ? _meta->o_size : 0; } ssize_t @@ -272,42 +238,14 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg) switch (cmd) { case ORBIOCUPDATED: { ATOMIC_ENTER; - *(bool *)arg = appears_updated(filp); + *(bool *)arg = filp_to_subscription(filp)->updated(); ATOMIC_LEAVE; return PX4_OK; } - case ORBIOCSETINTERVAL: { - int ret = PX4_OK; - lock(); - - SubscriberData *sd = static_cast(filp->f_priv); - - if (arg == 0) { - if (sd->update_interval) { - delete (sd->update_interval); - sd->update_interval = nullptr; - } - - } else { - if (sd->update_interval) { - sd->update_interval->interval = arg; - - } else { - sd->update_interval = new UpdateIntervalData(); - - if (sd->update_interval) { - sd->update_interval->interval = arg; - - } else { - ret = -ENOMEM; - } - } - } - - unlock(); - return ret; - } + case ORBIOCSETINTERVAL: + filp_to_subscription(filp)->set_interval_us(arg); + return PX4_OK; case ORBIOCGADVERTISER: *(uintptr_t *)arg = (uintptr_t)this; @@ -324,23 +262,14 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg) return ret; } - case ORBIOCGETINTERVAL: { - SubscriberData *sd = static_cast(filp->f_priv); - - if (sd->update_interval) { - *(unsigned *)arg = sd->update_interval->interval; - - } else { - *(unsigned *)arg = 0; - } - } - - return OK; + case ORBIOCGETINTERVAL: + *(unsigned *)arg = filp_to_subscription(filp)->get_interval_us(); + return PX4_OK; case ORBIOCISADVERTISED: *(unsigned long *)arg = _advertised; - return OK; + return PX4_OK; default: /* give it to the superclass */ @@ -450,43 +379,18 @@ px4_pollevent_t uORB::DeviceNode::poll_state(cdev::file_t *filp) { // If the topic appears updated to the subscriber, say so. - if (appears_updated(filp)) { - return POLLIN; - } - - return 0; + return filp_to_subscription(filp)->updated() ? POLLIN : 0; } void uORB::DeviceNode::poll_notify_one(px4_pollfd_struct_t *fds, px4_pollevent_t events) { // If the topic looks updated to the subscriber, go ahead and notify them. - if (appears_updated((cdev::file_t *)fds->priv)) { + if (filp_to_subscription((cdev::file_t *)fds->priv)->updated()) { CDev::poll_notify_one(fds, events); } } -bool -uORB::DeviceNode::appears_updated(cdev::file_t *filp) -{ - // check if this topic has been published yet, if not bail out - if (_data == nullptr) { - return false; - } - - SubscriberData *sd = static_cast(filp->f_priv); - - // if subscriber has interval check time since last update - if (sd->update_interval != nullptr) { - if (hrt_elapsed_time(&sd->update_interval->last_update) < sd->update_interval->interval) { - return false; - } - } - - // finally, compare the generation - return (sd->generation != published_message_count()); -} - bool uORB::DeviceNode::print_statistics(int max_topic_length) { diff --git a/src/modules/uORB/uORBDeviceNode.hpp b/src/modules/uORB/uORBDeviceNode.hpp index 0eb34bc36a..25070cc983 100644 --- a/src/modules/uORB/uORBDeviceNode.hpp +++ b/src/modules/uORB/uORBDeviceNode.hpp @@ -229,31 +229,6 @@ protected: private: - /** - * Copies data and the corresponding generation - * from a node to the buffer provided. Caller handles locking. - * - * @param dst - * The buffer into which the data is copied. - * @param generation - * The generation that was copied. - * @return bool - * Returns true if the data was copied. - */ - bool copy_locked(void *dst, unsigned &generation) const; - - struct UpdateIntervalData { - uint64_t last_update{0}; /**< time at which the last update was provided, used when update_interval is nonzero */ - unsigned interval{0}; /**< if nonzero minimum interval between updates */ - }; - - struct SubscriberData { - ~SubscriberData() { if (update_interval) { delete (update_interval); } } - - unsigned generation{0}; /**< last generation the subscriber has seen */ - UpdateIntervalData *update_interval{nullptr}; /**< if null, no update interval */ - }; - const orb_metadata *_meta; /**< object metadata information */ uint8_t *_data{nullptr}; /**< allocated object buffer */ @@ -265,15 +240,4 @@ private: bool _advertised{false}; /**< has ever been advertised (not necessarily published data yet) */ uint8_t _queue_size; /**< maximum number of elements in the queue */ int8_t _subscriber_count{0}; - - /** - * Check whether a topic appears updated to a subscriber. - * - * Lock must already be held when calling this. - * - * @param sd The subscriber for whom to check. - * @return True if the topic should appear updated to the subscriber - */ - bool appears_updated(cdev::file_t *filp); - };