microRTPS: agent: fix double free memory problem on SIGINT

It fixes also some minor issues when exiting the agent, including the printed stats
This commit is contained in:
TSC21
2021-05-28 16:28:51 +02:00
committed by Nuno Marques
parent 6d5f12d2a2
commit d3a23cee40
6 changed files with 50 additions and 65 deletions

View File

@@ -98,7 +98,7 @@ bool RtpsTopics::init(std::condition_variable *t_send_queue_cv, std::mutex *t_se
}
@[if send_topics]@
void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
void RtpsTopics::publish(const uint8_t topic_ID, char data_buffer[], size_t len)
{
switch (topic_ID) {
@[for topic in send_topics]@
@@ -109,7 +109,7 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
eprosima::fastcdr::Cdr cdr_des(cdrbuffer);
st.deserialize(cdr_des);
@[ if topic == 'Timesync' or topic == 'timesync']@
_timesync->processTimesyncMsg(&st);
_timesync->processTimesyncMsg(&st, &_@(topic)_pub);
if (getMsgSysID(&st) == 1) {
@[ end if]@
@@ -130,7 +130,7 @@ void RtpsTopics::publish(uint8_t topic_ID, char data_buffer[], size_t len)
@[end for]@
default:
printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to publish Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n",
printf("\033[1;33m[ micrortps_agent ]\tUnexpected topic ID '%hhu' to publish. Please make sure the agent is capable of parsing the message associated to the topic ID '%hhu'\033[0m\n",
topic_ID, topic_ID);
break;
}

View File

@@ -98,7 +98,7 @@ public:
const std::string &ns);
void set_timesync(const std::shared_ptr<TimeSync> &timesync) { _timesync = timesync; };
@[if send_topics]@
void publish(uint8_t topic_ID, char data_buffer[], size_t len);
void publish(const uint8_t topic_ID, char data_buffer[], size_t len);
@[end if]@
@[if recv_topics]@
bool getMsg(const uint8_t topic_ID, eprosima::fastcdr::Cdr &scdr);

View File

@@ -178,13 +178,6 @@ static int parse_options(int argc, char **argv)
return 0;
}
void signal_handler(int signum)
{
printf("\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum);
running = 0;
transport_node->close();
}
@[if recv_topics]@
std::atomic<bool> exit_sender_thread(false);
std::condition_variable t_send_queue_cv;
@@ -195,15 +188,16 @@ void t_send(void *)
{
char data_buffer[BUFFER_SIZE] = {};
uint32_t length = 0;
uint8_t topic_ID = 255;
while (running && !exit_sender_thread.load()) {
while (running && !exit_sender_thread) {
std::unique_lock<std::mutex> lk(t_send_queue_mutex);
while (t_send_queue.empty() && !exit_sender_thread.load()) {
while (t_send_queue.empty() && !exit_sender_thread) {
t_send_queue_cv.wait(lk);
}
uint8_t topic_ID = t_send_queue.front();
topic_ID = t_send_queue.front();
t_send_queue.pop();
lk.unlock();
@@ -212,6 +206,7 @@ void t_send(void *)
eprosima::fastcdr::FastBuffer cdrbuffer(&data_buffer[header_length], sizeof(data_buffer) - header_length);
eprosima::fastcdr::Cdr scdr(cdrbuffer);
if (!exit_sender_thread) {
if (topics.getMsg(topic_ID, scdr)) {
length = scdr.getSerializedDataLength();
@@ -221,9 +216,17 @@ void t_send(void *)
}
}
}
}
}
@[end if]@
void signal_handler(int signum)
{
printf("\n\033[1;33m[ micrortps_agent ]\tInterrupt signal (%d) received.\033[0m\n", signum);
running = 0;
transport_node->close();
}
int main(int argc, char **argv)
{
if (-1 == parse_options(argc, argv)) {
@@ -282,9 +285,7 @@ int main(int argc, char **argv)
@[end if]@
// Init timesync
std::shared_ptr<TimeSync> timeSync = std::make_shared<TimeSync>(_options.verbose_debug);
topics.set_timesync(timeSync);
topics.set_timesync(std::make_shared<TimeSync>(_options.verbose_debug));
@[if recv_topics]@
topics.init(&t_send_queue_cv, &t_send_queue_mutex, &t_send_queue, _options.ns);
@@ -298,28 +299,16 @@ int main(int argc, char **argv)
while (running) {
@[if send_topics]@
++loop;
if (!receiving) { start = std::chrono::steady_clock::now(); }
// Publish messages received from UART
while (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) {
if (0 < (length = transport_node->read(&topic_ID, data_buffer, BUFFER_SIZE))) {
topics.publish(topic_ID, data_buffer, sizeof(data_buffer));
++received;
total_read += length;
receiving = true;
end = std::chrono::steady_clock::now();
}
if ((receiving && std::chrono::duration<double>(std::chrono::steady_clock::now() - end).count() > WAIT_CNST) ||
(!running && loop > 1)) {
std::chrono::duration<double> elapsed_secs = end - start;
printf("[ micrortps_agent ]\tSENT: %lumessages \t- %lubytes\n", (unsigned long)sent, (unsigned long)total_sent);
printf("[ micrortps_agent ]\tRECEIVED: %dmessages \t- %dbytes; %d LOOPS - %.03f seconds - %.02fKB/s\n",
received, total_read, loop, elapsed_secs.count(), (double)total_read / (1000 * elapsed_secs.count()));
received = sent = total_read = total_sent = 0;
receiving = false;
}
@[else]@
usleep(_options.sleep_us);
@[end if]@
@@ -329,12 +318,22 @@ int main(int argc, char **argv)
exit_sender_thread = true;
t_send_queue_cv.notify_one();
sender_thread.join();
std::chrono::duration<double> elapsed_secs = end - start;
if (received > 0) {
printf("[ micrortps_agent ]\tRECEIVED: %d messages - %d bytes; %d LOOPS - %.03f seconds - %.02fKB/s\n",
received, total_read, loop, elapsed_secs.count(), static_cast<double>(total_read) / (1000 * elapsed_secs.count()));
}
@[end if]@
@[if recv_topics]@
if (sent > 0) {
printf("[ micrortps_agent ]\tSENT: %lu messages - %lu bytes\n", static_cast<unsigned long>(sent),
static_cast<unsigned long>(total_sent));
}
@[end if]@
delete transport_node;
transport_node = nullptr;
timeSync->stop();
timeSync->reset();
return 0;
}

View File

@@ -78,17 +78,15 @@ TimeSync::TimeSync(bool debug)
TimeSync::~TimeSync() { stop(); }
void TimeSync::start(const TimesyncPublisher *pub)
void TimeSync::start(TimesyncPublisher *pub)
{
stop();
_timesync_pub = (*pub);
auto run = [this]() {
auto run = [this, pub]() {
while (!_request_stop) {
timesync_msg_t msg = newTimesyncMsg();
_timesync_pub.publish(&msg);
pub->publish(&msg);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
@@ -102,8 +100,6 @@ void TimeSync::stop()
_request_stop = true;
if (_send_timesync_thread && _send_timesync_thread->joinable()) { _send_timesync_thread->join(); }
_send_timesync_thread.reset();
}
void TimeSync::reset()
@@ -185,7 +181,7 @@ bool TimeSync::addMeasurement(int64_t local_t1_ns, int64_t remote_t2_ns, int64_t
return true;
}
void TimeSync::processTimesyncMsg(timesync_msg_t *msg)
void TimeSync::processTimesyncMsg(timesync_msg_t *msg, TimesyncPublisher *pub)
{
if (getMsgSysID(msg) == 1 && getMsgSeq(msg) != _last_remote_msg_seq) {
_last_remote_msg_seq = getMsgSeq(msg);
@@ -201,7 +197,7 @@ void TimeSync::processTimesyncMsg(timesync_msg_t *msg)
setMsgSeq(msg, getMsgSeq(msg) + 1);
setMsgTC1(msg, getTimeNSec());
_timesync_pub.publish(msg);
pub->publish(msg);
}
}
}

View File

@@ -71,10 +71,8 @@ except AttributeError:
@[if ros2_distro]@
#include "Timesync_Publisher.h"
#include "Timesync_Subscriber.h"
@[else]@
#include "timesync_Publisher.h"
#include "timesync_Subscriber.h"
@[end if]@
static constexpr double ALPHA_INITIAL = 0.05;
@@ -117,7 +115,7 @@ public:
* @@brief Starts the timesync publishing thread
* @@param[in] pub The timesync publisher entity to use
*/
void start(const TimesyncPublisher *pub);
void start(TimesyncPublisher *pub);
/**
* @@brief Resets the filter
@@ -154,7 +152,7 @@ public:
* @@brief Processes DDS timesync message
* @@param[in,out] msg The timestamp msg to be processed
*/
void processTimesyncMsg(timesync_msg_t *msg);
void processTimesyncMsg(timesync_msg_t *msg, TimesyncPublisher *pub);
/**
* @@brief Creates a new timesync DDS message to be sent from the agent to the client
@@ -191,14 +189,6 @@ private:
bool _debug;
@[if ros2_distro]@
Timesync_Publisher _timesync_pub;
Timesync_Subscriber _timesync_sub;
@[else]@
timesync_Publisher _timesync_pub;
timesync_Subscriber _timesync_sub;
@[end if]@
std::unique_ptr<std::thread> _send_timesync_thread;
std::atomic<bool> _request_stop{false};

View File

@@ -630,7 +630,7 @@ int UDP_node::init_receiver(uint16_t udp_port)
}
#ifndef PX4_INFO
printf("[ micrortps_transport ]\tUDP transport: Trying to connect...");
printf("[ micrortps_transport ]\tUDP transport: Trying to connect...\n");
#else
PX4_INFO("UDP transport: Trying to connect...");
#endif /* PX4_INFO */
@@ -720,12 +720,12 @@ ssize_t UDP_node::node_read(void *buffer, size_t len)
return -1;
}
int ret = 0;
ssize_t ret = 0;
#if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX))
// Blocking call
static socklen_t addrlen = sizeof(receiver_outaddr);
ret = recvfrom(receiver_fd, buffer, len, 0, (struct sockaddr *) &receiver_outaddr, &addrlen);
#endif /* __PX4_NUTTX */
ret = recvfrom(receiver_fd, buffer, len, 0, (struct sockaddr *)&receiver_outaddr, &addrlen);
#endif /* !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) */
return ret;
}
@@ -735,9 +735,9 @@ ssize_t UDP_node::node_write(void *buffer, size_t len)
return -1;
}
int ret = 0;
ssize_t ret = 0;
#if !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX))
ret = sendto(sender_fd, buffer, len, 0, (struct sockaddr *)&sender_outaddr, sizeof(sender_outaddr));
#endif /* __PX4_NUTTX */
#endif /* !defined (__PX4_NUTTX) || (defined (CONFIG_NET) && defined (__PX4_NUTTX)) */
return ret;
}