18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_ 19 #define IGNITION_TRANSPORT_DISCOVERY_HH_ 33 #include <sys/types.h> 35 #include <sys/socket.h> 39 #include <arpa/inet.h> 43 #include <netinet/in.h> 49 #pragma warning(push, 0) 54 #pragma warning(disable: 4503) 56 #pragma warning(disable: 4996) 59 #include <ignition/msgs/discovery.pb.h> 62 #include <condition_variable> 71 #include <ignition/msgs/Utility.hh> 73 #include "ignition/transport/config.hh" 74 #include "ignition/transport/Export.hh" 86 inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
116 template<
typename Pub>
126 const bool _verbose =
false)
130 silenceInterval(kDefSilenceInterval),
131 activityInterval(kDefActivityInterval),
132 heartbeatInterval(kDefHeartbeatInterval),
133 connectionCb(nullptr),
134 disconnectionCb(nullptr),
137 numHeartbeatsUninitialized(0),
142 if (
env(
"IGN_IP", ignIp) && !ignIp.
empty())
143 this->hostInterfaces = {ignIp};
151 WORD wVersionRequested;
155 wVersionRequested = MAKEWORD(2, 2);
157 if (WSAStartup(wVersionRequested, &wsaData) != 0)
163 for (
const auto &netIface : this->hostInterfaces)
165 auto succeed = this->RegisterNetIface(netIface);
170 if (netIface == this->hostAddr && !succeed)
172 this->RegisterNetIface(
"127.0.0.1");
173 std::cerr <<
"Did you set the environment variable IGN_IP with a " 175 <<
" [" << netIface <<
"] seems an invalid local IP " 177 <<
" Using 127.0.0.1 as hostname." <<
std::endl;
178 this->hostAddr =
"127.0.0.1";
187 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
188 reinterpret_cast<const char *
>(&reuseAddr),
sizeof(reuseAddr)) != 0)
190 std::cerr <<
"Error setting socket option (SO_REUSEADDR)." 202 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
203 reinterpret_cast<const char *
>(&reusePort),
sizeof(reusePort)) != 0)
205 std::cerr <<
"Error setting socket option (SO_REUSEPORT)." 211 sockaddr_in localAddr;
212 memset(&localAddr, 0,
sizeof(localAddr));
213 localAddr.sin_family = AF_INET;
214 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
215 localAddr.sin_port = htons(static_cast<u_short>(this->port));
217 if (bind(this->sockets.at(0),
218 reinterpret_cast<sockaddr *
>(&localAddr),
sizeof(sockaddr_in)) < 0)
225 memset(&this->mcastAddr, 0,
sizeof(this->mcastAddr));
226 this->mcastAddr.sin_family = AF_INET;
227 this->mcastAddr.sin_addr.s_addr =
228 inet_addr(this->kMulticastGroup.c_str());
229 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
233 if (
env(
"IGN_RELAY", ignRelay) && !ignRelay.
empty())
239 for (
auto const &relayAddr : relays)
240 this->AddRelayAddress(relayAddr);
243 this->PrintCurrentState();
250 this->exitMutex.lock();
252 this->exitMutex.unlock();
255 if (this->threadReception.joinable())
256 this->threadReception.join();
264 for (
const auto &sock : this->sockets)
287 this->enabled =
true;
291 this->timeNextHeartbeat = now;
292 this->timeNextActivity = now;
295 this->threadReception =
std::thread(&Discovery::RecvMessages,
this);
311 if (!this->info.AddPublisher(_publisher))
346 cb = this->connectionCb;
350 pub.SetTopic(_topic);
351 pub.SetPUuid(this->pUuid);
358 found = this->info.Publishers(_topic, addresses);
364 for (
const auto &proc : addresses)
366 for (
const auto &node : proc.second)
414 return this->info.Publishers(_topic, _publishers);
435 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
439 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
447 msgs::Discovery::UNADVERTISE, inf);
458 return this->hostAddr;
468 return this->activityInterval;
479 return this->heartbeatInterval;
489 return this->silenceInterval;
498 this->activityInterval = _ms;
507 this->heartbeatInterval = _ms;
516 this->silenceInterval = _ms;
526 this->connectionCb = _cb;
536 this->disconnectionCb = _cb;
545 this->registrationCb = _cb;
554 this->unregistrationCb = _cb;
568 std::cout <<
"\tActivity: " << this->activityInterval
570 std::cout <<
"\tHeartbeat: " << this->heartbeatInterval
572 std::cout <<
"\tSilence: " << this->silenceInterval
581 if (this->activity.empty())
585 for (
auto &proc : this->activity)
605 this->info.TopicList(_topics);
614 if (!this->initialized)
616 this->initializedCv.wait(lk, [
this]{
return this->initialized;});
623 private:
void UpdateActivity()
636 if (now < this->timeNextActivity)
639 disconnectCb = this->disconnectionCb;
641 for (
auto it = this->activity.cbegin(); it != this->activity.cend();)
644 auto elapsed = now - it->second;
647 if (std::chrono::duration_cast<std::chrono::milliseconds>
648 (elapsed).count() > this->silenceInterval)
651 this->info.DelPublishersByProc(it->first);
656 this->activity.
erase(it++);
672 for (
auto const &uuid : uuids)
675 publisher.SetPUuid(uuid);
676 disconnectCb(publisher);
681 private:
void UpdateHeartbeat()
688 if (now < this->timeNextHeartbeat)
700 this->info.PublishersByProc(this->pUuid, nodes);
703 for (
const auto &topic : nodes)
705 for (
const auto &node : topic.second)
708 msgs::Discovery::ADVERTISE, node);
714 if (!this->initialized)
716 ++this->numHeartbeatsUninitialized;
717 if (this->numHeartbeatsUninitialized == 2)
721 this->initialized =
true;
724 this->initializedCv.notify_all();
742 private:
int NextTimeout()
const 745 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
746 auto timeUntilNextActivity = this->timeNextActivity - now;
748 int t =
static_cast<int>(
750 (
std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
751 int t2 =
std::min(t, this->kTimeout);
756 private:
void RecvMessages()
758 bool timeToExit =
false;
762 int timeout = this->NextTimeout();
766 this->RecvDiscoveryUpdate();
769 this->PrintCurrentState();
772 this->UpdateHeartbeat();
773 this->UpdateActivity();
785 private:
void RecvDiscoveryUpdate()
787 char rcvStr[Discovery::kMaxRcvStr];
788 sockaddr_in clntAddr;
789 socklen_t addrLen =
sizeof(clntAddr);
791 uint16_t received = recvfrom(this->sockets.at(0),
792 reinterpret_cast<raw_type *
>(rcvStr),
794 reinterpret_cast<sockaddr *>(&clntAddr),
795 reinterpret_cast<socklen_t *
>(&addrLen));
799 memcpy(&len, &rcvStr[0],
sizeof(len));
823 if (len +
sizeof(len) == received)
825 std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
826 uint16_t srcPort = ntohs(clntAddr.sin_port);
830 std::cout <<
"\nReceived discovery update from " 831 << srcAddr <<
": " << srcPort <<
std::endl;
834 this->DispatchDiscoveryMsg(srcAddr, rcvStr +
sizeof(len), len);
837 else if (received < 0)
839 std::cerr <<
"Discovery::RecvDiscoveryUpdate() recvfrom error" 848 private:
void DispatchDiscoveryMsg(
const std::string &_fromIp,
849 char *_msg, uint16_t _len)
851 ignition::msgs::Discovery msg;
856 if (!msg.ParseFromArray(_msg, _len))
860 if (this->Version() != msg.version())
866 if (recvPUuid == this->pUuid)
878 if (msg.has_flags() && msg.flags().relay())
881 msg.mutable_flags()->set_relay(
false);
882 msg.mutable_flags()->set_no_relay(
true);
883 this->SendMulticast(msg);
887 this->AddRelayAddress(_fromIp);
895 else if (!msg.has_flags() || !msg.flags().no_relay())
897 msg.mutable_flags()->set_relay(
true);
898 this->SendUnicast(msg);
901 bool isSenderLocal = (
std::find(this->hostInterfaces.begin(),
902 this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
903 (_fromIp.
find(
"127.") == 0);
913 connectCb = this->connectionCb;
914 disconnectCb = this->disconnectionCb;
915 registerCb = this->registrationCb;
916 unregisterCb = this->unregistrationCb;
921 case msgs::Discovery::ADVERTISE:
925 publisher.SetFromDiscovery(msg);
939 added = this->info.AddPublisher(publisher);
942 if (added && connectCb)
945 connectCb(publisher);
950 case msgs::Discovery::SUBSCRIBE:
956 recvTopic = msg.sub().topic();
960 std::cerr <<
"Subscription discovery message is missing " 961 <<
"Subscriber information.\n";
969 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
974 if (!this->info.Publishers(recvTopic, addresses))
978 for (
const auto &nodeInfo : addresses[this->pUuid])
990 msgs::Discovery::ADVERTISE, nodeInfo);
995 case msgs::Discovery::NEW_CONNECTION:
999 publisher.SetFromDiscovery(msg);
1002 registerCb(publisher);
1006 case msgs::Discovery::END_CONNECTION:
1010 publisher.SetFromDiscovery(msg);
1013 unregisterCb(publisher);
1017 case msgs::Discovery::HEARTBEAT:
1022 case msgs::Discovery::BYE:
1027 this->activity.erase(recvPUuid);
1033 pub.SetPUuid(recvPUuid);
1041 this->info.DelPublishersByProc(recvPUuid);
1046 case msgs::Discovery::UNADVERTISE:
1050 publisher.SetFromDiscovery(msg);
1063 disconnectCb(publisher);
1069 this->info.DelPublisherByNode(publisher.Topic(),
1070 publisher.PUuid(), publisher.NUuid());
1077 std::cerr <<
"Unknown message type [" << msg.type() <<
"].\n";
1089 private:
template<
typename T>
1091 const msgs::Discovery::Type _type,
1092 const T &_pub)
const 1094 ignition::msgs::Discovery discoveryMsg;
1095 discoveryMsg.set_version(this->Version());
1096 discoveryMsg.set_type(_type);
1097 discoveryMsg.set_process_uuid(this->pUuid);
1101 case msgs::Discovery::ADVERTISE:
1102 case msgs::Discovery::UNADVERTISE:
1103 case msgs::Discovery::NEW_CONNECTION:
1104 case msgs::Discovery::END_CONNECTION:
1106 _pub.FillDiscovery(discoveryMsg);
1109 case msgs::Discovery::SUBSCRIBE:
1111 discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1114 case msgs::Discovery::HEARTBEAT:
1115 case msgs::Discovery::BYE:
1118 std::cerr <<
"Discovery::SendMsg() error: Unrecognized message" 1119 <<
" type [" << _type <<
"]" <<
std::endl;
1126 this->SendMulticast(discoveryMsg);
1134 discoveryMsg.mutable_flags()->set_relay(
true);
1135 this->SendUnicast(discoveryMsg);
1140 std::cout <<
"\t* Sending " << msgs::ToString(_type)
1141 <<
" msg [" << _pub.Topic() <<
"]" <<
std::endl;
1147 private:
void SendUnicast(
const msgs::Discovery &_msg)
const 1151 #if GOOGLE_PROTOBUF_VERSION >= 3004000 1152 size_t msgSizeFull = _msg.ByteSizeLong();
1154 int msgSizeFull = _msg.ByteSize();
1156 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1158 std::cerr <<
"Discovery message too large to send. Discovery won't " 1159 <<
"work. This shouldn't happen.\n";
1162 msgSize = msgSizeFull;
1164 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1165 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1166 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1168 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1171 for (
const auto &sockAddr : this->relayAddrs)
1174 auto sent = sendto(this->sockets.at(0),
1175 reinterpret_cast<const raw_type *
>(
1176 reinterpret_cast<const unsigned char*
>(buffer)),
1178 reinterpret_cast<const sockaddr *
>(&sockAddr),
1181 if (sent != totalSize)
1192 std::cerr <<
"Discovery::SendUnicast: Error serializing data." 1201 private:
void SendMulticast(
const msgs::Discovery &_msg)
const 1205 #if GOOGLE_PROTOBUF_VERSION >= 3004000 1206 size_t msgSizeFull = _msg.ByteSizeLong();
1208 int msgSizeFull = _msg.ByteSize();
1210 if (msgSizeFull +
sizeof(msgSize) > this->kMaxRcvStr)
1212 std::cerr <<
"Discovery message too large to send. Discovery won't " 1213 <<
"work. This shouldn't happen.\n";
1217 msgSize = msgSizeFull;
1218 uint16_t totalSize =
sizeof(msgSize) + msgSize;
1219 char *buffer =
static_cast<char *
>(
new char[totalSize]);
1220 memcpy(&buffer[0], &msgSize,
sizeof(msgSize));
1222 if (_msg.SerializeToArray(buffer +
sizeof(msgSize), msgSize))
1226 for (
const auto &sock : this->Sockets())
1229 if (sendto(sock, reinterpret_cast<const raw_type *>(
1230 reinterpret_cast<const unsigned char*>(buffer)),
1232 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1233 sizeof(*(this->MulticastAddr()))) != totalSize)
1243 if (errno != EPERM && errno != ENOBUFS)
1245 std::cerr <<
"Exception sending a multicast message:" 1254 std::cerr <<
"Discovery::SendMulticast: Error serializing data." 1265 return this->sockets;
1270 private:
const sockaddr_in *MulticastAddr()
const 1272 return &this->mcastAddr;
1277 private: uint8_t Version()
const 1280 static int topicStats =
1281 (
env(
"IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats ==
"1");
1282 return this->kWireVersion + (topicStats * 100);
1289 private:
bool RegisterNetIface(
const std::string &_ip)
1292 int sock =
static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1302 struct in_addr ifAddr;
1303 ifAddr.s_addr = inet_addr(_ip.
c_str());
1304 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1305 reinterpret_cast<const char*>(&ifAddr),
sizeof(ifAddr)) != 0)
1307 std::cerr <<
"Error setting socket option (IP_MULTICAST_IF)." 1312 this->sockets.push_back(sock);
1317 struct ip_mreq group;
1318 group.imr_multiaddr.s_addr =
1319 inet_addr(this->kMulticastGroup.c_str());
1320 group.imr_interface.s_addr = inet_addr(_ip.
c_str());
1321 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1322 reinterpret_cast<const char*
>(&group),
sizeof(group)) != 0)
1324 std::cerr <<
"Error setting socket option (IP_ADD_MEMBERSHIP)." 1334 private:
void AddRelayAddress(
const std::string &_ip)
1337 for (
auto const &addr : this->relayAddrs)
1339 if (addr.sin_addr.s_addr == inet_addr(_ip.
c_str()))
1344 memset(&addr, 0,
sizeof(addr));
1345 addr.sin_family = AF_INET;
1346 addr.sin_addr.s_addr = inet_addr(_ip.
c_str());
1347 addr.sin_port = htons(static_cast<u_short>(this->port));
1349 this->relayAddrs.push_back(addr);
1355 private:
static const unsigned int kDefActivityInterval = 100;
1360 private:
static const unsigned int kDefHeartbeatInterval = 1000;
1365 private:
static const unsigned int kDefSilenceInterval = 3000;
1368 private:
const std::string kMulticastGroup =
"224.0.0.7";
1371 private:
const int kTimeout = 250;
1374 private:
static const uint16_t kMaxRcvStr =
1379 private:
static const uint8_t kWireVersion = 10;
1396 private:
unsigned int silenceInterval;
1401 private:
unsigned int activityInterval;
1406 private:
unsigned int heartbeatInterval;
1430 private:
bool verbose;
1436 private: sockaddr_in mcastAddr;
1460 private:
bool initialized;
1463 private:
unsigned int numHeartbeatsUninitialized;
1472 private:
bool enabled;
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:424
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1427
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: Discovery.hh:392
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:44
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Send data via multicast only.
Topic/service only available to subscribers in the same machine as the publisher. ...
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:278
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:610
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:476
void raw_type
Definition: Discovery.hh:45
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:117
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:601
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:513
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: Discovery.hh:551
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: Discovery.hh:384
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:247
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:124
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:400
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition: Discovery.hh:410
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:455
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds...
Definition: Discovery.hh:465
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:486
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:558
This class stores all the information about a message publisher.
Definition: Publisher.hh:190
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:533
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:523
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:302
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:88
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:495
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: Discovery.hh:542
Send data via unicast only.
Definition: AdvertiseOptions.hh:28
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:504
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:334