Ignition Transport

API Reference

9.1.2
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <ignition/msgs/discovery.pb.h>
60 
61 #include <algorithm>
62 #include <condition_variable>
63 #include <limits>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <string>
68 #include <thread>
69 #include <vector>
70 
71 #include <ignition/msgs/Utility.hh>
72 
73 #include "ignition/transport/config.hh"
74 #include "ignition/transport/Export.hh"
80 
81 namespace ignition
82 {
83  namespace transport
84  {
85  // Inline bracket to help doxygen filtering.
86  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
88  enum class DestinationType
89  {
91  UNICAST,
93  MULTICAST,
95  ALL
96  };
97 
98  //
104  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
105  const std::vector<int> &_sockets,
106  const int _timeout);
107 
116  template<typename Pub>
117  class Discovery
118  {
124  public: Discovery(const std::string &_pUuid,
125  const int _port,
126  const bool _verbose = false)
127  : port(_port),
128  hostAddr(determineHost()),
129  pUuid(_pUuid),
130  silenceInterval(kDefSilenceInterval),
131  activityInterval(kDefActivityInterval),
132  heartbeatInterval(kDefHeartbeatInterval),
133  connectionCb(nullptr),
134  disconnectionCb(nullptr),
135  verbose(_verbose),
136  initialized(false),
137  numHeartbeatsUninitialized(0),
138  exit(false),
139  enabled(false)
140  {
141  std::string ignIp;
142  if (env("IGN_IP", ignIp) && !ignIp.empty())
143  this->hostInterfaces = {ignIp};
144  else
145  {
146  // Get the list of network interfaces in this host.
147  this->hostInterfaces = determineInterfaces();
148  }
149 
150 #ifdef _WIN32
151  WORD wVersionRequested;
152  WSADATA wsaData;
153 
154  // Request WinSock v2.2.
155  wVersionRequested = MAKEWORD(2, 2);
156  // Load WinSock DLL.
157  if (WSAStartup(wVersionRequested, &wsaData) != 0)
158  {
159  std::cerr << "Unable to load WinSock DLL" << std::endl;
160  return;
161  }
162 #endif
163  for (const auto &netIface : this->hostInterfaces)
164  {
165  auto succeed = this->RegisterNetIface(netIface);
166 
167  // If the IP address that we're selecting as the main IP address of
168  // the host is invalid, we change it to 127.0.0.1 .
169  // This is probably because IGN_IP is set to a wrong value.
170  if (netIface == this->hostAddr && !succeed)
171  {
172  this->RegisterNetIface("127.0.0.1");
173  std::cerr << "Did you set the environment variable IGN_IP with a "
174  << "correct IP address? " << std::endl
175  << " [" << netIface << "] seems an invalid local IP "
176  << "address." << std::endl
177  << " Using 127.0.0.1 as hostname." << std::endl;
178  this->hostAddr = "127.0.0.1";
179  }
180  }
181 
182  // Socket option: SO_REUSEADDR. This options is used only for receiving
183  // data. We can reuse the same socket for receiving multicast data from
184  // multiple interfaces. We will use the socket at position 0 for
185  // receiving data.
186  int reuseAddr = 1;
187  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
188  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
189  {
190  std::cerr << "Error setting socket option (SO_REUSEADDR)."
191  << std::endl;
192  return;
193  }
194 
195 #ifdef SO_REUSEPORT
196  // Socket option: SO_REUSEPORT. This options is used only for receiving
197  // data. We can reuse the same socket for receiving multicast data from
198  // multiple interfaces. We will use the socket at position 0 for
199  // receiving data.
200  int reusePort = 1;
201  // cppcheck-suppress ConfigurationNotChecked
202  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
203  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
204  {
205  std::cerr << "Error setting socket option (SO_REUSEPORT)."
206  << std::endl;
207  return;
208  }
209 #endif
210  // Bind the first socket to the discovery port.
211  sockaddr_in localAddr;
212  memset(&localAddr, 0, sizeof(localAddr));
213  localAddr.sin_family = AF_INET;
214  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
215  localAddr.sin_port = htons(static_cast<u_short>(this->port));
216 
217  if (bind(this->sockets.at(0),
218  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
219  {
220  std::cerr << "Binding to a local port failed." << std::endl;
221  return;
222  }
223 
224  // Set 'mcastAddr' to the multicast discovery group.
225  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
226  this->mcastAddr.sin_family = AF_INET;
227  this->mcastAddr.sin_addr.s_addr =
228  inet_addr(this->kMulticastGroup.c_str());
229  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
230 
232  std::string ignRelay = "";
233  if (env("IGN_RELAY", ignRelay) && !ignRelay.empty())
234  {
235  relays = transport::split(ignRelay, ':');
236  }
237 
238  // Register all unicast relays.
239  for (auto const &relayAddr : relays)
240  this->AddRelayAddress(relayAddr);
241 
242  if (this->verbose)
243  this->PrintCurrentState();
244  }
245 
247  public: virtual ~Discovery()
248  {
249  // Tell the service thread to terminate.
250  this->exitMutex.lock();
251  this->exit = true;
252  this->exitMutex.unlock();
253 
254  // Wait for the service threads to finish before exit.
255  if (this->threadReception.joinable())
256  this->threadReception.join();
257 
258  // Broadcast a BYE message to trigger the remote cancellation of
259  // all our advertised topics.
260  this->SendMsg(DestinationType::ALL, msgs::Discovery::BYE,
261  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
262 
263  // Close sockets.
264  for (const auto &sock : this->sockets)
265  {
266 #ifdef _WIN32
267  closesocket(sock);
268  WSACleanup();
269 #else
270  close(sock);
271 #endif
272  }
273  }
274 
278  public: void Start()
279  {
280  {
281  std::lock_guard<std::mutex> lock(this->mutex);
282 
283  // The service is already running.
284  if (this->enabled)
285  return;
286 
287  this->enabled = true;
288  }
289 
290  auto now = std::chrono::steady_clock::now();
291  this->timeNextHeartbeat = now;
292  this->timeNextActivity = now;
293 
294  // Start the thread that receives discovery information.
295  this->threadReception = std::thread(&Discovery::RecvMessages, this);
296  }
297 
302  public: bool Advertise(const Pub &_publisher)
303  {
304  {
305  std::lock_guard<std::mutex> lock(this->mutex);
306 
307  if (!this->enabled)
308  return false;
309 
310  // Add the addressing information (local publisher).
311  if (!this->info.AddPublisher(_publisher))
312  return false;
313  }
314 
315  // Only advertise a message outside this process if the scope
316  // is not 'Process'
317  if (_publisher.Options().Scope() != Scope_t::PROCESS)
318  this->SendMsg(DestinationType::ALL, msgs::Discovery::ADVERTISE,
319  _publisher);
320 
321  return true;
322  }
323 
334  public: bool Discover(const std::string &_topic) const
335  {
337  bool found;
338  Addresses_M<Pub> addresses;
339 
340  {
341  std::lock_guard<std::mutex> lock(this->mutex);
342 
343  if (!this->enabled)
344  return false;
345 
346  cb = this->connectionCb;
347  }
348 
349  Pub pub;
350  pub.SetTopic(_topic);
351  pub.SetPUuid(this->pUuid);
352 
353  // Send a discovery request.
354  this->SendMsg(DestinationType::ALL, msgs::Discovery::SUBSCRIBE, pub);
355 
356  {
357  std::lock_guard<std::mutex> lock(this->mutex);
358  found = this->info.Publishers(_topic, addresses);
359  }
360 
361  if (found)
362  {
363  // I already have information about this topic.
364  for (const auto &proc : addresses)
365  {
366  for (const auto &node : proc.second)
367  {
368  if (cb)
369  {
370  // Execute the user's callback for a service request. Notice
371  // that we only execute one callback for preventing receive
372  // multiple service responses for a single request.
373  cb(node);
374  }
375  }
376  }
377  }
378 
379  return true;
380  }
381 
384  public: void Register(const MessagePublisher &_pub) const
385  {
386  this->SendMsg(
387  DestinationType::ALL, msgs::Discovery::NEW_CONNECTION, _pub);
388  }
389 
392  public: void Unregister(const MessagePublisher &_pub) const
393  {
394  this->SendMsg(
395  DestinationType::ALL, msgs::Discovery::END_CONNECTION, _pub);
396  }
397 
400  public: const TopicStorage<Pub> &Info() const
401  {
402  std::lock_guard<std::mutex> lock(this->mutex);
403  return this->info;
404  }
405 
410  public: bool Publishers(const std::string &_topic,
411  Addresses_M<Pub> &_publishers) const
412  {
413  std::lock_guard<std::mutex> lock(this->mutex);
414  return this->info.Publishers(_topic, _publishers);
415  }
416 
424  public: bool Unadvertise(const std::string &_topic,
425  const std::string &_nUuid)
426  {
427  Pub inf;
428  {
429  std::lock_guard<std::mutex> lock(this->mutex);
430 
431  if (!this->enabled)
432  return false;
433 
434  // Don't do anything if the topic is not advertised by any of my nodes
435  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
436  return true;
437 
438  // Remove the topic information.
439  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
440  }
441 
442  // Only unadvertise a message outside this process if the scope
443  // is not 'Process'.
444  if (inf.Options().Scope() != Scope_t::PROCESS)
445  {
446  this->SendMsg(DestinationType::ALL,
447  msgs::Discovery::UNADVERTISE, inf);
448  }
449 
450  return true;
451  }
452 
455  public: std::string HostAddr() const
456  {
457  std::lock_guard<std::mutex> lock(this->mutex);
458  return this->hostAddr;
459  }
460 
465  public: unsigned int ActivityInterval() const
466  {
467  std::lock_guard<std::mutex> lock(this->mutex);
468  return this->activityInterval;
469  }
470 
476  public: unsigned int HeartbeatInterval() const
477  {
478  std::lock_guard<std::mutex> lock(this->mutex);
479  return this->heartbeatInterval;
480  }
481 
486  public: unsigned int SilenceInterval() const
487  {
488  std::lock_guard<std::mutex> lock(this->mutex);
489  return this->silenceInterval;
490  }
491 
495  public: void SetActivityInterval(const unsigned int _ms)
496  {
497  std::lock_guard<std::mutex> lock(this->mutex);
498  this->activityInterval = _ms;
499  }
500 
504  public: void SetHeartbeatInterval(const unsigned int _ms)
505  {
506  std::lock_guard<std::mutex> lock(this->mutex);
507  this->heartbeatInterval = _ms;
508  }
509 
513  public: void SetSilenceInterval(const unsigned int _ms)
514  {
515  std::lock_guard<std::mutex> lock(this->mutex);
516  this->silenceInterval = _ms;
517  }
518 
523  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
524  {
525  std::lock_guard<std::mutex> lock(this->mutex);
526  this->connectionCb = _cb;
527  }
528 
533  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
534  {
535  std::lock_guard<std::mutex> lock(this->mutex);
536  this->disconnectionCb = _cb;
537  }
538 
542  public: void RegistrationsCb(const DiscoveryCallback<Pub> &_cb)
543  {
544  std::lock_guard<std::mutex> lock(this->mutex);
545  this->registrationCb = _cb;
546  }
547 
551  public: void UnregistrationsCb(const DiscoveryCallback<Pub> &_cb)
552  {
553  std::lock_guard<std::mutex> lock(this->mutex);
554  this->unregistrationCb = _cb;
555  }
556 
558  public: void PrintCurrentState() const
559  {
560  std::lock_guard<std::mutex> lock(this->mutex);
561 
562  std::cout << "---------------" << std::endl;
563  std::cout << std::boolalpha << "Enabled: "
564  << this->enabled << std::endl;
565  std::cout << "Discovery state" << std::endl;
566  std::cout << "\tUUID: " << this->pUuid << std::endl;
567  std::cout << "Settings" << std::endl;
568  std::cout << "\tActivity: " << this->activityInterval
569  << " ms." << std::endl;
570  std::cout << "\tHeartbeat: " << this->heartbeatInterval
571  << "ms." << std::endl;
572  std::cout << "\tSilence: " << this->silenceInterval
573  << " ms." << std::endl;
574  std::cout << "Known information:" << std::endl;
575  this->info.Print();
576 
577  // Used to calculate the elapsed time.
579 
580  std::cout << "Activity" << std::endl;
581  if (this->activity.empty())
582  std::cout << "\t<empty>" << std::endl;
583  else
584  {
585  for (auto &proc : this->activity)
586  {
587  // Elapsed time since the last update from this publisher.
588  std::chrono::duration<double> elapsed = now - proc.second;
589 
590  std::cout << "\t" << proc.first << std::endl;
591  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
592  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
593  << std::endl;
594  }
595  }
596  std::cout << "---------------" << std::endl;
597  }
598 
601  public: void TopicList(std::vector<std::string> &_topics) const
602  {
603  this->WaitForInit();
604  std::lock_guard<std::mutex> lock(this->mutex);
605  this->info.TopicList(_topics);
606  }
607 
610  public: void WaitForInit() const
611  {
612  std::unique_lock<std::mutex> lk(this->mutex);
613 
614  if (!this->initialized)
615  {
616  this->initializedCv.wait(lk, [this]{return this->initialized;});
617  }
618  }
619 
623  private: void UpdateActivity()
624  {
625  // The UUIDs of the processes that have expired.
627 
628  // A copy of the disconnection callback.
629  DiscoveryCallback<Pub> disconnectCb;
630 
632 
633  {
634  std::lock_guard<std::mutex> lock(this->mutex);
635 
636  if (now < this->timeNextActivity)
637  return;
638 
639  disconnectCb = this->disconnectionCb;
640 
641  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
642  {
643  // Elapsed time since the last update from this publisher.
644  auto elapsed = now - it->second;
645 
646  // This publisher has expired.
647  if (std::chrono::duration_cast<std::chrono::milliseconds>
648  (elapsed).count() > this->silenceInterval)
649  {
650  // Remove all the info entries for this process UUID.
651  this->info.DelPublishersByProc(it->first);
652 
653  uuids.push_back(it->first);
654 
655  // Remove the activity entry.
656  this->activity.erase(it++);
657  }
658  else
659  ++it;
660  }
661 
662  this->timeNextActivity = std::chrono::steady_clock::now() +
663  std::chrono::milliseconds(this->activityInterval);
664  }
665 
666  if (!disconnectCb)
667  return;
668 
669  // Notify without topic information. This is useful to inform the
670  // client that a remote node is gone, even if we were not
671  // interested in its topics.
672  for (auto const &uuid : uuids)
673  {
674  Pub publisher;
675  publisher.SetPUuid(uuid);
676  disconnectCb(publisher);
677  }
678  }
679 
681  private: void UpdateHeartbeat()
682  {
684 
685  {
686  std::lock_guard<std::mutex> lock(this->mutex);
687 
688  if (now < this->timeNextHeartbeat)
689  return;
690  }
691 
692  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
693  this->SendMsg(DestinationType::ALL, msgs::Discovery::HEARTBEAT, pub);
694 
696  {
697  std::lock_guard<std::mutex> lock(this->mutex);
698 
699  // Re-advertise topics that are advertised inside this process.
700  this->info.PublishersByProc(this->pUuid, nodes);
701  }
702 
703  for (const auto &topic : nodes)
704  {
705  for (const auto &node : topic.second)
706  {
707  this->SendMsg(DestinationType::ALL,
708  msgs::Discovery::ADVERTISE, node);
709  }
710  }
711 
712  {
713  std::lock_guard<std::mutex> lock(this->mutex);
714  if (!this->initialized)
715  {
716  ++this->numHeartbeatsUninitialized;
717  if (this->numHeartbeatsUninitialized == 2)
718  {
719  // We consider the discovery initialized after two cycles of
720  // heartbeats sent.
721  this->initialized = true;
722 
723  // Notify anyone waiting for the initialization phase to finish.
724  this->initializedCv.notify_all();
725  }
726  }
727 
728  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
729  std::chrono::milliseconds(this->heartbeatInterval);
730  }
731  }
732 
742  private: int NextTimeout() const
743  {
744  auto now = std::chrono::steady_clock::now();
745  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
746  auto timeUntilNextActivity = this->timeNextActivity - now;
747 
748  int t = static_cast<int>(
750  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
751  int t2 = std::min(t, this->kTimeout);
752  return std::max(t2, 0);
753  }
754 
756  private: void RecvMessages()
757  {
758  bool timeToExit = false;
759  while (!timeToExit)
760  {
761  // Calculate the timeout.
762  int timeout = this->NextTimeout();
763 
764  if (pollSockets(this->sockets, timeout))
765  {
766  this->RecvDiscoveryUpdate();
767 
768  if (this->verbose)
769  this->PrintCurrentState();
770  }
771 
772  this->UpdateHeartbeat();
773  this->UpdateActivity();
774 
775  // Is it time to exit?
776  {
777  std::lock_guard<std::mutex> lock(this->exitMutex);
778  if (this->exit)
779  timeToExit = true;
780  }
781  }
782  }
783 
785  private: void RecvDiscoveryUpdate()
786  {
787  char rcvStr[Discovery::kMaxRcvStr];
788  sockaddr_in clntAddr;
789  socklen_t addrLen = sizeof(clntAddr);
790 
791  uint16_t received = recvfrom(this->sockets.at(0),
792  reinterpret_cast<raw_type *>(rcvStr),
793  this->kMaxRcvStr, 0,
794  reinterpret_cast<sockaddr *>(&clntAddr),
795  reinterpret_cast<socklen_t *>(&addrLen));
796  if (received > 0)
797  {
798  uint16_t len = 0;
799  memcpy(&len, &rcvStr[0], sizeof(len));
800 
801  // Ignition Transport delimits each discovery message with a
802  // frame_delimiter that contains byte size information.
803  // A discovery message has the form:
804  //
805  // <frame_delimiter><frame_body>
806  //
807  // Ignition Transport version < 8 sends a frame delimiter that
808  // contains the value of sizeof(frame_delimiter)
809  // + sizeof(frame_body). In other words, the frame_delimiter
810  // contains a value that represents the total size of the
811  // frame_body and frame_delimiter in bytes.
812  //
813  // Ignition Transport version >= 8 sends a frame_delimiter
814  // that contains the value of sizeof(frame_body). In other
815  // words, the frame_delimiter contains a value that represents
816  // the total size of only the frame_body.
817  //
818  // It is possible that two incompatible versions of Ignition
819  // Transport exist on the same network. If we receive an
820  // unexpected size, then we ignore the message.
821 
822  // If-condition for version 8+
823  if (len + sizeof(len) == received)
824  {
825  std::string srcAddr = inet_ntoa(clntAddr.sin_addr);
826  uint16_t srcPort = ntohs(clntAddr.sin_port);
827 
828  if (this->verbose)
829  {
830  std::cout << "\nReceived discovery update from "
831  << srcAddr << ": " << srcPort << std::endl;
832  }
833 
834  this->DispatchDiscoveryMsg(srcAddr, rcvStr + sizeof(len), len);
835  }
836  }
837  else if (received < 0)
838  {
839  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
840  << std::endl;
841  }
842  }
843 
848  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
849  char *_msg, uint16_t _len)
850  {
851  ignition::msgs::Discovery msg;
852 
853  // Parse the message, and return if parsing failed. Parsing could
854  // fail when another discovery node is publishing messages using an
855  // older (or newer) format.
856  if (!msg.ParseFromArray(_msg, _len))
857  return;
858 
859  // Discard the message if the wire protocol is different than mine.
860  if (this->Version() != msg.version())
861  return;
862 
863  std::string recvPUuid = msg.process_uuid();
864 
865  // Discard our own discovery messages.
866  if (recvPUuid == this->pUuid)
867  return;
868 
869  // Forwarding summary:
870  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
871  // - From multicast group -> to unicast peers (with RELAY flag).
872 
873  // If the RELAY flag is set, this discovery message is coming via a
874  // unicast transmission. In this case, we don't process it, we just
875  // forward it to the multicast group, and it will be dispatched once
876  // received there. Note that we also unset the RELAY flag and set the
877  // NO_RELAY flag, to avoid forwarding the message anymore.
878  if (msg.has_flags() && msg.flags().relay())
879  {
880  // Unset the RELAY flag in the header and set the NO_RELAY.
881  msg.mutable_flags()->set_relay(false);
882  msg.mutable_flags()->set_no_relay(true);
883  this->SendMulticast(msg);
884 
885  // A unicast peer contacted me. I need to save its address for
886  // sending future messages in the future.
887  this->AddRelayAddress(_fromIp);
888  return;
889  }
890  // If we are receiving this discovery message via the multicast channel
891  // and the NO_RELAY flag is not set, we forward this message via unicast
892  // to all our relays. Note that this is the most common case, where we
893  // receive a regular multicast message and we forward it to any remote
894  // relays.
895  else if (!msg.has_flags() || !msg.flags().no_relay())
896  {
897  msg.mutable_flags()->set_relay(true);
898  this->SendUnicast(msg);
899  }
900 
901  bool isSenderLocal = (std::find(this->hostInterfaces.begin(),
902  this->hostInterfaces.end(), _fromIp) != this->hostInterfaces.end()) ||
903  (_fromIp.find("127.") == 0);
904 
905  // Update timestamp and cache the callbacks.
906  DiscoveryCallback<Pub> connectCb;
907  DiscoveryCallback<Pub> disconnectCb;
908  DiscoveryCallback<Pub> registerCb;
909  DiscoveryCallback<Pub> unregisterCb;
910  {
911  std::lock_guard<std::mutex> lock(this->mutex);
912  this->activity[recvPUuid] = std::chrono::steady_clock::now();
913  connectCb = this->connectionCb;
914  disconnectCb = this->disconnectionCb;
915  registerCb = this->registrationCb;
916  unregisterCb = this->unregistrationCb;
917  }
918 
919  switch (msg.type())
920  {
921  case msgs::Discovery::ADVERTISE:
922  {
923  // Read the rest of the fields.
924  Pub publisher;
925  publisher.SetFromDiscovery(msg);
926 
927  // Check scope of the topic.
928  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
929  (publisher.Options().Scope() == Scope_t::HOST &&
930  !isSenderLocal))
931  {
932  return;
933  }
934 
935  // Register an advertised address for the topic.
936  bool added;
937  {
938  std::lock_guard<std::mutex> lock(this->mutex);
939  added = this->info.AddPublisher(publisher);
940  }
941 
942  if (added && connectCb)
943  {
944  // Execute the client's callback.
945  connectCb(publisher);
946  }
947 
948  break;
949  }
950  case msgs::Discovery::SUBSCRIBE:
951  {
952  std::string recvTopic;
953  // Read the topic information.
954  if (msg.has_sub())
955  {
956  recvTopic = msg.sub().topic();
957  }
958  else
959  {
960  std::cerr << "Subscription discovery message is missing "
961  << "Subscriber information.\n";
962  break;
963  }
964 
965  // Check if at least one of my nodes advertises the topic requested.
966  Addresses_M<Pub> addresses;
967  {
968  std::lock_guard<std::mutex> lock(this->mutex);
969  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
970  {
971  break;
972  }
973 
974  if (!this->info.Publishers(recvTopic, addresses))
975  break;
976  }
977 
978  for (const auto &nodeInfo : addresses[this->pUuid])
979  {
980  // Check scope of the topic.
981  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
982  (nodeInfo.Options().Scope() == Scope_t::HOST &&
983  !isSenderLocal))
984  {
985  continue;
986  }
987 
988  // Answer an ADVERTISE message.
989  this->SendMsg(DestinationType::ALL,
990  msgs::Discovery::ADVERTISE, nodeInfo);
991  }
992 
993  break;
994  }
995  case msgs::Discovery::NEW_CONNECTION:
996  {
997  // Read the rest of the fields.
998  Pub publisher;
999  publisher.SetFromDiscovery(msg);
1000 
1001  if (registerCb)
1002  registerCb(publisher);
1003 
1004  break;
1005  }
1006  case msgs::Discovery::END_CONNECTION:
1007  {
1008  // Read the rest of the fields.
1009  Pub publisher;
1010  publisher.SetFromDiscovery(msg);
1011 
1012  if (unregisterCb)
1013  unregisterCb(publisher);
1014 
1015  break;
1016  }
1017  case msgs::Discovery::HEARTBEAT:
1018  {
1019  // The timestamp has already been updated.
1020  break;
1021  }
1022  case msgs::Discovery::BYE:
1023  {
1024  // Remove the activity entry for this publisher.
1025  {
1026  std::lock_guard<std::mutex> lock(this->mutex);
1027  this->activity.erase(recvPUuid);
1028  }
1029 
1030  if (disconnectCb)
1031  {
1032  Pub pub;
1033  pub.SetPUuid(recvPUuid);
1034  // Notify the new disconnection.
1035  disconnectCb(pub);
1036  }
1037 
1038  // Remove the address entry for this topic.
1039  {
1040  std::lock_guard<std::mutex> lock(this->mutex);
1041  this->info.DelPublishersByProc(recvPUuid);
1042  }
1043 
1044  break;
1045  }
1046  case msgs::Discovery::UNADVERTISE:
1047  {
1048  // Read the address.
1049  Pub publisher;
1050  publisher.SetFromDiscovery(msg);
1051 
1052  // Check scope of the topic.
1053  if ((publisher.Options().Scope() == Scope_t::PROCESS) ||
1054  (publisher.Options().Scope() == Scope_t::HOST &&
1055  !isSenderLocal))
1056  {
1057  return;
1058  }
1059 
1060  if (disconnectCb)
1061  {
1062  // Notify the new disconnection.
1063  disconnectCb(publisher);
1064  }
1065 
1066  // Remove the address entry for this topic.
1067  {
1068  std::lock_guard<std::mutex> lock(this->mutex);
1069  this->info.DelPublisherByNode(publisher.Topic(),
1070  publisher.PUuid(), publisher.NUuid());
1071  }
1072 
1073  break;
1074  }
1075  default:
1076  {
1077  std::cerr << "Unknown message type [" << msg.type() << "].\n";
1078  break;
1079  }
1080  }
1081  }
1082 
1089  private: template<typename T>
1090  void SendMsg(const DestinationType &_destType,
1091  const msgs::Discovery::Type _type,
1092  const T &_pub) const
1093  {
1094  ignition::msgs::Discovery discoveryMsg;
1095  discoveryMsg.set_version(this->Version());
1096  discoveryMsg.set_type(_type);
1097  discoveryMsg.set_process_uuid(this->pUuid);
1098 
1099  switch (_type)
1100  {
1101  case msgs::Discovery::ADVERTISE:
1102  case msgs::Discovery::UNADVERTISE:
1103  case msgs::Discovery::NEW_CONNECTION:
1104  case msgs::Discovery::END_CONNECTION:
1105  {
1106  _pub.FillDiscovery(discoveryMsg);
1107  break;
1108  }
1109  case msgs::Discovery::SUBSCRIBE:
1110  {
1111  discoveryMsg.mutable_sub()->set_topic(_pub.Topic());
1112  break;
1113  }
1114  case msgs::Discovery::HEARTBEAT:
1115  case msgs::Discovery::BYE:
1116  break;
1117  default:
1118  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1119  << " type [" << _type << "]" << std::endl;
1120  return;
1121  }
1122 
1123  if (_destType == DestinationType::MULTICAST ||
1124  _destType == DestinationType::ALL)
1125  {
1126  this->SendMulticast(discoveryMsg);
1127  }
1128 
1129  // Send the discovery message to the unicast relays.
1130  if (_destType == DestinationType::UNICAST ||
1131  _destType == DestinationType::ALL)
1132  {
1133  // Set the RELAY flag in the header.
1134  discoveryMsg.mutable_flags()->set_relay(true);
1135  this->SendUnicast(discoveryMsg);
1136  }
1137 
1138  if (this->verbose)
1139  {
1140  std::cout << "\t* Sending " << msgs::ToString(_type)
1141  << " msg [" << _pub.Topic() << "]" << std::endl;
1142  }
1143  }
1144 
1147  private: void SendUnicast(const msgs::Discovery &_msg) const
1148  {
1149  uint16_t msgSize;
1150 
1151 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1152  size_t msgSizeFull = _msg.ByteSizeLong();
1153 #else
1154  int msgSizeFull = _msg.ByteSize();
1155 #endif
1156  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1157  {
1158  std::cerr << "Discovery message too large to send. Discovery won't "
1159  << "work. This shouldn't happen.\n";
1160  return;
1161  }
1162  msgSize = msgSizeFull;
1163 
1164  uint16_t totalSize = sizeof(msgSize) + msgSize;
1165  char *buffer = static_cast<char *>(new char[totalSize]);
1166  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1167 
1168  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1169  {
1170  // Send the discovery message to the unicast relays.
1171  for (const auto &sockAddr : this->relayAddrs)
1172  {
1173  errno = 0;
1174  auto sent = sendto(this->sockets.at(0),
1175  reinterpret_cast<const raw_type *>(
1176  reinterpret_cast<const unsigned char*>(buffer)),
1177  totalSize, 0,
1178  reinterpret_cast<const sockaddr *>(&sockAddr),
1179  sizeof(sockAddr));
1180 
1181  if (sent != totalSize)
1182  {
1183  std::cerr << "Exception sending a unicast message:" << std::endl;
1184  std::cerr << " Return value: " << sent << std::endl;
1185  std::cerr << " Error code: " << strerror(errno) << std::endl;
1186  break;
1187  }
1188  }
1189  }
1190  else
1191  {
1192  std::cerr << "Discovery::SendUnicast: Error serializing data."
1193  << std::endl;
1194  }
1195 
1196  delete [] buffer;
1197  }
1198 
1201  private: void SendMulticast(const msgs::Discovery &_msg) const
1202  {
1203  uint16_t msgSize;
1204 
1205 #if GOOGLE_PROTOBUF_VERSION >= 3004000
1206  size_t msgSizeFull = _msg.ByteSizeLong();
1207 #else
1208  int msgSizeFull = _msg.ByteSize();
1209 #endif
1210  if (msgSizeFull + sizeof(msgSize) > this->kMaxRcvStr)
1211  {
1212  std::cerr << "Discovery message too large to send. Discovery won't "
1213  << "work. This shouldn't happen.\n";
1214  return;
1215  }
1216 
1217  msgSize = msgSizeFull;
1218  uint16_t totalSize = sizeof(msgSize) + msgSize;
1219  char *buffer = static_cast<char *>(new char[totalSize]);
1220  memcpy(&buffer[0], &msgSize, sizeof(msgSize));
1221 
1222  if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
1223  {
1224  // Send the discovery message to the multicast group through all the
1225  // sockets.
1226  for (const auto &sock : this->Sockets())
1227  {
1228  errno = 0;
1229  if (sendto(sock, reinterpret_cast<const raw_type *>(
1230  reinterpret_cast<const unsigned char*>(buffer)),
1231  totalSize, 0,
1232  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1233  sizeof(*(this->MulticastAddr()))) != totalSize)
1234  {
1235  // Ignore EPERM and ENOBUFS errors.
1236  //
1237  // See issue #106
1238  //
1239  // Rationale drawn from:
1240  //
1241  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1242  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1243  if (errno != EPERM && errno != ENOBUFS)
1244  {
1245  std::cerr << "Exception sending a multicast message:"
1246  << strerror(errno) << std::endl;
1247  }
1248  break;
1249  }
1250  }
1251  }
1252  else
1253  {
1254  std::cerr << "Discovery::SendMulticast: Error serializing data."
1255  << std::endl;
1256  }
1257 
1258  delete [] buffer;
1259  }
1260 
1263  private: const std::vector<int> &Sockets() const
1264  {
1265  return this->sockets;
1266  }
1267 
1270  private: const sockaddr_in *MulticastAddr() const
1271  {
1272  return &this->mcastAddr;
1273  }
1274 
1277  private: uint8_t Version() const
1278  {
1279  static std::string ignStats;
1280  static int topicStats =
1281  (env("IGN_TRANSPORT_TOPIC_STATISTICS", ignStats) && ignStats == "1");
1282  return this->kWireVersion + (topicStats * 100);
1283  }
1284 
1289  private: bool RegisterNetIface(const std::string &_ip)
1290  {
1291  // Make a new socket for sending discovery information.
1292  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1293  if (sock < 0)
1294  {
1295  std::cerr << "Socket creation failed." << std::endl;
1296  return false;
1297  }
1298 
1299  // Socket option: IP_MULTICAST_IF.
1300  // This socket option needs to be applied to each socket used to send
1301  // data. This option selects the source interface for outgoing messages.
1302  struct in_addr ifAddr;
1303  ifAddr.s_addr = inet_addr(_ip.c_str());
1304  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1305  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1306  {
1307  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1308  << std::endl;
1309  return false;
1310  }
1311 
1312  this->sockets.push_back(sock);
1313 
1314  // Join the multicast group. We have to do it for each network interface
1315  // but we can do it on the same socket. We will use the socket at
1316  // position 0 for receiving multicast information.
1317  struct ip_mreq group;
1318  group.imr_multiaddr.s_addr =
1319  inet_addr(this->kMulticastGroup.c_str());
1320  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1321  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1322  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1323  {
1324  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1325  << std::endl;
1326  return false;
1327  }
1328 
1329  return true;
1330  }
1331 
1334  private: void AddRelayAddress(const std::string &_ip)
1335  {
1336  // Sanity check: Make sure that this IP address is not already saved.
1337  for (auto const &addr : this->relayAddrs)
1338  {
1339  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1340  return;
1341  }
1342 
1343  sockaddr_in addr;
1344  memset(&addr, 0, sizeof(addr));
1345  addr.sin_family = AF_INET;
1346  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1347  addr.sin_port = htons(static_cast<u_short>(this->port));
1348 
1349  this->relayAddrs.push_back(addr);
1350  }
1351 
1355  private: static const unsigned int kDefActivityInterval = 100;
1356 
1360  private: static const unsigned int kDefHeartbeatInterval = 1000;
1361 
1365  private: static const unsigned int kDefSilenceInterval = 3000;
1366 
1368  private: const std::string kMulticastGroup = "224.0.0.7";
1369 
1371  private: const int kTimeout = 250;
1372 
1374  private: static const uint16_t kMaxRcvStr =
1376 
1379  private: static const uint8_t kWireVersion = 10;
1380 
1382  private: int port;
1383 
1385  private: std::string hostAddr;
1386 
1388  private: std::vector<std::string> hostInterfaces;
1389 
1391  private: std::string pUuid;
1392 
1396  private: unsigned int silenceInterval;
1397 
1401  private: unsigned int activityInterval;
1402 
1406  private: unsigned int heartbeatInterval;
1407 
1409  private: DiscoveryCallback<Pub> connectionCb;
1410 
1412  private: DiscoveryCallback<Pub> disconnectionCb;
1413 
1415  private: DiscoveryCallback<Pub> registrationCb;
1416 
1418  private: DiscoveryCallback<Pub> unregistrationCb;
1419 
1421  private: TopicStorage<Pub> info;
1422 
1428 
1430  private: bool verbose;
1431 
1433  private: std::vector<int> sockets;
1434 
1436  private: sockaddr_in mcastAddr;
1437 
1439  private: std::vector<sockaddr_in> relayAddrs;
1440 
1442  private: mutable std::mutex mutex;
1443 
1445  private: std::thread threadReception;
1446 
1448  private: Timestamp timeNextHeartbeat;
1449 
1451  private: Timestamp timeNextActivity;
1452 
1454  private: std::mutex exitMutex;
1455 
1460  private: bool initialized;
1461 
1463  private: unsigned int numHeartbeatsUninitialized;
1464 
1466  private: mutable std::condition_variable initializedCv;
1467 
1469  private: bool exit;
1470 
1472  private: bool enabled;
1473  };
1474 
1478 
1482  }
1483  }
1484 }
1485 
1486 #endif
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:424
T empty(T... args)
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1427
void Unregister(const MessagePublisher &_pub) const
Unregister a node from this process as a remote subscriber.
Definition: Discovery.hh:392
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:44
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Topic/service only available to subscribers in the same machine as the publisher. ...
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:278
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:610
T endl(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:476
void raw_type
Definition: Discovery.hh:45
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:117
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:601
T boolalpha(T... args)
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:513
STL class.
void UnregistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a remote node unsubscribes to a topic within this proces...
Definition: Discovery.hh:551
void Register(const MessagePublisher &_pub) const
Register a node from this process as a remote subscriber.
Definition: Discovery.hh:384
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
STL class.
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:247
T min(T... args)
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:124
T push_back(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:400
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers&#39; information known for a given topic.
Definition: Discovery.hh:410
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:455
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every &#39;activity interval&#39; milliseconds...
Definition: Discovery.hh:465
T erase(T... args)
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:486
T max(T... args)
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:558
T find(T... args)
This class stores all the information about a message publisher.
Definition: Publisher.hh:190
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:533
T c_str(T... args)
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:523
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:302
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:88
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:495
void RegistrationsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive an event when a new remote node subscribes to a topic within this proc...
Definition: Discovery.hh:542
Definition: AdvertiseOptions.hh:28
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:504
STL class.
bool env(const std::string &_name, std::string &_value)
Find the environment variable &#39;_name&#39; and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:334