Ignition Transport

API Reference

7.1.0
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <algorithm>
60 #include <condition_variable>
61 #include <map>
62 #include <memory>
63 #include <mutex>
64 #include <string>
65 #include <thread>
66 #include <vector>
67 
68 #include "ignition/transport/config.hh"
69 #include "ignition/transport/Export.hh"
76 
77 namespace ignition
78 {
79  namespace transport
80  {
81  // Inline bracket to help doxygen filtering.
82  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
84  enum class DestinationType
85  {
87  UNICAST,
89  MULTICAST,
91  ALL
92  };
93 
94  //
100  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
101  const std::vector<int> &_sockets,
102  const int _timeout);
103 
112  template<typename Pub>
113  class Discovery
114  {
120  public: Discovery(const std::string &_pUuid,
121  const int _port,
122  const bool _verbose = false)
123  : port(_port),
124  hostAddr(determineHost()),
125  pUuid(_pUuid),
126  silenceInterval(kDefSilenceInterval),
127  activityInterval(kDefActivityInterval),
128  heartbeatInterval(kDefHeartbeatInterval),
129  connectionCb(nullptr),
130  disconnectionCb(nullptr),
131  verbose(_verbose),
132  initialized(false),
133  numHeartbeatsUninitialized(0),
134  exit(false),
135  enabled(false)
136  {
137  std::string ignIp;
138  if (env("IGN_IP", ignIp) && !ignIp.empty())
139  this->hostInterfaces = {ignIp};
140  else
141  {
142  // Get the list of network interfaces in this host.
143  this->hostInterfaces = determineInterfaces();
144  }
145 
146 #ifdef _WIN32
147  WORD wVersionRequested;
148  WSADATA wsaData;
149 
150  // Request WinSock v2.2.
151  wVersionRequested = MAKEWORD(2, 2);
152  // Load WinSock DLL.
153  if (WSAStartup(wVersionRequested, &wsaData) != 0)
154  {
155  std::cerr << "Unable to load WinSock DLL" << std::endl;
156  return;
157  }
158 #endif
159  for (const auto &netIface : this->hostInterfaces)
160  {
161  auto succeed = this->RegisterNetIface(netIface);
162 
163  // If the IP address that we're selecting as the main IP address of
164  // the host is invalid, we change it to 127.0.0.1 .
165  // This is probably because IGN_IP is set to a wrong value.
166  if (netIface == this->hostAddr && !succeed)
167  {
168  this->RegisterNetIface("127.0.0.1");
169  std::cerr << "Did you set the environment variable IGN_IP with a "
170  << "correct IP address? " << std::endl
171  << " [" << netIface << "] seems an invalid local IP "
172  << "address." << std::endl
173  << " Using 127.0.0.1 as hostname." << std::endl;
174  this->hostAddr = "127.0.0.1";
175  }
176  }
177 
178  // Socket option: SO_REUSEADDR. This options is used only for receiving
179  // data. We can reuse the same socket for receiving multicast data from
180  // multiple interfaces. We will use the socket at position 0 for
181  // receiving data.
182  int reuseAddr = 1;
183  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
184  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
185  {
186  std::cerr << "Error setting socket option (SO_REUSEADDR)."
187  << std::endl;
188  return;
189  }
190 
191 #ifdef SO_REUSEPORT
192  // Socket option: SO_REUSEPORT. This options is used only for receiving
193  // data. We can reuse the same socket for receiving multicast data from
194  // multiple interfaces. We will use the socket at position 0 for
195  // receiving data.
196  int reusePort = 1;
197  // cppcheck-suppress ConfigurationNotChecked
198  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
199  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
200  {
201  std::cerr << "Error setting socket option (SO_REUSEPORT)."
202  << std::endl;
203  return;
204  }
205 #endif
206  // Bind the first socket to the discovery port.
207  sockaddr_in localAddr;
208  memset(&localAddr, 0, sizeof(localAddr));
209  localAddr.sin_family = AF_INET;
210  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
211  localAddr.sin_port = htons(static_cast<u_short>(this->port));
212 
213  if (bind(this->sockets.at(0),
214  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
215  {
216  std::cerr << "Binding to a local port failed." << std::endl;
217  return;
218  }
219 
220  // Set 'mcastAddr' to the multicast discovery group.
221  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
222  this->mcastAddr.sin_family = AF_INET;
223  this->mcastAddr.sin_addr.s_addr =
224  inet_addr(this->kMulticastGroup.c_str());
225  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
226 
228  std::string ignRelay = "";
229  if (env("IGN_RELAY", ignRelay) && !ignRelay.empty())
230  {
231  relays = transport::split(ignRelay, ':');
232  }
233 
234  // Register all unicast relays.
235  for (auto const &relayAddr : relays)
236  this->AddRelayAddress(relayAddr);
237 
238  if (this->verbose)
239  this->PrintCurrentState();
240  }
241 
243  public: virtual ~Discovery()
244  {
245  // Tell the service thread to terminate.
246  this->exitMutex.lock();
247  this->exit = true;
248  this->exitMutex.unlock();
249 
250  // Wait for the service threads to finish before exit.
251  if (this->threadReception.joinable())
252  this->threadReception.join();
253 
254  // Broadcast a BYE message to trigger the remote cancellation of
255  // all our advertised topics.
256  this->SendMsg(DestinationType::ALL, ByeType,
257  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
258 
259  // Close sockets.
260  for (const auto &sock : this->sockets)
261  {
262 #ifdef _WIN32
263  closesocket(sock);
264  WSACleanup();
265 #else
266  close(sock);
267 #endif
268  }
269  }
270 
274  public: void Start()
275  {
276  {
277  std::lock_guard<std::mutex> lock(this->mutex);
278 
279  // The service is already running.
280  if (this->enabled)
281  return;
282 
283  this->enabled = true;
284  }
285 
286  auto now = std::chrono::steady_clock::now();
287  this->timeNextHeartbeat = now;
288  this->timeNextActivity = now;
289 
290  // Start the thread that receives discovery information.
291  this->threadReception = std::thread(&Discovery::RecvMessages, this);
292  }
293 
298  public: bool Advertise(const Pub &_publisher)
299  {
300  {
301  std::lock_guard<std::mutex> lock(this->mutex);
302 
303  if (!this->enabled)
304  return false;
305 
306  // Add the addressing information (local publisher).
307  if (!this->info.AddPublisher(_publisher))
308  return false;
309  }
310 
311  // Only advertise a message outside this process if the scope
312  // is not 'Process'
313  if (_publisher.Options().Scope() != Scope_t::PROCESS)
314  this->SendMsg(DestinationType::ALL, AdvType, _publisher);
315 
316  return true;
317  }
318 
329  public: bool Discover(const std::string &_topic) const
330  {
332  bool found;
333  Addresses_M<Pub> addresses;
334 
335  {
336  std::lock_guard<std::mutex> lock(this->mutex);
337 
338  if (!this->enabled)
339  return false;
340 
341  cb = this->connectionCb;
342  }
343 
344  Pub pub;
345  pub.SetTopic(_topic);
346  pub.SetPUuid(this->pUuid);
347 
348  // Send a discovery request.
349  this->SendMsg(DestinationType::ALL, SubType, pub);
350 
351  {
352  std::lock_guard<std::mutex> lock(this->mutex);
353  found = this->info.Publishers(_topic, addresses);
354  }
355 
356  if (found)
357  {
358  // I already have information about this topic.
359  for (const auto &proc : addresses)
360  {
361  for (const auto &node : proc.second)
362  {
363  if (cb)
364  {
365  // Execute the user's callback for a service request. Notice
366  // that we only execute one callback for preventing receive
367  // multiple service responses for a single request.
368  cb(node);
369  }
370  }
371  }
372  }
373 
374  return true;
375  }
376 
379  public: const TopicStorage<Pub> &Info() const
380  {
381  std::lock_guard<std::mutex> lock(this->mutex);
382  return this->info;
383  }
384 
389  public: bool Publishers(const std::string &_topic,
390  Addresses_M<Pub> &_publishers) const
391  {
392  std::lock_guard<std::mutex> lock(this->mutex);
393  return this->info.Publishers(_topic, _publishers);
394  }
395 
403  public: bool Unadvertise(const std::string &_topic,
404  const std::string &_nUuid)
405  {
406  Pub inf;
407  {
408  std::lock_guard<std::mutex> lock(this->mutex);
409 
410  if (!this->enabled)
411  return false;
412 
413  // Don't do anything if the topic is not advertised by any of my nodes
414  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
415  return true;
416 
417  // Remove the topic information.
418  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
419  }
420 
421  // Only unadvertise a message outside this process if the scope
422  // is not 'Process'.
423  if (inf.Options().Scope() != Scope_t::PROCESS)
424  this->SendMsg(DestinationType::ALL, UnadvType, inf);
425 
426  return true;
427  }
428 
431  public: std::string HostAddr() const
432  {
433  std::lock_guard<std::mutex> lock(this->mutex);
434  return this->hostAddr;
435  }
436 
441  public: unsigned int ActivityInterval() const
442  {
443  std::lock_guard<std::mutex> lock(this->mutex);
444  return this->activityInterval;
445  }
446 
452  public: unsigned int HeartbeatInterval() const
453  {
454  std::lock_guard<std::mutex> lock(this->mutex);
455  return this->heartbeatInterval;
456  }
457 
462  public: unsigned int SilenceInterval() const
463  {
464  std::lock_guard<std::mutex> lock(this->mutex);
465  return this->silenceInterval;
466  }
467 
471  public: void SetActivityInterval(const unsigned int _ms)
472  {
473  std::lock_guard<std::mutex> lock(this->mutex);
474  this->activityInterval = _ms;
475  }
476 
480  public: void SetHeartbeatInterval(const unsigned int _ms)
481  {
482  std::lock_guard<std::mutex> lock(this->mutex);
483  this->heartbeatInterval = _ms;
484  }
485 
489  public: void SetSilenceInterval(const unsigned int _ms)
490  {
491  std::lock_guard<std::mutex> lock(this->mutex);
492  this->silenceInterval = _ms;
493  }
494 
499  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
500  {
501  std::lock_guard<std::mutex> lock(this->mutex);
502  this->connectionCb = _cb;
503  }
504 
509  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
510  {
511  std::lock_guard<std::mutex> lock(this->mutex);
512  this->disconnectionCb = _cb;
513  }
514 
516  public: void PrintCurrentState() const
517  {
518  std::lock_guard<std::mutex> lock(this->mutex);
519 
520  std::cout << "---------------" << std::endl;
521  std::cout << std::boolalpha << "Enabled: "
522  << this->enabled << std::endl;
523  std::cout << "Discovery state" << std::endl;
524  std::cout << "\tUUID: " << this->pUuid << std::endl;
525  std::cout << "Settings" << std::endl;
526  std::cout << "\tActivity: " << this->activityInterval
527  << " ms." << std::endl;
528  std::cout << "\tHeartbeat: " << this->heartbeatInterval
529  << "ms." << std::endl;
530  std::cout << "\tSilence: " << this->silenceInterval
531  << " ms." << std::endl;
532  std::cout << "Known information:" << std::endl;
533  this->info.Print();
534 
535  // Used to calculate the elapsed time.
537 
538  std::cout << "Activity" << std::endl;
539  if (this->activity.empty())
540  std::cout << "\t<empty>" << std::endl;
541  else
542  {
543  for (auto &proc : this->activity)
544  {
545  // Elapsed time since the last update from this publisher.
546  std::chrono::duration<double> elapsed = now - proc.second;
547 
548  std::cout << "\t" << proc.first << std::endl;
549  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
550  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
551  << std::endl;
552  }
553  }
554  std::cout << "---------------" << std::endl;
555  }
556 
559  public: void TopicList(std::vector<std::string> &_topics) const
560  {
561  this->WaitForInit();
562  std::lock_guard<std::mutex> lock(this->mutex);
563  this->info.TopicList(_topics);
564  }
565 
568  public: void WaitForInit() const
569  {
570  std::unique_lock<std::mutex> lk(this->mutex);
571 
572  if (!this->initialized)
573  {
574  this->initializedCv.wait(lk, [this]{return this->initialized;});
575  }
576  }
577 
581  private: void UpdateActivity()
582  {
583  // The UUIDs of the processes that have expired.
585 
586  // A copy of the disconnection callback.
587  DiscoveryCallback<Pub> disconnectCb;
588 
590 
591  {
592  std::lock_guard<std::mutex> lock(this->mutex);
593 
594  if (now < this->timeNextActivity)
595  return;
596 
597  disconnectCb = this->disconnectionCb;
598 
599  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
600  {
601  // Elapsed time since the last update from this publisher.
602  auto elapsed = now - it->second;
603 
604  // This publisher has expired.
605  if (std::chrono::duration_cast<std::chrono::milliseconds>
606  (elapsed).count() > this->silenceInterval)
607  {
608  // Remove all the info entries for this process UUID.
609  this->info.DelPublishersByProc(it->first);
610 
611  uuids.push_back(it->first);
612 
613  // Remove the activity entry.
614  this->activity.erase(it++);
615  }
616  else
617  ++it;
618  }
619 
620  this->timeNextActivity = std::chrono::steady_clock::now() +
621  std::chrono::milliseconds(this->activityInterval);
622  }
623 
624  if (!disconnectCb)
625  return;
626 
627  // Notify without topic information. This is useful to inform the
628  // client that a remote node is gone, even if we were not
629  // interested in its topics.
630  for (auto const &uuid : uuids)
631  {
632  Pub publisher;
633  publisher.SetPUuid(uuid);
634  disconnectCb(publisher);
635  }
636  }
637 
639  private: void UpdateHeartbeat()
640  {
642 
643  {
644  std::lock_guard<std::mutex> lock(this->mutex);
645 
646  if (now < this->timeNextHeartbeat)
647  return;
648  }
649 
650  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
651  this->SendMsg(DestinationType::ALL, HeartbeatType, pub);
652 
654  {
655  std::lock_guard<std::mutex> lock(this->mutex);
656 
657  // Re-advertise topics that are advertised inside this process.
658  this->info.PublishersByProc(this->pUuid, nodes);
659  }
660 
661  for (const auto &topic : nodes)
662  {
663  for (const auto &node : topic.second)
664  this->SendMsg(DestinationType::ALL, AdvType, node);
665  }
666 
667  {
668  std::lock_guard<std::mutex> lock(this->mutex);
669  if (!this->initialized)
670  {
671  ++this->numHeartbeatsUninitialized;
672  if (this->numHeartbeatsUninitialized == 2)
673  {
674  // We consider the discovery initialized after two cycles of
675  // heartbeats sent.
676  this->initialized = true;
677 
678  // Notify anyone waiting for the initialization phase to finish.
679  this->initializedCv.notify_all();
680  }
681  }
682 
683  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
684  std::chrono::milliseconds(this->heartbeatInterval);
685  }
686  }
687 
697  private: int NextTimeout() const
698  {
699  auto now = std::chrono::steady_clock::now();
700  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
701  auto timeUntilNextActivity = this->timeNextActivity - now;
702 
703  int t = static_cast<int>(
705  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
706  int t2 = std::min(t, this->kTimeout);
707  return std::max(t2, 0);
708  }
709 
711  private: void RecvMessages()
712  {
713  bool timeToExit = false;
714  while (!timeToExit)
715  {
716  // Calculate the timeout.
717  int timeout = this->NextTimeout();
718 
719  if (pollSockets(this->sockets, timeout))
720  {
721  this->RecvDiscoveryUpdate();
722 
723  if (this->verbose)
724  this->PrintCurrentState();
725  }
726 
727  this->UpdateHeartbeat();
728  this->UpdateActivity();
729 
730  // Is it time to exit?
731  {
732  std::lock_guard<std::mutex> lock(this->exitMutex);
733  if (this->exit)
734  timeToExit = true;
735  }
736  }
737  }
738 
740  private: void RecvDiscoveryUpdate()
741  {
742  char rcvStr[Discovery::kMaxRcvStr];
743  std::string srcAddr;
744  uint16_t srcPort;
745  sockaddr_in clntAddr;
746  socklen_t addrLen = sizeof(clntAddr);
747 
748  if ((recvfrom(this->sockets.at(0),
749  reinterpret_cast<raw_type *>(rcvStr),
750  this->kMaxRcvStr, 0,
751  reinterpret_cast<sockaddr *>(&clntAddr),
752  reinterpret_cast<socklen_t *>(&addrLen))) < 0)
753  {
754  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
755  << std::endl;
756  return;
757  }
758  srcAddr = inet_ntoa(clntAddr.sin_addr);
759  srcPort = ntohs(clntAddr.sin_port);
760 
761  if (this->verbose)
762  {
763  std::cout << "\nReceived discovery update from " << srcAddr << ": "
764  << srcPort << std::endl;
765  }
766 
767  this->DispatchDiscoveryMsg(srcAddr, rcvStr);
768  }
769 
770 
774  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
775  char *_msg)
776  {
777  // Entire length of the package in octets.
778  uint16_t len;
779  memcpy(&len, _msg, sizeof(len));
780 
781  // Create the header from the raw bytes.
782  char *headerPtr = _msg + sizeof(len);
783  Header header;
784  header.Unpack(headerPtr);
785 
786  // Discard the message if the wire protocol is different than mine.
787  if (this->kWireVersion != header.Version())
788  return;
789 
790  auto recvPUuid = header.PUuid();
791 
792  // Discard our own discovery messages.
793  if (recvPUuid == this->pUuid)
794  return;
795 
796  uint16_t flags = header.Flags();
797 
798  // Forwarding summary:
799  // - From a unicast peer -> to multicast group (with NO_RELAY flag).
800  // - From multicast group -> to unicast peers (with RELAY flag).
801 
802  // If the RELAY flag is set, this discovery message is coming via a
803  // unicast transmission. In this case, we don't process it, we just
804  // forward it to the multicast group, and it will be dispatched once
805  // received there. Note that we also unset the RELAY flag and set the
806  // NO_RELAY flag, to avoid forwarding the message anymore.
807  if (flags & FlagRelay)
808  {
809  // Unset the RELAY flag in the header and set the NO_RELAY.
810  flags &= ~FlagRelay;
811  flags |= FlagNoRelay;
812  header.SetFlags(flags);
813  header.Pack(headerPtr);
814  this->SendBytesMulticast(_msg, len);
815 
816  // A unicast peer contacted me. I need to save its address for
817  // sending future messages in the future.
818  this->AddRelayAddress(_fromIp);
819  return;
820  }
821  // If we are receiving this discovery message via the multicast channel
822  // and the NO_RELAY flag is not set, we forward this message via unicast
823  // to all our relays. Note that this is the most common case, where we
824  // receive a regular multicast message and we forward it to any remote
825  // relays.
826  else if (!(flags & FlagNoRelay))
827  {
828  flags |= FlagRelay;
829  header.SetFlags(flags);
830  header.Pack(headerPtr);
831  this->SendBytesUnicast(_msg, len);
832  }
833 
834  // Update timestamp and cache the callbacks.
835  DiscoveryCallback<Pub> connectCb;
836  DiscoveryCallback<Pub> disconnectCb;
837  {
838  std::lock_guard<std::mutex> lock(this->mutex);
839  this->activity[recvPUuid] = std::chrono::steady_clock::now();
840  connectCb = this->connectionCb;
841  disconnectCb = this->disconnectionCb;
842  }
843 
844  char *pBody = headerPtr + header.HeaderLength();
845 
846  switch (header.Type())
847  {
848  case AdvType:
849  {
850  // Read the rest of the fields.
852  advMsg.Unpack(pBody);
853 
854  // Check scope of the topic.
855  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
856  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
857  _fromIp != this->hostAddr))
858  {
859  return;
860  }
861 
862  // Register an advertised address for the topic.
863  bool added;
864  {
865  std::lock_guard<std::mutex> lock(this->mutex);
866  added = this->info.AddPublisher(advMsg.Publisher());
867  }
868 
869  if (added && connectCb)
870  {
871  // Execute the client's callback.
872  connectCb(advMsg.Publisher());
873  }
874 
875  break;
876  }
877  case SubType:
878  {
879  // Read the rest of the fields.
880  SubscriptionMsg subMsg;
881  subMsg.Unpack(pBody);
882  auto recvTopic = subMsg.Topic();
883 
884  // Check if at least one of my nodes advertises the topic requested.
885  Addresses_M<Pub> addresses;
886  {
887  std::lock_guard<std::mutex> lock(this->mutex);
888  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
889  {
890  break;
891  }
892 
893  if (!this->info.Publishers(recvTopic, addresses))
894  break;
895  }
896 
897  for (const auto &nodeInfo : addresses[this->pUuid])
898  {
899  // Check scope of the topic.
900  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
901  (nodeInfo.Options().Scope() == Scope_t::HOST &&
902  _fromIp != this->hostAddr))
903  {
904  continue;
905  }
906 
907  // Answer an ADVERTISE message.
908  this->SendMsg(DestinationType::ALL, AdvType, nodeInfo);
909  }
910 
911  break;
912  }
913  case HeartbeatType:
914  {
915  // The timestamp has already been updated.
916  break;
917  }
918  case ByeType:
919  {
920  // Remove the activity entry for this publisher.
921  {
922  std::lock_guard<std::mutex> lock(this->mutex);
923  this->activity.erase(recvPUuid);
924  }
925 
926  if (disconnectCb)
927  {
928  Pub pub;
929  pub.SetPUuid(recvPUuid);
930  // Notify the new disconnection.
931  disconnectCb(pub);
932  }
933 
934  // Remove the address entry for this topic.
935  {
936  std::lock_guard<std::mutex> lock(this->mutex);
937  this->info.DelPublishersByProc(recvPUuid);
938  }
939 
940  break;
941  }
942  case UnadvType:
943  {
944  // Read the address.
946  advMsg.Unpack(pBody);
947 
948  // Check scope of the topic.
949  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
950  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
951  _fromIp != this->hostAddr))
952  {
953  return;
954  }
955 
956  if (disconnectCb)
957  {
958  // Notify the new disconnection.
959  disconnectCb(advMsg.Publisher());
960  }
961 
962  // Remove the address entry for this topic.
963  {
964  std::lock_guard<std::mutex> lock(this->mutex);
965  this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
966  advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
967  }
968 
969  break;
970  }
971  default:
972  {
973  std::cerr << "Unknown message type [" << header.Type() << "]\n";
974  break;
975  }
976  }
977  }
978 
985  private: template<typename T>
986  void SendMsg(const DestinationType &_destType,
987  const uint8_t _type,
988  const T &_pub,
989  const uint16_t _flags = 0) const
990  {
991  // Create the header.
992  Header header(this->Version(), _pub.PUuid(), _type, _flags);
993  uint16_t lengthField = 0u;
994  std::vector<char> buffer;
995 
996  std::string topic = _pub.Topic();
997 
998  switch (_type)
999  {
1000  case AdvType:
1001  case UnadvType:
1002  {
1003  // Create the [UN]ADVERTISE message.
1004  transport::AdvertiseMessage<T> advMsg(header, _pub);
1005 
1006  // Allocate a buffer and serialize the message.
1007  buffer.resize(advMsg.MsgLength() + sizeof(lengthField));
1008  advMsg.Pack(reinterpret_cast<char*>(&buffer[sizeof(lengthField)]));
1009  break;
1010  }
1011  case SubType:
1012  {
1013  // Create the [UN]SUBSCRIBE message.
1014  SubscriptionMsg subMsg(header, topic);
1015 
1016  // Allocate a buffer and serialize the message.
1017  buffer.resize(subMsg.MsgLength() + sizeof(lengthField));
1018  subMsg.Pack(reinterpret_cast<char*>(&buffer[sizeof(lengthField)]));
1019  break;
1020  }
1021  case HeartbeatType:
1022  case ByeType:
1023  {
1024  // Allocate a buffer and serialize the message.
1025  buffer.resize(header.HeaderLength() + sizeof(lengthField));
1026  header.Pack(reinterpret_cast<char*>(&buffer[sizeof(lengthField)]));
1027  break;
1028  }
1029  default:
1030  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
1031  << " type [" << _type << "]" << std::endl;
1032  return;
1033  }
1034 
1035  lengthField = static_cast<uint16_t>(buffer.size());
1036  memcpy(&buffer[0], &lengthField, sizeof(lengthField));
1037  char *headerPtr = &buffer[0] + sizeof(lengthField);
1038 
1039  if (_destType == DestinationType::MULTICAST ||
1040  _destType == DestinationType::ALL)
1041  {
1042  this->SendBytesMulticast(&buffer[0], buffer.size());
1043  }
1044 
1045  // Send the discovery message to the unicast relays.
1046  if (_destType == DestinationType::UNICAST ||
1047  _destType == DestinationType::ALL)
1048  {
1049  // Set the RELAY flag in the header.
1050  uint16_t flags = header.Flags();
1051  flags |= FlagRelay;
1052  header.SetFlags(flags);
1053  header.Pack(headerPtr);
1054  this->SendBytesUnicast(&buffer[0], buffer.size());
1055  }
1056 
1057  if (this->verbose)
1058  {
1059  std::cout << "\t* Sending " << MsgTypesStr[_type]
1060  << " msg [" << topic << "]" << std::endl;
1061  }
1062  }
1063 
1067  private: void SendBytesUnicast(char *_buffer,
1068  uint16_t _len) const
1069  {
1070  // Send the discovery message to the unicast relays.
1071  for (const auto &sockAddr : this->relayAddrs)
1072  {
1073  auto sent = sendto(this->sockets.at(0),
1074  reinterpret_cast<const raw_type *>(
1075  reinterpret_cast<const unsigned char*>(_buffer)),
1076  _len, 0,
1077  reinterpret_cast<const sockaddr *>(&sockAddr),
1078  sizeof(sockAddr));
1079 
1080  if (sent != _len)
1081  {
1082  std::cerr << "Exception sending a unicast message" << std::endl;
1083  return;
1084  }
1085  }
1086  }
1087 
1091  private: void SendBytesMulticast(char *_buffer,
1092  uint16_t _len) const
1093  {
1094  // Send the discovery message to the multicast group through all the
1095  // sockets.
1096  for (const auto &sock : this->Sockets())
1097  {
1098  if (sendto(sock, reinterpret_cast<const raw_type *>(
1099  reinterpret_cast<const unsigned char*>(_buffer)),
1100  _len, 0,
1101  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
1102  sizeof(*(this->MulticastAddr()))) != _len)
1103  {
1104  // Ignore EPERM and ENOBUFS errors.
1105  //
1106  // See issue #106
1107  //
1108  // Rationale drawn from:
1109  //
1110  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
1111  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
1112  if (errno != EPERM && errno != ENOBUFS)
1113  {
1114  std::cerr << "Exception sending a multicast message:"
1115  << strerror(errno) << std::endl;
1116  }
1117  return;
1118  }
1119  }
1120  }
1121 
1124  private: const std::vector<int> &Sockets() const
1125  {
1126  return this->sockets;
1127  }
1128 
1131  private: const sockaddr_in *MulticastAddr() const
1132  {
1133  return &this->mcastAddr;
1134  }
1135 
1138  private: uint8_t Version() const
1139  {
1140  return this->kWireVersion;
1141  }
1142 
1147  private: bool RegisterNetIface(const std::string &_ip)
1148  {
1149  // Make a new socket for sending discovery information.
1150  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1151  if (sock < 0)
1152  {
1153  std::cerr << "Socket creation failed." << std::endl;
1154  return false;
1155  }
1156 
1157  // Socket option: IP_MULTICAST_IF.
1158  // This socket option needs to be applied to each socket used to send
1159  // data. This option selects the source interface for outgoing messages.
1160  struct in_addr ifAddr;
1161  ifAddr.s_addr = inet_addr(_ip.c_str());
1162  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1163  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1164  {
1165  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1166  << std::endl;
1167  return false;
1168  }
1169 
1170  this->sockets.push_back(sock);
1171 
1172  // Join the multicast group. We have to do it for each network interface
1173  // but we can do it on the same socket. We will use the socket at
1174  // position 0 for receiving multicast information.
1175  struct ip_mreq group;
1176  group.imr_multiaddr.s_addr =
1177  inet_addr(this->kMulticastGroup.c_str());
1178  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1179  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1180  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1181  {
1182  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1183  << std::endl;
1184  return false;
1185  }
1186 
1187  return true;
1188  }
1189 
1192  private: void AddRelayAddress(const std::string &_ip)
1193  {
1194  // Sanity check: Make sure that this IP address is not already saved.
1195  for (auto const &addr : this->relayAddrs)
1196  {
1197  if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
1198  return;
1199  }
1200 
1201  sockaddr_in addr;
1202  memset(&addr, 0, sizeof(addr));
1203  addr.sin_family = AF_INET;
1204  addr.sin_addr.s_addr = inet_addr(_ip.c_str());
1205  addr.sin_port = htons(static_cast<u_short>(this->port));
1206 
1207  this->relayAddrs.push_back(addr);
1208  }
1209 
1213  private: static const unsigned int kDefActivityInterval = 100;
1214 
1218  private: static const unsigned int kDefHeartbeatInterval = 1000;
1219 
1223  private: static const unsigned int kDefSilenceInterval = 3000;
1224 
1226  private: const std::string kMulticastGroup = "224.0.0.7";
1227 
1229  private: const int kTimeout = 250;
1230 
1232  private: static const int kMaxRcvStr = 65536;
1233 
1236  private: static const uint8_t kWireVersion = 9;
1237 
1239  private: int port;
1240 
1242  private: std::string hostAddr;
1243 
1245  private: std::vector<std::string> hostInterfaces;
1246 
1248  private: std::string pUuid;
1249 
1253  private: unsigned int silenceInterval;
1254 
1258  private: unsigned int activityInterval;
1259 
1263  private: unsigned int heartbeatInterval;
1264 
1266  private: DiscoveryCallback<Pub> connectionCb;
1267 
1269  private: DiscoveryCallback<Pub> disconnectionCb;
1270 
1272  private: TopicStorage<Pub> info;
1273 
1279 
1281  private: bool verbose;
1282 
1284  private: std::vector<int> sockets;
1285 
1287  private: sockaddr_in mcastAddr;
1288 
1290  private: std::vector<sockaddr_in> relayAddrs;
1291 
1293  private: mutable std::mutex mutex;
1294 
1296  private: std::thread threadReception;
1297 
1299  private: Timestamp timeNextHeartbeat;
1300 
1302  private: Timestamp timeNextActivity;
1303 
1305  private: std::mutex exitMutex;
1306 
1311  private: bool initialized;
1312 
1314  private: unsigned int numHeartbeatsUninitialized;
1315 
1317  private: mutable std::condition_variable initializedCv;
1318 
1320  private: bool exit;
1321 
1323  private: bool enabled;
1324  };
1325 
1329 
1333  }
1334  }
1335 }
1336 
1337 #endif
static const uint16_t FlagRelay
Definition: Packet.hh:48
Topic/service available to any subscriber (default scope).
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:403
T empty(T... args)
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1278
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:42
Header included in each discovery message containing the version of the discovery protocol...
Definition: Packet.hh:64
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Topic/service only available to subscribers in the same machine as the publisher. ...
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into a Sub.
Subscription packet used in the discovery protocol for requesting information about a given topic...
Definition: Packet.hh:177
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:274
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:568
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into an AdvertiseMessage.
Definition: Packet.hh:334
T endl(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:452
static const uint8_t HeartbeatType
Definition: Packet.hh:42
size_t Unpack(const char *_buffer)
Unserialize the header.
void raw_type
Definition: Discovery.hh:45
static const uint8_t SubType
Definition: Packet.hh:40
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:113
size_t Pack(char *_buffer) const
Serialize the subscription message.
static const uint8_t ByeType
Definition: Packet.hh:43
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:559
size_t MsgLength() const
Get the total length of the message.
Definition: Packet.hh:307
T boolalpha(T... args)
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:489
STL class.
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
T resize(T... args)
static const uint8_t UnadvType
Definition: Packet.hh:41
STL class.
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:243
T min(T... args)
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
size_t MsgLength() const
Get the total length of the message.
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:120
T push_back(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:379
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers&#39; information known for a given topic.
Definition: Discovery.hh:389
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:431
T & Publisher()
Get the publisher of this message.
Definition: Packet.hh:284
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every &#39;activity interval&#39; milliseconds...
Definition: Discovery.hh:441
T erase(T... args)
static const uint8_t AdvType
Definition: Packet.hh:39
std::vector< std::string > split(const std::string &_orig, char _delim)
split at a one character delimiter to get a vector of something
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:462
T max(T... args)
Advertise packet used in the discovery protocol to broadcast information about the node advertising a...
Definition: Packet.hh:258
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:54
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:516
T size(T... args)
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:509
T c_str(T... args)
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:499
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:298
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
DestinationType
Options for sending discovery messages.
Definition: Discovery.hh:84
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:471
Definition: AdvertiseOptions.hh:28
std::string Topic() const
Get the topic.
size_t Pack(char *_buffer) const
Serialize the advertise message.
Definition: Packet.hh:315
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:480
STL class.
static const uint16_t FlagNoRelay
Definition: Packet.hh:51
bool env(const std::string &_name, std::string &_value)
Find the environment variable &#39;_name&#39; and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:329