Fawkes API Fawkes Development Version
blackboard_manager.cpp
1
2/***************************************************************************
3 * Protoboard plugin template
4 * - blackboard manager implementation: Watch interfaces specified in
5 * instantiation and call to protobuf sender thread accordingly.
6 * Also specialize templates for peer handling since that is domain
7 * independent.
8 *
9 * Copyright 2019 Victor Mataré
10 ****************************************************************************/
11
12/* This program is free software; you can redistribute it and/or modify
13 * it under the terms of the GNU General Public License as published by
14 * the Free Software Foundation; either version 2 of the License, or
15 * (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU Library General Public License for more details.
21 *
22 * Read the full text in the LICENSE.GPL file in the doc directory.
23 */
24
25#include "blackboard_manager.h"
26
27#include "protobuf_to_bb.h"
28
29namespace protoboard {
30
31using namespace fawkes;
32
34{
35}
36
38{
39}
40
42: fawkes::Thread("ProtoboardBlackboardManager", Thread::OPMODE_WAITFORWAKEUP),
43 message_handler_(msg_handler),
44 bb_receiving_interfaces_(make_receiving_interfaces_map()),
45 on_message_waker_(nullptr),
46 next_peer_idx_(0)
47{
49}
50
51void
53{
54 pb_sender_.reset(sender);
55}
56
57void
59{
60 pb_sender_->init();
61 peer_iface_ = blackboard->open_for_writing<ProtobufPeerInterface>("/protoboard/peers");
62
63 on_message_waker_ = new fawkes::BlackBoardOnMessageWaker(blackboard, peer_iface_, this);
64
65 for (pb_conversion_map::value_type &c : bb_receiving_interfaces_)
66 c.second->init(blackboard, logger);
67}
68
69void
71{
72 delete on_message_waker_;
73 bb_receiving_interfaces_.clear();
74 pb_sender_->finalize();
75 blackboard->close(peer_iface_);
76}
77
78void
80{
81 // Handle CreatePeer* messages
82 on_interface<ProtobufPeerInterface>{peer_iface_, this}
87
88 // Handle sending blackboard interfaces
89 pb_sender_->process_sending_interfaces();
90
91 // Handle receiving blackboard interfaces
92 while (message_handler_->pb_queue_incoming()) {
93 ProtobufThead::incoming_message inc = message_handler_->pb_queue_pop();
94 pb_conversion_map::iterator it;
95
96 if ((it = bb_receiving_interfaces_.find(inc.msg->GetTypeName()))
97 == bb_receiving_interfaces_.end()) {
99 "Received message of unregistered type `%s'",
100 inc.msg->GetTypeName().c_str());
101 continue;
102 }
103 try {
104 it->second->handle(inc.msg);
105 } catch (std::exception &e) {
107 "Exception while handling %s: %s",
108 inc.msg->GetTypeName().c_str(),
109 e.what());
110 }
111 }
112}
113
116{
117 return blackboard;
118}
119
120void
121BlackboardManager::add_peer(ProtobufPeerInterface *iface, long peer_id)
122{
123 if (next_peer_idx_ >= iface->maxlenof_peers()) {
125 "Maximum number of peers reached. Can't create new peer with index %d!",
126 next_peer_idx_);
127 return;
128 }
129 iface->set_peers(next_peer_idx_++, peer_id);
130 iface->write();
131}
132
133/**
134 * Local specialization for the CreatePeerMessage that establishes ProtoBuf communication
135 * @param iface The ProtobufPeerInterface
136 * @param msg The incoming CreatePeerMessage
137 */
138template <>
139void
142{
143 add_peer(iface, message_handler_->peer_create(msg->address(), msg->port()));
144}
145
146/**
147 * Local specialization for the CreatePeerLocalMessage that establishes ProtoBuf communication
148 * @param iface The ProtobufPeerInterface
149 * @param msg The incoming CreatePeerLocalMessage
150 */
151template <>
152void
155{
156 add_peer(iface,
157 message_handler_->peer_create_local(msg->address(),
158 msg->send_to_port(),
159 msg->recv_on_port()));
160}
161
162/**
163 * Local specialization for the CreatePeerCryptoMessage that establishes ProtoBuf communication
164 * @param iface The ProtobufPeerInterface
165 * @param msg The incoming CreatePeerCryptoMessage
166 */
167template <>
168void
171{
172 add_peer(iface,
173 message_handler_->peer_create_crypto(
174 msg->address(), msg->port(), msg->crypto_key(), msg->cipher()));
175}
176
177/**
178 * Local specialization for the CreatePeerLocalCryptoMessage that establishes ProtoBuf communication
179 * @param iface The ProtobufPeerInterface
180 * @param msg The incoming CreatePeerLocalCryptoMessage
181 */
182template <>
183void
186{
187 add_peer(iface,
188 message_handler_->peer_create_local_crypto(msg->address(),
189 msg->send_to_port(),
190 msg->recv_on_port(),
191 msg->crypto_key(),
192 msg->cipher()));
193}
194
195} // namespace protoboard
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Wake threads on receiving a blackboard message.
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:501
virtual void log_error(const char *component, const char *format,...)=0
Log error message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
CreatePeerCryptoMessage Fawkes BlackBoard Interface Message.
CreatePeerLocalCryptoMessage Fawkes BlackBoard Interface Message.
CreatePeerLocalMessage Fawkes BlackBoard Interface Message.
CreatePeerMessage Fawkes BlackBoard Interface Message.
ProtobufPeerInterface Fawkes BlackBoard Interface.
size_t maxlenof_peers() const
Get maximum length of peers value.
void set_peers(unsigned int index, const int64_t new_peers)
Set peers value at given index.
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_coalesce_wakeups(bool coalesce=true)
Set wakeup coalescing.
Definition: thread.cpp:729
Abstract superclass for sending out ProtoBuf messages.
AbstractProtobufSender(BlackboardManager *bb_mgr)
Constructor.
virtual ~AbstractProtobufSender()
Destructor.
The main thread that is woken each time a message arrives on any of the interfaces watched by a bb_if...
virtual void finalize() override
Finalize the thread.
void set_protobuf_sender(AbstractProtobufSender *sender)
The ProtoBuf sender must be initialized after construction to beak a dependency loop.
fawkes::BlackBoard * get_blackboard()
Helper for other classes to get access to the blackboard.
virtual void init() override
Initialize the thread.
virtual void loop() override
Code to execute in the thread.
void handle_message(InterfaceT *iface, MessageT *msg)
Act on a given message on a given blackboard interface.
BlackboardManager(ProtobufThead *msg_handler)
Main thread constructor.
Receive incoming ProtoBuf messages and pass them on to the BlackboardManager for publication to the a...
incoming_message pb_queue_pop()
long int peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
long int peer_create(const std::string &host, int port)
Enable protobuf peer.
long int peer_create_local(const std::string &host, int send_to_port, int recv_on_port)
Enable protobuf peer.
long int peer_create_local_crypto(const std::string &host, int send_to_port, int recv_on_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Fawkes library namespace.
Wrapper for a ProtoBuf message and its metadata.
std::shared_ptr< google::protobuf::Message > msg
The message itself.