18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_ 19 #define IGNITION_TRANSPORT_DISCOVERY_HH_ 33 #include <sys/types.h> 35 #include <sys/socket.h> 39 #include <arpa/inet.h> 43 #include <netinet/in.h> 49 #pragma warning(push, 0) 54 #pragma warning(disable: 4503) 56 #pragma warning(disable: 4996) 60 #include <condition_variable> 68 #include "ignition/transport/config.hh" 69 #include "ignition/transport/Export.hh" 82 inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
101 template<
typename Pub>
111 const bool _verbose =
false)
115 silenceInterval(kDefSilenceInterval),
116 activityInterval(kDefActivityInterval),
117 heartbeatInterval(kDefHeartbeatInterval),
118 connectionCb(nullptr),
119 disconnectionCb(nullptr),
122 numHeartbeatsUninitialized(0),
127 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
128 this->hostInterfaces = {ignIp};
136 WORD wVersionRequested;
140 wVersionRequested = MAKEWORD(2, 2);
142 if (WSAStartup(wVersionRequested, &wsaData) != 0)
148 for (
const auto &netIface : this->hostInterfaces)
150 auto succeed = this->RegisterNetIface(netIface);
155 if (netIface == this->hostAddr && !succeed)
157 this->RegisterNetIface(
"127.0.0.1");
158 std::cerr <<
"Did you set the environment variable IGN_IP with a " 160 <<
" [" << netIface <<
"] seems an invalid local IP " 162 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
163 this->hostAddr =
"127.0.0.1";
172 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
173 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
175 std::cerr <<
"Error setting socket option (SO_REUSEADDR)." 187 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
188 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
190 std::cerr <<
"Error setting socket option (SO_REUSEPORT)." 196 sockaddr_in localAddr;
197 memset(&localAddr, 0,
sizeof(localAddr));
198 localAddr.sin_family = AF_INET;
199 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
200 localAddr.sin_port = htons(static_cast<u_short>(this->port));
202 if (bind(this->sockets.at(0),
203 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
210 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
211 this->mcastAddr.sin_family = AF_INET;
212 this->mcastAddr.sin_addr.s_addr =
213 inet_addr(this->kMulticastGroup.c_str());
214 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
217 this->PrintCurrentState();
224 this->exitMutex.lock();
226 this->exitMutex.unlock();
229 if (this->threadReception.joinable())
230 this->threadReception.join();
238 for (
const auto &sock : this->sockets)
261 this->enabled =
true;
265 this->timeNextHeartbeat = now;
266 this->timeNextActivity = now;
269 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
285 if (!this->info.AddPublisher(_publisher))
292 this->SendMsg(
AdvType, _publisher);
319 cb = this->connectionCb;
323 pub.SetTopic(_topic);
324 pub.SetPUuid(this->pUuid);
331 found = this->info.Publishers(_topic, addresses);
337 for (
const auto &proc : addresses)
339 for (
const auto &node : proc.second)
371 return this->info.Publishers(_topic, _publishers);
392 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
396 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
412 return this->hostAddr;
422 return this->activityInterval;
433 return this->heartbeatInterval;
443 return this->silenceInterval;
452 this->activityInterval = _ms;
461 this->heartbeatInterval = _ms;
470 this->silenceInterval = _ms;
480 this->connectionCb = _cb;
490 this->disconnectionCb = _cb;
504 std::cout <<
"\tActivity: " << this->activityInterval
506 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
508 std::cout <<
"\tSilence: " << this->silenceInterval
517 if (this->activity.empty())
521 for (
auto &proc : this->activity)
541 this->info.TopicList(_topics);
550 if (!this->initialized)
552 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
559 private:
void UpdateActivity()
572 if (now < this->timeNextActivity)
575 disconnectCb = this->disconnectionCb;
577 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
580 auto elapsed = now - it->second;
583 if (std::chrono::duration_cast<std::chrono::milliseconds>
584 (elapsed).count() > this->silenceInterval)
587 this->info.DelPublishersByProc(it->first);
592 this->activity.
erase(it++);
608 for (
auto const &uuid : uuids)
611 publisher.SetPUuid(uuid);
612 disconnectCb(publisher);
617 private:
void UpdateHeartbeat()
624 if (now < this->timeNextHeartbeat)
636 this->info.PublishersByProc(this->pUuid, nodes);
639 for (
const auto &topic : nodes)
641 for (
const auto &node : topic.second)
647 if (!this->initialized)
649 ++this->numHeartbeatsUninitialized;
650 if (this->numHeartbeatsUninitialized == 2)
654 this->initialized =
true;
657 this->initializedCv.notify_all();
675 private:
int NextTimeout()
const 678 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
679 auto timeUntilNextActivity = this->timeNextActivity - now;
681 int t =
static_cast<int>(
683 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
684 int t2 =
std::min(t, this->kTimeout);
689 private:
void RecvMessages()
691 bool timeToExit =
false;
695 int timeout = this->NextTimeout();
699 this->RecvDiscoveryUpdate();
702 this->PrintCurrentState();
705 this->UpdateHeartbeat();
706 this->UpdateActivity();
718 private:
void RecvDiscoveryUpdate()
720 char rcvStr[Discovery::kMaxRcvStr];
723 sockaddr_in clntAddr;
724 socklen_t addrLen =
sizeof(clntAddr);
726 if ((recvfrom(this->sockets.at(0),
727 reinterpret_cast<raw_type *
>(rcvStr),
729 reinterpret_cast<sockaddr *>(&clntAddr),
730 reinterpret_cast<socklen_t *
>(&addrLen))) < 0)
732 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error" 736 srcAddr = inet_ntoa(clntAddr.sin_addr);
737 srcPort = ntohs(clntAddr.sin_port);
741 std::cout <<
"\nReceived discovery update from " << srcAddr <<
": " 745 this->DispatchDiscoveryMsg(srcAddr, rcvStr);
752 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
763 if (this->kWireVersion != header.
Version())
766 auto recvPUuid = header.
PUuid();
769 if (recvPUuid == this->pUuid)
778 connectCb = this->connectionCb;
779 disconnectCb = this->disconnectionCb;
782 switch (header.
Type())
793 _fromIp != this->hostAddr))
802 added = this->info.AddPublisher(advMsg.
Publisher());
805 if (added && connectCb)
818 auto recvTopic = subMsg.
Topic();
824 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
829 if (!this->info.Publishers(recvTopic, addresses))
833 for (
const auto &nodeInfo : addresses[this->pUuid])
838 _fromIp != this->hostAddr))
844 this->SendMsg(
AdvType, nodeInfo);
859 this->activity.erase(recvPUuid);
865 pub.SetPUuid(recvPUuid);
873 this->info.DelPublishersByProc(recvPUuid);
887 _fromIp != this->hostAddr))
901 this->info.DelPublisherByNode(advMsg.
Publisher().Topic(),
909 std::cerr <<
"Unknown message type [" << header.
Type() <<
"]\n";
921 private:
template<
typename T>
922 void SendMsg(
const uint8_t _type,
924 const uint16_t _flags = 0)
const 927 Header header(this->Version(), _pub.PUuid(), _type, _flags);
943 advMsg.
Pack(reinterpret_cast<char*>(&buffer[0]));
944 msgLength =
static_cast<int>(advMsg.
MsgLength());
954 subMsg.
Pack(reinterpret_cast<char*>(&buffer[0]));
955 msgLength =
static_cast<int>(subMsg.
MsgLength());
962 buffer.
resize(header.HeaderLength());
963 header.Pack(reinterpret_cast<char*>(&buffer[0]));
964 msgLength = header.HeaderLength();
968 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message" 969 <<
" type [" << _type <<
"]" <<
std::endl;
975 for (
const auto &sock : this->Sockets())
977 if (sendto(sock, reinterpret_cast<const raw_type *>(
978 reinterpret_cast<unsigned char*>(&buffer[0])),
980 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
981 sizeof(*(this->MulticastAddr()))) != msgLength)
991 if (errno != EPERM && errno != ENOBUFS)
993 std::cerr <<
"Exception sending a message:" 1000 if (this->Verbose())
1003 <<
" msg [" << topic <<
"]" <<
std::endl;
1011 return this->sockets;
1016 private:
const sockaddr_in *MulticastAddr()
const 1018 return &this->mcastAddr;
1023 private:
bool Verbose()
const 1025 return this->verbose;
1030 private: uint8_t Version()
const 1032 return this->kWireVersion;
1039 private:
bool RegisterNetIface(
const std::string &_ip)
1042 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1052 struct in_addr ifAddr;
1053 ifAddr.s_addr = inet_addr(_ip.
c_str());
1054 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1055 reinterpret_cast<const char*>(&ifAddr),
sizeof(ifAddr)) != 0)
1057 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)." 1062 this->sockets.push_back(sock);
1067 struct ip_mreq group;
1068 group.imr_multiaddr.s_addr =
1069 inet_addr(this->kMulticastGroup.c_str());
1070 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1071 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1072 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1074 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)." 1085 private:
static const unsigned int kDefActivityInterval = 100;
1090 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1095 private:
static const unsigned int kDefSilenceInterval = 3000;
1098 private:
const std::string kMulticastGroup =
"224.0.0.7";
1101 private:
const int kTimeout = 250;
1104 private:
static const int kMaxRcvStr = 65536;
1108 private:
static const uint8_t kWireVersion = 8;
1125 private:
unsigned int silenceInterval;
1130 private:
unsigned int activityInterval;
1135 private:
unsigned int heartbeatInterval;
1153 private:
bool verbose;
1159 private: sockaddr_in mcastAddr;
1180 private:
bool initialized;
1183 private:
unsigned int numHeartbeatsUninitialized;
1192 private:
bool enabled;
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:381
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1150
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:42
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Topic/service only available to subscribers in the same machine as the publisher. ...
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into a Sub.
Subscription packet used in the discovery protocol for requesting information about a given topic...
Definition: Packet.hh:171
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:252
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:546
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into an AdvertiseMessage.
Definition: Packet.hh:328
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:430
static const uint8_t HeartbeatType
Definition: Packet.hh:42
void raw_type
Definition: Discovery.hh:45
static const uint8_t SubType
Definition: Packet.hh:40
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:102
size_t Pack(char *_buffer) const
Serialize the subscription message.
static const uint8_t ByeType
Definition: Packet.hh:43
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:537
size_t MsgLength() const
Get the total length of the message.
Definition: Packet.hh:301
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:467
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
static const uint8_t UnadvType
Definition: Packet.hh:41
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:221
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
size_t MsgLength() const
Get the total length of the message.
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:109
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:357
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:367
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:409
T & Publisher()
Get the publisher of this message.
Definition: Packet.hh:278
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:419
static const uint8_t AdvType
Definition: Packet.hh:39
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:440
Advertise packet used in the discovery protocol to broadcast information about the node advertising a...
Definition: Packet.hh:252
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:48
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:494
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:487
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:477
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:276
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:449
Definition: AdvertiseOptions.hh:28
std::string Topic() const
Get the topic.
size_t Pack(char *_buffer) const
Serialize the advertise message.
Definition: Packet.hh:309
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:458
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:307