Robot Raconteur Core C++ Library
Loading...
Searching...
No Matches
PipeMember.h
Go to the documentation of this file.
1
23
24#pragma once
25
30#include <boost/function.hpp>
31#include <boost/unordered_map.hpp>
32#include <boost/call_traits.hpp>
33#include <list>
34
35#ifdef _MSVC_VER
36#pragma warning(push)
37#pragma warning(disable : 4250)
38#pragma warning(disable : 4996)
39#endif
40
41#include <boost/signals2.hpp>
42
43namespace RobotRaconteur
44{
45
46class ROBOTRACONTEUR_CORE_API PipeBase;
47class ROBOTRACONTEUR_CORE_API PipeEndpointBaseListener;
48namespace detail
49{
50class PipeSubscription_connection;
51}
52
59class ROBOTRACONTEUR_CORE_API PipeEndpointBase : public RR_ENABLE_SHARED_FROM_THIS<PipeEndpointBase>,
60 private boost::noncopyable
61{
62 friend class PipeBase;
63 friend class PipeClientBase;
64 friend class PipeServerBase;
65 friend class PipeBroadcasterBase;
66 friend class PipeSubscriptionBase;
67 friend class detail::PipeSubscription_connection;
68
69 public:
70 virtual ~PipeEndpointBase() {}
71
77 virtual int32_t GetIndex();
78
87 virtual uint32_t GetEndpoint();
88
96
106 void SetRequestPacketAck(bool ack);
107
115 virtual void Close();
116
125 virtual void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
126 int32_t timeout = RR_TIMEOUT_INFINITE);
127
135 virtual size_t Available();
136
147
158
179 void SetIgnoreReceived(bool ignore);
180
181 virtual void AddListener(const RR_SHARED_PTR<PipeEndpointBaseListener>& listener);
182
183 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
184
185 protected:
186 virtual void RemoteClose();
187
188 PipeEndpointBase(const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0,
189 bool unreliable = false, MemberDefinition_Direction direction = MemberDefinition_Direction_both);
190
191 bool unreliable;
193
194 bool RequestPacketAck;
195
196 void AsyncSendPacketBase(const RR_INTRUSIVE_PTR<RRValue>& packet,
197 RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
198 handler);
199
200 RR_INTRUSIVE_PTR<RRValue> ReceivePacketBase();
201 RR_INTRUSIVE_PTR<RRValue> PeekPacketBase();
202
203 RR_INTRUSIVE_PTR<RRValue> ReceivePacketBaseWait(int32_t timeout = RR_TIMEOUT_INFINITE);
204 RR_INTRUSIVE_PTR<RRValue> PeekPacketBaseWait(int32_t timeout = RR_TIMEOUT_INFINITE);
205
206 bool TryReceivePacketBaseWait(RR_INTRUSIVE_PTR<RRValue>& packet, int32_t timeout = RR_TIMEOUT_INFINITE,
207 bool peek = false);
208
209 boost::mutex sendlock;
210 boost::mutex recvlock;
211
212 std::deque<RR_INTRUSIVE_PTR<RRValue> > recv_packets;
213 boost::condition_variable recv_packets_wait;
214
215 uint32_t increment_packet_number(uint32_t packetnum);
216
217 void PipePacketReceived(const RR_INTRUSIVE_PTR<RRValue>& packet, uint32_t packetnum);
218
219 void PipePacketAckReceived(uint32_t packetnum);
220
221 void Shutdown();
222
223 virtual void fire_PipeEndpointClosedCallback() = 0;
224
225 virtual void fire_PacketReceivedEvent() = 0;
226
227 virtual void fire_PacketAckReceivedEvent(uint32_t packetnum) = 0;
228
229 RR_SHARED_PTR<PipeBase> GetParent();
230
231 bool closed;
232
233 uint32_t send_packet_number;
234 uint32_t recv_packet_number;
235
236 RR_WEAK_PTR<PipeBase> parent;
237 int32_t index;
238 uint32_t endpoint;
239 std::string service_path;
240 std::string member_name;
241
242 RR_UNORDERED_MAP<uint32_t, RR_INTRUSIVE_PTR<RRValue> > out_of_order_packets;
243
244 bool ignore_incoming_packets;
245
246 boost::mutex listeners_lock;
247 std::list<RR_WEAK_PTR<PipeEndpointBaseListener> > listeners;
248
249 detail::async_signal_semaphore pipe_packet_received_semaphore;
250
251 RR_WEAK_PTR<RobotRaconteurNode> node;
252};
253
281
282template <typename T>
283class PipeEndpoint : public PipeEndpointBase
284{
285 private:
286 boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> PipeEndpointClosedCallback;
287 boost::mutex PipeEndpointClosedCallback_lock;
288
289 public:
295 boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> GetPipeEndpointClosedCallback()
296 {
297 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
298 return PipeEndpointClosedCallback;
299 }
300
311 void SetPipeEndpointClosedCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> callback)
312 {
313 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
314 PipeEndpointClosedCallback = callback;
315 }
316
323 boost::signals2::signal<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> PacketReceivedEvent;
324
334 boost::signals2::signal<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, uint32_t)> PacketAckReceivedEvent;
335
347 virtual uint32_t SendPacket(typename boost::call_traits<T>::param_type packet)
348 {
349 ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
350
351 RR_SHARED_PTR<detail::sync_async_handler<uint32_t> > t =
352 RR_MAKE_SHARED<detail::sync_async_handler<uint32_t> >();
353 boost::function<void(const RR_SHARED_PTR<uint32_t>&, const RR_SHARED_PTR<RobotRaconteurException>&)> h =
354 boost::bind(&detail::sync_async_handler<uint32_t>::operator(), t, RR_BOOST_PLACEHOLDERS(_1),
355 RR_BOOST_PLACEHOLDERS(_2));
357 packet, boost::bind(&PipeEndpoint::send_handler, RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2), h));
358 return *t->end();
359 }
360
369 virtual void AsyncSendPacket(typename boost::call_traits<T>::param_type packet,
370 boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)> handler)
371 {
372 AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
373 }
374
384 virtual T ReceivePacket() { return RRPrimUtil<T>::PreUnpack(ReceivePacketBase()); }
385
395 virtual T PeekNextPacket() { return RRPrimUtil<T>::PreUnpack(PeekPacketBase()); }
396
405 virtual T ReceivePacketWait(int32_t timeout = RR_TIMEOUT_INFINITE)
406 {
407 return RRPrimUtil<T>::PreUnpack(ReceivePacketBaseWait(timeout));
408 }
409
418 virtual T PeekNextPacketWait(int32_t timeout = RR_TIMEOUT_INFINITE)
419 {
420 return RRPrimUtil<T>::PreUnpack(PeekPacketBaseWait(timeout));
421 }
422
438 virtual bool TryReceivePacketWait(T& val, int32_t timeout = RR_TIMEOUT_INFINITE, bool peek = false)
439 {
440 RR_INTRUSIVE_PTR<RRValue> o;
441 if (!TryReceivePacketBaseWait(o, timeout, peek))
442 return false;
443 val = RRPrimUtil<T>::PreUnpack(o);
444 return true;
445 }
446
447 PipeEndpoint(const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0, bool unreliable = false,
449 : PipeEndpointBase(parent, index, endpoint, unreliable, direction){};
450
451 protected:
452 RR_OVIRTUAL void fire_PipeEndpointClosedCallback() RR_OVERRIDE
453 {
454 boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> c = GetPipeEndpointClosedCallback();
455 if (!c)
456 return;
457 c(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()));
458 }
459
460 RR_OVIRTUAL void fire_PacketReceivedEvent() RR_OVERRIDE
461 {
462 PacketReceivedEvent(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()));
463 }
464
465 RR_OVIRTUAL void fire_PacketAckReceivedEvent(uint32_t packetnum) RR_OVERRIDE
466 {
467 PacketAckReceivedEvent(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()), packetnum);
468 }
469
470 static void send_handler(uint32_t packetnumber, const RR_SHARED_PTR<RobotRaconteurException>& err,
471 const boost::function<void(const RR_SHARED_PTR<uint32_t>&,
472 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
473 {
474 handler(RR_MAKE_SHARED<uint32_t>(packetnumber), err);
475 }
476
477 public:
478 RR_OVIRTUAL void Close() RR_OVERRIDE
479 {
481 {
482
483 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
484 PipeEndpointClosedCallback.clear();
485 }
486 PacketReceivedEvent.disconnect_all_slots();
487 PacketAckReceivedEvent.disconnect_all_slots();
488 }
489
490 protected:
491 virtual void AsyncClose1(const RR_SHARED_PTR<RobotRaconteurException>& err,
492 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
493 {
494 try
495 {
496 {
497 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
498 PipeEndpointClosedCallback.clear();
499 }
500 PacketReceivedEvent.disconnect_all_slots();
501 PacketAckReceivedEvent.disconnect_all_slots();
502 }
503 catch (std::exception&)
504 {}
505
506 handler(err);
507 }
508
509 public:
510 RR_OVIRTUAL void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
511 int32_t timeout = 2000) RR_OVERRIDE
512 {
513 PipeEndpointBase::AsyncClose(boost::bind(&PipeEndpoint<T>::AsyncClose1,
514 RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()),
515 RR_BOOST_PLACEHOLDERS(_1), handler),
516 timeout);
517 }
518
519 protected:
520 RR_OVIRTUAL void RemoteClose() RR_OVERRIDE
521 {
522 PipeEndpointBase::RemoteClose();
523 {
524 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
525 PipeEndpointClosedCallback.clear();
526 }
527 PacketReceivedEvent.disconnect_all_slots();
528 PacketAckReceivedEvent.disconnect_all_slots();
529 }
530};
531
538class ROBOTRACONTEUR_CORE_API PipeBase : public RR_ENABLE_SHARED_FROM_THIS<PipeBase>, private boost::noncopyable
539{
540 friend class PipeEndpointBase;
541
542 public:
543 virtual ~PipeBase() {}
544
550 static const int32_t ANY_INDEX = -1;
551
557 virtual std::string GetMemberName() = 0;
558
559 virtual void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) = 0;
560
561 virtual void Shutdown() = 0;
562
563 virtual std::string GetServicePath() = 0;
564
565 virtual void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint, bool remote, uint32_t ee,
566 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
567 int32_t timeout) = 0;
568
569 protected:
570 PipeBase();
571
572 bool unreliable;
573
574 virtual void AsyncSendPipePacket(
575 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber, bool requestack, uint32_t endpoint,
576 bool unreliable,
577 RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>) handler) = 0;
578
579 bool rawelements;
580
581 void DispatchPacketAck(const RR_INTRUSIVE_PTR<MessageElement>& me, const RR_SHARED_PTR<PipeEndpointBase>& e);
582
583 bool DispatchPacket(const RR_INTRUSIVE_PTR<MessageElement>& me, const RR_SHARED_PTR<PipeEndpointBase>& e,
584 uint32_t& packetnumber);
585
586 RR_INTRUSIVE_PTR<MessageElement> PackPacket(const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index,
587 uint32_t packetnumber, bool requestack);
588
589 virtual void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
590
591 virtual RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data)
592 {
593 return GetNode()->PackVarType(data);
594 }
595
596 virtual RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata)
597 {
598 return GetNode()->UnpackVarType(mdata);
599 }
600
601 RR_WEAK_PTR<RobotRaconteurNode> node;
602
603 MemberDefinition_Direction direction;
604
605 public:
606 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
607
618
629};
630
664template <typename T>
665class Pipe : public virtual PipeBase
666{
667 public:
668 friend class PipeEndpointBase;
669
670 Pipe(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify) { this->verify = RR_MOVE(verify); }
671
672 RR_OVIRTUAL ~Pipe() RR_OVERRIDE {}
673
681 virtual boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() = 0;
682
699 virtual void SetPipeConnectCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> function) = 0;
700
714 virtual RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) = 0;
715
727 virtual void AsyncConnect(
728 int32_t index,
729 boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
730 handler,
731 int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
732
743 virtual void AsyncConnect(
744 boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
745 handler,
746 int32_t timeout = RR_TIMEOUT_INFINITE)
747 {
748 AsyncConnect(-1, RR_MOVE(handler), timeout);
749 }
750
751 RR_OVIRTUAL RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data) RR_OVERRIDE
752 {
753 if (verify)
754 {
755 verify(data);
756 }
757 return GetNode()->template PackAnyType<typename RRPrimUtil<T>::BoxedType>(data);
758 }
759
760 RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata) RR_OVERRIDE
761 {
762 if (!verify)
763 {
764 return GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
765 }
766 else
767 {
768 RR_INTRUSIVE_PTR<RRValue> ret = GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
769 verify(ret);
770 return ret;
771 }
772 }
773
774 protected:
775 boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify;
776};
777
778class ROBOTRACONTEUR_CORE_API ServiceStub;
779
780class ROBOTRACONTEUR_CORE_API PipeClientBase : public virtual PipeBase
781{
782 public:
783 friend class PipeSubscriptionBase;
784 friend class detail::PipeSubscription_connection;
785
786 RR_OVIRTUAL ~PipeClientBase() RR_OVERRIDE {}
787
788 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
789
790 RR_OVIRTUAL void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
791
792 RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
793
794 RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint, bool remote, uint32_t ee,
795 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
796 handler,
797 int32_t timeout) RR_OVERRIDE;
798
799 RR_SHARED_PTR<ServiceStub> GetStub();
800
801 RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
802
803 protected:
804 RR_OVIRTUAL void AsyncSendPipePacket(
805 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber, bool requestack, uint32_t endpoint,
806 bool unreliable,
807 RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
808 handler) RR_OVERRIDE;
809
810 std::string m_MemberName;
811
812 RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > pipeendpoints;
813 boost::mutex pipeendpoints_lock;
814
815 RR_WEAK_PTR<ServiceStub> stub;
816
817 std::list<boost::tuple<int32_t, int32_t> > connecting_endpoints;
818 int32_t connecting_key_count;
819 RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > early_endpoints;
820 std::string service_path;
821 uint32_t endpoint;
822
823 void AsyncConnect_internal(int32_t index,
824 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<PipeEndpointBase>&,
825 const RR_SHARED_PTR<RobotRaconteurException>&)>)
826 handler,
827 int32_t timeout);
828
829 void AsyncConnect_internal1(const RR_INTRUSIVE_PTR<MessageEntry>& ret,
830 const RR_SHARED_PTR<RobotRaconteurException>& err, int32_t index, int32_t key,
831 boost::function<void(const RR_SHARED_PTR<PipeEndpointBase>&,
832 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
833
834 PipeClientBase(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub, bool unreliable,
835 MemberDefinition_Direction direction);
836
837 virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, bool unreliable,
838 MemberDefinition_Direction direction) = 0;
839
840 RR_OVIRTUAL void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
841};
842
843template <typename T>
844class PipeClient : public virtual Pipe<T>, public virtual PipeClientBase
845{
846 public:
847 RR_OVIRTUAL ~PipeClient() RR_OVERRIDE {}
848
849 RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
850 {
851 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
852 "GetPipeConnectCallback is not valid for PipeClient");
853 throw InvalidOperationException("Not valid for client");
854 }
855
856 RR_OVIRTUAL void SetPipeConnectCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> function)
857 RR_OVERRIDE
858 {
859 RR_UNUSED(function);
860 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
861 "SetPipeConnectCallback is not valid for PipeClient");
862 throw InvalidOperationException("Not valid for client");
863 }
864
865 RR_OVIRTUAL void AsyncConnect(
866 int32_t index,
867 boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
868 handler,
869 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
870 {
871
872 AsyncConnect_internal(index,
873 boost::bind(handler,
874 boost::bind(&PipeClient<T>::AsyncConnect_cast, RR_BOOST_PLACEHOLDERS(_1)),
875 RR_BOOST_PLACEHOLDERS(_2)),
876 timeout);
877 }
878
879 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
880 {
881 ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
882
883 RR_SHARED_PTR<detail::sync_async_handler<PipeEndpoint<T> > > t =
884 RR_MAKE_SHARED<detail::sync_async_handler<PipeEndpoint<T> > >();
885 AsyncConnect(index,
886 boost::bind(&detail::sync_async_handler<PipeEndpoint<T> >::operator(), t,
887 RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2)),
888 GetNode()->GetRequestTimeout());
889 return t->end();
890 }
891
892 PipeClient(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub, bool unreliable = false,
893 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
894 boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
895 : Pipe<T>(verify), PipeClientBase(name, stub, unreliable, direction)
896 {
897 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
898 }
899
900 using PipeClientBase::AsyncClose;
901 using PipeClientBase::AsyncSendPipePacket;
902 using PipeClientBase::GetMemberName;
903 using PipeClientBase::PipePacketReceived;
904 using PipeClientBase::Shutdown;
905
906 protected:
907 static RR_SHARED_PTR<PipeEndpoint<T> > AsyncConnect_cast(const RR_SHARED_PTR<PipeEndpointBase>& b)
908 {
909 return rr_cast<PipeEndpoint<T> >(b);
910 }
911
912 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, bool unreliable,
913 MemberDefinition_Direction direction) RR_OVERRIDE
914 {
915 return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, 0,
916 unreliable, direction);
917 }
918};
919
920class ROBOTRACONTEUR_CORE_API ServiceSkel;
921class ROBOTRACONTEUR_CORE_API PipeServerBase : public virtual PipeBase
922{
923 public:
924 RR_OVIRTUAL ~PipeServerBase() RR_OVERRIDE {}
925
926 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
927
928 RR_OVIRTUAL void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
929
930 RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
931
932 RR_OVIRTUAL void AsyncSendPipePacket(
933 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber, bool requestack, uint32_t endpoint,
934 bool unreliable,
935 RR_MOVE_ARG(boost::function<void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
936 handler) RR_OVERRIDE;
937
938 RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint, bool remote, uint32_t ee,
939 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
940 handler,
941 int32_t timeout) RR_OVERRIDE;
942
943 virtual RR_INTRUSIVE_PTR<MessageEntry> PipeCommand(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e);
944
945 RR_SHARED_PTR<ServiceSkel> GetSkel();
946
947 RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
948
949 protected:
950 std::string m_MemberName;
951 std::string service_path;
952
953 struct pipe_endpoint_server_id
954 {
955 pipe_endpoint_server_id(uint32_t endpoint, int32_t index)
956 {
957 this->endpoint = endpoint;
958 this->index = index;
959 }
960
961 uint32_t endpoint;
962 int32_t index;
963
964 bool operator==(const pipe_endpoint_server_id& rhs) const
965 {
966 return (endpoint == rhs.endpoint && index == rhs.index);
967 }
968 };
969
970 struct hash_value
971 {
972 std::size_t operator()(pipe_endpoint_server_id const& e) const
973 {
974 std::size_t seed = 0;
975 boost::hash_combine(seed, e.endpoint);
976 boost::hash_combine(seed, e.index);
977 return seed;
978 }
979 };
980
981 RR_UNORDERED_MAP<pipe_endpoint_server_id, RR_SHARED_PTR<PipeEndpointBase>, hash_value> pipeendpoints;
982 boost::mutex pipeendpoints_lock;
983
984 RR_WEAK_PTR<ServiceSkel> skel;
985
986 PipeServerBase(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel, bool unreliable,
987 MemberDefinition_Direction direction);
988
989 virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint, bool unreliable,
990 MemberDefinition_Direction direction) = 0;
991
992 RR_OVIRTUAL void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
993
994 virtual void fire_PipeConnectCallback(const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
995
996 bool init;
997 boost::signals2::connection listener_connection;
998
999 public:
1000 void ClientDisconnected(const RR_SHARED_PTR<ServerContext>& context, ServerServiceListenerEventType ev,
1001 const RR_SHARED_PTR<void>& param);
1002};
1003
1004template <typename T>
1005class PipeServer : public virtual PipeServerBase, public virtual Pipe<T>
1006{
1007
1008 public:
1009 RR_OVIRTUAL ~PipeServer() RR_OVERRIDE {}
1010
1011 RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
1012 {
1013 return callback;
1014 }
1015
1016 RR_OVIRTUAL void SetPipeConnectCallback(boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&)> function)
1017 RR_OVERRIDE
1018 {
1019 callback = function;
1020 }
1021
1022 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
1023 {
1024 RR_UNUSED(index);
1025 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1026 "Connect is not valid for PipeServer");
1027 throw InvalidOperationException("Not valid for server");
1028 }
1029
1030 RR_OVIRTUAL void AsyncConnect(
1031 int32_t index,
1032 boost::function<void(const RR_SHARED_PTR<PipeEndpoint<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
1033 handler,
1034 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1035 {
1036 RR_UNUSED(index);
1037 RR_UNUSED(handler);
1038 RR_UNUSED(timeout);
1039 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1040 "AsyncConnect is not valid for PipeServer");
1041 throw InvalidOperationException("Not valid for server");
1042 }
1043
1044 PipeServer(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel, bool unreliable = false,
1045 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1046 boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1047 : PipeServerBase(name, skel, unreliable, direction), Pipe<T>(verify)
1048 {
1049 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1050 }
1051
1052 protected:
1053 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint, bool unreliable,
1054 MemberDefinition_Direction direction) RR_OVERRIDE
1055 {
1056 return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, endpoint,
1057 unreliable, direction);
1058 }
1059
1060 boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> callback;
1061
1062 RR_OVIRTUAL void fire_PipeConnectCallback(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE
1063 {
1064 if (!callback)
1065 return;
1066 callback(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(e));
1067 }
1068
1069 public:
1070 RR_OVIRTUAL void Shutdown() RR_OVERRIDE
1071 {
1072 PipeServerBase::Shutdown();
1073
1074 callback.clear();
1075 }
1076};
1077
1078namespace detail
1079{
1080class PipeBroadcasterBase_connected_endpoint;
1081struct PipeBroadcasterBase_async_send_operation;
1082} // namespace detail
1083
1090class ROBOTRACONTEUR_CORE_API PipeBroadcasterBase : public RR_ENABLE_SHARED_FROM_THIS<PipeBroadcasterBase>,
1091 private boost::noncopyable
1092{
1093 public:
1094 virtual ~PipeBroadcasterBase();
1095
1096 size_t GetActivePipeEndpointCount();
1097
1104 boost::function<bool(const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> GetPredicate();
1105
1124 void SetPredicate(boost::function<bool(const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> f);
1125
1131 int32_t GetMaxBacklog();
1132
1142 void SetMaxBacklog(int32_t maximum_backlog);
1143
1144 protected:
1145 PipeBroadcasterBase();
1146
1147 void InitBase(const RR_SHARED_PTR<PipeBase>& pipe, int32_t maximum_backlog = -1);
1148
1149 void EndpointConnectedBase(const RR_SHARED_PTR<PipeEndpointBase>& ep);
1150
1151 void EndpointClosedBase(const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep);
1152
1153 void PacketAckReceivedBase(const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep, uint32_t id);
1154
1155 void handle_send(int32_t id, const RR_SHARED_PTR<RobotRaconteurException>& err,
1156 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep,
1157 const RR_SHARED_PTR<detail::PipeBroadcasterBase_async_send_operation>& op, int32_t key,
1158 int32_t send_key, const boost::function<void()>& handler);
1159
1160 void SendPacketBase(const RR_INTRUSIVE_PTR<RRValue>& packet);
1161
1162 void AsyncSendPacketBase(const RR_INTRUSIVE_PTR<RRValue>& packet, RR_MOVE_ARG(boost::function<void()>) handler);
1163
1164 virtual void AttachPipeServerEvents(const RR_SHARED_PTR<PipeServerBase>& p);
1165
1166 virtual void AttachPipeEndpointEvents(const RR_SHARED_PTR<PipeEndpointBase>& p,
1167 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep);
1168
1169 RR_SHARED_PTR<PipeBase> GetPipeBase();
1170
1171 std::list<RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint> > endpoints;
1172 boost::mutex endpoints_lock;
1173
1174 RR_WEAK_PTR<PipeServerBase> pipe;
1175 RR_WEAK_PTR<RobotRaconteurNode> node;
1176 int32_t maximum_backlog;
1177 std::string service_path;
1178 std::string member_name;
1179
1180 bool copy_element;
1181
1182 boost::function<bool(const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> predicate;
1183};
1184
1216template <typename T>
1217class PipeBroadcaster : public PipeBroadcasterBase
1218{
1219
1220 public:
1229
1239 void Init(RR_SHARED_PTR<Pipe<T> > pipe, int32_t maximum_backlog = -1) { InitBase(pipe, maximum_backlog); }
1240
1248 void SendPacket(T packet) { SendPacketBase(RRPrimUtil<T>::PrePack(packet)); }
1249
1258 void AsyncSendPacket(T packet, boost::function<void()> handler)
1259 {
1260 AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
1261 }
1262
1268 RR_SHARED_PTR<Pipe<T> > GetPipe() { return rr_cast<Pipe<T> >(GetPipeBase()); }
1269
1270 protected:
1271 RR_OVIRTUAL void AttachPipeServerEvents(const RR_SHARED_PTR<PipeServerBase>& p) RR_OVERRIDE
1272 {
1273 RR_SHARED_PTR<PipeServer<T> > p_T = rr_cast<PipeServer<T> >(p);
1274
1275 p_T->SetPipeConnectCallback(
1276 boost::bind(&PipeBroadcaster::EndpointConnectedBase, shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1277 }
1278
1279 RR_OVIRTUAL void AttachPipeEndpointEvents(const RR_SHARED_PTR<PipeEndpointBase>& ep,
1280 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep)
1281 RR_OVERRIDE
1282 {
1283 RR_SHARED_PTR<PipeEndpoint<T> > ep_T = rr_cast<PipeEndpoint<T> >(ep);
1284
1285 ep_T->SetPipeEndpointClosedCallback(boost::bind(&PipeBroadcaster::EndpointClosedBase, shared_from_this(), cep));
1286 ep_T->PacketAckReceivedEvent.connect(
1287 boost::bind(&PipeBroadcaster::PacketAckReceivedBase, shared_from_this(), cep, RR_BOOST_PLACEHOLDERS(_2)));
1288 }
1289};
1290#ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
1292using PipeEndpointBasePtr = RR_SHARED_PTR<PipeEndpointBase>;
1294template <typename T>
1295using PipeEndpointPtr = RR_SHARED_PTR<PipeEndpoint<T> >;
1297using PipeBasePtr = RR_SHARED_PTR<PipeBase>;
1299template <typename T>
1300using PipePtr = RR_SHARED_PTR<Pipe<T> >;
1302template <typename T>
1303using PipeBroadcasterPtr = RR_SHARED_PTR<PipeBroadcaster<T> >;
1304#endif
1305} // namespace RobotRaconteur
1306
1307#ifdef _MSVC_VER
1308#pragma warning(pop)
1309#endif
static boost::shared_ptr< T > rr_cast(const boost::shared_ptr< U > &objin)
Dynamic cast a RR_SHARED_PTR type. Throws DataTypeMismatchException if cast is invalid instead of ret...
Definition DataTypes.h:199
boost::shared_ptr< PipeBase > PipeBasePtr
Convenience alias for PipeBase shared_ptr.
Definition PipeMember.h:1297
boost::shared_ptr< PipeEndpoint< T > > PipeEndpointPtr
Convenience alias for PipeEndpoint shared_ptr.
Definition PipeMember.h:1295
boost::shared_ptr< PipeEndpointBase > PipeEndpointBasePtr
Convenience alias for PipeEndpointBase shared_ptr.
Definition PipeMember.h:1292
boost::shared_ptr< Pipe< T > > PipePtr
Convenience alias for Pipe shared_ptr.
Definition PipeMember.h:1300
boost::shared_ptr< PipeBroadcaster< T > > PipeBroadcasterPtr
Convenience alias for PipeBroadcaster shared_ptr.
Definition PipeMember.h:1303
MemberDefinition_Direction
Member direction enum.
Definition RobotRaconteurConstants.h:534
@ MemberDefinition_Direction_both
member supports read and write
Definition RobotRaconteurConstants.h:536
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition RobotRaconteurConstants.h:566
Base class for Pipe.
Definition PipeMember.h:539
static const int32_t ANY_INDEX
Dynamically select pipe endpoint index.
Definition PipeMember.h:550
virtual std::string GetMemberName()=0
Get the member name of the pipe.
bool IsUnreliable()
Get if pipe is declared unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
void SetPredicate(boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> f)
Set the predicate callback function.
void SetMaxBacklog(int32_t maximum_backlog)
Set the maximum backlog.
boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> GetPredicate()
Get the current predicate callback function.
int32_t GetMaxBacklog()
Gets the currently configured maximum backlog.
void Init(boost::shared_ptr< Pipe< T > > pipe, int32_t maximum_backlog=-1)
Initialize the PipeBroadcaster.
Definition PipeMember.h:1239
boost::shared_ptr< Pipe< T > > GetPipe()
Get the associated pipe.
Definition PipeMember.h:1268
void AsyncSendPacket(T packet, boost::function< void()> handler)
Asynchronously send packet to all connected pipe endpoint clients.
Definition PipeMember.h:1258
void SendPacket(T packet)
Send a packet to all connected pipe endpoint clients.
Definition PipeMember.h:1248
PipeBroadcaster()
Construct a new PipeBroadcaster.
Definition PipeMember.h:1228
Base class for PipeEndpoint.
Definition PipeMember.h:61
bool GetRequestPacketAck()
Get if pipe endpoint is requesting acks.
bool IsUnreliable()
Get if pipe endpoint is unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
void SetRequestPacketAck(bool ack)
Set if pipe endpoint should request packet acks.
virtual int32_t GetIndex()
Returns the pipe endpoint index used when endpoint connected.
void SetIgnoreReceived(bool ignore)
Set whether pipe endpoint should ignore incoming packets.
virtual void Close()
Close the pipe endpoint.
bool GetIgnoreReceived()
Get if pipe endpoint is ignoring incoming packets.
virtual void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously close the pipe endpoint.
virtual uint32_t GetEndpoint()
Returns the Robot Raconteur node Endpoint ID.
virtual size_t Available()
Return number of packets in the receive queue.
Pipe endpoint used to transmit reliable or unreliable data streams.
Definition PipeMember.h:284
virtual T PeekNextPacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Peek the next packet in the receive queue, block if queue is empty.
Definition PipeMember.h:418
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &, uint32_t)> PacketAckReceivedEvent
Signal called when a packet ack has been received.
Definition PipeMember.h:334
RR_OVIRTUAL void Close() RR_OVERRIDE
Close the pipe endpoint.
Definition PipeMember.h:478
virtual void AsyncSendPacket(typename boost::call_traits< T >::param_type packet, boost::function< void(uint32_t, const boost::shared_ptr< RobotRaconteurException > &)> handler)
Send a packet to the peer endpoint asynchronously.
Definition PipeMember.h:369
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &)> PacketReceivedEvent
Signal called when a packet has been received.
Definition PipeMember.h:323
virtual T ReceivePacket()
Receive the next packet in the receive queue.
Definition PipeMember.h:384
void SetPipeEndpointClosedCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> callback)
Set the endpoint closed callback function.
Definition PipeMember.h:311
RR_OVIRTUAL void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=2000) RR_OVERRIDE
Asynchronously close the pipe endpoint.
Definition PipeMember.h:510
virtual bool TryReceivePacketWait(T &val, int32_t timeout=RR_TIMEOUT_INFINITE, bool peek=false)
Try receiving a packet, optionally blocking if the queue is empty.
Definition PipeMember.h:438
virtual T ReceivePacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Receive the next packet in the receive queue, block if queue is empty.
Definition PipeMember.h:405
virtual uint32_t SendPacket(typename boost::call_traits< T >::param_type packet)
Sends a packet to the peer endpoint.
Definition PipeMember.h:347
boost::function< void(boost::shared_ptr< PipeEndpoint< T > >)> GetPipeEndpointClosedCallback()
Get the currently configured endpoint closed callback function.
Definition PipeMember.h:295
virtual T PeekNextPacket()
Peeks the next packet in the receive queue.
Definition PipeMember.h:395
pipe member type interface
Definition PipeMember.h:666
virtual void AsyncConnect(int32_t index, boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously connect a pipe endpoint.
virtual void SetPipeConnectCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> function)=0
Set the pipe endpoint connected callback function.
virtual boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> GetPipeConnectCallback()=0
Get the currently configured pipe endpoint connected callback function.
virtual boost::shared_ptr< PipeEndpoint< T > > Connect(int32_t index)=0
Connect a pipe endpoint.
virtual void AsyncConnect(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously connect a pipe endpoint.
Definition PipeMember.h:743