Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions ublox_gps/include/ublox_gps/async_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

#include <asio/buffer.hpp>
#include <asio/error_code.hpp>
#include <asio/io_service.hpp>
#include <asio/io_context.hpp>
#include <asio/placeholders.hpp>
#include <asio/write.hpp>
#include <asio/ip/udp.hpp>
Expand All @@ -59,11 +59,11 @@ class AsyncWorker final : public Worker {
/**
* @brief Construct an Asynchronous I/O worker.
* @param stream the stream for th I/O service
* @param io_service the I/O service
* @param io_context the I/O service
* @param buffer_size the size of the input and output buffers
*/
explicit AsyncWorker(std::shared_ptr<StreamT> stream,
std::shared_ptr<asio::io_service> io_service,
std::shared_ptr<asio::io_context> io_context,
std::size_t buffer_size,
int debug,
const rclcpp::Logger& logger);
Expand Down Expand Up @@ -124,7 +124,7 @@ class AsyncWorker final : public Worker {
void doClose();

std::shared_ptr<StreamT> stream_; //!< The I/O stream
std::shared_ptr<asio::io_service> io_service_; //!< The I/O service
std::shared_ptr<asio::io_context> io_context_; //!< The I/O service

std::mutex read_mutex_; //!< Lock for the input buffer
std::condition_variable read_condition_;
Expand All @@ -150,24 +150,24 @@ class AsyncWorker final : public Worker {

template <typename StreamT>
AsyncWorker<StreamT>::AsyncWorker(std::shared_ptr<StreamT> stream,
std::shared_ptr<asio::io_service> io_service,
std::shared_ptr<asio::io_context> io_context,
std::size_t buffer_size,
int debug,
const rclcpp::Logger& logger)
: stream_(stream), io_service_(io_service), in_buffer_size_(0), stopping_(false), debug_(debug), logger_(logger) {
: stream_(stream), io_context_(io_context), in_buffer_size_(0), stopping_(false), debug_(debug), logger_(logger) {
in_.resize(buffer_size);

out_.reserve(buffer_size);

io_service_->post(std::bind(&AsyncWorker<StreamT>::doRead, this));
background_thread_ = std::make_shared<std::thread>([this]{ io_service_->run(); });
asio::post(io_context_->get_executor(), std::bind(&AsyncWorker<StreamT>::doRead, this));
background_thread_ = std::make_shared<std::thread>([this]{ io_context_->run(); });
}

template <typename StreamT>
AsyncWorker<StreamT>::~AsyncWorker() {
io_service_->post(std::bind(&AsyncWorker<StreamT>::doClose, this));
asio::post(io_context_->get_executor(), std::bind(&AsyncWorker<StreamT>::doClose, this));
background_thread_->join();
//io_service_->reset();
//io_context_->reset();
}

template <typename StreamT>
Expand All @@ -185,7 +185,7 @@ bool AsyncWorker<StreamT>::send(const unsigned char* data,
}
out_.insert(out_.end(), data, data + size);

io_service_->post(std::bind(&AsyncWorker<StreamT>::doWrite, this));
asio::post(io_context_->get_executor(), std::bind(&AsyncWorker<StreamT>::doWrite, this));
return true;
}

Expand Down Expand Up @@ -314,7 +314,7 @@ void AsyncWorker<StreamT>::readEnd(const asio::error_code& error,
}

if (!stopping_) {
io_service_->post(std::bind(&AsyncWorker<StreamT>::doRead, this));
asio::post(io_context_->get_executor(), std::bind(&AsyncWorker<StreamT>::doRead, this));
}
}

Expand Down
59 changes: 30 additions & 29 deletions ublox_gps/src/gps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
#include <stdexcept>
#include <thread>

#include <asio/io_service.hpp>
#include <asio/connect.hpp>
#include <asio/io_context.hpp>
#include <asio/serial_port.hpp>
#include <asio/serial_port_base.hpp>
#include <asio/ip/tcp.hpp>
Expand Down Expand Up @@ -129,8 +130,8 @@ void Gps::processUpdSosAck(const ublox_msgs::msg::UpdSOSAck &m) {
void Gps::initializeSerial(const std::string & port, unsigned int baudrate,
uint16_t uart_in, uint16_t uart_out) {
port_ = port;
auto io_service = std::make_shared<asio::io_service>();
auto serial = std::make_shared<asio::serial_port>(*io_service);
auto io_context = std::make_shared<asio::io_context>();
auto serial = std::make_shared<asio::serial_port>(*io_context);

// open serial port
try {
Expand All @@ -152,7 +153,7 @@ void Gps::initializeSerial(const std::string & port, unsigned int baudrate,
if (worker_) {
return;
}
setWorker(std::make_shared<AsyncWorker<asio::serial_port>>(serial, io_service, 8192, debug_, logger_));
setWorker(std::make_shared<AsyncWorker<asio::serial_port>>(serial, io_context, 8192, debug_, logger_));

configured_ = false;

Expand Down Expand Up @@ -185,8 +186,8 @@ void Gps::initializeSerial(const std::string & port, unsigned int baudrate,
}

void Gps::resetSerial(const std::string & port) {
auto io_service = std::make_shared<asio::io_service>();
auto serial = std::make_shared<asio::serial_port>(*io_service);
auto io_context = std::make_shared<asio::io_context>();
auto serial = std::make_shared<asio::serial_port>(*io_context);

// open serial port
try {
Expand All @@ -202,7 +203,7 @@ void Gps::resetSerial(const std::string & port) {
if (worker_) {
return;
}
setWorker(std::make_shared<AsyncWorker<asio::serial_port>>(serial, io_service, 8192, debug_, logger_));
setWorker(std::make_shared<AsyncWorker<asio::serial_port>>(serial, io_context, 8192, debug_, logger_));
configured_ = false;

// Poll UART PRT Config
Expand All @@ -227,69 +228,69 @@ void Gps::resetSerial(const std::string & port) {
void Gps::initializeTcp(const std::string & host, const std::string & port) {
host_ = host;
port_ = port;
auto io_service = std::make_shared<asio::io_service>();
asio::ip::tcp::resolver::iterator endpoint;
auto io_context = std::make_shared<asio::io_context>();
asio::ip::tcp::resolver::results_type endpoint;

try {
asio::ip::tcp::resolver resolver(*io_service);
asio::ip::tcp::resolver resolver(*io_context);
endpoint =
resolver.resolve(asio::ip::tcp::resolver::query(host, port));
resolver.resolve(host, port);
} catch (const std::runtime_error& e) {
throw std::runtime_error("U-Blox: Could not resolve" + host + " " +
port + " " + e.what());
}

auto socket = std::make_shared<asio::ip::tcp::socket>(*io_service);
auto socket = std::make_shared<asio::ip::tcp::socket>(*io_context);

try {
socket->connect(*endpoint);
asio::connect(*socket, endpoint);
} catch (const std::runtime_error& e) {
throw std::runtime_error("U-Blox: Could not connect to " +
endpoint->host_name() + ":" +
endpoint->service_name() + ": " + e.what());
endpoint.begin()->host_name() + ":" +
endpoint.begin()->service_name() + ": " + e.what());
}

RCLCPP_INFO(logger_, "U-Blox: Connected to %s:%s.", endpoint->host_name().c_str(),
endpoint->service_name().c_str());
RCLCPP_INFO(logger_, "U-Blox: Connected to %s:%s.", endpoint.begin()->host_name().c_str(),
endpoint.begin()->service_name().c_str());

if (worker_) {
return;
}
setWorker(std::make_shared<AsyncWorker<asio::ip::tcp::socket>>(socket, io_service, 8192, debug_, logger_));
setWorker(std::make_shared<AsyncWorker<asio::ip::tcp::socket>>(socket, io_context, 8192, debug_, logger_));
}

void Gps::initializeUdp(const std::string & host, const std::string & port) {
host_ = host;
port_ = port;
auto io_service = std::make_shared<asio::io_service>();
asio::ip::udp::resolver::iterator endpoint;
auto io_context = std::make_shared<asio::io_context>();
asio::ip::udp::resolver::results_type endpoint;

try {
asio::ip::udp::resolver resolver(*io_service);
asio::ip::udp::resolver resolver(*io_context);
endpoint =
resolver.resolve(asio::ip::udp::resolver::query(host, port));
resolver.resolve(host, port);
} catch (const std::runtime_error& e) {
throw std::runtime_error("U-Blox: Could not resolve" + host + " " +
port + " " + e.what());
}

auto socket = std::make_shared<asio::ip::udp::socket>(*io_service);
auto socket = std::make_shared<asio::ip::udp::socket>(*io_context);

try {
socket->connect(*endpoint);
asio::connect(*socket, endpoint);
} catch (const std::runtime_error& e) {
throw std::runtime_error("U-Blox: Could not connect to " +
endpoint->host_name() + ":" +
endpoint->service_name() + ": " + e.what());
endpoint.begin()->host_name() + ":" +
endpoint.begin()->service_name() + ": " + e.what());
}

RCLCPP_INFO(logger_, "U-Blox: Connected to %s:%s.", endpoint->host_name().c_str(),
endpoint->service_name().c_str());
RCLCPP_INFO(logger_, "U-Blox: Connected to %s:%s.", endpoint.begin()->host_name().c_str(),
endpoint.begin()->service_name().c_str());

if (worker_) {
return;
}
setWorker(std::make_shared<AsyncWorker<asio::ip::udp::socket>>(socket, io_service, 8192, debug_, logger_));
setWorker(std::make_shared<AsyncWorker<asio::ip::udp::socket>>(socket, io_context, 8192, debug_, logger_));
}

void Gps::close() {
Expand Down