XRootD
Loading...
Searching...
No Matches
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor.
 
 ~Stream ()
 Destructor.
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty.
 
void Disconnect (bool force=false)
 Disconnect the stream.
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection.
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error.
 
const std::string & GetName () const
 Return stream name.
 
const URLGetURL () const
 Get the URL.
 
XRootDStatus Initialize ()
 Initializer.
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed.
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error.
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error.
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed.
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout.
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout.
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream.
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler.
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler.
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending.
 
void SetChannelData (AnyObject *channelData)
 Set the channel data.
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue.
 
void SetJobManager (JobManager *jobManager)
 Set job manager.
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams.
 
void SetPoller (Poller *poller)
 Set the poller.
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager.
 
void SetTransport (TransportHandler *transport)
 Set the transport.
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58 {
59 Disconnected = 0,
60 Connected = 1,
61 Connecting = 2,
62 Error = 3
63 };
@ Disconnected
Not connected.
@ Error
Broken.
@ Connected
Connected.
@ Connecting
In the process of being connected.

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL * url,
const URL & prefer = URL() )

Constructor.

Definition at line 96 of file XrdClStream.cc.

96 :
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pBytesSent( 0 ),
111 pBytesReceived( 0 )
112 {
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
119
120 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126
127 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129
130 pAddressType = Utils::String2AddressType( netStack );
131 if( pAddressType == Utils::AddressType::IPAuto )
132 {
134 if( !( stacks & XrdNetUtils::hasIP64 ) )
135 {
136 if( stacks & XrdNetUtils::hasIPv4 )
137 pAddressType = Utils::AddressType::IPv4;
138 else if( stacks & XrdNetUtils::hasIPv6 )
139 pAddressType = Utils::AddressType::IPv6;
140 }
141 }
142
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148 }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition XrdConfig.cc:112

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdCl::Utils::IPAuto, XrdCl::Utils::IPv4, XrdCl::Utils::IPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, XrdNetUtils::qryINIF, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154 {
155 Disconnect( true );
156
157 Log *log = DefaultEnv::GetLog();
158 log->Debug( PostMasterMsg, "[%s] Destroying stream",
159 pStreamName.c_str() );
160
161 MonitorDisconnection( XRootDStatus() );
162
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165 delete *it;
166 }
void Disconnect(bool force=false)
Disconnect the stream.

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL & url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1173 of file XrdClStream.cc.

1174 {
1175 Log *log = DefaultEnv::GetLog();
1176
1177 //--------------------------------------------------------------------------
1178 // Resolve all the addresses of the host we're supposed to connect to
1179 //--------------------------------------------------------------------------
1180 std::vector<XrdNetAddr> prefaddrs;
1181 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1182 if( !st.IsOK() )
1183 {
1184 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1185 , pStreamName.c_str(), url.GetHostName().c_str() );
1186 return false;
1187 }
1188
1189 //--------------------------------------------------------------------------
1190 // Resolve all the addresses of the alias
1191 //--------------------------------------------------------------------------
1192 std::vector<XrdNetAddr> aliasaddrs;
1193 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1194 if( !st.IsOK() )
1195 {
1196 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1197 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1198 return false;
1199 }
1200
1201 //--------------------------------------------------------------------------
1202 // Now check if the preferred host is part of the alias
1203 //--------------------------------------------------------------------------
1204 auto itr = prefaddrs.begin();
1205 for( ; itr != prefaddrs.end() ; ++itr )
1206 {
1207 auto itr2 = aliasaddrs.begin();
1208 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1209 if( itr->Same( &*itr2 ) ) return true;
1210 }
1211
1212 return false;
1213 }
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t subStream)

Disables respective uplink if empty.

Definition at line 568 of file XrdClStream.cc.

569 {
570 XrdSysMutexHelper scopedLock( pMutex );
571 Log *log = DefaultEnv::GetLog();
572
573 if( pSubStreams[subStream]->outQueue->IsEmpty() )
574 {
575 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
576 pSubStreams[subStream]->socket->GetStreamName().c_str() );
577 pSubStreams[subStream]->socket->DisableUplink();
578 }
579 }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364 {
365 XrdSysMutexHelper scopedLock( pMutex );
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368 {
369 (*it)->socket->Close();
370 (*it)->status = Socket::Disconnected;
371 }
372 }
@ Disconnected
The socket is disconnected.

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID & path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188 {
189 XrdSysMutexHelper scopedLock( pMutex );
190
191 //--------------------------------------------------------------------------
192 // We are in the process of connecting the main stream, so we do nothing
193 // because when the main stream connection is established it will connect
194 // all the other streams
195 //--------------------------------------------------------------------------
196 if( pSubStreams[0]->status == Socket::Connecting )
197 return XRootDStatus();
198
199 //--------------------------------------------------------------------------
200 // The main stream is connected, so we can verify whether we have
201 // the up and the down stream connected and ready to handle data.
202 // If anything is not right we fall back to stream 0.
203 //--------------------------------------------------------------------------
204 if( pSubStreams[0]->status == Socket::Connected )
205 {
206 if( pSubStreams[path.down]->status != Socket::Connected )
207 path.down = 0;
208
209 if( pSubStreams[path.up]->status == Socket::Disconnected )
210 {
211 path.up = 0;
212 return pSubStreams[0]->socket->EnableUplink();
213 }
214
215 if( pSubStreams[path.up]->status == Socket::Connected )
216 return pSubStreams[path.up]->socket->EnableUplink();
217
218 return XRootDStatus();
219 }
220
221 //--------------------------------------------------------------------------
222 // The main stream is not connected, we need to check whether enough time
223 // has passed since we last encountered an error (if any) so that we could
224 // re-attempt the connection
225 //--------------------------------------------------------------------------
226 Log *log = DefaultEnv::GetLog();
227 time_t now = ::time(0);
228
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
231
232 gettimeofday( &pConnectionStarted, 0 );
233 ++pConnectionCount;
234
235 //--------------------------------------------------------------------------
236 // Resolve all the addresses of the host we're supposed to connect to
237 //--------------------------------------------------------------------------
238 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239 if( !st.IsOK() )
240 {
241 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
244 st.status = stFatal;
245 pLastFatalError = st;
246 return st;
247 }
248
249 if( pPrefer.IsValid() )
250 {
251 std::vector<XrdNetAddr> addrresses;
252 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253 if( !st.IsOK() )
254 {
255 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257 }
258 else
259 {
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
262 // first add all remaining addresses
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
265 {
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
268 }
269 // then copy all 'preferred' addresses
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271 // and keep the result
272 pAddresses.swap( tmp );
273 }
274 }
275
277 pAddresses );
278
279 while( !pAddresses.empty() )
280 {
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285 if( st.IsOK() )
286 {
287 pSubStreams[0]->status = Socket::Connecting;
288 break;
289 }
290 }
291 return st;
292 }
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:445
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348 {
349 XrdSysMutexHelper scopedLock( pMutex );
350 if( pSubStreams[0]->status == Socket::Connecting )
351 {
352 pSubStreams[0]->status = Socket::Disconnected;
353 XrdCl::PathID path( 0, 0 );
354 XrdCl::XRootDStatus st = EnableLink( path );
355 if( !st.IsOK() )
356 OnConnectError( 0, st );
357 }
358 }
XRootDStatus EnableLink(PathID &path)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool IsOK() const
We're fine.

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus status,
bool hush = false )

Force error.

Definition at line 913 of file XrdClStream.cc.

914 {
915 XrdSysMutexHelper scopedLock( pMutex );
916 Log *log = DefaultEnv::GetLog();
917 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
918 {
919 if( pSubStreams[substream]->status != Socket::Connected ) continue;
920 pSubStreams[substream]->socket->Close();
921 pSubStreams[substream]->status = Socket::Disconnected;
922
923 if( !hush )
924 log->Error( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
925 pStreamName.c_str(), status.ToString().c_str() );
926
927 //--------------------------------------------------------------------
928 // Reinsert the stuff that we have failed to sent
929 //--------------------------------------------------------------------
930 if( pSubStreams[substream]->outMsgHelper.msg )
931 {
932 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
933 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
934 h.stateful );
935 pSubStreams[substream]->outMsgHelper.Reset();
936 }
937
938 //--------------------------------------------------------------------
939 // Reinsert the receiving handler and reset any partially read partial
940 //--------------------------------------------------------------------
941 if( pSubStreams[substream]->inMsgHelper.handler )
942 {
943 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
944 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
945 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
946 if( xrdHandler ) xrdHandler->PartialReceived();
947 h.Reset();
948 }
949 }
950
951 pConnectionCount = 0;
952
953 //------------------------------------------------------------------------
954 // We're done here, unlock the stream mutex to avoid deadlocks and
955 // report the disconnection event to the handlers
956 //------------------------------------------------------------------------
957 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
958 "message handlers.", pStreamName.c_str() );
959
960 SubStreamList::iterator it;
961 OutQueue q;
962 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
963 q.GrabItems( *(*it)->outQueue );
964 scopedLock.UnLock();
965
966 q.Report( status );
967
968 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
969 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
970 }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::Log::Error(), XrdCl::InMessageHelper::expires, XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string & XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171 {
172 return pStreamName;
173 }

◆ GetURL()

const URL * XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158 {
159 return pUrl;
160 }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172 {
173 if( !pTransport || !pPoller || !pChannelData )
174 return XRootDStatus( stError, errUninitialized );
175
176 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177 pChannelData, 0, this );
178 pSubStreams.push_back( new SubStreamData() );
179 pSubStreams[0]->socket = s;
180 return XRootDStatus();
181 }
const uint16_t errUninitialized
const uint16_t stError
An error occurred that could potentially be retried.

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t stream,
MsgHandler *& incHandler )

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1142 of file XrdClStream.cc.

1144 {
1145 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1146 if( !mh.handler )
1148
1149 uint16_t action = mh.handler->InspectStatusRsp();
1150 mh.action |= action;
1151
1152 if( action & MsgHandler::RemoveHandler )
1153 pIncomingQueue->RemoveMessageHandler( mh.handler );
1154
1155 if( action & MsgHandler::Raw )
1156 {
1157 incHandler = mh.handler;
1158 return MsgHandler::Raw;
1159 }
1160
1161 if( action & MsgHandler::Corrupted )
1162 return MsgHandler::Corrupted;
1163
1164 if( action & MsgHandler::More )
1165 return MsgHandler::More;
1166
1167 return MsgHandler::None;
1168 }
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > & msg,
uint16_t stream )

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1121 of file XrdClStream.cc.

1122 {
1123 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1124 if( !mh.handler )
1125 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1126 mh.expires,
1127 mh.action );
1128
1129 if( !mh.handler )
1130 return nullptr;
1131
1132 if( mh.action & MsgHandler::Raw )
1133 return mh.handler;
1134 return nullptr;
1135 }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t subStream)

Call back when a message has been reconstructed.

Definition at line 610 of file XrdClStream.cc.

611 {
612 XrdSysMutexHelper scopedLock( pMutex );
613 pSubStreams[subStream]->status = Socket::Connected;
614
615 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
616 Log *log = DefaultEnv::GetLog();
617 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
618 subStream, ipstack.c_str() );
619
620 if( subStream == 0 )
621 {
622 pLastStreamError = 0;
623 pLastFatalError = XRootDStatus();
624 pConnectionCount = 0;
625 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
626 pSessionId = ++sSessCntGen;
627
628 //------------------------------------------------------------------------
629 // Create the streams if they don't exist yet
630 //------------------------------------------------------------------------
631 if( pSubStreams.size() == 1 && numSub > 1 )
632 {
633 for( uint16_t i = 1; i < numSub; ++i )
634 {
635 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
636 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
637 pChannelData, i, this );
638 pSubStreams.push_back( new SubStreamData() );
639 pSubStreams[i]->socket = s;
640 }
641 }
642
643 //------------------------------------------------------------------------
644 // Connect the extra streams, if we fail we move all the outgoing items
645 // to stream 0, we don't need to enable the uplink here, because it
646 // should be already enabled after the handshaking process is completed.
647 //------------------------------------------------------------------------
648 if( pSubStreams.size() > 1 )
649 {
650 log->Debug( PostMasterMsg, "[%s] Attempting to connect %d additional "
651 "streams.", pStreamName.c_str(), pSubStreams.size()-1 );
652 for( size_t i = 1; i < pSubStreams.size(); ++i )
653 {
654 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
655 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
656 if( !st.IsOK() )
657 {
658 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
659 pSubStreams[i]->socket->Close();
660 }
661 else
662 {
663 pSubStreams[i]->status = Socket::Connecting;
664 }
665 }
666 }
667
668 //------------------------------------------------------------------------
669 // Inform monitoring
670 //------------------------------------------------------------------------
671 pBytesSent = 0;
672 pBytesReceived = 0;
673 gettimeofday( &pConnectionDone, 0 );
674 Monitor *mon = DefaultEnv::GetMonitor();
675 if( mon )
676 {
677 Monitor::ConnectInfo i;
678 i.server = pUrl->GetHostId();
679 i.sTOD = pConnectionStarted;
680 i.eTOD = pConnectionDone;
681 i.streams = pSubStreams.size();
682
683 AnyObject qryResult;
684 std::string *qryResponse = 0;
685 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
686 qryResult.Get( qryResponse );
687 i.auth = *qryResponse;
688 delete qryResponse;
689 mon->Event( Monitor::EvConnect, &i );
690 }
691
692 //------------------------------------------------------------------------
693 // For every connected control-stream call the global on-connect handler
694 //------------------------------------------------------------------------
696 }
697 else if( pOnDataConnJob )
698 {
699 //------------------------------------------------------------------------
700 // For every connected data-stream call the on-connect handler
701 //------------------------------------------------------------------------
702 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
703 }
704 }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::TransportQuery::Auth, XrdCl::Monitor::ConnectInfo::auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t subStream,
XRootDStatus status )

On connect error.

Definition at line 709 of file XrdClStream.cc.

710 {
711 XrdSysMutexHelper scopedLock( pMutex );
712 Log *log = DefaultEnv::GetLog();
713 pSubStreams[subStream]->socket->Close();
714 time_t now = ::time(0);
715
716 //--------------------------------------------------------------------------
717 // For every connection error call the global connection error handler
718 //--------------------------------------------------------------------------
720
721 //--------------------------------------------------------------------------
722 // If we connected subStream == 0 and cannot connect >0 then we just give
723 // up and move the outgoing messages to another queue
724 //--------------------------------------------------------------------------
725 if( subStream > 0 )
726 {
727 pSubStreams[subStream]->status = Socket::Disconnected;
728 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
729 if( pSubStreams[0]->status == Socket::Connected )
730 {
731 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
732 if( !st.IsOK() )
733 OnFatalError( 0, st, scopedLock );
734 return;
735 }
736
737 if( pSubStreams[0]->status == Socket::Connecting )
738 return;
739
740 OnFatalError( subStream, status, scopedLock );
741 return;
742 }
743
744 //--------------------------------------------------------------------------
745 // Check if we still have time to try and do something in the current window
746 //--------------------------------------------------------------------------
747 time_t elapsed = now-pConnectionInitTime;
748 log->Error( PostMasterMsg, "[%s] elapsed = %d, pConnectionWindow = %d "
749 "seconds.", pStreamName.c_str(), elapsed, pConnectionWindow );
750
751 //------------------------------------------------------------------------
752 // If we have some IP addresses left we try them
753 //------------------------------------------------------------------------
754 if( !pAddresses.empty() )
755 {
756 XRootDStatus st;
757 do
758 {
759 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
760 pAddresses.pop_back();
761 pConnectionInitTime = ::time( 0 );
762 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
763 }
764 while( !pAddresses.empty() && !st.IsOK() );
765
766 if( !st.IsOK() )
767 OnFatalError( subStream, st, scopedLock );
768
769 return;
770 }
771 //------------------------------------------------------------------------
772 // If we still can retry with the same host name, we sleep until the end
773 // of the connection window and try
774 //------------------------------------------------------------------------
775 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
776 && !status.IsFatal() )
777 {
778 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %d "
779 "seconds.", pStreamName.c_str(), pConnectionWindow-elapsed );
780
781 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
782 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
783 return;
784 }
785 //--------------------------------------------------------------------------
786 // We are out of the connection window, the only thing we can do here
787 // is re-resolving the host name and retrying if we still can
788 //--------------------------------------------------------------------------
789 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
790 {
791 pAddresses.clear();
792 pSubStreams[0]->status = Socket::Disconnected;
793 PathID path( 0, 0 );
794 XRootDStatus st = EnableLink( path );
795 if( !st.IsOK() )
796 OnFatalError( subStream, st, scopedLock );
797 return;
798 }
799
800 //--------------------------------------------------------------------------
801 // Else, we fail
802 //--------------------------------------------------------------------------
803 OnFatalError( subStream, status, scopedLock );
804 }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t subStream,
XRootDStatus status )

On error.

Definition at line 809 of file XrdClStream.cc.

810 {
811 XrdSysMutexHelper scopedLock( pMutex );
812 Log *log = DefaultEnv::GetLog();
813 pSubStreams[subStream]->socket->Close();
814 pSubStreams[subStream]->status = Socket::Disconnected;
815
816 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
817 pStreamName.c_str(), subStream, status.ToString().c_str() );
818
819 //--------------------------------------------------------------------------
820 // Reinsert the stuff that we have failed to sent
821 //--------------------------------------------------------------------------
822 if( pSubStreams[subStream]->outMsgHelper.msg )
823 {
824 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
825 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
826 h.stateful );
827 pSubStreams[subStream]->outMsgHelper.Reset();
828 }
829
830 //--------------------------------------------------------------------------
831 // Reinsert the receiving handler and reset any partially read partial
832 //--------------------------------------------------------------------------
833 if( pSubStreams[subStream]->inMsgHelper.handler )
834 {
835 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
836 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
837 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
838 if( xrdHandler ) xrdHandler->PartialReceived();
839 h.Reset();
840 }
841
842 //--------------------------------------------------------------------------
843 // We are dealing with an error of a peripheral stream. If we don't have
844 // anything to send don't bother recovering. Otherwise move the requests
845 // to stream 0 if possible.
846 //--------------------------------------------------------------------------
847 if( subStream > 0 )
848 {
849 if( pSubStreams[subStream]->outQueue->IsEmpty() )
850 return;
851
852 if( pSubStreams[0]->status != Socket::Disconnected )
853 {
854 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
855 if( pSubStreams[0]->status == Socket::Connected )
856 {
857 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
858 if( !st.IsOK() )
859 OnFatalError( 0, st, scopedLock );
860 return;
861 }
862 }
863 OnFatalError( subStream, status, scopedLock );
864 return;
865 }
866
867 //--------------------------------------------------------------------------
868 // If we lost the stream 0 we have lost the session, we re-enable the
869 // stream if we still have things in one of the outgoing queues, otherwise
870 // there is not point to recover at this point.
871 //--------------------------------------------------------------------------
872 if( subStream == 0 )
873 {
874 MonitorDisconnection( status );
875
876 SubStreamList::iterator it;
877 size_t outstanding = 0;
878 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
879 outstanding += (*it)->outQueue->GetSizeStateless();
880
881 if( outstanding )
882 {
883 PathID path( 0, 0 );
884 XRootDStatus st = EnableLink( path );
885 if( !st.IsOK() )
886 {
887 OnFatalError( 0, st, scopedLock );
888 return;
889 }
890 }
891
892 //------------------------------------------------------------------------
893 // We're done here, unlock the stream mutex to avoid deadlocks and
894 // report the disconnection event to the handlers
895 //------------------------------------------------------------------------
896 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
897 "message handlers.", pStreamName.c_str() );
898 OutQueue q;
899 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
900 q.GrabStateful( *(*it)->outQueue );
901 scopedLock.UnLock();
902
903 q.Report( status );
904 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
905 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
906 return;
907 }
908 }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::InMessageHelper::expires, XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t subStream,
std::shared_ptr< Message > msg,
uint32_t bytesReceived )

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474 {
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
477
478 MsgHandler *handler = nullptr;
479 uint16_t action = 0;
480 {
481 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482 handler = mh.handler;
483 action = mh.action;
484 mh.Reset();
485 }
486
487 if( !IsPartial( *msg ) )
488 {
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490 *pChannelData );
491 if( streamAction & TransportHandler::DigestMsg )
492 return;
493
494 if( streamAction & TransportHandler::RequestClose )
495 {
496 RequestClose( *msg );
497 return;
498 }
499 }
500
501 Log *log = DefaultEnv::GetLog();
502
503 //--------------------------------------------------------------------------
504 // No handler, we discard the message ...
505 //--------------------------------------------------------------------------
506 if( !handler )
507 {
508 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509 log->Warning( PostMasterMsg, "[%s] Discarding received message: 0x%x "
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), msg.get(), rsp->hdr.status,
512 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513 return;
514 }
515
516 //--------------------------------------------------------------------------
517 // We have a handler, so we call the callback
518 //--------------------------------------------------------------------------
519 log->Dump( PostMasterMsg, "[%s] Handling received message: 0x%x.",
520 pStreamName.c_str(), msg.get() );
521
523 {
524 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: 0x%x.",
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526
527 // if we are handling partial response we have to take down the timeout fence
528 if( IsPartial( *msg ) )
529 {
530 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531 if( xrdHandler ) xrdHandler->PartialReceived();
532 }
533
534 return;
535 }
536
537 Job *job = new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
539 }
kXR_char streamid[2]
Definition XProtocol.hh:914
ServerResponseHeader hdr
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t subStream,
Message * msg,
uint32_t bytesSent )

Definition at line 584 of file XrdClStream.cc.

587 {
588 pTransport->MessageSent( msg, subStream, bytesSent,
589 *pChannelData );
590 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
591 pBytesSent += bytesSent;
592 if( h.handler )
593 {
594 h.handler->OnStatusReady( msg, XRootDStatus() );
595 bool rmMsg = false;
596 pIncomingQueue->AddMessageHandler( h.handler, h.handler->GetExpiration(), rmMsg );
597 if( rmMsg )
598 {
599 Log *log = DefaultEnv::GetLog();
600 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
601 pStreamName.c_str(), subStream );
602 }
603 }
604 pSubStreams[subStream]->outMsgHelper.Reset();
605 }
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AddMessageHandler(), XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), XrdCl::MsgHandler::OnStatusReady(), XrdCl::PostMasterMsg, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t subStream)

On read timeout.

Definition at line 1029 of file XrdClStream.cc.

1030 {
1031 //--------------------------------------------------------------------------
1032 // We only take the main stream into account
1033 //--------------------------------------------------------------------------
1034 if( substream != 0 )
1035 return true;
1036
1037 //--------------------------------------------------------------------------
1038 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1039 // It is assumed that the underlying transport makes sure that there is no
1040 // pending requests that are not answered, ie. all possible virtual streams
1041 // are de-allocated
1042 //--------------------------------------------------------------------------
1043 Log *log = DefaultEnv::GetLog();
1044 SubStreamList::iterator it;
1045 time_t now = time(0);
1046
1047 XrdSysMutexHelper scopedLock( pMutex );
1048 uint32_t outgoingMessages = 0;
1049 time_t lastActivity = 0;
1050 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1051 {
1052 outgoingMessages += (*it)->outQueue->GetSize();
1053 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1054 if( lastActivity < sockLastActivity )
1055 lastActivity = sockLastActivity;
1056 }
1057
1058 if( !outgoingMessages )
1059 {
1060 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1061 *pChannelData );
1062 if( disconnect )
1063 {
1064 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1065 pStreamName.c_str() );
1066 scopedLock.UnLock();
1067 //----------------------------------------------------------------------
1068 // Important note!
1069 //
1070 // This destroys the Stream object itself, the underlined
1071 // AsyncSocketHandler object (that called this method) and the Channel
1072 // object that aggregates this Stream.
1073 //----------------------------------------------------------------------
1075 return false;
1076 }
1077 }
1078
1079 //--------------------------------------------------------------------------
1080 // Check if the stream is broken
1081 //--------------------------------------------------------------------------
1082 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1083 *pChannelData );
1084 if( !st.IsOK() )
1085 {
1086 scopedLock.UnLock();
1087 OnError( substream, st );
1088 return false;
1089 }
1090 return true;
1091 }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t subStream)

Definition at line 545 of file XrdClStream.cc.

546 {
547 XrdSysMutexHelper scopedLock( pMutex );
548 Log *log = DefaultEnv::GetLog();
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
550 {
551 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
553
554 pSubStreams[subStream]->socket->DisableUplink();
555 return std::make_pair( (Message *)0, (MsgHandler *)0 );
556 }
557
558 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560 h.expires,
561 h.stateful );
562 scopedLock.UnLock();
563 if( h.handler )
564 h.handler->OnReadyToSend( h.msg );
565 return std::make_pair( h.msg, h.handler );
566 }

References XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t subStream)

On write timeout.

Definition at line 1096 of file XrdClStream.cc.

1097 {
1098 return true;
1099 }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t query,
AnyObject & result )

Query the stream.

Definition at line 1218 of file XrdClStream.cc.

1219 {
1220 switch( query )
1221 {
1223 {
1224 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1225 return Status();
1226 }
1227
1229 {
1230 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1231 return Status();
1232 }
1233
1235 {
1236 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1237 return Status();
1238 }
1239
1240 default:
1241 return Status( stError, errQueryNotSupported );
1242 }
1243 }
const uint16_t errQueryNotSupported
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler * handler)

Register channel event handler.

Definition at line 1104 of file XrdClStream.cc.

1105 {
1106 pChannelEvHandlers.AddHandler( handler );
1107 }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler * handler)

Remove a channel event handler.

Definition at line 1112 of file XrdClStream.cc.

1113 {
1114 pChannelEvHandlers.RemoveHandler( handler );
1115 }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message * msg,
MsgHandler * handler,
bool stateful,
time_t expires )

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301 {
302 XrdSysMutexHelper scopedLock( pMutex );
303 Log *log = DefaultEnv::GetLog();
304
305 //--------------------------------------------------------------------------
306 // Check the session ID and bounce if needed
307 //--------------------------------------------------------------------------
308 if( msg->GetSessionId() &&
309 (pSubStreams[0]->status != Socket::Connected ||
310 pSessionId != msg->GetSessionId()) )
311 return XRootDStatus( stError, errInvalidSession );
312
313 //--------------------------------------------------------------------------
314 // Decide on the path to send the message
315 //--------------------------------------------------------------------------
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.up )
318 {
319 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320 "substream %d, using 0 instead", pStreamName.c_str(),
321 msg->GetObfuscatedDescription().c_str(), path.up );
322 path.up = 0;
323 }
324
325 log->Dump( PostMasterMsg, "[%s] Sending message %s (0x%x) through "
326 "substream %d expecting answer at %d", pStreamName.c_str(),
327 msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328
329 //--------------------------------------------------------------------------
330 // Enable *a* path and insert the message to the right queue
331 //--------------------------------------------------------------------------
332 XRootDStatus st = EnableLink( path );
333 if( st.IsOK() )
334 {
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337 expires, stateful );
338 }
339 else
340 st.status = stFatal;
341 return st;
342 }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject * channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116 {
117 pChannelData = channelData;
118 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue * incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108 {
109 pIncomingQueue = incomingQueue;
110 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager * jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132 {
133 pJobManager = jobManager;
134 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > & onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264 {
265 XrdSysMutexHelper scopedLock( pMutex );
266 pOnDataConnJob = onConnJob;
267 }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller * poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100 {
101 pPoller = poller;
102 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager * taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124 {
125 pTaskManager = taskManager;
126 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler * transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92 {
93 pTransport = transport;
94 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378 {
379 //--------------------------------------------------------------------------
380 // Check for timed-out requests and incoming handlers
381 //--------------------------------------------------------------------------
382 pMutex.Lock();
383 OutQueue q;
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386 q.GrabExpired( *(*it)->outQueue, now );
387 pMutex.UnLock();
388
389 q.Report( XRootDStatus( stError, errOperationExpired ) );
390 pIncomingQueue->ReportTimeout( now );
391 }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: