Commit c0048557 authored by Christian Sponfeldner's avatar Christian Sponfeldner

rework emros_dispatcher: Implement state machine for connection handling

parent 22d63b03
Subproject commit 9e946c8005ed282c652a8d5a650ab5fb404bc258
Subproject commit 518eed7beb3d07d7a392a5b3c81dd22e0d8aa34d
......@@ -14,13 +14,15 @@
#include "emros_dispatcher/eth.h"
#include "utility_ofc/ethprotocol.h"
#include "emros_common/emros_commands.h"
#include "emros_dispatcher/udpsocket.h"
#include "iostream"
#include "utility_ofc/timeout.h"
#include "utility_ofc/leaky_bucket_timeout.h"
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include "time_stamp_helper.h"
#include "emros_dispatcher/udpsocket.h"
#include "emros_dispatcher/time_stamp_helper.h"
using namespace EMROSCmdType;
......@@ -31,25 +33,54 @@ public:
~Dispatcher();
private:
static constexpr double READ_SOCKET_INTERVAL_SEC_ = 0.0025;
static constexpr double ACCEPTED_DRIFT_OF_TIME_AXIS_SEC_ = 0.02;
static constexpr double ACCEPTED_DRIFT_OF_TIME_AXIS_SEC_ = 0.04;
static constexpr double INTERVAL_CONNECTION_MANAGEMENT_SEC_ = 0.01;
static constexpr double INTERVAL_HEARTBEAT_SEC = Ethprotocol::GetDownlinkHeartbeatIntervalInSec();
static constexpr double INTERVAL_READ_SOCKET_SEC_ = 0.01;
static constexpr double INTERVAL_CONNECTION_BUILD_UP_CYCLES_SEC_ = 0.1;
static constexpr unsigned int CYCLES_BETWEEN_CONNECTION_ATTEMPTS_ = 10;
void checkForNewEthMSG(Ethprotocol::DataframeRxBuffer& rx_data);
void disableSync();
void dispatcherRxCallback(const emros_dispatcher::eth::ConstPtr&);
ros::Timer createTimerNoAutostart(double interval_sec, void (Dispatcher::*callback)(const ros::TimerEvent&));
void dispatcherTxCallback(const emros_dispatcher::eth::ConstPtr&);
void enableSync();
void generateDownlinkHeartbeat(const ros::TimerEvent& event);
void getParam();
void processIncommingSocketData(__attribute__((unused)) const ros::TimerEvent& event);
void processIncommingEtherprotocolFrames(const Ethprotocol::DataframeRxBuffer& rx_data);
void sendEthMsg(Ethprotocol::EthDataframe& data);
void syncTiming();
void enterStateNotConnected();
void enterStateSyncEnabled();
void enterStateTimeAxisSynchronised();
bool executeIfSecurityCommand(const Ethprotocol::EthDataframe&);
bool executeIfServiceCommand(const Ethprotocol::EthDataframe&);
void processConnectionStateMachine(const ros::TimerEvent&);
void pickUpNewEthMsgs(Ethprotocol::DataframeRxBuffer&);
void processCyclicDuties(const ros::TimerEvent&);
void processIncommingSocketData(const ros::TimerEvent&);
void processIncommingEtherprotocolFrame(const Ethprotocol::EthDataframe&);
void processIncommingEtherprotocolFrames(const Ethprotocol::DataframeRxBuffer&);
void processStateNotConnected();
void processStateSyncEnabled();
void processStateTimeAxisSynchronised();
void publishEtherprotocolFrame(const Ethprotocol::EthDataframe&);
void sendDisableSync();
void sendDownlinkHeartbeat();
void sendEthMsg(Ethprotocol::EthDataframe&);
void requestToEnableSync();
void requestToSynchroniseTimeAxis();
void updateMembersViaParameterServer();
enum class ConnectionStates
{
NOT_CONNECTED,
TIME_AXIS_SYNCHRONISED,
SYNC_ENABLED
} connection_state_;
UdpSocket& socket_;
boost::mutex mutex_send_;
ros::Timer timer_downlink_heartbeat_generation_;
ros::Timer timer_cyclic_readout_udpsocket_;
ros::NodeHandle node_handler_;
ros::Subscriber subscriber_dispatcher_tx_;
ros::Publisher publisher_dispatcher_rx_;
ros::Timer timer_cyclic_duties_;
ros::Timer timer_readout_udpsocket_;
ros::Timer timer_state_machine_;
std::string source_ip_; // IP address of this system (ROS-PC)
std::string target_ip_; // IP address of EM5
......@@ -57,11 +88,12 @@ private:
uint16_t target_port_; // port at EM5
bool sync_enabled_;
bool connection_request_failed_;
bool recieved_sync_time_ack_;
bool recieved_sync_enabled_ack_;
bool recieved_disable_sync_;
ros::NodeHandle node_handler_;
ros::Subscriber subscriber_dispatcher_tx_;
ros::Subscriber subscriber_dispatcher_rx_;
ros::Publisher publisher_dispatcher_rx_;
unsigned int cycles_since_connection_request_;
TimeStampHelper time_stamp_helper_;
};
......
This diff is collapsed.
......@@ -49,12 +49,12 @@ void TimeStampHelper::warnAboutAnyNonUniformTimestamps(const Ethprotocol::Datafr
std::stringstream times;
bool non_uniform_timestamp_detected = false;
for (size_t i = 0; i < rx_data.pos; i++)
for (size_t i = 0; i < rx_data.number_of_frames; i++)
{
ids << rx_data.dataframe[i].getData().id << ",";
times << rx_data.dataframe[i].getData().time << ",";
ids << rx_data.dataframes[i].getData().id << ",";
times << rx_data.dataframes[i].getData().time << ",";
if (rx_data.dataframe[i].getData().time != rx_data.dataframe[0].getData().time)
if (rx_data.dataframes[i].getData().time != rx_data.dataframes[0].getData().time)
{
non_uniform_timestamp_detected = true;
}
......
......@@ -56,8 +56,13 @@ int UdpSocket::open_socket(uint16_t target_port, const char* target_ip, uint16_t
ssize_t UdpSocket::send_udp(unsigned char* data, size_t length)
{
return sendto(socket_file_descriptor_, data, length, 0, reinterpret_cast<struct sockaddr*>(&target_address_),
sizeof(target_address_));
ssize_t return_code = sendto(socket_file_descriptor_, data, length, 0,
reinterpret_cast<struct sockaddr*>(&target_address_), sizeof(target_address_));
if (return_code == -1)
{
ROS_WARN("Failed to sent UDP message.");
}
return return_code;
}
void UdpSocket::listen_port(Ethprotocol::StreamBuffer& stream_buffer)
......
Subproject commit d4506cc9c4d8fcd16f9f9d1d483c5964119c77e0
Subproject commit 1a2a1c18d8dc9bc5cec11a07ed0cc3bdeb382cc7
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment