18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_ 19 #define IGNITION_TRANSPORT_DISCOVERY_HH_ 33 #include <sys/types.h> 35 #include <sys/socket.h> 39 #include <arpa/inet.h> 43 #include <netinet/in.h> 49 #pragma warning(push, 0) 54 #pragma warning(disable: 4503) 56 #pragma warning(disable: 4996) 60 #include <condition_variable> 68 #include "ignition/transport/config.hh" 69 #include "ignition/transport/Export.hh" 82 inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
112 template<
typename Pub>
122 const bool _verbose =
false)
126 silenceInterval(kDefSilenceInterval),
127 activityInterval(kDefActivityInterval),
128 heartbeatInterval(kDefHeartbeatInterval),
129 connectionCb(nullptr),
130 disconnectionCb(nullptr),
133 numHeartbeatsUninitialized(0),
138 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
139 this->hostInterfaces = {ignIp};
147 WORD wVersionRequested;
151 wVersionRequested = MAKEWORD(2, 2);
153 if (WSAStartup(wVersionRequested, &wsaData) != 0)
159 for (
const auto &netIface : this->hostInterfaces)
161 auto succeed = this->RegisterNetIface(netIface);
166 if (netIface == this->hostAddr && !succeed)
168 this->RegisterNetIface(
"127.0.0.1");
169 std::cerr <<
"Did you set the environment variable IGN_IP with a " 171 <<
" [" << netIface <<
"] seems an invalid local IP " 173 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
174 this->hostAddr =
"127.0.0.1";
183 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
184 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
186 std::cerr <<
"Error setting socket option (SO_REUSEADDR)." 198 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
199 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
201 std::cerr <<
"Error setting socket option (SO_REUSEPORT)." 207 sockaddr_in localAddr;
208 memset(&localAddr, 0,
sizeof(localAddr));
209 localAddr.sin_family = AF_INET;
210 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
211 localAddr.sin_port = htons(static_cast<u_short>(this->port));
213 if (bind(this->sockets.at(0),
214 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
221 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
222 this->mcastAddr.sin_family = AF_INET;
223 this->mcastAddr.sin_addr.s_addr =
224 inet_addr(this->kMulticastGroup.c_str());
225 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
229 if (
env(
"IGN_RELAY", ignRelay) && !ignRelay.
empty())
235 for (
auto const &relayAddr : relays)
236 this->AddRelayAddress(relayAddr);
239 this->PrintCurrentState();
246 this->exitMutex.lock();
248 this->exitMutex.unlock();
251 if (this->threadReception.joinable())
252 this->threadReception.join();
260 for (
const auto &sock : this->sockets)
283 this->enabled =
true;
287 this->timeNextHeartbeat = now;
288 this->timeNextActivity = now;
291 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
307 if (!this->info.AddPublisher(_publisher))
341 cb = this->connectionCb;
345 pub.SetTopic(_topic);
346 pub.SetPUuid(this->pUuid);
353 found = this->info.Publishers(_topic, addresses);
359 for (
const auto &proc : addresses)
361 for (
const auto &node : proc.second)
393 return this->info.Publishers(_topic, _publishers);
414 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
418 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
434 return this->hostAddr;
444 return this->activityInterval;
455 return this->heartbeatInterval;
465 return this->silenceInterval;
474 this->activityInterval = _ms;
483 this->heartbeatInterval = _ms;
492 this->silenceInterval = _ms;
502 this->connectionCb = _cb;
512 this->disconnectionCb = _cb;
526 std::cout <<
"\tActivity: " << this->activityInterval
528 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
530 std::cout <<
"\tSilence: " << this->silenceInterval
539 if (this->activity.empty())
543 for (
auto &proc : this->activity)
563 this->info.TopicList(_topics);
572 if (!this->initialized)
574 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
581 private:
void UpdateActivity()
594 if (now < this->timeNextActivity)
597 disconnectCb = this->disconnectionCb;
599 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
602 auto elapsed = now - it->second;
605 if (std::chrono::duration_cast<std::chrono::milliseconds>
606 (elapsed).count() > this->silenceInterval)
609 this->info.DelPublishersByProc(it->first);
614 this->activity.
erase(it++);
630 for (
auto const &uuid : uuids)
633 publisher.SetPUuid(uuid);
634 disconnectCb(publisher);
639 private:
void UpdateHeartbeat()
646 if (now < this->timeNextHeartbeat)
658 this->info.PublishersByProc(this->pUuid, nodes);
661 for (
const auto &topic : nodes)
663 for (
const auto &node : topic.second)
669 if (!this->initialized)
671 ++this->numHeartbeatsUninitialized;
672 if (this->numHeartbeatsUninitialized == 2)
676 this->initialized =
true;
679 this->initializedCv.notify_all();
697 private:
int NextTimeout()
const 700 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
701 auto timeUntilNextActivity = this->timeNextActivity - now;
703 int t =
static_cast<int>(
705 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
706 int t2 =
std::min(t, this->kTimeout);
711 private:
void RecvMessages()
713 bool timeToExit =
false;
717 int timeout = this->NextTimeout();
721 this->RecvDiscoveryUpdate();
724 this->PrintCurrentState();
727 this->UpdateHeartbeat();
728 this->UpdateActivity();
740 private:
void RecvDiscoveryUpdate()
742 char rcvStr[Discovery::kMaxRcvStr];
745 sockaddr_in clntAddr;
746 socklen_t addrLen =
sizeof(clntAddr);
748 if ((recvfrom(this->sockets.at(0),
749 reinterpret_cast<raw_type *
>(rcvStr),
751 reinterpret_cast<sockaddr *>(&clntAddr),
752 reinterpret_cast<socklen_t *
>(&addrLen))) < 0)
754 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error" 758 srcAddr = inet_ntoa(clntAddr.sin_addr);
759 srcPort = ntohs(clntAddr.sin_port);
763 std::cout <<
"\nReceived discovery update from " << srcAddr <<
": " 767 this->DispatchDiscoveryMsg(srcAddr, rcvStr);
774 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
779 memcpy(&len, _msg,
sizeof(len));
782 char *headerPtr = _msg +
sizeof(len);
787 if (this->kWireVersion != header.Version())
790 auto recvPUuid = header.PUuid();
793 if (recvPUuid == this->pUuid)
796 uint16_t flags = header.Flags();
812 header.SetFlags(flags);
813 header.Pack(headerPtr);
814 this->SendBytesMulticast(_msg, len);
818 this->AddRelayAddress(_fromIp);
829 header.SetFlags(flags);
830 header.Pack(headerPtr);
831 this->SendBytesUnicast(_msg, len);
840 connectCb = this->connectionCb;
841 disconnectCb = this->disconnectionCb;
844 char *pBody = headerPtr + header.HeaderLength();
846 switch (header.Type())
857 _fromIp != this->hostAddr))
866 added = this->info.AddPublisher(advMsg.
Publisher());
869 if (added && connectCb)
882 auto recvTopic = subMsg.
Topic();
888 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
893 if (!this->info.Publishers(recvTopic, addresses))
897 for (
const auto &nodeInfo : addresses[this->pUuid])
902 _fromIp != this->hostAddr))
923 this->activity.erase(recvPUuid);
929 pub.SetPUuid(recvPUuid);
937 this->info.DelPublishersByProc(recvPUuid);
951 _fromIp != this->hostAddr))
965 this->info.DelPublisherByNode(advMsg.
Publisher().Topic(),
973 std::cerr <<
"Unknown message type [" << header.Type() <<
"]\n";
985 private:
template<
typename T>
989 const uint16_t _flags = 0)
const 992 Header header(this->Version(), _pub.PUuid(), _type, _flags);
993 uint16_t lengthField = 0u;
1008 advMsg.
Pack(reinterpret_cast<char*>(&buffer[
sizeof(lengthField)]));
1018 subMsg.
Pack(reinterpret_cast<char*>(&buffer[
sizeof(lengthField)]));
1025 buffer.
resize(header.HeaderLength() +
sizeof(lengthField));
1026 header.Pack(reinterpret_cast<char*>(&buffer[
sizeof(lengthField)]));
1030 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message" 1031 <<
" type [" << _type <<
"]" <<
std::endl;
1035 lengthField =
static_cast<uint16_t
>(buffer.
size());
1036 memcpy(&buffer[0], &lengthField,
sizeof(lengthField));
1037 char *headerPtr = &buffer[0] +
sizeof(lengthField);
1042 this->SendBytesMulticast(&buffer[0], buffer.
size());
1050 uint16_t flags = header.Flags();
1052 header.SetFlags(flags);
1053 header.Pack(headerPtr);
1054 this->SendBytesUnicast(&buffer[0], buffer.
size());
1060 <<
" msg [" << topic <<
"]" <<
std::endl;
1067 private:
void SendBytesUnicast(
char *_buffer,
1068 uint16_t _len)
const 1071 for (
const auto &sockAddr : this->relayAddrs)
1073 auto sent = sendto(this->sockets.at(0),
1074 reinterpret_cast<const raw_type *
>(
1075 reinterpret_cast<const unsigned char*
>(_buffer)),
1077 reinterpret_cast<const sockaddr *
>(&sockAddr),
1091 private:
void SendBytesMulticast(
char *_buffer,
1092 uint16_t _len)
const 1096 for (
const auto &sock : this->Sockets())
1098 if (sendto(sock, reinterpret_cast<const raw_type *>(
1099 reinterpret_cast<const unsigned char*>(_buffer)),
1101 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1102 sizeof(*(this->MulticastAddr()))) != _len)
1112 if (errno != EPERM && errno != ENOBUFS)
1114 std::cerr <<
"Exception sending a multicast message:" 1126 return this->sockets;
1131 private:
const sockaddr_in *MulticastAddr()
const 1133 return &this->mcastAddr;
1138 private: uint8_t Version()
const 1140 return this->kWireVersion;
1147 private:
bool RegisterNetIface(
const std::string &_ip)
1150 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1160 struct in_addr ifAddr;
1161 ifAddr.s_addr = inet_addr(_ip.
c_str());
1162 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1163 reinterpret_cast<const char*>(&ifAddr),
sizeof(ifAddr)) != 0)
1165 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)." 1170 this->sockets.push_back(sock);
1175 struct ip_mreq group;
1176 group.imr_multiaddr.s_addr =
1177 inet_addr(this->kMulticastGroup.c_str());
1178 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1179 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1180 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1182 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)." 1192 private:
void AddRelayAddress(
const std::string &_ip)
1195 for (
auto const &addr : this->relayAddrs)
1197 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
1202 memset(&addr, 0,
sizeof(addr));
1203 addr.sin_family = AF_INET;
1204 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
1205 addr.sin_port = htons(static_cast<u_short>(this->port));
1207 this->relayAddrs.push_back(addr);
1213 private:
static const unsigned int kDefActivityInterval = 100;
1218 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1223 private:
static const unsigned int kDefSilenceInterval = 3000;
1226 private:
const std::string kMulticastGroup =
"224.0.0.7";
1229 private:
const int kTimeout = 250;
1232 private:
static const int kMaxRcvStr = 65536;
1236 private:
static const uint8_t kWireVersion = 9;
1253 private:
unsigned int silenceInterval;
1258 private:
unsigned int activityInterval;
1263 private:
unsigned int heartbeatInterval;
1281 private:
bool verbose;
1287 private: sockaddr_in mcastAddr;
1311 private:
bool initialized;
1314 private:
unsigned int numHeartbeatsUninitialized;
1323 private:
bool enabled;
static const uint16_t FlagRelay
Definition: Packet.hh:48
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:403
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1278
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:42
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Send data via multicast only.
Topic/service only available to subscribers in the same machine as the publisher. ...
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into a Sub.
Subscription packet used in the discovery protocol for requesting information about a given topic...
Definition: Packet.hh:177
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:274
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:568
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into an AdvertiseMessage.
Definition: Packet.hh:334
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:452
static const uint8_t HeartbeatType
Definition: Packet.hh:42
void raw_type
Definition: Discovery.hh:45
static const uint8_t SubType
Definition: Packet.hh:40
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:113
size_t Pack(char *_buffer) const
Serialize the subscription message.
static const uint8_t ByeType
Definition: Packet.hh:43
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:559
size_t MsgLength() const
Get the total length of the message.
Definition: Packet.hh:307
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:489
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
static const uint8_t UnadvType
Definition: Packet.hh:41
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:243
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
size_t MsgLength() const
Get the total length of the message.
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:120
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:379
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:389
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:431
T & Publisher()
Get the publisher of this message.
Definition: Packet.hh:284
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:441
static const uint8_t AdvType
Definition: Packet.hh:39
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:462
Advertise packet used in the discovery protocol to broadcast information about the node advertising a...
Definition: Packet.hh:258
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:54
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:516
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:509
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:499
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:298
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:84
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:471
Send data via unicast only.
Definition: AdvertiseOptions.hh:28
std::string Topic() const
Get the topic.
size_t Pack(char *_buffer) const
Serialize the advertise message.
Definition: Packet.hh:315
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:480
static const uint16_t FlagNoRelay
Definition: Packet.hh:51
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:329