Ignition Transport

API Reference

6.0.0
Discovery.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2014 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19 #define IGNITION_TRANSPORT_DISCOVERY_HH_
20 #include <errno.h>
21 #include <string.h>
22 
23 #ifdef _WIN32
24  // For socket(), connect(), send(), and recv().
25  #include <Winsock2.h>
26  #include <Ws2def.h>
27  #include <Ws2ipdef.h>
28  #include <Ws2tcpip.h>
29  // Type used for raw data on this platform.
30  using raw_type = char;
31 #else
32  // For data types
33  #include <sys/types.h>
34  // For socket(), connect(), send(), and recv()
35  #include <sys/socket.h>
36  // For gethostbyname()
37  #include <netdb.h>
38  // For inet_addr()
39  #include <arpa/inet.h>
40  // For close()
41  #include <unistd.h>
42  // For sockaddr_in
43  #include <netinet/in.h>
44  // Type used for raw data on this platform
45  using raw_type = void;
46 #endif
47 
48 #ifdef _WIN32
49  #pragma warning(push, 0)
50 #endif
51 #ifdef _WIN32
52  #pragma warning(pop)
53  // Suppress "decorated name length exceed" warning in STL.
54  #pragma warning(disable: 4503)
55  // Suppress "depreted API warnings" in WINSOCK.
56  #pragma warning(disable: 4996)
57 #endif
58 
59 #include <algorithm>
60 #include <condition_variable>
61 #include <map>
62 #include <memory>
63 #include <mutex>
64 #include <string>
65 #include <thread>
66 #include <vector>
67 
68 #include "ignition/transport/config.hh"
69 #include "ignition/transport/Export.hh"
76 
77 namespace ignition
78 {
79  namespace transport
80  {
81  // Inline bracket to help doxygen filtering.
82  inline namespace IGNITION_TRANSPORT_VERSION_NAMESPACE {
83  //
89  bool IGNITION_TRANSPORT_VISIBLE pollSockets(
90  const std::vector<int> &_sockets,
91  const int _timeout);
92 
101  template<typename Pub>
102  class Discovery
103  {
109  public: Discovery(const std::string &_pUuid,
110  const int _port,
111  const bool _verbose = false)
112  : port(_port),
113  hostAddr(determineHost()),
114  pUuid(_pUuid),
115  silenceInterval(kDefSilenceInterval),
116  activityInterval(kDefActivityInterval),
117  heartbeatInterval(kDefHeartbeatInterval),
118  connectionCb(nullptr),
119  disconnectionCb(nullptr),
120  verbose(_verbose),
121  initialized(false),
122  numHeartbeatsUninitialized(0),
123  exit(false),
124  enabled(false)
125  {
126  std::string ignIp;
127  if (env("IGN_IP", ignIp) && !ignIp.empty())
128  this->hostInterfaces = {ignIp};
129  else
130  {
131  // Get the list of network interfaces in this host.
132  this->hostInterfaces = determineInterfaces();
133  }
134 
135 #ifdef _WIN32
136  WORD wVersionRequested;
137  WSADATA wsaData;
138 
139  // Request WinSock v2.2.
140  wVersionRequested = MAKEWORD(2, 2);
141  // Load WinSock DLL.
142  if (WSAStartup(wVersionRequested, &wsaData) != 0)
143  {
144  std::cerr << "Unable to load WinSock DLL" << std::endl;
145  return;
146  }
147 #endif
148  for (const auto &netIface : this->hostInterfaces)
149  {
150  auto succeed = this->RegisterNetIface(netIface);
151 
152  // If the IP address that we're selecting as the main IP address of
153  // the host is invalid, we change it to 127.0.0.1 .
154  // This is probably because IGN_IP is set to a wrong value.
155  if (netIface == this->hostAddr && !succeed)
156  {
157  this->RegisterNetIface("127.0.0.1");
158  std::cerr << "Did you set the environment variable IGN_IP with a "
159  << "correct IP address? " << std::endl
160  << " [" << netIface << "] seems an invalid local IP "
161  << "address." << std::endl
162  << " Using 127.0.0.1 as hostname." << std::endl;
163  this->hostAddr = "127.0.0.1";
164  }
165  }
166 
167  // Socket option: SO_REUSEADDR. This options is used only for receiving
168  // data. We can reuse the same socket for receiving multicast data from
169  // multiple interfaces. We will use the socket at position 0 for
170  // receiving data.
171  int reuseAddr = 1;
172  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
173  reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
174  {
175  std::cerr << "Error setting socket option (SO_REUSEADDR)."
176  << std::endl;
177  return;
178  }
179 
180 #ifdef SO_REUSEPORT
181  // Socket option: SO_REUSEPORT. This options is used only for receiving
182  // data. We can reuse the same socket for receiving multicast data from
183  // multiple interfaces. We will use the socket at position 0 for
184  // receiving data.
185  int reusePort = 1;
186  // cppcheck-suppress ConfigurationNotChecked
187  if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
188  reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
189  {
190  std::cerr << "Error setting socket option (SO_REUSEPORT)."
191  << std::endl;
192  return;
193  }
194 #endif
195  // Bind the first socket to the discovery port.
196  sockaddr_in localAddr;
197  memset(&localAddr, 0, sizeof(localAddr));
198  localAddr.sin_family = AF_INET;
199  localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
200  localAddr.sin_port = htons(static_cast<u_short>(this->port));
201 
202  if (bind(this->sockets.at(0),
203  reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
204  {
205  std::cerr << "Binding to a local port failed." << std::endl;
206  return;
207  }
208 
209  // Set 'mcastAddr' to the multicast discovery group.
210  memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
211  this->mcastAddr.sin_family = AF_INET;
212  this->mcastAddr.sin_addr.s_addr =
213  inet_addr(this->kMulticastGroup.c_str());
214  this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
215 
216  if (this->verbose)
217  this->PrintCurrentState();
218  }
219 
221  public: virtual ~Discovery()
222  {
223  // Tell the service thread to terminate.
224  this->exitMutex.lock();
225  this->exit = true;
226  this->exitMutex.unlock();
227 
228  // Wait for the service threads to finish before exit.
229  if (this->threadReception.joinable())
230  this->threadReception.join();
231 
232  // Broadcast a BYE message to trigger the remote cancellation of
233  // all our advertised topics.
234  this->SendMsg(ByeType,
235  Publisher("", "", this->pUuid, "", AdvertiseOptions()));
236 
237  // Close sockets.
238  for (const auto &sock : this->sockets)
239  {
240 #ifdef _WIN32
241  closesocket(sock);
242  WSACleanup();
243 #else
244  close(sock);
245 #endif
246  }
247  }
248 
252  public: void Start()
253  {
254  {
255  std::lock_guard<std::mutex> lock(this->mutex);
256 
257  // The service is already running.
258  if (this->enabled)
259  return;
260 
261  this->enabled = true;
262  }
263 
264  auto now = std::chrono::steady_clock::now();
265  this->timeNextHeartbeat = now;
266  this->timeNextActivity = now;
267 
268  // Start the thread that receives discovery information.
269  this->threadReception = std::thread(&Discovery::RecvMessages, this);
270  }
271 
276  public: bool Advertise(const Pub &_publisher)
277  {
278  {
279  std::lock_guard<std::mutex> lock(this->mutex);
280 
281  if (!this->enabled)
282  return false;
283 
284  // Add the addressing information (local publisher).
285  if (!this->info.AddPublisher(_publisher))
286  return false;
287  }
288 
289  // Only advertise a message outside this process if the scope
290  // is not 'Process'
291  if (_publisher.Options().Scope() != Scope_t::PROCESS)
292  this->SendMsg(AdvType, _publisher);
293 
294  return true;
295  }
296 
307  public: bool Discover(const std::string &_topic) const
308  {
310  bool found;
311  Addresses_M<Pub> addresses;
312 
313  {
314  std::lock_guard<std::mutex> lock(this->mutex);
315 
316  if (!this->enabled)
317  return false;
318 
319  cb = this->connectionCb;
320  }
321 
322  Pub pub;
323  pub.SetTopic(_topic);
324  pub.SetPUuid(this->pUuid);
325 
326  // Send a discovery request.
327  this->SendMsg(SubType, pub);
328 
329  {
330  std::lock_guard<std::mutex> lock(this->mutex);
331  found = this->info.Publishers(_topic, addresses);
332  }
333 
334  if (found)
335  {
336  // I already have information about this topic.
337  for (const auto &proc : addresses)
338  {
339  for (const auto &node : proc.second)
340  {
341  if (cb)
342  {
343  // Execute the user's callback for a service request. Notice
344  // that we only execute one callback for preventing receive
345  // multiple service responses for a single request.
346  cb(node);
347  }
348  }
349  }
350  }
351 
352  return true;
353  }
354 
357  public: const TopicStorage<Pub> &Info() const
358  {
359  std::lock_guard<std::mutex> lock(this->mutex);
360  return this->info;
361  }
362 
367  public: bool Publishers(const std::string &_topic,
368  Addresses_M<Pub> &_publishers) const
369  {
370  std::lock_guard<std::mutex> lock(this->mutex);
371  return this->info.Publishers(_topic, _publishers);
372  }
373 
381  public: bool Unadvertise(const std::string &_topic,
382  const std::string &_nUuid)
383  {
384  Pub inf;
385  {
386  std::lock_guard<std::mutex> lock(this->mutex);
387 
388  if (!this->enabled)
389  return false;
390 
391  // Don't do anything if the topic is not advertised by any of my nodes
392  if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
393  return true;
394 
395  // Remove the topic information.
396  this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
397  }
398 
399  // Only unadvertise a message outside this process if the scope
400  // is not 'Process'.
401  if (inf.Options().Scope() != Scope_t::PROCESS)
402  this->SendMsg(UnadvType, inf);
403 
404  return true;
405  }
406 
409  public: std::string HostAddr() const
410  {
411  std::lock_guard<std::mutex> lock(this->mutex);
412  return this->hostAddr;
413  }
414 
419  public: unsigned int ActivityInterval() const
420  {
421  std::lock_guard<std::mutex> lock(this->mutex);
422  return this->activityInterval;
423  }
424 
430  public: unsigned int HeartbeatInterval() const
431  {
432  std::lock_guard<std::mutex> lock(this->mutex);
433  return this->heartbeatInterval;
434  }
435 
440  public: unsigned int SilenceInterval() const
441  {
442  std::lock_guard<std::mutex> lock(this->mutex);
443  return this->silenceInterval;
444  }
445 
449  public: void SetActivityInterval(const unsigned int _ms)
450  {
451  std::lock_guard<std::mutex> lock(this->mutex);
452  this->activityInterval = _ms;
453  }
454 
458  public: void SetHeartbeatInterval(const unsigned int _ms)
459  {
460  std::lock_guard<std::mutex> lock(this->mutex);
461  this->heartbeatInterval = _ms;
462  }
463 
467  public: void SetSilenceInterval(const unsigned int _ms)
468  {
469  std::lock_guard<std::mutex> lock(this->mutex);
470  this->silenceInterval = _ms;
471  }
472 
477  public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
478  {
479  std::lock_guard<std::mutex> lock(this->mutex);
480  this->connectionCb = _cb;
481  }
482 
487  public: void DisconnectionsCb(const DiscoveryCallback<Pub> &_cb)
488  {
489  std::lock_guard<std::mutex> lock(this->mutex);
490  this->disconnectionCb = _cb;
491  }
492 
494  public: void PrintCurrentState() const
495  {
496  std::lock_guard<std::mutex> lock(this->mutex);
497 
498  std::cout << "---------------" << std::endl;
499  std::cout << std::boolalpha << "Enabled: "
500  << this->enabled << std::endl;
501  std::cout << "Discovery state" << std::endl;
502  std::cout << "\tUUID: " << this->pUuid << std::endl;
503  std::cout << "Settings" << std::endl;
504  std::cout << "\tActivity: " << this->activityInterval
505  << " ms." << std::endl;
506  std::cout << "\tHeartbeat: " << this->heartbeatInterval
507  << "ms." << std::endl;
508  std::cout << "\tSilence: " << this->silenceInterval
509  << " ms." << std::endl;
510  std::cout << "Known information:" << std::endl;
511  this->info.Print();
512 
513  // Used to calculate the elapsed time.
515 
516  std::cout << "Activity" << std::endl;
517  if (this->activity.empty())
518  std::cout << "\t<empty>" << std::endl;
519  else
520  {
521  for (auto &proc : this->activity)
522  {
523  // Elapsed time since the last update from this publisher.
524  std::chrono::duration<double> elapsed = now - proc.second;
525 
526  std::cout << "\t" << proc.first << std::endl;
527  std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
528  std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
529  << std::endl;
530  }
531  }
532  std::cout << "---------------" << std::endl;
533  }
534 
537  public: void TopicList(std::vector<std::string> &_topics) const
538  {
539  this->WaitForInit();
540  std::lock_guard<std::mutex> lock(this->mutex);
541  this->info.TopicList(_topics);
542  }
543 
546  public: void WaitForInit() const
547  {
548  std::unique_lock<std::mutex> lk(this->mutex);
549 
550  if (!this->initialized)
551  {
552  this->initializedCv.wait(lk, [this]{return this->initialized;});
553  }
554  }
555 
559  private: void UpdateActivity()
560  {
561  // The UUIDs of the processes that have expired.
563 
564  // A copy of the disconnection callback.
565  DiscoveryCallback<Pub> disconnectCb;
566 
568 
569  {
570  std::lock_guard<std::mutex> lock(this->mutex);
571 
572  if (now < this->timeNextActivity)
573  return;
574 
575  disconnectCb = this->disconnectionCb;
576 
577  for (auto it = this->activity.cbegin(); it != this->activity.cend();)
578  {
579  // Elapsed time since the last update from this publisher.
580  auto elapsed = now - it->second;
581 
582  // This publisher has expired.
583  if (std::chrono::duration_cast<std::chrono::milliseconds>
584  (elapsed).count() > this->silenceInterval)
585  {
586  // Remove all the info entries for this process UUID.
587  this->info.DelPublishersByProc(it->first);
588 
589  uuids.push_back(it->first);
590 
591  // Remove the activity entry.
592  this->activity.erase(it++);
593  }
594  else
595  ++it;
596  }
597 
598  this->timeNextActivity = std::chrono::steady_clock::now() +
599  std::chrono::milliseconds(this->activityInterval);
600  }
601 
602  if (!disconnectCb)
603  return;
604 
605  // Notify without topic information. This is useful to inform the
606  // client that a remote node is gone, even if we were not
607  // interested in its topics.
608  for (auto const &uuid : uuids)
609  {
610  Pub publisher;
611  publisher.SetPUuid(uuid);
612  disconnectCb(publisher);
613  }
614  }
615 
617  private: void UpdateHeartbeat()
618  {
620 
621  {
622  std::lock_guard<std::mutex> lock(this->mutex);
623 
624  if (now < this->timeNextHeartbeat)
625  return;
626  }
627 
628  Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
629  this->SendMsg(HeartbeatType, pub);
630 
632  {
633  std::lock_guard<std::mutex> lock(this->mutex);
634 
635  // Re-advertise topics that are advertised inside this process.
636  this->info.PublishersByProc(this->pUuid, nodes);
637  }
638 
639  for (const auto &topic : nodes)
640  {
641  for (const auto &node : topic.second)
642  this->SendMsg(AdvType, node);
643  }
644 
645  {
646  std::lock_guard<std::mutex> lock(this->mutex);
647  if (!this->initialized)
648  {
649  ++this->numHeartbeatsUninitialized;
650  if (this->numHeartbeatsUninitialized == 2)
651  {
652  // We consider the discovery initialized after two cycles of
653  // heartbeats sent.
654  this->initialized = true;
655 
656  // Notify anyone waiting for the initialization phase to finish.
657  this->initializedCv.notify_all();
658  }
659  }
660 
661  this->timeNextHeartbeat = std::chrono::steady_clock::now() +
662  std::chrono::milliseconds(this->heartbeatInterval);
663  }
664  }
665 
675  private: int NextTimeout() const
676  {
677  auto now = std::chrono::steady_clock::now();
678  auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
679  auto timeUntilNextActivity = this->timeNextActivity - now;
680 
681  int t = static_cast<int>(
683  (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
684  int t2 = std::min(t, this->kTimeout);
685  return std::max(t2, 0);
686  }
687 
689  private: void RecvMessages()
690  {
691  bool timeToExit = false;
692  while (!timeToExit)
693  {
694  // Calculate the timeout.
695  int timeout = this->NextTimeout();
696 
697  if (pollSockets(this->sockets, timeout))
698  {
699  this->RecvDiscoveryUpdate();
700 
701  if (this->verbose)
702  this->PrintCurrentState();
703  }
704 
705  this->UpdateHeartbeat();
706  this->UpdateActivity();
707 
708  // Is it time to exit?
709  {
710  std::lock_guard<std::mutex> lock(this->exitMutex);
711  if (this->exit)
712  timeToExit = true;
713  }
714  }
715  }
716 
718  private: void RecvDiscoveryUpdate()
719  {
720  char rcvStr[Discovery::kMaxRcvStr];
721  std::string srcAddr;
722  uint16_t srcPort;
723  sockaddr_in clntAddr;
724  socklen_t addrLen = sizeof(clntAddr);
725 
726  if ((recvfrom(this->sockets.at(0),
727  reinterpret_cast<raw_type *>(rcvStr),
728  this->kMaxRcvStr, 0,
729  reinterpret_cast<sockaddr *>(&clntAddr),
730  reinterpret_cast<socklen_t *>(&addrLen))) < 0)
731  {
732  std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
733  << std::endl;
734  return;
735  }
736  srcAddr = inet_ntoa(clntAddr.sin_addr);
737  srcPort = ntohs(clntAddr.sin_port);
738 
739  if (this->verbose)
740  {
741  std::cout << "\nReceived discovery update from " << srcAddr << ": "
742  << srcPort << std::endl;
743  }
744 
745  this->DispatchDiscoveryMsg(srcAddr, rcvStr);
746  }
747 
748 
752  private: void DispatchDiscoveryMsg(const std::string &_fromIp,
753  char *_msg)
754  {
755  Header header;
756  char *pBody = _msg;
757 
758  // Create the header from the raw bytes.
759  header.Unpack(_msg);
760  pBody += header.HeaderLength();
761 
762  // Discard the message if the wire protocol is different than mine.
763  if (this->kWireVersion != header.Version())
764  return;
765 
766  auto recvPUuid = header.PUuid();
767 
768  // Discard our own discovery messages.
769  if (recvPUuid == this->pUuid)
770  return;
771 
772  // Update timestamp and cache the callbacks.
773  DiscoveryCallback<Pub> connectCb;
774  DiscoveryCallback<Pub> disconnectCb;
775  {
776  std::lock_guard<std::mutex> lock(this->mutex);
777  this->activity[recvPUuid] = std::chrono::steady_clock::now();
778  connectCb = this->connectionCb;
779  disconnectCb = this->disconnectionCb;
780  }
781 
782  switch (header.Type())
783  {
784  case AdvType:
785  {
786  // Read the rest of the fields.
788  advMsg.Unpack(pBody);
789 
790  // Check scope of the topic.
791  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
792  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
793  _fromIp != this->hostAddr))
794  {
795  return;
796  }
797 
798  // Register an advertised address for the topic.
799  bool added;
800  {
801  std::lock_guard<std::mutex> lock(this->mutex);
802  added = this->info.AddPublisher(advMsg.Publisher());
803  }
804 
805  if (added && connectCb)
806  {
807  // Execute the client's callback.
808  connectCb(advMsg.Publisher());
809  }
810 
811  break;
812  }
813  case SubType:
814  {
815  // Read the rest of the fields.
816  SubscriptionMsg subMsg;
817  subMsg.Unpack(pBody);
818  auto recvTopic = subMsg.Topic();
819 
820  // Check if at least one of my nodes advertises the topic requested.
821  Addresses_M<Pub> addresses;
822  {
823  std::lock_guard<std::mutex> lock(this->mutex);
824  if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
825  {
826  break;
827  }
828 
829  if (!this->info.Publishers(recvTopic, addresses))
830  break;
831  }
832 
833  for (const auto &nodeInfo : addresses[this->pUuid])
834  {
835  // Check scope of the topic.
836  if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
837  (nodeInfo.Options().Scope() == Scope_t::HOST &&
838  _fromIp != this->hostAddr))
839  {
840  continue;
841  }
842 
843  // Answer an ADVERTISE message.
844  this->SendMsg(AdvType, nodeInfo);
845  }
846 
847  break;
848  }
849  case HeartbeatType:
850  {
851  // The timestamp has already been updated.
852  break;
853  }
854  case ByeType:
855  {
856  // Remove the activity entry for this publisher.
857  {
858  std::lock_guard<std::mutex> lock(this->mutex);
859  this->activity.erase(recvPUuid);
860  }
861 
862  if (disconnectCb)
863  {
864  Pub pub;
865  pub.SetPUuid(recvPUuid);
866  // Notify the new disconnection.
867  disconnectCb(pub);
868  }
869 
870  // Remove the address entry for this topic.
871  {
872  std::lock_guard<std::mutex> lock(this->mutex);
873  this->info.DelPublishersByProc(recvPUuid);
874  }
875 
876  break;
877  }
878  case UnadvType:
879  {
880  // Read the address.
882  advMsg.Unpack(pBody);
883 
884  // Check scope of the topic.
885  if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
886  (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
887  _fromIp != this->hostAddr))
888  {
889  return;
890  }
891 
892  if (disconnectCb)
893  {
894  // Notify the new disconnection.
895  disconnectCb(advMsg.Publisher());
896  }
897 
898  // Remove the address entry for this topic.
899  {
900  std::lock_guard<std::mutex> lock(this->mutex);
901  this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
902  advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
903  }
904 
905  break;
906  }
907  default:
908  {
909  std::cerr << "Unknown message type [" << header.Type() << "]\n";
910  break;
911  }
912  }
913  }
914 
921  private: template<typename T>
922  void SendMsg(const uint8_t _type,
923  const T &_pub,
924  const uint16_t _flags = 0) const
925  {
926  // Create the header.
927  Header header(this->Version(), _pub.PUuid(), _type, _flags);
928  auto msgLength = 0;
929  std::vector<char> buffer;
930 
931  std::string topic = _pub.Topic();
932 
933  switch (_type)
934  {
935  case AdvType:
936  case UnadvType:
937  {
938  // Create the [UN]ADVERTISE message.
939  transport::AdvertiseMessage<T> advMsg(header, _pub);
940 
941  // Allocate a buffer and serialize the message.
942  buffer.resize(advMsg.MsgLength());
943  advMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
944  msgLength = static_cast<int>(advMsg.MsgLength());
945  break;
946  }
947  case SubType:
948  {
949  // Create the [UN]SUBSCRIBE message.
950  SubscriptionMsg subMsg(header, topic);
951 
952  // Allocate a buffer and serialize the message.
953  buffer.resize(subMsg.MsgLength());
954  subMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
955  msgLength = static_cast<int>(subMsg.MsgLength());
956  break;
957  }
958  case HeartbeatType:
959  case ByeType:
960  {
961  // Allocate a buffer and serialize the message.
962  buffer.resize(header.HeaderLength());
963  header.Pack(reinterpret_cast<char*>(&buffer[0]));
964  msgLength = header.HeaderLength();
965  break;
966  }
967  default:
968  std::cerr << "Discovery::SendMsg() error: Unrecognized message"
969  << " type [" << _type << "]" << std::endl;
970  return;
971  }
972 
973  // Send the discovery message to the multicast group through all the
974  // sockets.
975  for (const auto &sock : this->Sockets())
976  {
977  if (sendto(sock, reinterpret_cast<const raw_type *>(
978  reinterpret_cast<unsigned char*>(&buffer[0])),
979  msgLength, 0,
980  reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
981  sizeof(*(this->MulticastAddr()))) != msgLength)
982  {
983  // Ignore EPERM and ENOBUFS errors.
984  //
985  // See issue #106
986  //
987  // Rationale drawn from:
988  //
989  // * https://groups.google.com/forum/#!topic/comp.protocols.tcp-ip/Qou9Sfgr77E
990  // * https://stackoverflow.com/questions/16555101/sendto-dgrams-do-not-block-for-enobufs-on-osx
991  if (errno != EPERM && errno != ENOBUFS)
992  {
993  std::cerr << "Exception sending a message:"
994  << strerror(errno) << std::endl;
995  }
996  return;
997  }
998  }
999 
1000  if (this->Verbose())
1001  {
1002  std::cout << "\t* Sending " << MsgTypesStr[_type]
1003  << " msg [" << topic << "]" << std::endl;
1004  }
1005  }
1006 
1009  private: const std::vector<int> &Sockets() const
1010  {
1011  return this->sockets;
1012  }
1013 
1016  private: const sockaddr_in *MulticastAddr() const
1017  {
1018  return &this->mcastAddr;
1019  }
1020 
1023  private: bool Verbose() const
1024  {
1025  return this->verbose;
1026  }
1027 
1030  private: uint8_t Version() const
1031  {
1032  return this->kWireVersion;
1033  }
1034 
1039  private: bool RegisterNetIface(const std::string &_ip)
1040  {
1041  // Make a new socket for sending discovery information.
1042  int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1043  if (sock < 0)
1044  {
1045  std::cerr << "Socket creation failed." << std::endl;
1046  return false;
1047  }
1048 
1049  // Socket option: IP_MULTICAST_IF.
1050  // This socket option needs to be applied to each socket used to send
1051  // data. This option selects the source interface for outgoing messages.
1052  struct in_addr ifAddr;
1053  ifAddr.s_addr = inet_addr(_ip.c_str());
1054  if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1055  reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1056  {
1057  std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1058  << std::endl;
1059  return false;
1060  }
1061 
1062  this->sockets.push_back(sock);
1063 
1064  // Join the multicast group. We have to do it for each network interface
1065  // but we can do it on the same socket. We will use the socket at
1066  // position 0 for receiving multicast information.
1067  struct ip_mreq group;
1068  group.imr_multiaddr.s_addr =
1069  inet_addr(this->kMulticastGroup.c_str());
1070  group.imr_interface.s_addr = inet_addr(_ip.c_str());
1071  if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1072  reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1073  {
1074  std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1075  << std::endl;
1076  return false;
1077  }
1078 
1079  return true;
1080  }
1081 
1085  private: static const unsigned int kDefActivityInterval = 100;
1086 
1090  private: static const unsigned int kDefHeartbeatInterval = 1000;
1091 
1095  private: static const unsigned int kDefSilenceInterval = 3000;
1096 
1098  private: const std::string kMulticastGroup = "224.0.0.7";
1099 
1101  private: const int kTimeout = 250;
1102 
1104  private: static const int kMaxRcvStr = 65536;
1105 
1108  private: static const uint8_t kWireVersion = 8;
1109 
1111  private: int port;
1112 
1114  private: std::string hostAddr;
1115 
1117  private: std::vector<std::string> hostInterfaces;
1118 
1120  private: std::string pUuid;
1121 
1125  private: unsigned int silenceInterval;
1126 
1130  private: unsigned int activityInterval;
1131 
1135  private: unsigned int heartbeatInterval;
1136 
1138  private: DiscoveryCallback<Pub> connectionCb;
1139 
1141  private: DiscoveryCallback<Pub> disconnectionCb;
1142 
1144  private: TopicStorage<Pub> info;
1145 
1151 
1153  private: bool verbose;
1154 
1156  private: std::vector<int> sockets;
1157 
1159  private: sockaddr_in mcastAddr;
1160 
1162  private: mutable std::mutex mutex;
1163 
1165  private: std::thread threadReception;
1166 
1168  private: Timestamp timeNextHeartbeat;
1169 
1171  private: Timestamp timeNextActivity;
1172 
1174  private: std::mutex exitMutex;
1175 
1180  private: bool initialized;
1181 
1183  private: unsigned int numHeartbeatsUninitialized;
1184 
1186  private: mutable std::condition_variable initializedCv;
1187 
1189  private: bool exit;
1190 
1192  private: bool enabled;
1193  };
1194 
1198 
1202  }
1203  }
1204 }
1205 
1206 #endif
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition: Discovery.hh:381
T empty(T... args)
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition: Discovery.hh:1150
This class stores all the information about a publisher. It stores the topic name that publishes...
Definition: Publisher.hh:42
Header included in each discovery message containing the version of the discovery protocol...
Definition: Packet.hh:58
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp/src/libros/network.cpp.
Topic/service only available to subscribers in the same machine as the publisher. ...
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into a Sub.
int HeaderLength() const
Get the header length.
Subscription packet used in the discovery protocol for requesting information about a given topic...
Definition: Packet.hh:171
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition: Discovery.hh:252
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition: Discovery.hh:546
size_t Unpack(const char *_buffer)
Unserialize a stream of bytes into an AdvertiseMessage.
Definition: Packet.hh:328
T endl(T... args)
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes...
Definition: Discovery.hh:430
static const uint8_t HeartbeatType
Definition: Packet.hh:42
size_t Unpack(const char *_buffer)
Unserialize the header.
void raw_type
Definition: Discovery.hh:45
static const uint8_t SubType
Definition: Packet.hh:40
T duration_cast(T... args)
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition: Discovery.hh:102
size_t Pack(char *_buffer) const
Serialize the subscription message.
static const uint8_t ByeType
Definition: Packet.hh:43
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition: Discovery.hh:537
size_t MsgLength() const
Get the total length of the message.
Definition: Packet.hh:301
T boolalpha(T... args)
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition: Discovery.hh:467
STL class.
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
T resize(T... args)
static const uint8_t UnadvType
Definition: Packet.hh:41
STL class.
virtual ~Discovery()
Destructor.
Definition: Discovery.hh:221
T min(T... args)
A class for customizing the publication options for a topic or service advertised. E.g.: Set the scope of a topic/service.
Definition: AdvertiseOptions.hh:59
size_t MsgLength() const
Get the total length of the message.
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition: Discovery.hh:109
T push_back(T... args)
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition: Discovery.hh:357
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers&#39; information known for a given topic.
Definition: Discovery.hh:367
std::string HostAddr() const
Get the IP address of this host.
Definition: Discovery.hh:409
T & Publisher()
Get the publisher of this message.
Definition: Packet.hh:278
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every &#39;activity interval&#39; milliseconds...
Definition: Discovery.hh:419
T erase(T... args)
uint8_t Type() const
Get the message type.
static const uint8_t AdvType
Definition: Packet.hh:39
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition: Discovery.hh:440
T max(T... args)
Advertise packet used in the discovery protocol to broadcast information about the node advertising a...
Definition: Packet.hh:252
std::string PUuid() const
Get the process uuid.
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition: Packet.hh:48
void PrintCurrentState() const
Print the current discovery state.
Definition: Discovery.hh:494
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active...
Definition: Discovery.hh:487
T c_str(T... args)
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected...
Definition: Discovery.hh:477
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition: Discovery.hh:276
Topic/service only available to subscribers in the same process as the publisher. ...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::chrono::steady_clock::time_point Timestamp
Definition: TransportTypes.hh:155
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition: Discovery.hh:449
Definition: AdvertiseOptions.hh:28
std::string Topic() const
Get the topic.
uint16_t Version() const
Get the discovery protocol version.
size_t Pack(char *_buffer) const
Serialize the advertise message.
Definition: Packet.hh:309
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition: Discovery.hh:458
STL class.
bool env(const std::string &_name, std::string &_value)
Find the environment variable &#39;_name&#39; and return its value.
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition: Discovery.hh:307