18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_ 19 #define IGNITION_TRANSPORT_DISCOVERY_HH_ 33 #include <sys/types.h> 35 #include <sys/socket.h> 39 #include <arpa/inet.h> 43 #include <netinet/in.h> 49 #pragma warning(push, 0) 54 #pragma warning(disable: 4503) 56 #pragma warning(disable: 4996) 59 #include <ignition/msgs/discovery.pb.h> 62 #include <condition_variable> 71 #include <ignition/msgs/Utility.hh> 73 #include "ignition/transport/config.hh" 74 #include "ignition/transport/Export.hh" 86 inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
116 template<
typename Pub>
126 const bool _verbose =
false)
130 silenceInterval(kDefSilenceInterval),
131 activityInterval(kDefActivityInterval),
132 heartbeatInterval(kDefHeartbeatInterval),
133 connectionCb(nullptr),
134 disconnectionCb(nullptr),
137 numHeartbeatsUninitialized(0),
142 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
143 this->hostInterfaces = {ignIp};
151 WORD wVersionRequested;
155 wVersionRequested = MAKEWORD(2, 2);
157 if (WSAStartup(wVersionRequested, &wsaData) != 0)
163 for (
const auto &netIface : this->hostInterfaces)
165 auto succeed = this->RegisterNetIface(netIface);
170 if (netIface == this->hostAddr && !succeed)
172 this->RegisterNetIface(
"127.0.0.1");
173 std::cerr <<
"Did you set the environment variable IGN_IP with a " 175 <<
" [" << netIface <<
"] seems an invalid local IP " 177 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
178 this->hostAddr =
"127.0.0.1";
187 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
188 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
190 std::cerr <<
"Error setting socket option (SO_REUSEADDR)." 202 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
203 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
205 std::cerr <<
"Error setting socket option (SO_REUSEPORT)." 211 sockaddr_in localAddr;
212 memset(&localAddr, 0,
sizeof(localAddr));
213 localAddr.sin_family = AF_INET;
214 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
215 localAddr.sin_port = htons(static_cast<u_short>(this->port));
217 if (bind(this->sockets.at(0),
218 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
225 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
226 this->mcastAddr.sin_family = AF_INET;
227 this->mcastAddr.sin_addr.s_addr =
228 inet_addr(this->kMulticastGroup.c_str());
229 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
233 if (
env(
"IGN_RELAY", ignRelay) && !ignRelay.
empty())
239 for (
auto const &relayAddr : relays)
240 this->AddRelayAddress(relayAddr);
243 this->PrintCurrentState();
250 this->exitMutex.lock();
252 this->exitMutex.unlock();
255 if (this->threadReception.joinable())
256 this->threadReception.join();
264 for (
const auto &sock : this->sockets)
287 this->enabled =
true;
291 this->timeNextHeartbeat = now;
292 this->timeNextActivity = now;
295 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
311 if (!this->info.AddPublisher(_publisher))
346 cb = this->connectionCb;
350 pub.SetTopic(_topic);
351 pub.SetPUuid(this->pUuid);
358 found = this->info.Publishers(_topic, addresses);
364 for (
const auto &proc : addresses)
366 for (
const auto &node : proc.second)
414 return this->info.Publishers(_topic, _publishers);
435 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
439 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
447 msgs::Discovery::UNADVERTISE, inf);
458 return this->hostAddr;
468 return this->activityInterval;
479 return this->heartbeatInterval;
489 return this->silenceInterval;
498 this->activityInterval = _ms;
507 this->heartbeatInterval = _ms;
516 this->silenceInterval = _ms;
526 this->connectionCb = _cb;
536 this->disconnectionCb = _cb;
545 this->registrationCb = _cb;
554 this->unregistrationCb = _cb;
568 std::cout <<
"\tActivity: " << this->activityInterval
570 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
572 std::cout <<
"\tSilence: " << this->silenceInterval
581 if (this->activity.empty())
585 for (
auto &proc : this->activity)
605 this->info.TopicList(_topics);
614 if (!this->initialized)
616 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
623 private:
void UpdateActivity()
636 if (now < this->timeNextActivity)
639 disconnectCb = this->disconnectionCb;
641 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
644 auto elapsed = now - it->second;
647 if (std::chrono::duration_cast<std::chrono::milliseconds>
648 (elapsed).count() > this->silenceInterval)
651 this->info.DelPublishersByProc(it->first);
656 this->activity.
erase(it++);
672 for (
auto const &uuid : uuids)
675 publisher.SetPUuid(uuid);
676 disconnectCb(publisher);
681 private:
void UpdateHeartbeat()
688 if (now < this->timeNextHeartbeat)
700 this->info.PublishersByProc(this->pUuid, nodes);
703 for (
const auto &topic : nodes)
705 for (
const auto &node : topic.second)
708 msgs::Discovery::ADVERTISE, node);
714 if (!this->initialized)
716 ++this->numHeartbeatsUninitialized;
717 if (this->numHeartbeatsUninitialized == 2)
721 this->initialized =
true;
724 this->initializedCv.notify_all();
742 private:
int NextTimeout()
const 745 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
746 auto timeUntilNextActivity = this->timeNextActivity - now;
748 int t =
static_cast<int>(
750 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
751 int t2 =
std::min(t, this->kTimeout);
756 private:
void RecvMessages()
758 bool timeToExit =
false;
762 int timeout = this->NextTimeout();
766 this->RecvDiscoveryUpdate();
769 this->PrintCurrentState();
772 this->UpdateHeartbeat();
773 this->UpdateActivity();
785 private:
void RecvDiscoveryUpdate()
787 char rcvStr[Discovery::kMaxRcvStr];
788 sockaddr_in clntAddr;
789 socklen_t addrLen =
sizeof(clntAddr);
791 uint16_t received = recvfrom(this->sockets.at(0),
792 reinterpret_cast<raw_type *
>(rcvStr),
794 reinterpret_cast<sockaddr *>(&clntAddr),
795 reinterpret_cast<socklen_t *
>(&addrLen));
799 memcpy(&len, &rcvStr[0],
sizeof(len));
823 if (len +
sizeof(len) == received)
825 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
826 uint16_t srcPort = ntohs(clntAddr.sin_port);
830 std::cout <<
"\nReceived discovery update from " 831 << srcAddr <<
": " << srcPort <<
std::endl;
834 this->DispatchDiscoveryMsg(srcAddr, rcvStr +
sizeof(len), len);
837 else if (received < 0)
839 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error" 848 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
849 char *_msg, uint16_t _len)
851 ignition::msgs::Discovery msg;
856 if (!msg.ParseFromArray(_msg, _len))
860 if (this->Version() != msg.version())
866 if (recvPUuid == this->pUuid)
878 if (msg.has_flags() && msg.flags().relay())
881 msg.mutable_flags()->set_relay(
false);
882 msg.mutable_flags()->set_no_relay(
true);
883 this->SendMulticast(msg);
887 this->AddRelayAddress(_fromIp);
895 else if (!msg.has_flags() || !msg.flags().no_relay())
897 msg.mutable_flags()->set_relay(
true);
898 this->SendUnicast(msg);
909 connectCb = this->connectionCb;
910 disconnectCb = this->disconnectionCb;
911 registerCb = this->registrationCb;
912 unregisterCb = this->unregistrationCb;
917 case msgs::Discovery::ADVERTISE:
921 publisher.SetFromDiscovery(msg);
926 _fromIp != this->hostAddr))
935 added = this->info.AddPublisher(publisher);
938 if (added && connectCb)
941 connectCb(publisher);
946 case msgs::Discovery::SUBSCRIBE:
952 recvTopic = msg.sub().topic();
956 std::cerr <<
"Subscription discovery message is missing " 957 <<
"Subscriber information.\n";
965 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
970 if (!this->info.Publishers(recvTopic, addresses))
974 for (
const auto &nodeInfo : addresses[this->pUuid])
979 _fromIp != this->hostAddr))
986 msgs::Discovery::ADVERTISE, nodeInfo);
991 case msgs::Discovery::NEW_CONNECTION:
995 publisher.SetFromDiscovery(msg);
998 registerCb(publisher);
1002 case msgs::Discovery::END_CONNECTION:
1006 publisher.SetFromDiscovery(msg);
1009 unregisterCb(publisher);
1013 case msgs::Discovery::HEARTBEAT:
1018 case msgs::Discovery::BYE:
1023 this->activity.erase(recvPUuid);
1029 pub.SetPUuid(recvPUuid);
1037 this->info.DelPublishersByProc(recvPUuid);
1042 case msgs::Discovery::UNADVERTISE:
1046 publisher.SetFromDiscovery(msg);
1051 _fromIp != this->hostAddr))
1059 disconnectCb(publisher);
1065 this->info.DelPublisherByNode(publisher.Topic(),
1066 publisher.PUuid(), publisher.NUuid());
1073 std::cerr <<
"Unknown message type [" << msg.type() <<
"].\n";
1085 private:
template<
typename T>
1087 const msgs::Discovery::Type _type,
1088 const T &_pub)
const 1090 ignition::msgs::Discovery discoveryMsg;
1091 discoveryMsg.set_version(this->Version());
1092 discoveryMsg.set_type(_type);
1093 discoveryMsg.set_process_uuid(this->pUuid);
1097 case msgs::Discovery::ADVERTISE:
1098 case msgs::Discovery::UNADVERTISE:
1099 case msgs::Discovery::NEW_CONNECTION:
1100 case msgs::Discovery::END_CONNECTION:
1102 _pub.FillDiscovery(discoveryMsg);
1105 case msgs::Discovery::SUBSCRIBE:
1107 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1110 case msgs::Discovery::HEARTBEAT:
1111 case msgs::Discovery::BYE:
1114 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message" 1115 <<
" type [" << _type <<
"]" <<
std::endl;
1122 this->SendMulticast(discoveryMsg);
1130 discoveryMsg.mutable_flags()->set_relay(
true);
1131 this->SendUnicast(discoveryMsg);
1136 std::cout <<
"\t* Sending " << msgs::ToString(_type)
1137 <<
" msg [" << _pub.Topic() <<
"]" <<
std::endl;
1143 private:
void SendUnicast(
const msgs::Discovery &_msg)
const 1147 #if GOOGLE_PROTOBUF_VERSION >= 3004000 1148 size_t msgSizeFull = _msg.ByteSizeLong();
1150 int msgSizeFull = _msg.ByteSize();
1152 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1154 std::cerr <<
"Discovery message too large to send. Discovery won't " 1155 <<
"work. This shouldn't happen.\n";
1158 msgSize = msgSizeFull;
1160 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1161 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1162 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1164 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1167 for (
const auto &sockAddr : this->relayAddrs)
1169 auto sent = sendto(this->sockets.at(0),
1170 reinterpret_cast<const raw_type *
>(
1171 reinterpret_cast<const unsigned char*
>(buffer)),
1173 reinterpret_cast<const sockaddr *
>(&sockAddr),
1176 if (sent != totalSize)
1185 std::cerr <<
"Discovery::SendUnicast: Error serializing data." 1194 private:
void SendMulticast(
const msgs::Discovery &_msg)
const 1198 #if GOOGLE_PROTOBUF_VERSION >= 3004000 1199 size_t msgSizeFull = _msg.ByteSizeLong();
1201 int msgSizeFull = _msg.ByteSize();
1203 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1205 std::cerr <<
"Discovery message too large to send. Discovery won't " 1206 <<
"work. This shouldn't happen.\n";
1210 msgSize = msgSizeFull;
1211 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1212 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1213 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1215 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1219 for (
const auto &sock : this->Sockets())
1222 if (sendto(sock, reinterpret_cast<const raw_type *>(
1223 reinterpret_cast<const unsigned char*>(buffer)),
1225 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1226 sizeof(*(this->MulticastAddr()))) != totalSize)
1236 if (errno != EPERM && errno != ENOBUFS)
1238 std::cerr <<
"Exception sending a multicast message:" 1247 std::cerr <<
"Discovery::SendMulticast: Error serializing data." 1258 return this->sockets;
1263 private:
const sockaddr_in *MulticastAddr()
const 1265 return &this->mcastAddr;
1270 private: uint8_t Version()
const 1273 static int topicStats =
1274 (
env(
"IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats ==
"1");
1275 return this->kWireVersion + (topicStats * 100);
1282 private:
bool RegisterNetIface(
const std::string &_ip)
1285 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1295 struct in_addr ifAddr;
1296 ifAddr.s_addr = inet_addr(_ip.
c_str());
1297 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1298 reinterpret_cast<const char*>(&ifAddr),
sizeof(ifAddr)) != 0)
1300 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)." 1305 this->sockets.push_back(sock);
1310 struct ip_mreq group;
1311 group.imr_multiaddr.s_addr =
1312 inet_addr(this->kMulticastGroup.c_str());
1313 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1314 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1315 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1317 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)." 1327 private:
void AddRelayAddress(
const std::string &_ip)
1330 for (
auto const &addr : this->relayAddrs)
1332 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
1337 memset(&addr, 0,
sizeof(addr));
1338 addr.sin_family = AF_INET;
1339 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
1340 addr.sin_port = htons(static_cast<u_short>(this->port));
1342 this->relayAddrs.push_back(addr);
1348 private:
static const unsigned int kDefActivityInterval = 100;
1353 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1358 private:
static const unsigned int kDefSilenceInterval = 3000;
1361 private:
const std::string kMulticastGroup =
"224.0.0.7";
1364 private:
const int kTimeout = 250;
1367 private:
static const uint16_t kMaxRcvStr =
1372 private:
static const uint8_t kWireVersion = 10;
1389 private:
unsigned int silenceInterval;
1394 private:
unsigned int activityInterval;
1399 private:
unsigned int heartbeatInterval;
1425 private:
bool verbose;
1431 private: sockaddr_in mcastAddr;
1455 private:
bool initialized;
1458 private:
unsigned int numHeartbeatsUninitialized;
1467 private:
bool enabled;
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:424
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1422
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: Discovery.hh:392
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:44
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Send data via multicast only.
Topic/service only available to subscribers in the same machine as the publisher. ...
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:278
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:610
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:476
void raw_type
Definition: Discovery.hh:45
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:117
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:601
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:513
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: Discovery.hh:551
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: Discovery.hh:384
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:247
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:124
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:400
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:410
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:455
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:465
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:486
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:558
This class stores all the information about a message publisher.
Definition: Publisher.hh:222
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:533
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:523
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:302
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:88
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:495
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: Discovery.hh:542
Send data via unicast only.
Definition: AdvertiseOptions.hh:28
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:504
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:334