30#include <boost/function.hpp>
31#include <boost/unordered_map.hpp>
32#include <boost/call_traits.hpp>
37#pragma warning(disable : 4250)
38#pragma warning(disable : 4996)
41#include <boost/signals2.hpp>
43namespace RobotRaconteur
46class ROBOTRACONTEUR_CORE_API
PipeBase;
47class ROBOTRACONTEUR_CORE_API PipeEndpointBaseListener;
50class PipeSubscription_connection;
59class ROBOTRACONTEUR_CORE_API PipeEndpointBase :
public RR_ENABLE_SHARED_FROM_THIS<PipeEndpointBase>,
60 private boost::noncopyable
62 friend class PipeBase;
63 friend class PipeClientBase;
64 friend class PipeServerBase;
65 friend class PipeBroadcasterBase;
66 friend class PipeSubscriptionBase;
67 friend class detail::PipeSubscription_connection;
70 virtual ~PipeEndpointBase() {}
125 virtual void AsyncClose(boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
181 virtual void AddListener(
const RR_SHARED_PTR<PipeEndpointBaseListener>& listener);
183 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
186 virtual void RemoteClose();
188 PipeEndpointBase(
const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0,
194 bool RequestPacketAck;
196 void AsyncSendPacketBase(
const RR_INTRUSIVE_PTR<RRValue>& packet,
197 RR_MOVE_ARG(boost::function<
void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)>)
200 RR_INTRUSIVE_PTR<RRValue> ReceivePacketBase();
201 RR_INTRUSIVE_PTR<RRValue> PeekPacketBase();
206 bool TryReceivePacketBaseWait(RR_INTRUSIVE_PTR<RRValue>& packet, int32_t timeout =
RR_TIMEOUT_INFINITE,
209 boost::mutex sendlock;
210 boost::mutex recvlock;
212 std::deque<RR_INTRUSIVE_PTR<RRValue> > recv_packets;
213 boost::condition_variable recv_packets_wait;
215 uint32_t increment_packet_number(uint32_t packetnum);
217 void PipePacketReceived(
const RR_INTRUSIVE_PTR<RRValue>& packet, uint32_t packetnum);
219 void PipePacketAckReceived(uint32_t packetnum);
223 virtual void fire_PipeEndpointClosedCallback() = 0;
225 virtual void fire_PacketReceivedEvent() = 0;
227 virtual void fire_PacketAckReceivedEvent(uint32_t packetnum) = 0;
229 RR_SHARED_PTR<PipeBase> GetParent();
233 uint32_t send_packet_number;
234 uint32_t recv_packet_number;
236 RR_WEAK_PTR<PipeBase> parent;
239 std::string service_path;
240 std::string member_name;
242 RR_UNORDERED_MAP<uint32_t, RR_INTRUSIVE_PTR<RRValue> > out_of_order_packets;
244 bool ignore_incoming_packets;
246 boost::mutex listeners_lock;
247 std::list<RR_WEAK_PTR<PipeEndpointBaseListener> > listeners;
249 detail::async_signal_semaphore pipe_packet_received_semaphore;
251 RR_WEAK_PTR<RobotRaconteurNode> node;
283class PipeEndpoint :
public PipeEndpointBase
286 boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> PipeEndpointClosedCallback;
287 boost::mutex PipeEndpointClosedCallback_lock;
297 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
298 return PipeEndpointClosedCallback;
313 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
314 PipeEndpointClosedCallback = callback;
347 virtual uint32_t
SendPacket(
typename boost::call_traits<T>::param_type packet)
349 ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
351 RR_SHARED_PTR<detail::sync_async_handler<uint32_t> > t =
352 RR_MAKE_SHARED<detail::sync_async_handler<uint32_t> >();
353 boost::function<void(
const RR_SHARED_PTR<uint32_t>&,
const RR_SHARED_PTR<RobotRaconteurException>&)> h =
354 boost::bind(&detail::sync_async_handler<uint32_t>::operator(), t, RR_BOOST_PLACEHOLDERS(_1),
355 RR_BOOST_PLACEHOLDERS(_2));
357 packet, boost::bind(&PipeEndpoint::send_handler, RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2), h));
370 boost::function<
void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)> handler)
372 AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
384 virtual T
ReceivePacket() {
return RRPrimUtil<T>::PreUnpack(ReceivePacketBase()); }
395 virtual T
PeekNextPacket() {
return RRPrimUtil<T>::PreUnpack(PeekPacketBase()); }
407 return RRPrimUtil<T>::PreUnpack(ReceivePacketBaseWait(timeout));
420 return RRPrimUtil<T>::PreUnpack(PeekPacketBaseWait(timeout));
440 RR_INTRUSIVE_PTR<RRValue> o;
441 if (!TryReceivePacketBaseWait(o, timeout, peek))
443 val = RRPrimUtil<T>::PreUnpack(o);
447 PipeEndpoint(
const RR_SHARED_PTR<PipeBase>& parent, int32_t index, uint32_t endpoint = 0,
bool unreliable =
false,
452 RR_OVIRTUAL
void fire_PipeEndpointClosedCallback() RR_OVERRIDE
457 c(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()));
460 RR_OVIRTUAL
void fire_PacketReceivedEvent() RR_OVERRIDE
465 RR_OVIRTUAL
void fire_PacketAckReceivedEvent(uint32_t packetnum) RR_OVERRIDE
470 static void send_handler(uint32_t packetnumber,
const RR_SHARED_PTR<RobotRaconteurException>& err,
471 const boost::function<
void(
const RR_SHARED_PTR<uint32_t>&,
472 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
474 handler(RR_MAKE_SHARED<uint32_t>(packetnumber), err);
478 RR_OVIRTUAL
void Close() RR_OVERRIDE
483 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
484 PipeEndpointClosedCallback.clear();
491 virtual void AsyncClose1(
const RR_SHARED_PTR<RobotRaconteurException>& err,
492 const boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
497 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
498 PipeEndpointClosedCallback.clear();
503 catch (std::exception&)
510 RR_OVIRTUAL
void AsyncClose(boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
511 int32_t timeout = 2000) RR_OVERRIDE
514 RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(shared_from_this()),
515 RR_BOOST_PLACEHOLDERS(_1), handler),
520 RR_OVIRTUAL
void RemoteClose() RR_OVERRIDE
522 PipeEndpointBase::RemoteClose();
524 boost::mutex::scoped_lock lock(PipeEndpointClosedCallback_lock);
525 PipeEndpointClosedCallback.clear();
538class ROBOTRACONTEUR_CORE_API PipeBase :
public RR_ENABLE_SHARED_FROM_THIS<PipeBase>,
private boost::noncopyable
540 friend class PipeEndpointBase;
543 virtual ~PipeBase() {}
559 virtual void PipePacketReceived(
const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) = 0;
561 virtual void Shutdown() = 0;
563 virtual std::string GetServicePath() = 0;
565 virtual void AsyncClose(
const RR_SHARED_PTR<PipeEndpointBase>& endpoint,
bool remote, uint32_t ee,
566 RR_MOVE_ARG(boost::function<
void(
const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
567 int32_t timeout) = 0;
574 virtual void AsyncSendPipePacket(
575 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber,
bool requestack, uint32_t endpoint,
577 RR_MOVE_ARG(boost::function<
void(uint32_t,
const RR_SHARED_PTR<RobotRaconteurException>&)>) handler) = 0;
581 void DispatchPacketAck(
const RR_INTRUSIVE_PTR<MessageElement>& me,
const RR_SHARED_PTR<PipeEndpointBase>& e);
583 bool DispatchPacket(
const RR_INTRUSIVE_PTR<MessageElement>& me,
const RR_SHARED_PTR<PipeEndpointBase>& e,
584 uint32_t& packetnumber);
586 RR_INTRUSIVE_PTR<MessageElement> PackPacket(
const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index,
587 uint32_t packetnumber,
bool requestack);
589 virtual void DeleteEndpoint(
const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
591 virtual RR_INTRUSIVE_PTR<MessageElementData> PackData(
const RR_INTRUSIVE_PTR<RRValue>& data)
593 return GetNode()->PackVarType(data);
596 virtual RR_INTRUSIVE_PTR<RRValue> UnpackData(
const RR_INTRUSIVE_PTR<MessageElement>& mdata)
598 return GetNode()->UnpackVarType(mdata);
601 RR_WEAK_PTR<RobotRaconteurNode> node;
603 MemberDefinition_Direction direction;
606 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
665class Pipe :
public virtual PipeBase
668 friend class PipeEndpointBase;
670 Pipe(boost::function<
void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify) { this->verify = RR_MOVE(verify); }
672 RR_OVIRTUAL ~Pipe() RR_OVERRIDE {}
714 virtual RR_SHARED_PTR<PipeEndpoint<T> >
Connect(int32_t index) = 0;
729 boost::function<
void(
const RR_SHARED_PTR<
PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
744 boost::function<
void(
const RR_SHARED_PTR<
PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
751 RR_OVIRTUAL RR_INTRUSIVE_PTR<MessageElementData> PackData(
const RR_INTRUSIVE_PTR<RRValue>& data) RR_OVERRIDE
757 return GetNode()->template PackAnyType<typename RRPrimUtil<T>::BoxedType>(data);
760 RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> UnpackData(
const RR_INTRUSIVE_PTR<MessageElement>& mdata) RR_OVERRIDE
764 return GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
768 RR_INTRUSIVE_PTR<RRValue> ret = GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
775 boost::function<void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify;
778class ROBOTRACONTEUR_CORE_API ServiceStub;
780class ROBOTRACONTEUR_CORE_API PipeClientBase :
public virtual PipeBase
783 friend class PipeSubscriptionBase;
784 friend class detail::PipeSubscription_connection;
786 RR_OVIRTUAL ~PipeClientBase() RR_OVERRIDE {}
788 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
790 RR_OVIRTUAL
void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
792 RR_OVIRTUAL
void Shutdown() RR_OVERRIDE;
794 RR_OVIRTUAL
void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint,
bool remote, uint32_t ee,
795 RR_MOVE_ARG(boost::function<
void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
797 int32_t timeout) RR_OVERRIDE;
799 RR_SHARED_PTR<ServiceStub> GetStub();
801 RR_OVIRTUAL std::
string GetServicePath() RR_OVERRIDE;
804 RR_OVIRTUAL
void AsyncSendPipePacket(
805 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber,
bool requestack, uint32_t endpoint,
807 RR_MOVE_ARG(boost::function<
void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
808 handler) RR_OVERRIDE;
810 std::
string m_MemberName;
812 RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > pipeendpoints;
813 boost::mutex pipeendpoints_lock;
815 RR_WEAK_PTR<ServiceStub> stub;
817 std::list<boost::tuple<int32_t, int32_t> > connecting_endpoints;
818 int32_t connecting_key_count;
819 RR_UNORDERED_MAP<int32_t, RR_SHARED_PTR<PipeEndpointBase> > early_endpoints;
820 std::
string service_path;
823 void AsyncConnect_internal(int32_t index,
824 RR_MOVE_ARG(boost::function<
void(const RR_SHARED_PTR<PipeEndpointBase>&,
825 const RR_SHARED_PTR<RobotRaconteurException>&)>)
829 void AsyncConnect_internal1(const RR_INTRUSIVE_PTR<MessageEntry>& ret,
830 const RR_SHARED_PTR<RobotRaconteurException>& err, int32_t index, int32_t key,
831 boost::function<
void(const RR_SHARED_PTR<PipeEndpointBase>&,
832 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
834 PipeClientBase(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub,
bool unreliable,
835 MemberDefinition_Direction direction);
837 virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index,
bool unreliable,
838 MemberDefinition_Direction direction) = 0;
840 RR_OVIRTUAL
void DeleteEndpoint(const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
844class PipeClient : public virtual Pipe<T>, public virtual PipeClientBase
847 RR_OVIRTUAL ~PipeClient() RR_OVERRIDE {}
849 RR_OVIRTUAL boost::function<void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
851 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
852 "GetPipeConnectCallback is not valid for PipeClient");
853 throw InvalidOperationException(
"Not valid for client");
856 RR_OVIRTUAL
void SetPipeConnectCallback(boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)> function)
860 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
861 "SetPipeConnectCallback is not valid for PipeClient");
862 throw InvalidOperationException(
"Not valid for client");
865 RR_OVIRTUAL
void AsyncConnect(
867 boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
872 AsyncConnect_internal(index,
874 boost::bind(&PipeClient<T>::AsyncConnect_cast, RR_BOOST_PLACEHOLDERS(_1)),
875 RR_BOOST_PLACEHOLDERS(_2)),
879 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
881 ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
883 RR_SHARED_PTR<detail::sync_async_handler<PipeEndpoint<T> > > t =
884 RR_MAKE_SHARED<detail::sync_async_handler<PipeEndpoint<T> > >();
886 boost::bind(&detail::sync_async_handler<PipeEndpoint<T> >::operator(), t,
887 RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2)),
888 GetNode()->GetRequestTimeout());
892 PipeClient(boost::string_ref name,
const RR_SHARED_PTR<ServiceStub>& stub,
bool unreliable =
false,
893 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
894 boost::function<
void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
895 : Pipe<T>(verify), PipeClientBase(name, stub, unreliable, direction)
897 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
900 using PipeClientBase::AsyncClose;
901 using PipeClientBase::AsyncSendPipePacket;
902 using PipeClientBase::GetMemberName;
903 using PipeClientBase::PipePacketReceived;
904 using PipeClientBase::Shutdown;
907 static RR_SHARED_PTR<PipeEndpoint<T> > AsyncConnect_cast(
const RR_SHARED_PTR<PipeEndpointBase>& b)
909 return rr_cast<PipeEndpoint<T> >(b);
912 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index,
bool unreliable,
913 MemberDefinition_Direction direction) RR_OVERRIDE
915 return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, 0,
916 unreliable, direction);
920class ROBOTRACONTEUR_CORE_API ServiceSkel;
921class ROBOTRACONTEUR_CORE_API PipeServerBase :
public virtual PipeBase
924 RR_OVIRTUAL ~PipeServerBase() RR_OVERRIDE {}
926 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
928 RR_OVIRTUAL
void PipePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
930 RR_OVIRTUAL
void Shutdown() RR_OVERRIDE;
932 RR_OVIRTUAL
void AsyncSendPipePacket(
933 const RR_INTRUSIVE_PTR<RRValue>& data, int32_t index, uint32_t packetnumber,
bool requestack, uint32_t endpoint,
935 RR_MOVE_ARG(boost::function<
void(uint32_t, const RR_SHARED_PTR<RobotRaconteurException>&)>)
936 handler) RR_OVERRIDE;
938 RR_OVIRTUAL
void AsyncClose(const RR_SHARED_PTR<PipeEndpointBase>& endpoint,
bool remote, uint32_t ee,
939 RR_MOVE_ARG(boost::function<
void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
941 int32_t timeout) RR_OVERRIDE;
943 virtual RR_INTRUSIVE_PTR<MessageEntry> PipeCommand(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e);
945 RR_SHARED_PTR<ServiceSkel> GetSkel();
947 RR_OVIRTUAL std::
string GetServicePath() RR_OVERRIDE;
950 std::
string m_MemberName;
951 std::
string service_path;
953 struct pipe_endpoint_server_id
955 pipe_endpoint_server_id(uint32_t endpoint, int32_t index)
957 this->endpoint = endpoint;
964 bool operator==(
const pipe_endpoint_server_id& rhs)
const
966 return (endpoint == rhs.endpoint && index == rhs.index);
972 std::size_t operator()(pipe_endpoint_server_id
const& e)
const
974 std::size_t seed = 0;
975 boost::hash_combine(seed, e.endpoint);
976 boost::hash_combine(seed, e.index);
981 RR_UNORDERED_MAP<pipe_endpoint_server_id, RR_SHARED_PTR<PipeEndpointBase>, hash_value> pipeendpoints;
982 boost::mutex pipeendpoints_lock;
984 RR_WEAK_PTR<ServiceSkel> skel;
986 PipeServerBase(boost::string_ref name,
const RR_SHARED_PTR<ServiceSkel>& skel,
bool unreliable,
987 MemberDefinition_Direction direction);
989 virtual RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint,
bool unreliable,
990 MemberDefinition_Direction direction) = 0;
992 RR_OVIRTUAL
void DeleteEndpoint(
const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE;
994 virtual void fire_PipeConnectCallback(
const RR_SHARED_PTR<PipeEndpointBase>& e) = 0;
997 boost::signals2::connection listener_connection;
1000 void ClientDisconnected(
const RR_SHARED_PTR<ServerContext>& context, ServerServiceListenerEventType ev,
1001 const RR_SHARED_PTR<void>& param);
1004template <
typename T>
1005class PipeServer :
public virtual PipeServerBase,
public virtual Pipe<T>
1009 RR_OVIRTUAL ~PipeServer() RR_OVERRIDE {}
1011 RR_OVIRTUAL boost::function<void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)> GetPipeConnectCallback() RR_OVERRIDE
1016 RR_OVIRTUAL
void SetPipeConnectCallback(boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&)> function)
1019 callback = function;
1022 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpoint<T> > Connect(int32_t index) RR_OVERRIDE
1025 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1026 "Connect is not valid for PipeServer");
1027 throw InvalidOperationException(
"Not valid for server");
1030 RR_OVIRTUAL
void AsyncConnect(
1032 boost::function<
void(
const RR_SHARED_PTR<PipeEndpoint<T> >&,
const RR_SHARED_PTR<RobotRaconteurException>&)>
1039 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1040 "AsyncConnect is not valid for PipeServer");
1041 throw InvalidOperationException(
"Not valid for server");
1044 PipeServer(boost::string_ref name,
const RR_SHARED_PTR<ServiceSkel>& skel,
bool unreliable =
false,
1045 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1046 boost::function<
void(
const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1047 : PipeServerBase(name, skel, unreliable, direction), Pipe<T>(verify)
1049 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1053 RR_OVIRTUAL RR_SHARED_PTR<PipeEndpointBase> CreateNewPipeEndpoint(int32_t index, uint32_t endpoint,
bool unreliable,
1054 MemberDefinition_Direction direction) RR_OVERRIDE
1056 return RR_MAKE_SHARED<PipeEndpoint<T> >(RR_STATIC_POINTER_CAST<PipeBase>(shared_from_this()), index, endpoint,
1057 unreliable, direction);
1060 boost::function<void(RR_SHARED_PTR<PipeEndpoint<T> >)> callback;
1062 RR_OVIRTUAL
void fire_PipeConnectCallback(
const RR_SHARED_PTR<PipeEndpointBase>& e) RR_OVERRIDE
1066 callback(RR_STATIC_POINTER_CAST<PipeEndpoint<T> >(e));
1070 RR_OVIRTUAL
void Shutdown() RR_OVERRIDE
1072 PipeServerBase::Shutdown();
1080class PipeBroadcasterBase_connected_endpoint;
1081struct PipeBroadcasterBase_async_send_operation;
1090class ROBOTRACONTEUR_CORE_API PipeBroadcasterBase :
public RR_ENABLE_SHARED_FROM_THIS<PipeBroadcasterBase>,
1091 private boost::noncopyable
1094 virtual ~PipeBroadcasterBase();
1096 size_t GetActivePipeEndpointCount();
1104 boost::function<bool(
const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)>
GetPredicate();
1124 void SetPredicate(boost::function<
bool(
const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> f);
1145 PipeBroadcasterBase();
1147 void InitBase(
const RR_SHARED_PTR<PipeBase>& pipe, int32_t maximum_backlog = -1);
1149 void EndpointConnectedBase(
const RR_SHARED_PTR<PipeEndpointBase>& ep);
1151 void EndpointClosedBase(
const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep);
1153 void PacketAckReceivedBase(
const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep, uint32_t
id);
1155 void handle_send(int32_t
id,
const RR_SHARED_PTR<RobotRaconteurException>& err,
1156 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& ep,
1157 const RR_SHARED_PTR<detail::PipeBroadcasterBase_async_send_operation>& op, int32_t key,
1158 int32_t send_key,
const boost::function<
void()>& handler);
1160 void SendPacketBase(
const RR_INTRUSIVE_PTR<RRValue>& packet);
1162 void AsyncSendPacketBase(
const RR_INTRUSIVE_PTR<RRValue>& packet, RR_MOVE_ARG(boost::function<
void()>) handler);
1164 virtual void AttachPipeServerEvents(
const RR_SHARED_PTR<PipeServerBase>& p);
1166 virtual void AttachPipeEndpointEvents(
const RR_SHARED_PTR<PipeEndpointBase>& p,
1167 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep);
1169 RR_SHARED_PTR<PipeBase> GetPipeBase();
1171 std::list<RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint> > endpoints;
1172 boost::mutex endpoints_lock;
1174 RR_WEAK_PTR<PipeServerBase> pipe;
1175 RR_WEAK_PTR<RobotRaconteurNode> node;
1176 int32_t maximum_backlog;
1177 std::string service_path;
1178 std::string member_name;
1182 boost::function<bool(
const RR_SHARED_PTR<PipeBroadcasterBase>&, uint32_t, int32_t)> predicate;
1216template <
typename T>
1239 void Init(RR_SHARED_PTR<
Pipe<T> > pipe, int32_t maximum_backlog = -1) { InitBase(pipe, maximum_backlog); }
1248 void SendPacket(T packet) { SendPacketBase(RRPrimUtil<T>::PrePack(packet)); }
1260 AsyncSendPacketBase(RRPrimUtil<T>::PrePack(packet), RR_MOVE(handler));
1271 RR_OVIRTUAL
void AttachPipeServerEvents(
const RR_SHARED_PTR<PipeServerBase>& p) RR_OVERRIDE
1275 p_T->SetPipeConnectCallback(
1276 boost::bind(&PipeBroadcaster::EndpointConnectedBase, shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1279 RR_OVIRTUAL
void AttachPipeEndpointEvents(
const RR_SHARED_PTR<PipeEndpointBase>& ep,
1280 const RR_SHARED_PTR<detail::PipeBroadcasterBase_connected_endpoint>& cep)
1283 RR_SHARED_PTR<PipeEndpoint<T> > ep_T = rr_cast<PipeEndpoint<T> >(ep);
1285 ep_T->SetPipeEndpointClosedCallback(boost::bind(&PipeBroadcaster::EndpointClosedBase, shared_from_this(), cep));
1286 ep_T->PacketAckReceivedEvent.connect(
1287 boost::bind(&PipeBroadcaster::PacketAckReceivedBase, shared_from_this(), cep, RR_BOOST_PLACEHOLDERS(_2)));
1290#ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
1294template <
typename T>
1299template <
typename T>
1302template <
typename T>
static boost::shared_ptr< T > rr_cast(const boost::shared_ptr< U > &objin)
Dynamic cast a RR_SHARED_PTR type. Throws DataTypeMismatchException if cast is invalid instead of ret...
Definition DataTypes.h:199
boost::shared_ptr< PipeBase > PipeBasePtr
Convenience alias for PipeBase shared_ptr.
Definition PipeMember.h:1297
boost::shared_ptr< PipeEndpoint< T > > PipeEndpointPtr
Convenience alias for PipeEndpoint shared_ptr.
Definition PipeMember.h:1295
boost::shared_ptr< PipeEndpointBase > PipeEndpointBasePtr
Convenience alias for PipeEndpointBase shared_ptr.
Definition PipeMember.h:1292
boost::shared_ptr< Pipe< T > > PipePtr
Convenience alias for Pipe shared_ptr.
Definition PipeMember.h:1300
boost::shared_ptr< PipeBroadcaster< T > > PipeBroadcasterPtr
Convenience alias for PipeBroadcaster shared_ptr.
Definition PipeMember.h:1303
MemberDefinition_Direction
Member direction enum.
Definition RobotRaconteurConstants.h:534
@ MemberDefinition_Direction_both
member supports read and write
Definition RobotRaconteurConstants.h:536
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition RobotRaconteurConstants.h:566
Base class for Pipe.
Definition PipeMember.h:539
static const int32_t ANY_INDEX
Dynamically select pipe endpoint index.
Definition PipeMember.h:550
virtual std::string GetMemberName()=0
Get the member name of the pipe.
bool IsUnreliable()
Get if pipe is declared unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
void SetPredicate(boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> f)
Set the predicate callback function.
void SetMaxBacklog(int32_t maximum_backlog)
Set the maximum backlog.
boost::function< bool(const boost::shared_ptr< PipeBroadcasterBase > &, uint32_t, int32_t)> GetPredicate()
Get the current predicate callback function.
int32_t GetMaxBacklog()
Gets the currently configured maximum backlog.
void Init(boost::shared_ptr< Pipe< T > > pipe, int32_t maximum_backlog=-1)
Initialize the PipeBroadcaster.
Definition PipeMember.h:1239
boost::shared_ptr< Pipe< T > > GetPipe()
Get the associated pipe.
Definition PipeMember.h:1268
void AsyncSendPacket(T packet, boost::function< void()> handler)
Asynchronously send packet to all connected pipe endpoint clients.
Definition PipeMember.h:1258
void SendPacket(T packet)
Send a packet to all connected pipe endpoint clients.
Definition PipeMember.h:1248
PipeBroadcaster()
Construct a new PipeBroadcaster.
Definition PipeMember.h:1228
Base class for PipeEndpoint.
Definition PipeMember.h:61
bool GetRequestPacketAck()
Get if pipe endpoint is requesting acks.
bool IsUnreliable()
Get if pipe endpoint is unreliable.
MemberDefinition_Direction Direction()
The direction of the pipe.
void SetRequestPacketAck(bool ack)
Set if pipe endpoint should request packet acks.
virtual int32_t GetIndex()
Returns the pipe endpoint index used when endpoint connected.
void SetIgnoreReceived(bool ignore)
Set whether pipe endpoint should ignore incoming packets.
virtual void Close()
Close the pipe endpoint.
bool GetIgnoreReceived()
Get if pipe endpoint is ignoring incoming packets.
virtual void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously close the pipe endpoint.
virtual uint32_t GetEndpoint()
Returns the Robot Raconteur node Endpoint ID.
virtual size_t Available()
Return number of packets in the receive queue.
Pipe endpoint used to transmit reliable or unreliable data streams.
Definition PipeMember.h:284
virtual T PeekNextPacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Peek the next packet in the receive queue, block if queue is empty.
Definition PipeMember.h:418
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &, uint32_t)> PacketAckReceivedEvent
Signal called when a packet ack has been received.
Definition PipeMember.h:334
RR_OVIRTUAL void Close() RR_OVERRIDE
Close the pipe endpoint.
Definition PipeMember.h:478
virtual void AsyncSendPacket(typename boost::call_traits< T >::param_type packet, boost::function< void(uint32_t, const boost::shared_ptr< RobotRaconteurException > &)> handler)
Send a packet to the peer endpoint asynchronously.
Definition PipeMember.h:369
boost::signals2::signal< void(const boost::shared_ptr< PipeEndpoint< T > > &)> PacketReceivedEvent
Signal called when a packet has been received.
Definition PipeMember.h:323
virtual T ReceivePacket()
Receive the next packet in the receive queue.
Definition PipeMember.h:384
void SetPipeEndpointClosedCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> callback)
Set the endpoint closed callback function.
Definition PipeMember.h:311
RR_OVIRTUAL void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=2000) RR_OVERRIDE
Asynchronously close the pipe endpoint.
Definition PipeMember.h:510
virtual bool TryReceivePacketWait(T &val, int32_t timeout=RR_TIMEOUT_INFINITE, bool peek=false)
Try receiving a packet, optionally blocking if the queue is empty.
Definition PipeMember.h:438
virtual T ReceivePacketWait(int32_t timeout=RR_TIMEOUT_INFINITE)
Receive the next packet in the receive queue, block if queue is empty.
Definition PipeMember.h:405
virtual uint32_t SendPacket(typename boost::call_traits< T >::param_type packet)
Sends a packet to the peer endpoint.
Definition PipeMember.h:347
boost::function< void(boost::shared_ptr< PipeEndpoint< T > >)> GetPipeEndpointClosedCallback()
Get the currently configured endpoint closed callback function.
Definition PipeMember.h:295
virtual T PeekNextPacket()
Peeks the next packet in the receive queue.
Definition PipeMember.h:395
pipe member type interface
Definition PipeMember.h:666
virtual void AsyncConnect(int32_t index, boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously connect a pipe endpoint.
virtual void SetPipeConnectCallback(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> function)=0
Set the pipe endpoint connected callback function.
virtual boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &)> GetPipeConnectCallback()=0
Get the currently configured pipe endpoint connected callback function.
virtual boost::shared_ptr< PipeEndpoint< T > > Connect(int32_t index)=0
Connect a pipe endpoint.
virtual void AsyncConnect(boost::function< void(const boost::shared_ptr< PipeEndpoint< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)
Asynchronously connect a pipe endpoint.
Definition PipeMember.h:743