Robot Raconteur Core C++ Library
Loading...
Searching...
No Matches
WireMember.h
Go to the documentation of this file.
1
23
24#pragma once
25
30#include <boost/call_traits.hpp>
31
32#ifdef _MSVC_VER
33#pragma warning(push)
34#pragma warning(disable : 4250)
35#pragma warning(disable : 4996)
36#endif
37#include <boost/signals2.hpp>
38
39namespace RobotRaconteur
40{
41class ROBOTRACONTEUR_CORE_API WireBase;
42class ROBOTRACONTEUR_CORE_API WireConnectionBase;
43class ROBOTRACONTEUR_CORE_API WireConnectionBaseListener;
44namespace detail
45{
46class WireSubscription_connection;
47ROBOTRACONTEUR_CORE_API bool WireConnectionBase_IsValueExpired(RR_WEAK_PTR<RobotRaconteurNode> node,
48 const boost::posix_time::ptime& recv_time,
49 int32_t lifespan);
50} // namespace detail
51
58class ROBOTRACONTEUR_CORE_API WireConnectionBase : public RR_ENABLE_SHARED_FROM_THIS<WireConnectionBase>,
59 private boost::noncopyable
60{
61
62 friend class WireBase;
63 friend class WireClientBase;
64 friend class WireServerBase;
65 friend class WireBroadcasterBase;
66 friend class WireSubscriptionBase;
67 friend class detail::WireSubscription_connection;
68
69 public:
78 virtual uint32_t GetEndpoint();
79
88
97
105 virtual void Close();
106
115 virtual void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
116 int32_t timeout);
117
118 WireConnectionBase(const RR_SHARED_PTR<WireBase>& parent, uint32_t endpoint = 0,
120
121 virtual ~WireConnectionBase() {}
122
123 virtual void WirePacketReceived(TimeSpec timespec, const RR_INTRUSIVE_PTR<RRValue>& packet);
124
134 virtual bool GetInValueValid();
135
145 virtual bool GetOutValueValid();
146
158 bool WaitInValueValid(int32_t timeout = RR_TIMEOUT_INFINITE);
159
171 bool WaitOutValueValid(int32_t timeout = RR_TIMEOUT_INFINITE);
172
173 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
174
184 virtual bool GetIgnoreInValue();
185
195 virtual void SetIgnoreInValue(bool ignore);
196
197 virtual void AddListener(const RR_SHARED_PTR<WireConnectionBaseListener>& listener);
198
209
219 virtual int32_t GetInValueLifespan();
233 virtual void SetInValueLifespan(int32_t millis);
234
244 virtual int32_t GetOutValueLifespan();
258 virtual void SetOutValueLifespan(int32_t millis);
259
260 protected:
261 virtual void RemoteClose();
262
263 RR_INTRUSIVE_PTR<RRValue> inval;
264 RR_INTRUSIVE_PTR<RRValue> outval;
265
266 bool inval_valid;
267 TimeSpec lasttime_send;
268 boost::posix_time::ptime lasttime_send_local;
269
270 bool outval_valid;
271 TimeSpec lasttime_recv;
272 boost::posix_time::ptime lasttime_recv_local;
273
274 boost::condition_variable inval_wait;
275 boost::condition_variable outval_wait;
276
277 int32_t inval_lifespan;
278 int32_t outval_lifespan;
279
280 uint32_t endpoint;
281 RR_WEAK_PTR<WireBase> parent;
282 std::string service_path;
283 std::string member_name;
284
285 boost::mutex sendlock;
286 boost::mutex recvlock;
287
288 bool send_closed;
289 bool recv_closed;
290
291 RR_INTRUSIVE_PTR<RRValue> GetInValueBase();
292
293 RR_INTRUSIVE_PTR<RRValue> GetOutValueBase();
294
295 void SetOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value);
296
297 bool TryGetInValueBase(RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec& time);
298 bool TryGetOutValueBase(RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec& time);
299
300 virtual void fire_WireValueChanged(const RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec time) = 0;
301
302 virtual void fire_WireClosedCallback() = 0;
303
304 void Shutdown();
305
306 RR_SHARED_PTR<WireBase> GetParent();
307
308 boost::mutex inval_lock;
309 boost::mutex outval_lock;
310
311 bool ignore_inval;
312
313 boost::mutex listeners_lock;
314 std::list<RR_WEAK_PTR<WireConnectionBaseListener> > listeners;
315
316 detail::async_signal_semaphore wire_value_changed_semaphore;
317
318 RR_WEAK_PTR<RobotRaconteurNode> node;
319
321};
322
350template <typename T>
351class WireConnection : public WireConnectionBase
352{
353 private:
354 boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> WireConnectionClosedCallback;
355 boost::mutex WireConnectionClosedCallback_lock;
356
357 public:
364 boost::signals2::signal<void(const RR_SHARED_PTR<WireConnection<T> >& connection, T value, TimeSpec time)>
366
372
373 boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> GetWireConnectionClosedCallback()
374 {
375 boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
376 return WireConnectionClosedCallback;
377 }
378
389 void SetWireConnectionClosedCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> callback)
390 {
391 boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
392 WireConnectionClosedCallback = callback;
393 }
394
395 RR_OVIRTUAL ~WireConnection() RR_OVERRIDE {}
396
406 virtual T GetInValue() { return RRPrimUtil<T>::PreUnpack(GetInValueBase()); }
407
417 virtual T GetOutValue() { return RRPrimUtil<T>::PreUnpack(GetOutValueBase()); }
418
428 virtual void SetOutValue(typename boost::call_traits<T>::param_type value)
429 {
430 SetOutValueBase(RRPrimUtil<T>::PrePack(value));
431 }
432
444 bool TryGetInValue(T& value, TimeSpec& time)
445 {
446 RR_INTRUSIVE_PTR<RRValue> o;
447 if (!TryGetInValueBase(o, time))
448 return false;
449 value = RRPrimUtil<T>::PreUnpack(o);
450 return true;
451 }
452
464 bool TryGetOutValue(T& value, TimeSpec& time)
465 {
466 RR_INTRUSIVE_PTR<RRValue> o;
467 if (!TryGetOutValueBase(o, time))
468 return false;
469 value = RRPrimUtil<T>::PreUnpack(o);
470 return true;
471 }
472
473 WireConnection(const RR_SHARED_PTR<WireBase>& parent, uint32_t endpoint = 0,
475 : WireConnectionBase(parent, endpoint, direction)
476 {}
477
478 protected:
479 RR_OVIRTUAL void fire_WireValueChanged(const RR_INTRUSIVE_PTR<RRValue>& value, TimeSpec time) RR_OVERRIDE
480 {
481 WireValueChanged(RR_STATIC_POINTER_CAST<WireConnection<T> >(shared_from_this()),
482 RRPrimUtil<T>::PreUnpack(value), time);
483 }
484
485 RR_OVIRTUAL void fire_WireClosedCallback() RR_OVERRIDE
486 {
487 boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> c = GetWireConnectionClosedCallback();
488 if (!c)
489 return;
490 c(RR_STATIC_POINTER_CAST<WireConnection<T> >(shared_from_this()));
491 }
492
493 public:
494 RR_OVIRTUAL void Close() RR_OVERRIDE
495 {
497 {
498 boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
499 WireConnectionClosedCallback.clear();
500 }
501 WireValueChanged.disconnect_all_slots();
502 }
503
504 protected:
505 virtual void AsyncClose1(const RR_SHARED_PTR<RobotRaconteurException>& err,
506 const boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
507 {
508 try
509 {
510 {
511 boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
512 WireConnectionClosedCallback.clear();
513 }
514 WireValueChanged.disconnect_all_slots();
515 }
516 catch (std::exception&)
517 {}
518
519 handler(err);
520 }
521
522 public:
523 RR_OVIRTUAL void AsyncClose(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
524 int32_t timeout = 2000) RR_OVERRIDE
525 {
526 WireConnectionBase::AsyncClose(boost::bind(&WireConnection<T>::AsyncClose1,
527 RR_STATIC_POINTER_CAST<WireConnection<T> >(shared_from_this()),
528 RR_BOOST_PLACEHOLDERS(_1), handler),
529 timeout);
530 }
531
532 protected:
533 RR_OVIRTUAL void RemoteClose() RR_OVERRIDE
534 {
535 WireConnectionBase::RemoteClose();
536 {
537 boost::mutex::scoped_lock lock(WireConnectionClosedCallback_lock);
538 WireConnectionClosedCallback.clear();
539 }
540 WireValueChanged.disconnect_all_slots();
541 }
542};
543
548class ROBOTRACONTEUR_CORE_API WireBase : public RR_ENABLE_SHARED_FROM_THIS<WireBase>, private boost::noncopyable
549{
550
551 public:
552 friend class WireConnectionBase;
553
554 virtual ~WireBase() {}
555
561 virtual std::string GetMemberName() = 0;
562
563 virtual std::string GetServicePath() = 0;
564
565 virtual void WirePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) = 0;
566
567 virtual void Shutdown() = 0;
568
569 virtual void AsyncClose(const RR_SHARED_PTR<WireConnectionBase>& endpoint, bool remote, uint32_t ee,
570 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
571 int32_t timeout) = 0;
572
573 protected:
574 WireBase();
575
576 virtual void SendWirePacket(const RR_INTRUSIVE_PTR<RRValue>& data, TimeSpec time, uint32_t endpoint) = 0;
577
578 bool rawelements;
579
580 void DispatchPacket(const RR_INTRUSIVE_PTR<MessageEntry>& me, const RR_SHARED_PTR<WireConnectionBase>& e);
581
582 RR_INTRUSIVE_PTR<RRValue> UnpackPacket(const RR_INTRUSIVE_PTR<MessageEntry>& me, TimeSpec& ts);
583
584 RR_INTRUSIVE_PTR<MessageEntry> PackPacket(const RR_INTRUSIVE_PTR<RRValue>& data, TimeSpec time);
585
586 virtual RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data)
587 {
588 return GetNode()->PackVarType(data);
589 }
590
591 virtual RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata)
592 {
593 return GetNode()->UnpackVarType(mdata);
594 }
595
596 RR_WEAK_PTR<RobotRaconteurNode> node;
597
598 MemberDefinition_Direction direction;
599
600 public:
601 RR_SHARED_PTR<RobotRaconteurNode> GetNode();
602
613};
614
667template <typename T>
668class Wire : public virtual WireBase
669{
670
671 friend class WireConnectionBase;
672
673 public:
674 Wire(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify) { this->verify = RR_MOVE(verify); }
675
676 RR_OVIRTUAL ~Wire() RR_OVERRIDE {}
677
691 virtual RR_SHARED_PTR<WireConnection<T> > Connect() = 0;
692
703 virtual void AsyncConnect(
704 boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
705 handler,
706 int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
707
724 virtual T PeekInValue(TimeSpec& ts) = 0;
725
742 virtual T PeekOutValue(TimeSpec& ts) = 0;
743
757 virtual void PokeOutValue(const T& value) = 0;
758
769 virtual void AsyncPeekInValue(
770 boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
771 int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
772
783 virtual void AsyncPeekOutValue(
784 boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
785 int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
786
798 virtual void AsyncPokeOutValue(const T& value,
799 boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
800 int32_t timeout = RR_TIMEOUT_INFINITE) = 0;
801
809 virtual boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> GetWireConnectCallback() = 0;
810
829 virtual void SetWireConnectCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> function) = 0;
830
838 virtual boost::function<T(const uint32_t&)> GetPeekInValueCallback() = 0;
839
863 virtual void SetPeekInValueCallback(boost::function<T(const uint32_t&)> function) = 0;
864
872 virtual boost::function<T(const uint32_t&)> GetPeekOutValueCallback() = 0;
873
897 virtual void SetPeekOutValueCallback(boost::function<T(const uint32_t&)> function) = 0;
898
907 virtual boost::function<void(const T&, const TimeSpec&, const uint32_t&)> GetPokeOutValueCallback() = 0;
908
934 boost::function<void(const T&, const TimeSpec&, const uint32_t&)> function) = 0;
935
936 protected:
937 RR_OVIRTUAL RR_INTRUSIVE_PTR<MessageElementData> PackData(const RR_INTRUSIVE_PTR<RRValue>& data) RR_OVERRIDE
938 {
939 if (verify)
940 {
941 verify(data);
942 }
943 return GetNode()->template PackAnyType<typename RRPrimUtil<T>::BoxedType>(data);
944 }
945
946 RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> UnpackData(const RR_INTRUSIVE_PTR<MessageElement>& mdata) RR_OVERRIDE
947 {
948 if (!verify)
949 {
950 return GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
951 }
952 else
953 {
954 RR_INTRUSIVE_PTR<RRValue> ret = GetNode()->template UnpackAnyType<typename RRPrimUtil<T>::BoxedType>(mdata);
955 verify(ret);
956 return ret;
957 }
958 }
959
960 boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify;
961};
962
963class ROBOTRACONTEUR_CORE_API ServiceStub;
964
965class ROBOTRACONTEUR_CORE_API WireClientBase : public virtual WireBase
966{
967 friend class WireConnectionBase;
968 friend class WireSubscriptionBase;
969 friend class detail::WireSubscription_connection;
970
971 public:
972 RR_OVIRTUAL ~WireClientBase() RR_OVERRIDE {}
973
974 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
975
976 RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
977
978 RR_OVIRTUAL void WirePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
979
980 RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
981
982 RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<WireConnectionBase>& endpoint, bool remote, uint32_t ee,
983 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
984 handler,
985 int32_t timeout) RR_OVERRIDE;
986
987 RR_SHARED_PTR<ServiceStub> GetStub();
988
989 protected:
990 RR_OVIRTUAL void SendWirePacket(const RR_INTRUSIVE_PTR<RRValue>& packet, TimeSpec time,
991 uint32_t endpoint) RR_OVERRIDE;
992
993 std::string m_MemberName;
994 std::string service_path;
995 uint32_t endpoint;
996
997 RR_SHARED_PTR<WireConnectionBase> connection;
998 boost::mutex connection_lock;
999
1000 RR_WEAK_PTR<ServiceStub> stub;
1001
1002 void AsyncConnect_internal(RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<WireConnectionBase>&,
1003 const RR_SHARED_PTR<RobotRaconteurException>&)>)
1004 handler,
1005 int32_t timeout);
1006
1007 void AsyncConnect_internal1(const RR_INTRUSIVE_PTR<MessageEntry>& ret,
1008 const RR_SHARED_PTR<RobotRaconteurException>& err,
1009 boost::function<void(const RR_SHARED_PTR<WireConnectionBase>&,
1010 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
1011
1012 WireClientBase(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub,
1013 MemberDefinition_Direction direction);
1014
1015 virtual RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(MemberDefinition_Direction direction) = 0;
1016
1017 RR_INTRUSIVE_PTR<RRValue> PeekInValueBase(TimeSpec& ts);
1018 RR_INTRUSIVE_PTR<RRValue> PeekOutValueBase(TimeSpec& ts);
1019 void PokeOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value);
1020
1021 void AsyncPeekInValueBase(RR_MOVE_ARG(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&, const TimeSpec&,
1022 const RR_SHARED_PTR<RobotRaconteurException>&)>) handler,
1023 int32_t timeout = RR_TIMEOUT_INFINITE);
1024 void AsyncPeekOutValueBase(RR_MOVE_ARG(boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&, const TimeSpec&,
1025 const RR_SHARED_PTR<RobotRaconteurException>&)>)
1026 handler,
1027 int32_t timeout = RR_TIMEOUT_INFINITE);
1028 void AsyncPokeOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value,
1029 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
1030 handler,
1031 int32_t timeout = RR_TIMEOUT_INFINITE);
1032
1033 void AsyncPeekValueBaseEnd1(const RR_INTRUSIVE_PTR<MessageEntry>& m,
1034 const RR_SHARED_PTR<RobotRaconteurException>& err,
1035 boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&, const TimeSpec&,
1036 const RR_SHARED_PTR<RobotRaconteurException>&)>& handler);
1037};
1038
1039template <typename T>
1040class WireClient : public virtual Wire<T>, public virtual WireClientBase
1041{
1042 public:
1043 WireClient(boost::string_ref name, const RR_SHARED_PTR<ServiceStub>& stub,
1044 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1045 boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1046 : Wire<T>(verify), WireClientBase(name, stub, direction)
1047 {
1048 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1049 }
1050
1051 RR_OVIRTUAL ~WireClient() RR_OVERRIDE {}
1052
1053 RR_OVIRTUAL void AsyncConnect(
1054 boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
1055 handler,
1056 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1057 {
1058 AsyncConnect_internal(boost::bind(handler,
1059 boost::bind(&WireClient<T>::AsyncConnect_cast, RR_BOOST_PLACEHOLDERS(_1)),
1060 RR_BOOST_PLACEHOLDERS(_2)),
1061 timeout);
1062 }
1063
1064 RR_OVIRTUAL RR_SHARED_PTR<WireConnection<T> > Connect() RR_OVERRIDE
1065 {
1066 ROBOTRACONTEUR_ASSERT_MULTITHREADED(node);
1067
1068 RR_SHARED_PTR<detail::sync_async_handler<WireConnection<T> > > t =
1069 RR_MAKE_SHARED<detail::sync_async_handler<WireConnection<T> > >();
1070 AsyncConnect(boost::bind(&detail::sync_async_handler<WireConnection<T> >::operator(), t,
1071 RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2)),
1072 GetNode()->GetRequestTimeout());
1073 return t->end();
1074 }
1075
1076 protected:
1077 static RR_SHARED_PTR<WireConnection<T> > AsyncConnect_cast(const RR_SHARED_PTR<WireConnectionBase>& b)
1078 {
1079 return rr_cast<WireConnection<T> >(b);
1080 }
1081
1082 void AsyncPeekValueBaseEnd2(
1083 const RR_INTRUSIVE_PTR<RRValue>& value, const TimeSpec& ts, const RR_SHARED_PTR<RobotRaconteurException>& err,
1084 const boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)>& handler)
1085 {
1086
1087 if (err)
1088 {
1089 typename boost::initialized<T> err_value;
1090 handler(err_value, ts, err);
1091 return;
1092 }
1093
1094 T value2;
1095 try
1096 {
1097 value2 = RRPrimUtil<T>::PreUnpack(value);
1098 }
1099 catch (std::exception& exp)
1100 {
1101 typename boost::initialized<T> err_value;
1102 RR_SHARED_PTR<RobotRaconteurException> err = RobotRaconteurExceptionUtil::ExceptionToSharedPtr(exp);
1103 handler(err_value, ts, err);
1104 return;
1105 }
1106
1107 handler(value2, ts, err);
1108 }
1109
1110 public:
1111 RR_OVIRTUAL T PeekInValue(TimeSpec& ts) RR_OVERRIDE { return RRPrimUtil<T>::PreUnpack(PeekInValueBase(ts)); }
1112 RR_OVIRTUAL T PeekOutValue(TimeSpec& ts) RR_OVERRIDE { return RRPrimUtil<T>::PreUnpack(PeekOutValueBase(ts)); }
1113 RR_OVIRTUAL void PokeOutValue(const T& value) RR_OVERRIDE
1114 {
1115 return PokeOutValueBase(RRPrimUtil<T>::PrePack(value));
1116 }
1117 RR_OVIRTUAL void AsyncPeekInValue(
1118 boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1119 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1120 {
1121 AsyncPeekInValueBase(boost::bind(&WireClient::AsyncPeekValueBaseEnd2,
1122 RR_DYNAMIC_POINTER_CAST<WireClient>(shared_from_this()),
1123 RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2),
1124 RR_BOOST_PLACEHOLDERS(_3), RR_MOVE(handler)),
1125 timeout);
1126 }
1127 RR_OVIRTUAL void AsyncPeekOutValue(
1128 boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1129 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1130 {
1131 AsyncPeekOutValueBase(boost::bind(&WireClient::AsyncPeekValueBaseEnd2,
1132 RR_DYNAMIC_POINTER_CAST<WireClient>(shared_from_this()),
1133 RR_BOOST_PLACEHOLDERS(_1), RR_BOOST_PLACEHOLDERS(_2),
1134 RR_BOOST_PLACEHOLDERS(_3), RR_MOVE(handler)),
1135 timeout);
1136 }
1137 RR_OVIRTUAL void AsyncPokeOutValue(const T& value,
1138 boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1139 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1140 {
1141 AsyncPokeOutValueBase(RRPrimUtil<T>::PrePack(value), RR_MOVE(handler), timeout);
1142 }
1143
1144 // Unused service-side functions
1145 RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> GetWireConnectCallback() RR_OVERRIDE
1146 {
1147 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1148 "GetWireConnectCallback is not valid for WireClient");
1149 throw InvalidOperationException("Not valid for client");
1150 }
1151 RR_OVIRTUAL void SetWireConnectCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> function)
1152 RR_OVERRIDE
1153 {
1154 RR_UNUSED(function);
1155 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1156 "SetWireConnectCallback is not valid for WireClient");
1157 throw InvalidOperationException("Not valid for client");
1158 }
1159 RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekInValueCallback() RR_OVERRIDE
1160 {
1161 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1162 "GetPeekInValueCallback is not valid for WireClient");
1163 throw InvalidOperationException("Not valid for client");
1164 }
1165 RR_OVIRTUAL void SetPeekInValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1166 {
1167 RR_UNUSED(function);
1168 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1169 "SetPeekInValueCallback is not valid for WireClient");
1170 throw InvalidOperationException("Not valid for client");
1171 }
1172 RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekOutValueCallback() RR_OVERRIDE
1173 {
1174 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1175 "GetPeekOutValueCallback is not valid for WireClient");
1176 throw InvalidOperationException("Not valid for client");
1177 }
1178 RR_OVIRTUAL void SetPeekOutValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1179 {
1180 RR_UNUSED(function);
1181 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1182 "SetPeekOutValueCallback is not valid for WireClient");
1183 throw InvalidOperationException("Not valid for client");
1184 }
1185 RR_OVIRTUAL boost::function<void(const T&, const TimeSpec&, const uint32_t&)> GetPokeOutValueCallback() RR_OVERRIDE
1186 {
1187 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1188 "GetPokeOutValueCallback is not valid for WireClient");
1189 throw InvalidOperationException("Not valid for client");
1190 }
1191 RR_OVIRTUAL void SetPokeOutValueCallback(boost::function<void(const T&, const TimeSpec&, const uint32_t&)> function)
1192 RR_OVERRIDE
1193 {
1194 RR_UNUSED(function);
1195 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, endpoint, service_path, m_MemberName,
1196 "SetPokeOutValueCallback is not valid for WireClient");
1197 throw InvalidOperationException("Not valid for client");
1198 }
1199
1200 protected:
1201 RR_OVIRTUAL RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(MemberDefinition_Direction direction)
1202 RR_OVERRIDE
1203 {
1204 return RR_MAKE_SHARED<WireConnection<T> >(RR_STATIC_POINTER_CAST<WireBase>(shared_from_this()), 0, direction);
1205 }
1206};
1207
1208class ROBOTRACONTEUR_CORE_API ServiceSkel;
1209class ROBOTRACONTEUR_CORE_API WireServerBase : public virtual WireBase
1210{
1211 friend class WireConnectionBase;
1212
1213 public:
1214 RR_OVIRTUAL ~WireServerBase() RR_OVERRIDE {}
1215
1216 RR_OVIRTUAL std::string GetMemberName() RR_OVERRIDE;
1217
1218 RR_OVIRTUAL std::string GetServicePath() RR_OVERRIDE;
1219
1220 RR_OVIRTUAL void WirePacketReceived(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e = 0) RR_OVERRIDE;
1221
1222 RR_OVIRTUAL void Shutdown() RR_OVERRIDE;
1223
1224 RR_OVIRTUAL void AsyncClose(const RR_SHARED_PTR<WireConnectionBase>& endpoint, bool remote, uint32_t ee,
1225 RR_MOVE_ARG(boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)>)
1226 handler,
1227 int32_t timeout) RR_OVERRIDE;
1228
1229 virtual RR_INTRUSIVE_PTR<MessageEntry> WireCommand(const RR_INTRUSIVE_PTR<MessageEntry>& m, uint32_t e);
1230
1231 RR_SHARED_PTR<ServiceSkel> GetSkel();
1232
1233 protected:
1234 RR_OVIRTUAL void SendWirePacket(const RR_INTRUSIVE_PTR<RRValue>& packet, TimeSpec time,
1235 uint32_t endpoint) RR_OVERRIDE;
1236
1237 std::string m_MemberName;
1238 std::string service_path;
1239
1240 RR_UNORDERED_MAP<uint32_t, RR_SHARED_PTR<WireConnectionBase> > connections;
1241 boost::mutex connections_lock;
1242
1243 RR_WEAK_PTR<ServiceSkel> skel;
1244
1245 WireServerBase(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel,
1246 MemberDefinition_Direction direction);
1247
1248 virtual RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(uint32_t e,
1249 MemberDefinition_Direction direction) = 0;
1250
1251 virtual void fire_WireConnectCallback(const RR_SHARED_PTR<WireConnectionBase>& e) = 0;
1252
1253 bool init;
1254 boost::signals2::connection listener_connection;
1255
1256 public:
1257 void ClientDisconnected(const RR_SHARED_PTR<ServerContext>& context, ServerServiceListenerEventType ev,
1258 const RR_SHARED_PTR<void>& param);
1259
1260 protected:
1261 virtual RR_INTRUSIVE_PTR<RRValue> do_PeekInValue(const uint32_t&) = 0;
1262 virtual RR_INTRUSIVE_PTR<RRValue> do_PeekOutValue(const uint32_t&) = 0;
1263 virtual void do_PokeOutValue(const RR_INTRUSIVE_PTR<RRValue>& value, const TimeSpec&, const uint32_t& ep) = 0;
1264};
1265
1266template <typename T>
1267class WireServer : public virtual WireServerBase, public virtual Wire<T>
1268{
1269
1270 public:
1271 WireServer(boost::string_ref name, const RR_SHARED_PTR<ServiceSkel>& skel,
1272 MemberDefinition_Direction direction = MemberDefinition_Direction_both,
1273 boost::function<void(const RR_INTRUSIVE_PTR<RRValue>&)> verify = RR_NULL_FN)
1274 : WireServerBase(name, skel, direction), Wire<T>(verify)
1275 {
1276 rawelements = (boost::is_same<T, RR_INTRUSIVE_PTR<MessageElement> >::value);
1277 }
1278
1279 RR_OVIRTUAL ~WireServer() RR_OVERRIDE {}
1280
1281 RR_OVIRTUAL void AsyncConnect(
1282 boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&, const RR_SHARED_PTR<RobotRaconteurException>&)>
1283 handler,
1284 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1285 {
1286 RR_UNUSED(handler);
1287 RR_UNUSED(timeout);
1288 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1289 "AsyncConnect is not valid for WireServer");
1290 throw InvalidOperationException("Not valid for server");
1291 }
1292 RR_OVIRTUAL RR_SHARED_PTR<WireConnection<T> > Connect() RR_OVERRIDE
1293 {
1294 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1295 "Connect is not valid for WireServer");
1296 throw InvalidOperationException("Not valid for server");
1297 }
1298 RR_OVIRTUAL T PeekInValue(TimeSpec& ts) RR_OVERRIDE
1299 {
1300 RR_UNUSED(ts);
1301 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1302 "PeekInValue is not valid for WireServer");
1303 throw InvalidOperationException("Not valid for server");
1304 }
1305 RR_OVIRTUAL T PeekOutValue(TimeSpec& ts) RR_OVERRIDE
1306 {
1307 RR_UNUSED(ts);
1308 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1309 "PeekOutValue is not valid for WireServer");
1310 throw InvalidOperationException("Not valid for server");
1311 }
1312 RR_OVIRTUAL void PokeOutValue(const T& value) RR_OVERRIDE
1313 {
1314 RR_UNUSED(value);
1315 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1316 "PokeOutValue is not valid for WireServer");
1317 throw InvalidOperationException("Not valid for server");
1318 }
1319 RR_OVIRTUAL void AsyncPeekInValue(
1320 boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1321 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1322 {
1323 RR_UNUSED(handler);
1324 RR_UNUSED(timeout);
1325 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1326 "AsyncPeekInValue is not valid for WireServer");
1327 throw InvalidOperationException("Not valid for server");
1328 }
1329 RR_OVIRTUAL void AsyncPeekOutValue(
1330 boost::function<void(const T&, const TimeSpec&, const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1331 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1332 {
1333 RR_UNUSED(handler);
1334 RR_UNUSED(timeout);
1335 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1336 "AsyncPeekOutValue is not valid for WireServer");
1337 throw InvalidOperationException("Not valid for server");
1338 }
1339 RR_OVIRTUAL void AsyncPokeOutValue(const T& value,
1340 boost::function<void(const RR_SHARED_PTR<RobotRaconteurException>&)> handler,
1341 int32_t timeout = RR_TIMEOUT_INFINITE) RR_OVERRIDE
1342 {
1343 RR_UNUSED(value);
1344 RR_UNUSED(handler);
1345 RR_UNUSED(timeout);
1346 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, m_MemberName,
1347 "AsyncPokeOutValue is not valid for WireServer");
1348 throw InvalidOperationException("Not valid for server");
1349 }
1350
1351 RR_OVIRTUAL boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> GetWireConnectCallback() RR_OVERRIDE
1352 {
1353 return callback;
1354 }
1355 RR_OVIRTUAL void SetWireConnectCallback(boost::function<void(const RR_SHARED_PTR<WireConnection<T> >&)> function)
1356 RR_OVERRIDE
1357 {
1358 callback = function;
1359 }
1360 RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekInValueCallback() RR_OVERRIDE { return peek_in_callback; }
1361 RR_OVIRTUAL void SetPeekInValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1362 {
1363 peek_in_callback = function;
1364 }
1365 RR_OVIRTUAL boost::function<T(const uint32_t&)> GetPeekOutValueCallback() RR_OVERRIDE { return peek_out_callback; }
1366 RR_OVIRTUAL void SetPeekOutValueCallback(boost::function<T(const uint32_t&)> function) RR_OVERRIDE
1367 {
1368 peek_out_callback = function;
1369 }
1370 RR_OVIRTUAL boost::function<void(const T&, const TimeSpec&, const uint32_t&)> GetPokeOutValueCallback() RR_OVERRIDE
1371 {
1372 return poke_out_callback;
1373 }
1374 RR_OVIRTUAL void SetPokeOutValueCallback(boost::function<void(const T&, const TimeSpec&, const uint32_t&)> function)
1375 RR_OVERRIDE
1376 {
1377 poke_out_callback = function;
1378 }
1379
1380 protected:
1381 RR_OVIRTUAL RR_SHARED_PTR<WireConnectionBase> CreateNewWireConnection(
1382 uint32_t e, MemberDefinition_Direction direction) RR_OVERRIDE
1383 {
1384 return RR_MAKE_SHARED<WireConnection<T> >(RR_STATIC_POINTER_CAST<WireBase>(shared_from_this()), e, direction);
1385 }
1386
1387 boost::function<void(RR_SHARED_PTR<WireConnection<T> >)> callback;
1388 boost::function<T(const uint32_t&)> peek_in_callback;
1389 boost::function<T(const uint32_t&)> peek_out_callback;
1390 boost::function<void(const T&, const TimeSpec&, const uint32_t&)> poke_out_callback;
1391
1392 RR_OVIRTUAL void fire_WireConnectCallback(const RR_SHARED_PTR<WireConnectionBase>& e) RR_OVERRIDE
1393 {
1394 if (!callback)
1395 return;
1396 callback(RR_STATIC_POINTER_CAST<WireConnection<T> >(e));
1397 }
1398
1399 RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> do_PeekInValue(const uint32_t& ep) RR_OVERRIDE
1400 {
1401 if (!peek_in_callback)
1402 {
1403 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, ep, service_path, m_MemberName,
1404 "Attempt to call PeekInValue when callback not set");
1405 throw InvalidOperationException("Invalid operation");
1406 }
1407 return RRPrimUtil<T>::PrePack(peek_in_callback(ep));
1408 }
1409
1410 RR_OVIRTUAL RR_INTRUSIVE_PTR<RRValue> do_PeekOutValue(const uint32_t& ep) RR_OVERRIDE
1411 {
1412 if (!peek_out_callback)
1413 {
1414 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, ep, service_path, m_MemberName,
1415 "Attempt to call PeekOutValue when callback not set");
1416 throw InvalidOperationException("Invalid operation");
1417 }
1418 return RRPrimUtil<T>::PrePack(peek_out_callback(ep));
1419 }
1420
1421 RR_OVIRTUAL void do_PokeOutValue(const RR_INTRUSIVE_PTR<RRValue>& value, const TimeSpec& ts,
1422 const uint32_t& ep) RR_OVERRIDE
1423 {
1424 if (!poke_out_callback)
1425 {
1426 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, ep, service_path, m_MemberName,
1427 "Attempt to call PokeOutValue when callback not set");
1428 throw InvalidOperationException("Invalid operation");
1429 }
1430 return poke_out_callback(RRPrimUtil<T>::PreUnpack(value), ts, ep);
1431 }
1432
1433 public:
1434 RR_OVIRTUAL void Shutdown() RR_OVERRIDE
1435 {
1436 WireServerBase::Shutdown();
1437 callback.clear();
1438 peek_in_callback.clear();
1439 peek_out_callback.clear();
1440 poke_out_callback.clear();
1441 }
1442};
1443
1444namespace detail
1445{
1446template <typename T>
1447class Wire_traits;
1448
1449template <typename T>
1450class Wire_traits<Wire<T> >
1451{
1452 public:
1453 typedef WireConnection<T> wireconnection_type;
1454 typedef WireClient<T> wireclient_type;
1455 typedef WireServer<T> wireserver_type;
1456};
1457
1458} // namespace detail
1459
1460namespace detail
1461{
1462class WireBroadcaster_connected_connection;
1463}
1464
1471class ROBOTRACONTEUR_CORE_API WireBroadcasterBase : public RR_ENABLE_SHARED_FROM_THIS<WireBroadcasterBase>
1472{
1473 public:
1474 size_t GetActiveWireConnectionCount();
1475
1476 virtual ~WireBroadcasterBase();
1477
1484 boost::function<bool(RR_SHARED_PTR<WireBroadcasterBase>&, uint32_t)> GetPredicate();
1485
1503 void SetPredicate(boost::function<bool(const RR_SHARED_PTR<WireBroadcasterBase>&, uint32_t)> f);
1504
1507
1509 void SetOutValueLifespan(int32_t millis);
1510
1511 protected:
1512 WireBroadcasterBase();
1513
1514 void InitBase(const RR_SHARED_PTR<WireBase>& wire);
1515
1516 void ConnectionClosedBase(const RR_SHARED_PTR<detail::WireBroadcaster_connected_connection>& ep);
1517
1518 void ConnectionConnectedBase(const RR_SHARED_PTR<WireConnectionBase>& ep);
1519
1520 void SetOutValueBase(const RR_INTRUSIVE_PTR<RRValue>& value);
1521
1522 virtual void AttachWireServerEvents(const RR_SHARED_PTR<WireServerBase>& w);
1523
1524 virtual void AttachWireConnectionEvents(const RR_SHARED_PTR<WireConnectionBase>& w,
1525 const RR_SHARED_PTR<detail::WireBroadcaster_connected_connection>& cep);
1526
1527 RR_INTRUSIVE_PTR<RRValue> ClientPeekInValueBase();
1528
1529 std::list<RR_SHARED_PTR<detail::WireBroadcaster_connected_connection> > connected_wires;
1530 boost::mutex connected_wires_lock;
1531 RR_WEAK_PTR<WireServerBase> wire;
1532 RR_WEAK_PTR<RobotRaconteurNode> node;
1533 std::string service_path;
1534 std::string member_name;
1535
1536 bool copy_element;
1537
1538 boost::function<bool(RR_SHARED_PTR<WireBroadcasterBase>&, uint32_t)> predicate;
1539
1540 RR_INTRUSIVE_PTR<RRValue> out_value;
1541 boost::initialized<bool> out_value_valid;
1542
1543 int32_t out_value_lifespan;
1544 boost::posix_time::ptime out_value_lasttime_local;
1545
1546 void ServiceEvent(ServerServiceListenerEventType evt);
1547
1548 RR_SHARED_PTR<WireBase> GetWireBase();
1549};
1550
1576template <typename T>
1577class WireBroadcaster : public WireBroadcasterBase
1578{
1579 public:
1588
1597 void Init(RR_SHARED_PTR<Wire<T> > wire) { InitBase(wire); }
1598
1611 void SetOutValue(T value) { SetOutValueBase(RRPrimUtil<T>::PrePack(value)); }
1612
1613 RR_SHARED_PTR<Wire<T> > GetWire() { return rr_cast<Wire<T> >(GetWireBase()); }
1614
1615 protected:
1616 T ClientPeekInValue() { return RRPrimUtil<T>::PreUnpack(ClientPeekInValueBase()); }
1617
1618 static T ClientPeekOutValue() { throw ReadOnlyMemberException("Read only wire"); }
1619 static T ClientPokeOutValue() { throw ReadOnlyMemberException("Read only wire"); }
1620
1621 RR_OVIRTUAL void AttachWireServerEvents(const RR_SHARED_PTR<WireServerBase>& w) RR_OVERRIDE
1622 {
1623 RR_SHARED_PTR<WireServer<T> > w_T = rr_cast<WireServer<T> >(w);
1624 w_T->SetWireConnectCallback(boost::bind(&WireBroadcaster::ConnectionConnectedBase, this->shared_from_this(),
1625 RR_BOOST_PLACEHOLDERS(_1)));
1626 w_T->SetPeekInValueCallback(boost::bind(&WireBroadcaster<T>::ClientPeekInValue,
1627 RR_STATIC_POINTER_CAST<WireBroadcaster<T> >(this->shared_from_this())));
1628 w_T->SetPeekOutValueCallback(boost::bind(&WireBroadcaster<T>::ClientPeekOutValue));
1629 w_T->SetPokeOutValueCallback(boost::bind(&WireBroadcaster<T>::ClientPokeOutValue));
1630 }
1631
1632 RR_OVIRTUAL void AttachWireConnectionEvents(const RR_SHARED_PTR<WireConnectionBase>& w,
1633 const RR_SHARED_PTR<detail::WireBroadcaster_connected_connection>& c)
1634 RR_OVERRIDE
1635 {
1636 RR_SHARED_PTR<WireConnection<T> > w_T = rr_cast<WireConnection<T> >(w);
1637 w_T->SetWireConnectionClosedCallback(
1638 boost::bind(&WireBroadcaster::ConnectionClosedBase, this->shared_from_this(), c));
1639 }
1640};
1641
1642namespace detail
1643{
1644static void WireUnicastReceiverBase_empty_close_handler(const RR_SHARED_PTR<RobotRaconteurException>& err) {}
1645} // namespace detail
1646
1647template <typename T, typename U>
1648class WireUnicastReceiverBase : public RR_ENABLE_SHARED_FROM_THIS<WireUnicastReceiverBase<T, U> >
1649{
1650 public:
1651 typedef typename detail::Wire_traits<T>::wireserver_type wireserver_type;
1652 typedef typename detail::Wire_traits<T>::wireconnection_type wireconnection_type;
1653
1661 WireUnicastReceiverBase() : in_value_lifespan(-1) {}
1662 virtual ~WireUnicastReceiverBase() {}
1663
1672 void Init(const RR_SHARED_PTR<T>& wire)
1673 {
1674 node = wire->GetNode();
1675 in_value_lifespan = -1;
1676 RR_SHARED_PTR<wireserver_type> wire_server = RR_DYNAMIC_POINTER_CAST<wireserver_type>(wire);
1677 if (!wire_server)
1678 {
1679 ROBOTRACONTEUR_LOG_DEBUG_COMPONENT_PATH(node, Member, -1, service_path, member_name,
1680 "WireUnicastReceiver init must be passed a WireServer");
1681 throw InvalidOperationException("WireServer required for WireUnicastReceiver");
1682 }
1683 this->wire = wire_server;
1684 wire_server->SetWireConnectCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ConnectionConnected,
1685 this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1686 wire_server->SetPeekInValueCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ClientPeekInValue));
1687 wire_server->SetPeekOutValueCallback(
1688 boost::bind(&WireUnicastReceiverBase<T, U>::ClientPeekOutValue, this->shared_from_this()));
1689 wire_server->SetPokeOutValueCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ClientPokeOutValue,
1690 this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1),
1691 RR_BOOST_PLACEHOLDERS(_2), RR_BOOST_PLACEHOLDERS(_3)));
1692
1693 wire_server->GetSkel()->GetContext()->ServerServiceListener.connect(
1694 boost::signals2::signal<void(
1695 const RR_SHARED_PTR<ServerContext>&, ServerServiceListenerEventType,
1696 const RR_SHARED_PTR<void>&)>::slot_type(boost::bind(&WireUnicastReceiverBase::ServiceEvent, this,
1697 RR_BOOST_PLACEHOLDERS(_2)))
1698 .track(this->shared_from_this()));
1699
1700 this->service_path = wire_server->GetServicePath();
1701 this->member_name = wire_server->GetMemberName();
1702
1703 ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, -1, service_path, member_name,
1704 "WireUnicastReceiver initialized");
1705 }
1706
1718 U GetInValue(TimeSpec& ts, uint32_t& ep)
1719 {
1720 boost::mutex::scoped_lock lock(this_lock);
1721 if (!in_value_valid.data())
1722 throw ValueNotSetException("Value not set");
1723 if (detail::WireConnectionBase_IsValueExpired(node, in_value_lasttime_local, in_value_lifespan))
1724 {
1725 throw ValueNotSetException("Value expired");
1726 }
1727 ts = in_value_ts;
1728 ep = in_value_ep;
1729 return in_value;
1730 }
1731
1745 bool TryGetInValue(U& value, TimeSpec& ts, uint32_t& ep)
1746 {
1747 boost::mutex::scoped_lock lock(this_lock);
1748 if (!in_value_valid)
1749 return false;
1750 if (detail::WireConnectionBase_IsValueExpired(node, in_value_lasttime_local, in_value_lifespan))
1751 {
1752 return false;
1753 }
1754 value = in_value;
1755 ts = in_value_ts;
1756 ep = in_value_ep;
1757 return true;
1758 }
1759
1766 boost::signals2::signal<void(const U&, const TimeSpec&, const uint32_t&)> InValueChanged;
1767
1769 int32_t GetInValueLifespan()
1770 {
1771 boost::mutex::scoped_lock lock(this_lock);
1772 return in_value_lifespan;
1773 }
1774
1776 void SetInValueLifespan(int32_t millis)
1777 {
1778 boost::mutex::scoped_lock lock(this_lock);
1779 in_value_lifespan = millis;
1780 }
1781
1782 RR_SHARED_PTR<T> GetWire() { return wire; }
1783
1784 protected:
1785 void ConnectionConnected(const RR_SHARED_PTR<wireconnection_type>& connection)
1786 {
1787 boost::mutex::scoped_lock lock(this_lock);
1788 if (active_connection)
1789 {
1790 uint32_t active_ep = active_connection->GetEndpoint();
1791 try
1792 {
1793 active_connection->AsyncClose(&detail::WireUnicastReceiverBase_empty_close_handler);
1794 }
1795 catch (std::exception&)
1796 {}
1797 active_connection.reset();
1798
1799 ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, active_ep, service_path, member_name,
1800 "WireUnicastReceiver active wire closed for new connection");
1801 }
1802 active_connection = connection;
1803 connection->SetWireConnectionClosedCallback(boost::bind(&WireUnicastReceiverBase<T, U>::ConnectionClosed,
1804 this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1)));
1805 connection->WireValueChanged.connect(boost::bind(&WireUnicastReceiverBase<T, U>::ConnectionInValueChanged,
1806 this->shared_from_this(), RR_BOOST_PLACEHOLDERS(_1),
1807 RR_BOOST_PLACEHOLDERS(_2), RR_BOOST_PLACEHOLDERS(_3)));
1808
1809 ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, -1, service_path, member_name,
1810 "WireUnicastReceiver wire connected, made active wire");
1811 }
1812
1813 void ConnectionClosed(const RR_SHARED_PTR<wireconnection_type>& connection)
1814 {
1815 boost::mutex::scoped_lock lock(this_lock);
1816 if (active_connection == connection)
1817 {
1818 active_connection.reset();
1819 }
1820 }
1821
1822 void ConnectionInValueChanged(const RR_SHARED_PTR<wireconnection_type>& connection, const U& value,
1823 const TimeSpec& time)
1824 {
1825 ClientPokeOutValue(value, time, connection->GetEndpoint());
1826 }
1827
1828 static U ClientPeekInValue() { throw WriteOnlyMemberException("Write only wire"); }
1829
1830 U ClientPeekOutValue()
1831 {
1832 boost::mutex::scoped_lock lock(this_lock);
1833 if (!in_value_valid)
1834 throw ValueNotSetException("Value not set");
1835 return in_value;
1836 }
1837
1838 void ClientPokeOutValue(const U& value, const TimeSpec& ts, const uint32_t& ep)
1839 {
1840 RR_SHARED_PTR<RobotRaconteurNode> n = node.lock();
1841 boost::mutex::scoped_lock lock(this_lock);
1842 in_value = value;
1843 in_value_ts = ts;
1844 in_value_valid.data() = true;
1845 in_value_ep.data() = ep;
1846 if (n)
1847 {
1848 in_value_lasttime_local = n->NowNodeTime();
1849 }
1850
1851 lock.unlock();
1852
1853 InValueChanged(value, ts, ep);
1854
1855 ROBOTRACONTEUR_LOG_TRACE_COMPONENT_PATH(node, Member, ep, service_path, member_name,
1856 "WireUnicastReceiver value changed");
1857 }
1858
1859 void ServiceEvent(ServerServiceListenerEventType evt)
1860 {
1861 if (evt != ServerServiceListenerEventType_ServiceClosed)
1862 return;
1863 boost::mutex::scoped_lock lock(this_lock);
1864 InValueChanged.disconnect_all_slots();
1865 }
1866
1867 RR_SHARED_PTR<wireserver_type> wire;
1868 RR_SHARED_PTR<wireconnection_type> active_connection;
1869 boost::mutex this_lock;
1870 U in_value;
1871 TimeSpec in_value_ts;
1872 boost::initialized<bool> in_value_valid;
1873 boost::initialized<uint32_t> in_value_ep;
1874 boost::posix_time::ptime in_value_lasttime_local;
1875 int32_t in_value_lifespan;
1876
1877 std::string member_name;
1878 std::string service_path;
1879 RR_WEAK_PTR<RobotRaconteurNode> node;
1880};
1881
1912template <typename T>
1913class WireUnicastReceiver : public WireUnicastReceiverBase<Wire<T>, T>
1914{
1915};
1916#ifndef ROBOTRACONTEUR_NO_CXX11_TEMPLATE_ALIASES
1918using WireConnectionBasePtr = RR_SHARED_PTR<WireConnectionBase>;
1920template <typename T>
1921using WireConnectionPtr = RR_SHARED_PTR<WireConnection<T> >;
1923using WireBasePtr = RR_SHARED_PTR<WireBase>;
1925template <typename T>
1926using WirePtr = RR_SHARED_PTR<Wire<T> >;
1928template <typename T>
1929using WireBroadcasterPtr = RR_SHARED_PTR<WireBroadcaster<T> >;
1931template <typename T>
1932using WireUnicastReceiverPtr = RR_SHARED_PTR<WireUnicastReceiver<T> >;
1933#endif
1934
1935} // namespace RobotRaconteur
1936
1937#ifdef _MSVC_VER
1938#pragma warning(pop)
1939#endif
static boost::shared_ptr< T > rr_cast(const boost::shared_ptr< U > &objin)
Dynamic cast a RR_SHARED_PTR type. Throws DataTypeMismatchException if cast is invalid instead of ret...
Definition DataTypes.h:199
ServerServiceListenerEventType
Enum of service listener events.
Definition RobotRaconteurConstants.h:518
MemberDefinition_Direction
Member direction enum.
Definition RobotRaconteurConstants.h:534
@ MemberDefinition_Direction_both
member supports read and write
Definition RobotRaconteurConstants.h:536
#define RR_TIMEOUT_INFINITE
Disable timeout for asynchronous operations.
Definition RobotRaconteurConstants.h:566
boost::shared_ptr< WireBroadcaster< T > > WireBroadcasterPtr
Convenience alias for WireBroadcaster shared_ptr.
Definition WireMember.h:1929
boost::shared_ptr< Wire< T > > WirePtr
Convenience alias for Wire shared_ptr.
Definition WireMember.h:1926
boost::shared_ptr< WireConnection< T > > WireConnectionPtr
Convenience alias for WireConnection shared_ptr.
Definition WireMember.h:1921
boost::shared_ptr< WireBase > WireBasePtr
Convenience alias for WireBase shared_ptr.
Definition WireMember.h:1923
boost::shared_ptr< WireUnicastReceiver< T > > WireUnicastReceiverPtr
Convenience alias for WireUnicastReceiver shared_ptr.
Definition WireMember.h:1932
boost::shared_ptr< WireConnectionBase > WireConnectionBasePtr
Convenience alias for WireConnectionBase shared_ptr.
Definition WireMember.h:1918
Represents. a point in time. Used by wire members to timestamp packets.
Definition DataTypes.h:2668
Base class for Wire.
Definition WireMember.h:549
virtual std::string GetMemberName()=0
Get the member name of the wire.
MemberDefinition_Direction Direction()
The direction of the wire.
void SetOutValueLifespan(int32_t millis)
Set the lifespan of OutValue.
int32_t GetOutValueLifespan()
Get the lifespan of OutValue.
boost::function< bool(boost::shared_ptr< WireBroadcasterBase > &, uint32_t)> GetPredicate()
Get the current predicate callback function.
void SetPredicate(boost::function< bool(const boost::shared_ptr< WireBroadcasterBase > &, uint32_t)> f)
Set the predicate callback function.
WireBroadcaster()
Construct a new WireBroadcaster.
Definition WireMember.h:1587
void SetOutValue(T value)
Set the OutValue for all connections.
Definition WireMember.h:1611
void Init(boost::shared_ptr< Wire< T > > wire)
Initialize the WireBroadcaster.
Definition WireMember.h:1597
Base class for WireConnection.
Definition WireMember.h:60
virtual int32_t GetOutValueLifespan()
Get the lifespan of OutValue.
virtual void SetIgnoreInValue(bool ignore)
Set whether wire connection should ignore incoming values.
virtual TimeSpec GetLastValueSentTime()
Get the timestamp of the last sent value.
bool WaitInValueValid(int32_t timeout=RR_TIMEOUT_INFINITE)
Waits for InValue to be valid.
virtual void Close()
Close the wire connection.
bool WaitOutValueValid(int32_t timeout=RR_TIMEOUT_INFINITE)
Waits for OutValue to be valid.
virtual uint32_t GetEndpoint()
Returns the Robot Raconteur node Endpoint ID.
virtual void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout)
Asynchronously close the wire connection.
virtual bool GetOutValueValid()
Get if the OutValue is valid.
MemberDefinition_Direction Direction()
The direction of the wire.
virtual void SetOutValueLifespan(int32_t millis)
Set the lifespan of OutValue.
virtual int32_t GetInValueLifespan()
Get the lifespan of InValue.
virtual bool GetIgnoreInValue()
Get if wire connection is ignoring incoming values.
virtual void SetInValueLifespan(int32_t millis)
Set the lifespan of InValue.
virtual TimeSpec GetLastValueReceivedTime()
Get the timestamp of the last received value.
virtual bool GetInValueValid()
Get if the InValue is valid.
Wire connection used to transmit "most recent" values.
Definition WireMember.h:352
virtual T GetOutValue()
Get the current OutValue.
Definition WireMember.h:417
RR_OVIRTUAL void AsyncClose(boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=2000) RR_OVERRIDE
Asynchronously close the wire connection.
Definition WireMember.h:523
virtual T GetInValue()
Get the current InValue.
Definition WireMember.h:406
bool TryGetInValue(T &value, TimeSpec &time)
Try getting the InValue, returning true on success or false on failure.
Definition WireMember.h:444
RR_OVIRTUAL void Close() RR_OVERRIDE
Close the wire connection.
Definition WireMember.h:494
bool TryGetOutValue(T &value, TimeSpec &time)
Try getting the OutValue, returning true on success or false on failure.
Definition WireMember.h:464
boost::signals2::signal< void(const boost::shared_ptr< WireConnection< T > > &connection, T value, TimeSpec time)> WireValueChanged
Signal invoked when the InValue is changed.
Definition WireMember.h:365
virtual void SetOutValue(typename boost::call_traits< T >::param_type value)
Set the OutValue and transmit to the peer connection.
Definition WireMember.h:428
void SetWireConnectionClosedCallback(boost::function< void(const boost::shared_ptr< WireConnection< T > > &)> callback)
Set the connection closed callback function.
Definition WireMember.h:389
boost::function< void(boost::shared_ptr< WireConnection< T > >)> GetWireConnectionClosedCallback()
Get the currently configured connection closed callback function.
Definition WireMember.h:373
wire member type interface
Definition WireMember.h:669
virtual void SetWireConnectCallback(boost::function< void(const boost::shared_ptr< WireConnection< T > > &)> function)=0
Set wire connected callback function.
virtual boost::function< T(const uint32_t &)> GetPeekOutValueCallback()=0
Get the currently configure PeekOutValue callback.
virtual T PeekInValue(TimeSpec &ts)=0
Peek the current InValue.
virtual void SetPeekInValueCallback(boost::function< T(const uint32_t &)> function)=0
Set the PeekInValue callback function.
virtual boost::shared_ptr< WireConnection< T > > Connect()=0
Connect the wire.
virtual boost::function< T(const uint32_t &)> GetPeekInValueCallback()=0
Get the currently configure PeekInValue callback.
virtual void AsyncConnect(boost::function< void(const boost::shared_ptr< WireConnection< T > > &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously connect the wire.
virtual void AsyncPeekInValue(boost::function< void(const T &, const TimeSpec &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously peek the current InValue.
virtual boost::function< void(const T &, const TimeSpec &, const uint32_t &)> GetPokeOutValueCallback()=0
Get the currently configure PokeOutValue callback.
virtual void SetPokeOutValueCallback(boost::function< void(const T &, const TimeSpec &, const uint32_t &)> function)=0
Set the PokeOutValue callback function.
virtual void SetPeekOutValueCallback(boost::function< T(const uint32_t &)> function)=0
Set the PeekOutValue callback function.
virtual boost::function< void(const boost::shared_ptr< WireConnection< T > > &)> GetWireConnectCallback()=0
Get the currently configured wire connected callback function.
virtual void PokeOutValue(const T &value)=0
Poke the OutValue.
virtual void AsyncPeekOutValue(boost::function< void(const T &, const TimeSpec &, const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously peek the current OutValue.
virtual void AsyncPokeOutValue(const T &value, boost::function< void(const boost::shared_ptr< RobotRaconteurException > &)> handler, int32_t timeout=RR_TIMEOUT_INFINITE)=0
Asynchronously poke the OutValue.
virtual T PeekOutValue(TimeSpec &ts)=0
Peek the current OutValue.
Receive the InValue from the most recent connection.
Definition WireMember.h:1914