XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

+ Inheritance diagram for XrdCl::XRootDTransport:
+ Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor.
 
 ~XRootDTransport ()
 Destructor.
 
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel.
 
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups.
 
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel.
 
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream.
 
virtual XRootDStatus GetBody (Message &message, Socket *socket)
 
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
 
virtual XRootDStatus GetMore (Message &message, Socket *socket)
 
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message.
 
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message.
 
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake.
 
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel.
 
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
 
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected.
 
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action.
 
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent.
 
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual bool NeedControlConnection ()
 
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel.
 
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created.
 
virtual void WaitBeforeExit ()
 Wait until the program can safely exit.
 
- Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()
 

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message.
 
static void LogErrorResponse (const Message &msg)
 Log server error response.
 
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message.
 
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message.
 
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams.
 
static void SetDescription (Message *msg)
 Get the description of a message.
 
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite.
 
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message.
 
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message.
 
static XRootDStatus UnMarshallRequest (Message *msg)
 
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response.
 

Friends

struct PluginUnloadHandler
 

Additional Inherited Members

- Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...
 

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 291 of file XrdClXRootDTransport.cc.

291 :
292 pSecUnloadHandler( new PluginUnloadHandler() )
293 {
294 }

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 299 of file XrdClXRootDTransport.cc.

300 {
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
302 }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject & channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1737 of file XrdClXRootDTransport.cc.

1738 {
1739 XRootDChannelInfo *info = 0;
1740 channelData.Get( info );
1741 if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1742 info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1743 }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject & channelData,
uint16_t subStreamId )
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1485 of file XrdClXRootDTransport.cc.

1487 {
1488 XRootDChannelInfo *info = 0;
1489 channelData.Get( info );
1490 XrdSysMutexHelper scopedLock( info->mutex );
1491
1492 CleanUpProtection( info );
1493
1494 if( !info->stream.empty() )
1495 {
1496 XRootDStreamInfo &sInfo = info->stream[subStreamId];
1497 sInfo.status = XRootDStreamInfo::Disconnected;
1498 }
1499
1500 if( subStreamId == 0 )
1501 {
1502 info->sidManager->ReleaseAllTimedOut();
1503 info->sentOpens.clear();
1504 info->sentCloses.clear();
1505 info->openFiles = 0;
1506 info->waitBarrier = 0;
1507 }
1508 }

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::waitBarrier.

+ Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject & channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 460 of file XrdClXRootDTransport.cc.

461 {
462 }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char * msg,
std::ostringstream & o )
static

Get the description of a message.

Definition at line 2898 of file XrdClXRootDTransport.cc.

2899 {
2900 Log *log = DefaultEnv::GetLog();
2901 if( log->GetLevel() < Log::ErrorMsg )
2902 return;
2903
2904 ClientRequestHdr *req = (ClientRequestHdr *)msg;
2905 switch( req->requestid )
2906 {
2907 //------------------------------------------------------------------------
2908 // kXR_open
2909 //------------------------------------------------------------------------
2910 case kXR_open:
2911 {
2912 ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2913 o << "kXR_open (";
2914 char *fn = GetDataAsString( msg );
2915 o << "file: " << fn << ", ";
2916 delete [] fn;
2917 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2918 o << std::setbase(10);
2919 o << "flags: ";
2920 if( sreq->options == 0 )
2921 o << "none";
2922 else
2923 {
2924 if( sreq->options & kXR_delete )
2925 o << "kXR_delete ";
2926 if( sreq->options & kXR_force )
2927 o << "kXR_force ";
2928 if( sreq->options & kXR_mkpath )
2929 o << "kXR_mkpath ";
2930 if( sreq->options & kXR_new )
2931 o << "kXR_new ";
2932 if( sreq->options & kXR_nowait )
2933 o << "kXR_delete ";
2934 if( sreq->options & kXR_open_apnd )
2935 o << "kXR_open_apnd ";
2936 if( sreq->options & kXR_open_read )
2937 o << "kXR_open_read ";
2938 if( sreq->options & kXR_open_updt )
2939 o << "kXR_open_updt ";
2940 if( sreq->options & kXR_posc )
2941 o << "kXR_posc ";
2942 if( sreq->options & kXR_refresh )
2943 o << "kXR_refresh ";
2944 if( sreq->options & kXR_replica )
2945 o << "kXR_replica ";
2946 if( sreq->options & kXR_seqio )
2947 o << "kXR_seqio ";
2948 if( sreq->options & kXR_async )
2949 o << "kXR_async ";
2950 if( sreq->options & kXR_retstat )
2951 o << "kXR_retstat ";
2952 }
2953 o << ")";
2954 break;
2955 }
2956
2957 //------------------------------------------------------------------------
2958 // kXR_close
2959 //------------------------------------------------------------------------
2960 case kXR_close:
2961 {
2963 o << "kXR_close (";
2964 o << "handle: " << FileHandleToStr( sreq->fhandle );
2965 o << ")";
2966 break;
2967 }
2968
2969 //------------------------------------------------------------------------
2970 // kXR_stat
2971 //------------------------------------------------------------------------
2972 case kXR_stat:
2973 {
2974 ClientStatRequest *sreq = (ClientStatRequest *)msg;
2975 o << "kXR_stat (";
2976 if( sreq->dlen )
2977 {
2978 char *fn = GetDataAsString( msg );;
2979 o << "path: " << fn << ", ";
2980 delete [] fn;
2981 }
2982 else
2983 {
2984 o << "handle: " << FileHandleToStr( sreq->fhandle );
2985 o << ", ";
2986 }
2987 o << "flags: ";
2988 if( sreq->options == 0 )
2989 o << "none";
2990 else
2991 {
2992 if( sreq->options & kXR_vfs )
2993 o << "kXR_vfs";
2994 }
2995 o << ")";
2996 break;
2997 }
2998
2999 //------------------------------------------------------------------------
3000 // kXR_read
3001 //------------------------------------------------------------------------
3002 case kXR_read:
3003 {
3004 ClientReadRequest *sreq = (ClientReadRequest *)msg;
3005 o << "kXR_read (";
3006 o << "handle: " << FileHandleToStr( sreq->fhandle );
3007 o << std::setbase(10);
3008 o << ", ";
3009 o << "offset: " << sreq->offset << ", ";
3010 o << "size: " << sreq->rlen << ")";
3011 break;
3012 }
3013
3014 //------------------------------------------------------------------------
3015 // kXR_pgread
3016 //------------------------------------------------------------------------
3017 case kXR_pgread:
3018 {
3020 o << "kXR_pgread (";
3021 o << "handle: " << FileHandleToStr( sreq->fhandle );
3022 o << std::setbase(10);
3023 o << ", ";
3024 o << "offset: " << sreq->offset << ", ";
3025 o << "size: " << sreq->rlen << ")";
3026 break;
3027 }
3028
3029 //------------------------------------------------------------------------
3030 // kXR_write
3031 //------------------------------------------------------------------------
3032 case kXR_write:
3033 {
3035 o << "kXR_write (";
3036 o << "handle: " << FileHandleToStr( sreq->fhandle );
3037 o << std::setbase(10);
3038 o << ", ";
3039 o << "offset: " << sreq->offset << ", ";
3040 o << "size: " << sreq->dlen << ")";
3041 break;
3042 }
3043
3044 //------------------------------------------------------------------------
3045 // kXR_pgwrite
3046 //------------------------------------------------------------------------
3047 case kXR_pgwrite:
3048 {
3050 o << "kXR_pgwrite (";
3051 o << "handle: " << FileHandleToStr( sreq->fhandle );
3052 o << std::setbase(10);
3053 o << ", ";
3054 o << "offset: " << sreq->offset << ", ";
3055 o << "size: " << sreq->dlen << ")";
3056 break;
3057 }
3058
3059 //------------------------------------------------------------------------
3060 // kXR_sync
3061 //------------------------------------------------------------------------
3062 case kXR_sync:
3063 {
3064 ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3065 o << "kXR_sync (";
3066 o << "handle: " << FileHandleToStr( sreq->fhandle );
3067 o << ")";
3068 break;
3069 }
3070
3071 //------------------------------------------------------------------------
3072 // kXR_truncate
3073 //------------------------------------------------------------------------
3074 case kXR_truncate:
3075 {
3077 o << "kXR_truncate (";
3078 if( !sreq->dlen )
3079 o << "handle: " << FileHandleToStr( sreq->fhandle );
3080 else
3081 {
3082 char *fn = GetDataAsString( msg );
3083 o << "file: " << fn;
3084 delete [] fn;
3085 }
3086 o << std::setbase(10);
3087 o << ", ";
3088 o << "offset: " << sreq->offset;
3089 o << ")";
3090 break;
3091 }
3092
3093 //------------------------------------------------------------------------
3094 // kXR_readv
3095 //------------------------------------------------------------------------
3096 case kXR_readv:
3097 {
3098 unsigned char *fhandle = 0;
3099 o << "kXR_readv (";
3100
3101 o << "handle: ";
3102 readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3103 fhandle = dataChunk[0].fhandle;
3104 if( fhandle )
3105 o << FileHandleToStr( fhandle );
3106 else
3107 o << "unknown";
3108 o << ", ";
3109 o << std::setbase(10);
3110 o << "chunks: [";
3111 uint64_t size = 0;
3112 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3113 {
3114 size += dataChunk[i].rlen;
3115 o << "(offset: " << dataChunk[i].offset;
3116 o << ", size: " << dataChunk[i].rlen << "); ";
3117 }
3118 o << "], ";
3119 o << "total size: " << size << ")";
3120 break;
3121 }
3122
3123 //------------------------------------------------------------------------
3124 // kXR_writev
3125 //------------------------------------------------------------------------
3126 case kXR_writev:
3127 {
3128 unsigned char *fhandle = 0;
3129 o << "kXR_writev (";
3130
3131 XrdProto::write_list *wrtList =
3132 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3133 uint64_t size = 0;
3134 uint32_t numChunks = 0;
3135 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3136 {
3137 fhandle = wrtList[i].fhandle;
3138 size += wrtList[i].wlen;
3139 ++numChunks;
3140 }
3141 o << "handle: ";
3142 if( fhandle )
3143 o << FileHandleToStr( fhandle );
3144 else
3145 o << "unknown";
3146 o << ", ";
3147 o << std::setbase(10);
3148 o << "chunks: " << numChunks << ", ";
3149 o << "total size: " << size << ")";
3150 break;
3151 }
3152
3153 //------------------------------------------------------------------------
3154 // kXR_locate
3155 //------------------------------------------------------------------------
3156 case kXR_locate:
3157 {
3159 char *fn = GetDataAsString( msg );;
3160 o << "kXR_locate (";
3161 o << "path: " << fn << ", ";
3162 delete [] fn;
3163 o << "flags: ";
3164 if( sreq->options == 0 )
3165 o << "none";
3166 else
3167 {
3168 if( sreq->options & kXR_refresh )
3169 o << "kXR_refresh ";
3170 if( sreq->options & kXR_prefname )
3171 o << "kXR_prefname ";
3172 if( sreq->options & kXR_nowait )
3173 o << "kXR_nowait ";
3174 if( sreq->options & kXR_force )
3175 o << "kXR_force ";
3176 if( sreq->options & kXR_compress )
3177 o << "kXR_compress ";
3178 }
3179 o << ")";
3180 break;
3181 }
3182
3183 //------------------------------------------------------------------------
3184 // kXR_mv
3185 //------------------------------------------------------------------------
3186 case kXR_mv:
3187 {
3188 ClientMvRequest *sreq = (ClientMvRequest *)msg;
3189 o << "kXR_mv (";
3190 o << "source: ";
3191 o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3192 o << ", ";
3193 o << "destination: ";
3194 o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3195 o << ")";
3196 break;
3197 }
3198
3199 //------------------------------------------------------------------------
3200 // kXR_query
3201 //------------------------------------------------------------------------
3202 case kXR_query:
3203 {
3205 o << "kXR_query (";
3206 o << "code: ";
3207 switch( sreq->infotype )
3208 {
3209 case kXR_Qconfig: o << "kXR_Qconfig"; break;
3210 case kXR_Qckscan: o << "kXR_Qckscan"; break;
3211 case kXR_Qcksum: o << "kXR_Qcksum"; break;
3212 case kXR_Qopaque: o << "kXR_Qopaque"; break;
3213 case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3214 case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3215 case kXR_QPrep: o << "kXR_QPrep"; break;
3216 case kXR_Qspace: o << "kXR_Qspace"; break;
3217 case kXR_QStats: o << "kXR_QStats"; break;
3218 case kXR_Qvisa: o << "kXR_Qvisa"; break;
3219 case kXR_Qxattr: o << "kXR_Qxattr"; break;
3220 default: o << sreq->infotype; break;
3221 }
3222 o << ", ";
3223
3224 if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3225 {
3226 o << "handle: " << FileHandleToStr( sreq->fhandle );
3227 o << ", ";
3228 }
3229
3230 o << "arg length: " << sreq->dlen << ")";
3231 break;
3232 }
3233
3234 //------------------------------------------------------------------------
3235 // kXR_rm
3236 //------------------------------------------------------------------------
3237 case kXR_rm:
3238 {
3239 o << "kXR_rm (";
3240 char *fn = GetDataAsString( msg );;
3241 o << "path: " << fn << ")";
3242 delete [] fn;
3243 break;
3244 }
3245
3246 //------------------------------------------------------------------------
3247 // kXR_mkdir
3248 //------------------------------------------------------------------------
3249 case kXR_mkdir:
3250 {
3252 o << "kXR_mkdir (";
3253 char *fn = GetDataAsString( msg );
3254 o << "path: " << fn << ", ";
3255 delete [] fn;
3256 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3257 o << std::setbase(10);
3258 o << "flags: ";
3259 if( sreq->options[0] == 0 )
3260 o << "none";
3261 else
3262 {
3263 if( sreq->options[0] & kXR_mkdirpath )
3264 o << "kXR_mkdirpath";
3265 }
3266 o << ")";
3267 break;
3268 }
3269
3270 //------------------------------------------------------------------------
3271 // kXR_rmdir
3272 //------------------------------------------------------------------------
3273 case kXR_rmdir:
3274 {
3275 o << "kXR_rmdir (";
3276 char *fn = GetDataAsString( msg );
3277 o << "path: " << fn << ")";
3278 delete [] fn;
3279 break;
3280 }
3281
3282 //------------------------------------------------------------------------
3283 // kXR_chmod
3284 //------------------------------------------------------------------------
3285 case kXR_chmod:
3286 {
3288 o << "kXR_chmod (";
3289 char *fn = GetDataAsString( msg );
3290 o << "path: " << fn << ", ";
3291 delete [] fn;
3292 o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3293 break;
3294 }
3295
3296 //------------------------------------------------------------------------
3297 // kXR_ping
3298 //------------------------------------------------------------------------
3299 case kXR_ping:
3300 {
3301 o << "kXR_ping ()";
3302 break;
3303 }
3304
3305 //------------------------------------------------------------------------
3306 // kXR_protocol
3307 //------------------------------------------------------------------------
3308 case kXR_protocol:
3309 {
3311 o << "kXR_protocol (";
3312 o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3313 break;
3314 }
3315
3316 //------------------------------------------------------------------------
3317 // kXR_dirlist
3318 //------------------------------------------------------------------------
3319 case kXR_dirlist:
3320 {
3321 o << "kXR_dirlist (";
3322 char *fn = GetDataAsString( msg );;
3323 o << "path: " << fn << ")";
3324 delete [] fn;
3325 break;
3326 }
3327
3328 //------------------------------------------------------------------------
3329 // kXR_set
3330 //------------------------------------------------------------------------
3331 case kXR_set:
3332 {
3333 o << "kXR_set (";
3334 char *fn = GetDataAsString( msg );;
3335 o << "data: " << fn << ")";
3336 delete [] fn;
3337 break;
3338 }
3339
3340 //------------------------------------------------------------------------
3341 // kXR_prepare
3342 //------------------------------------------------------------------------
3343 case kXR_prepare:
3344 {
3346 o << "kXR_prepare (";
3347 o << "flags: ";
3348
3349 if( sreq->options == 0 )
3350 o << "none";
3351 else
3352 {
3353 if( sreq->options & kXR_stage )
3354 o << "kXR_stage ";
3355 if( sreq->options & kXR_wmode )
3356 o << "kXR_wmode ";
3357 if( sreq->options & kXR_coloc )
3358 o << "kXR_coloc ";
3359 if( sreq->options & kXR_fresh )
3360 o << "kXR_fresh ";
3361 }
3362
3363 o << ", priority: " << (int) sreq->prty << ", ";
3364
3365 char *fn = GetDataAsString( msg );
3366 char *cursor;
3367 for( cursor = fn; *cursor; ++cursor )
3368 if( *cursor == '\n' ) *cursor = ' ';
3369
3370 o << "paths: " << fn << ")";
3371 delete [] fn;
3372 break;
3373 }
3374
3375 case kXR_chkpoint:
3376 {
3378 o << "kXR_chkpoint (";
3379 o << "opcode: ";
3380 if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3381 else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3382 else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3383 else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3384 else if( sreq->opcode == kXR_ckpXeq )
3385 {
3386 o << "kXR_ckpXeq) ";
3387 // In this case our request body will be one of kXR_pgwrite,
3388 // kXR_truncate, kXR_write, or kXR_writev request.
3389 GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3390 }
3391
3392 break;
3393 }
3394
3395 //------------------------------------------------------------------------
3396 // Default
3397 //------------------------------------------------------------------------
3398 default:
3399 {
3400 o << "kXR_unknown (length: " << req->dlen << ")";
3401 break;
3402 }
3403 };
3404 }
static const int kXR_ckpRollback
Definition XProtocol.hh:215
kXR_int16 arg1len
Definition XProtocol.hh:430
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_char fhandle[4]
Definition XProtocol.hh:782
kXR_char fhandle[4]
Definition XProtocol.hh:807
kXR_char fhandle[4]
Definition XProtocol.hh:771
kXR_int32 dlen
Definition XProtocol.hh:431
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_compress
Definition XProtocol.hh:452
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_prefname
Definition XProtocol.hh:461
@ kXR_nowait
Definition XProtocol.hh:467
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_mkpath
Definition XProtocol.hh:460
@ kXR_seqio
Definition XProtocol.hh:468
@ kXR_replica
Definition XProtocol.hh:465
@ kXR_posc
Definition XProtocol.hh:466
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_force
Definition XProtocol.hh:454
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
kXR_char fhandle[4]
Definition XProtocol.hh:509
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:659
kXR_char fhandle[4]
Definition XProtocol.hh:229
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_char options[1]
Definition XProtocol.hh:416
static const int kXR_ckpCommit
Definition XProtocol.hh:213
kXR_int64 offset
Definition XProtocol.hh:661
@ kXR_vfs
Definition XProtocol.hh:763
@ kXR_mkdirpath
Definition XProtocol.hh:410
@ kXR_wmode
Definition XProtocol.hh:591
@ kXR_fresh
Definition XProtocol.hh:593
@ kXR_coloc
Definition XProtocol.hh:592
@ kXR_stage
Definition XProtocol.hh:590
static const int kXR_ckpQuery
Definition XProtocol.hh:214
@ kXR_QPrep
Definition XProtocol.hh:616
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qconfig
Definition XProtocol.hh:621
@ kXR_Qopaquf
Definition XProtocol.hh:624
@ kXR_Qckscan
Definition XProtocol.hh:620
@ kXR_Qxattr
Definition XProtocol.hh:618
@ kXR_Qspace
Definition XProtocol.hh:619
@ kXR_Qvisa
Definition XProtocol.hh:622
@ kXR_QStats
Definition XProtocol.hh:615
@ kXR_Qcksum
Definition XProtocol.hh:617
@ kXR_Qopaque
Definition XProtocol.hh:623
static const int kXR_ckpBegin
Definition XProtocol.hh:212
static Log * GetLog()
Get default log.
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition XrdConfig.cc:112
kXR_char fhandle[4]
Definition XProtocol.hh:832

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientRequestHdr::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdCl::Log::ErrorMsg, ClientCloseRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, readahead_list::fhandle, XrdProto::write_list::fhandle, GenerateDescription(), XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, readahead_list::offset, ClientChkPointRequest::opcode, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, and XrdProto::write_list::wlen.

Referenced by GenerateDescription(), and SetDescription().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL & url,
AnyObject & channelData )
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1835 of file XrdClXRootDTransport.cc.

1837 {
1838 XRootDChannelInfo *info = 0;
1839 channelData.Get( info );
1840 if( !bool( info->bindSelector ) )
1841 return url;
1842
1843 return URL( info->bindSelector->Get() );
1844 }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message & message,
Socket * socket )
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 347 of file XrdClXRootDTransport.cc.

348 {
349 //--------------------------------------------------------------------------
350 // Retrieve the body
351 //--------------------------------------------------------------------------
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
354 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
355 bodySize = rsphdr->dlen;
356
357 if( message.GetSize() < bodySize + 8 )
358 message.ReAllocate( bodySize + 8 );
359
360 leftToBeRead = bodySize-(message.GetCursor()-8);
361 while( leftToBeRead )
362 {
363 int bytesRead = 0;
364 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365
366 if( !status.IsOK() || status.code == suRetry )
367 return status;
368
369 leftToBeRead -= bytesRead;
370 message.AdvanceCursor( bytesRead );
371 }
372
373 return XRootDStatus( stOK, suDone );
374 }
const uint16_t suRetry
const uint16_t stOK
Everything went OK.
const uint16_t suDone

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message & message,
Socket * socket )
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 307 of file XrdClXRootDTransport.cc.

308 {
309 //--------------------------------------------------------------------------
310 // A new message - allocate the space needed for the header
311 //--------------------------------------------------------------------------
312 if( message.GetCursor() == 0 && message.GetSize() < 8 )
313 message.Allocate( 8 );
314
315 //--------------------------------------------------------------------------
316 // Read the message header
317 //--------------------------------------------------------------------------
318 if( message.GetCursor() < 8 )
319 {
320 size_t leftToBeRead = 8 - message.GetCursor();
321 while( leftToBeRead )
322 {
323 int bytesRead = 0;
324 XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325 leftToBeRead, bytesRead );
326 if( !status.IsOK() || status.code == suRetry )
327 return status;
328
329 leftToBeRead -= bytesRead;
330 message.AdvanceCursor( bytesRead );
331 }
332 UnMarshallHeader( message );
333
334 uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335 Log *log = DefaultEnv::GetLog();
336 log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337 "body", &message, bodySize );
338
339 return XRootDStatus( stOK, suDone );
340 }
341 return XRootDStatus( stError, errInternal );
342 }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint64_t XRootDTransportMsg
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message & message,
Socket * socket )
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 379 of file XrdClXRootDTransport.cc.

380 {
381 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
382 if( rsphdr->status != kXR_status )
383 return XRootDStatus( stError, errInvalidOp );
384
385 //--------------------------------------------------------------------------
386 // In case of non kXR_status responses we read all the response, including
387 // data. For kXR_status responses we first read only the remainder of the
388 // header. The header must then be unmarshalled, and then a second call to
389 // GetMore (repeated for suRetry as needed) will read the data.
390 //--------------------------------------------------------------------------
391
392 uint32_t bodySize = rsphdr->dlen;
393 if( bodySize+8 < sizeof( ServerResponseStatus ) )
394 return XRootDStatus( stError, errInvalidMessage, 0,
395 "kXR_status: invalid message size." );
396
397 ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
398 bodySize += rspst->bdy.dlen;
399
400 if( message.GetSize() < bodySize + 8 )
401 message.ReAllocate( bodySize + 8 );
402
403 size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404 while( leftToBeRead )
405 {
406 int bytesRead = 0;
407 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408
409 if( !status.IsOK() || status.code == suRetry )
410 return status;
411
412 leftToBeRead -= bytesRead;
413 message.AdvanceCursor( bytesRead );
414 }
415
416 // Unmarchal to message body
417 Log *log = DefaultEnv::GetLog();
418 XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
419 if( !st.IsOK() && st.code == errDataError )
420 {
421 log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422 st.GetErrorMessage().c_str() );
423 return st;
424 }
425
426 if( !st.IsOK() )
427 {
428 log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429 &message );
430 return st;
431 }
432
433 return XRootDStatus( stOK, suDone );
434 }
@ kXR_status
Definition XProtocol.hh:907
struct ServerResponseBody_Status bdy
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidOp
const uint16_t errInvalidMessage

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseBody_Status::dlen, ServerResponseHeader::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
AnyObject & channelData )
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1697 of file XrdClXRootDTransport.cc.

1698 {
1699 XRootDChannelInfo *info = 0;
1700 channelData.Get( info );
1701 return GetSignature( toSign, sign, info );
1702 }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get(), and GetSignature().

Referenced by GetSignature().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
XRootDChannelInfo * info )
virtual

Get signature for given message.

Definition at line 1707 of file XrdClXRootDTransport.cc.

1710 {
1711 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1712 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1713
1714 ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1715 if( !info ) return Status( stError, errInternal );
1716 if( info->protection )
1717 {
1718 SecurityRequest *newreq = 0;
1719 // check if we have to secure the request in the first place
1720 if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1721 // secure (sign/encrypt) the request
1722 int rc = info->protection->Secure( newreq, *thereq, 0 );
1723 // there was an error
1724 if( rc < 0 )
1725 return Status( stError, errInternal, -rc );
1726
1727 sign = new Message();
1728 sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1729 }
1730
1731 return Status();
1732 }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), XrdCl::PluginUnloadHandler::lock, NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), XrdCl::stError, and XrdCl::PluginUnloadHandler::unloaded.

+ Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 467 of file XrdClXRootDTransport.cc.

469 {
470 XRootDChannelInfo *info = 0;
471 channelData.Get( info );
472 XrdSysMutexHelper scopedLock( info->mutex );
473
474 if( info->stream.size() <= handShakeData->subStreamId )
475 {
476 Log *log = DefaultEnv::GetLog();
477 log->Error( XRootDTransportMsg,
478 "[%s] Internal error: not enough substreams",
479 handShakeData->streamName.c_str() );
480 return XRootDStatus( stFatal, errInternal );
481 }
482
483 if( handShakeData->subStreamId == 0 )
484 {
485 info->streamName = handShakeData->streamName;
486 return HandShakeMain( handShakeData, channelData );
487 }
488 return HandShakeParallel( handShakeData, channelData );
489 }
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

Implements XrdCl::TransportHandler.

Definition at line 727 of file XrdClXRootDTransport.cc.

729 {
730 XRootDChannelInfo *info = 0;
731 channelData.Get( info );
732 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
733 return ( sInfo.status == XRootDStreamInfo::Connected );
734 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL & url,
AnyObject & channelData )
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 439 of file XrdClXRootDTransport.cc.

441 {
442 XRootDChannelInfo *info = new XRootDChannelInfo( url );
443 XrdSysMutexHelper scopedLock( info->mutex );
444 channelData.Set( info );
445
446 Env *env = DefaultEnv::GetEnv();
447 int streams = DefaultSubStreamsPerChannel;
448 env->GetInt( "SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->stream.resize( streams );
451 info->strmSelector.reset( new StreamSelector( streams ) );
452 info->encrypted = url.IsSecure();
453 info->istpc = url.IsTPC();
454 info->logintoken = url.GetLoginToken();
455 }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t inactiveTime,
AnyObject & channelData )
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 785 of file XrdClXRootDTransport.cc.

787 {
788 XRootDChannelInfo *info = 0;
789 channelData.Get( info );
790 Env *env = DefaultEnv::GetEnv();
791 Log *log = DefaultEnv::GetLog();
792
793 int streamTimeout = DefaultStreamTimeout;
794 env->GetInt( "StreamTimeout", streamTimeout );
795
796 XrdSysMutexHelper scopedLock( info->mutex );
797
798 const time_t now = time(0);
799 const bool anySID =
800 info->sidManager->IsAnySIDOldAs( now - streamTimeout );
801
802 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %ld seconds, "
803 "stream timeout: %d, any SID: %d, wait barrier: %s",
804 info->streamName.c_str(), inactiveTime, streamTimeout,
805 anySID, Utils::TimeToString(info->waitBarrier).c_str() );
806
807 if( inactiveTime < streamTimeout )
808 return Status();
809
810 if( now < info->waitBarrier )
811 return Status();
812
813 if( !anySID )
814 return Status();
815
816 return Status( stError, errSocketTimeout );
817 }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
const uint16_t errSocketTimeout
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t time,
AnyObject & channelData )
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 739 of file XrdClXRootDTransport.cc.

741 {
742 XRootDChannelInfo *info = 0;
743 channelData.Get( info );
744 Env *env = DefaultEnv::GetEnv();
745 Log *log = DefaultEnv::GetLog();
746
747 //--------------------------------------------------------------------------
748 // Check the TTL settings for the current server
749 //--------------------------------------------------------------------------
750 int ttl;
751 if( info->serverFlags & kXR_isServer )
752 {
754 env->GetInt( "DataServerTTL", ttl );
755 }
756 else
757 {
759 env->GetInt( "LoadBalancerTTL", ttl );
760 }
761
762 //--------------------------------------------------------------------------
763 // See whether we can give a go-ahead for the disconnection
764 //--------------------------------------------------------------------------
765 XrdSysMutexHelper scopedLock( info->mutex );
766 uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
767 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %ld seconds, "
768 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769 info->streamName.c_str(), inactiveTime, ttl, allocatedSIDs,
770 info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
771
772 if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
773 return false;
774
775 if( !allocatedSIDs && inactiveTime > ttl )
776 return true;
777
778 return false;
779 }
#define kXR_isServer
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message & msg)
static

Log server error response.

Definition at line 1454 of file XrdClXRootDTransport.cc.

1455 {
1456 Log *log = DefaultEnv::GetLog();
1457 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1458 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1459 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1460 log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1461 rsp->body.error.errnum, errmsg );
1462 delete [] errmsg;
1463 }
union ServerResponse::@040373375333017131300127053271011057331004327334 body
ServerResponseHeader hdr

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char * msg)
static

Marshal the outgoing message.

Definition at line 1050 of file XrdClXRootDTransport.cc.

1051 {
1052 ClientRequest *req = (ClientRequest*)msg;
1053 switch( req->header.requestid )
1054 {
1055 //------------------------------------------------------------------------
1056 // kXR_protocol
1057 //------------------------------------------------------------------------
1058 case kXR_protocol:
1059 req->protocol.clientpv = htonl( req->protocol.clientpv );
1060 break;
1061
1062 //------------------------------------------------------------------------
1063 // kXR_login
1064 //------------------------------------------------------------------------
1065 case kXR_login:
1066 req->login.pid = htonl( req->login.pid );
1067 break;
1068
1069 //------------------------------------------------------------------------
1070 // kXR_locate
1071 //------------------------------------------------------------------------
1072 case kXR_locate:
1073 req->locate.options = htons( req->locate.options );
1074 break;
1075
1076 //------------------------------------------------------------------------
1077 // kXR_query
1078 //------------------------------------------------------------------------
1079 case kXR_query:
1080 req->query.infotype = htons( req->query.infotype );
1081 break;
1082
1083 //------------------------------------------------------------------------
1084 // kXR_truncate
1085 //------------------------------------------------------------------------
1086 case kXR_truncate:
1087 req->truncate.offset = htonll( req->truncate.offset );
1088 break;
1089
1090 //------------------------------------------------------------------------
1091 // kXR_mkdir
1092 //------------------------------------------------------------------------
1093 case kXR_mkdir:
1094 req->mkdir.mode = htons( req->mkdir.mode );
1095 break;
1096
1097 //------------------------------------------------------------------------
1098 // kXR_chmod
1099 //------------------------------------------------------------------------
1100 case kXR_chmod:
1101 req->chmod.mode = htons( req->chmod.mode );
1102 break;
1103
1104 //------------------------------------------------------------------------
1105 // kXR_open
1106 //------------------------------------------------------------------------
1107 case kXR_open:
1108 req->open.mode = htons( req->open.mode );
1109 req->open.options = htons( req->open.options );
1110 break;
1111
1112 //------------------------------------------------------------------------
1113 // kXR_read
1114 //------------------------------------------------------------------------
1115 case kXR_read:
1116 req->read.offset = htonll( req->read.offset );
1117 req->read.rlen = htonl( req->read.rlen );
1118 break;
1119
1120 //------------------------------------------------------------------------
1121 // kXR_write
1122 //------------------------------------------------------------------------
1123 case kXR_write:
1124 req->write.offset = htonll( req->write.offset );
1125 break;
1126
1127 //------------------------------------------------------------------------
1128 // kXR_mv
1129 //------------------------------------------------------------------------
1130 case kXR_mv:
1131 req->mv.arg1len = htons( req->mv.arg1len );
1132 break;
1133
1134 //------------------------------------------------------------------------
1135 // kXR_readv
1136 //------------------------------------------------------------------------
1137 case kXR_readv:
1138 {
1139 uint16_t numChunks = (req->readv.dlen)/16;
1140 readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1141 for( size_t i = 0; i < numChunks; ++i )
1142 {
1143 dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1144 dataChunk[i].offset = htonll( dataChunk[i].offset );
1145 }
1146 break;
1147 }
1148
1149 //------------------------------------------------------------------------
1150 // kXR_writev
1151 //------------------------------------------------------------------------
1152 case kXR_writev:
1153 {
1154 uint16_t numChunks = (req->writev.dlen)/16;
1155 XrdProto::write_list *wrtList =
1156 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1157 for( size_t i = 0; i < numChunks; ++i )
1158 {
1159 wrtList[i].wlen = htonl( wrtList[i].wlen );
1160 wrtList[i].offset = htonll( wrtList[i].offset );
1161 }
1162
1163 break;
1164 }
1165
1166 case kXR_pgread:
1167 {
1168 req->pgread.offset = htonll( req->pgread.offset );
1169 req->pgread.rlen = htonl( req->pgread.rlen );
1170 break;
1171 }
1172
1173 case kXR_pgwrite:
1174 {
1175 req->pgwrite.offset = htonll( req->pgwrite.offset );
1176 break;
1177 }
1178
1179 //------------------------------------------------------------------------
1180 // kXR_prepare
1181 //------------------------------------------------------------------------
1182 case kXR_prepare:
1183 {
1184 req->prepare.optionX = htons( req->prepare.optionX );
1185 req->prepare.port = htons( req->prepare.port );
1186 break;
1187 }
1188
1189 case kXR_chkpoint:
1190 {
1191 if( req->chkpoint.opcode == kXR_ckpXeq )
1192 MarshallRequest( msg + 24 );
1193 break;
1194 }
1195 };
1196
1197 req->header.requestid = htons( req->header.requestid );
1198 req->header.dlen = htonl( req->header.dlen );
1199 return XRootDStatus();
1200 }
struct ClientTruncateRequest truncate
Definition XProtocol.hh:875
struct ClientPgReadRequest pgread
Definition XProtocol.hh:861
struct ClientMkdirRequest mkdir
Definition XProtocol.hh:858
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:862
struct ClientReadVRequest readv
Definition XProtocol.hh:868
struct ClientOpenRequest open
Definition XProtocol.hh:860
struct ClientRequestHdr header
Definition XProtocol.hh:846
struct ClientWriteVRequest writev
Definition XProtocol.hh:877
struct ClientLoginRequest login
Definition XProtocol.hh:857
@ kXR_login
Definition XProtocol.hh:119
struct ClientChmodRequest chmod
Definition XProtocol.hh:850
struct ClientQueryRequest query
Definition XProtocol.hh:866
struct ClientReadRequest read
Definition XProtocol.hh:867
struct ClientMvRequest mv
Definition XProtocol.hh:859
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ClientPrepareRequest prepare
Definition XProtocol.hh:864
struct ClientWriteRequest write
Definition XProtocol.hh:876
struct ClientProtocolRequest protocol
Definition XProtocol.hh:865
struct ClientLocateRequest locate
Definition XProtocol.hh:856
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientReadVRequest::dlen, ClientRequestHdr::dlen, ClientWriteVRequest::dlen, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, readahead_list::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

+ Here is the call graph for this function:

◆ MarshallRequest() [2/2]

static XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message * msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176 {
177 MarshallRequest( msg->GetBuffer() );
178 msg->SetIsMarshalled( true );
179 return XRootDStatus();
180 }

References XrdCl::Buffer::GetBuffer(), MarshallRequest(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message & msg,
uint16_t subStream,
AnyObject & channelData )
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1561 of file XrdClXRootDTransport.cc.

1564 {
1565 XRootDChannelInfo *info = 0;
1566 channelData.Get( info );
1567 XrdSysMutexHelper scopedLock( info->mutex );
1568 Log *log = DefaultEnv::GetLog();
1569
1570 //--------------------------------------------------------------------------
1571 // Update the substream queues
1572 //--------------------------------------------------------------------------
1573 info->strmSelector->MsgReceived( subStream );
1574
1575 //--------------------------------------------------------------------------
1576 // Check whether this message is a response to a request that has
1577 // timed out, and if so, drop it
1578 //--------------------------------------------------------------------------
1579 ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1580 if( rsp->hdr.status == kXR_attn )
1581 {
1582 return NoAction;
1583 }
1584
1585 if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1586 {
1587 log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1588 "response that we're no longer interested in (timed out)",
1589 &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1590 //------------------------------------------------------------------------
1591 // If it is kXR_waitresp there will be another one,
1592 // so we don't release the sid yet
1593 //------------------------------------------------------------------------
1594 if( rsp->hdr.status != kXR_waitresp )
1595 info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1596 //------------------------------------------------------------------------
1597 // If it is a successful response to an open request
1598 // that timed out, we need to send a close
1599 //------------------------------------------------------------------------
1600 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1601 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1602 if( sidIt != info->sentOpens.end() )
1603 {
1604 info->sentOpens.erase( sidIt );
1605 if( rsp->hdr.status == kXR_ok ) return RequestClose;
1606 }
1607 return DigestMsg;
1608 }
1609
1610 //--------------------------------------------------------------------------
1611 // If we have a wait or waitresp
1612 //--------------------------------------------------------------------------
1613 uint32_t seconds = 0;
1614 if( rsp->hdr.status == kXR_wait )
1615 seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1616 // to re-send the request
1617 else if( rsp->hdr.status == kXR_waitresp )
1618 {
1619 seconds = ntohl( rsp->body.waitresp.seconds );
1620
1621 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1622 "setting up wait barrier.",
1623 info->streamName.c_str(),
1624 seconds );
1625 }
1626
1627 time_t barrier = time(0) + seconds;
1628 if( info->waitBarrier < barrier )
1629 info->waitBarrier = barrier;
1630
1631 //--------------------------------------------------------------------------
1632 // If we got a response to an open request, we may need to bump the counter
1633 // of open files
1634 //--------------------------------------------------------------------------
1635 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1636 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1637 if( sidIt != info->sentOpens.end() )
1638 {
1639 if( rsp->hdr.status == kXR_waitresp )
1640 return NoAction;
1641 info->sentOpens.erase( sidIt );
1642 if( rsp->hdr.status == kXR_ok )
1643 {
1644 ++info->openFiles;
1645 info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1646 }
1647 return NoAction;
1648 }
1649
1650 //--------------------------------------------------------------------------
1651 // If we got a response to a close, we may need to decrement the counter of
1652 // open files
1653 //--------------------------------------------------------------------------
1654 sidIt = info->sentCloses.find( sid );
1655 if( sidIt != info->sentCloses.end() )
1656 {
1657 if( rsp->hdr.status == kXR_waitresp )
1658 return NoAction;
1659 info->sentCloses.erase( sidIt );
1660 --info->openFiles;
1661 return NoAction;
1662 }
1663 return NoAction;
1664 }
kXR_char streamid[2]
Definition XProtocol.hh:914
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message * msg,
uint16_t subStream,
uint32_t bytesSent,
AnyObject & channelData )
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1669 of file XrdClXRootDTransport.cc.

1673 {
1674 XRootDChannelInfo *info = 0;
1675 channelData.Get( info );
1676 XrdSysMutexHelper scopedLock( info->mutex );
1677 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1678 uint16_t reqid = ntohs( req->header.requestid );
1679
1680
1681 //--------------------------------------------------------------------------
1682 // We need to track opens to know if we can close streams due to idleness
1683 //--------------------------------------------------------------------------
1684 uint16_t sid;
1685 memcpy( &sid, req->header.streamid, 2 );
1686
1687 if( reqid == kXR_open )
1688 info->sentOpens.insert( sid );
1689 else if( reqid == kXR_close )
1690 info->sentCloses.insert( sid );
1691 }
kXR_char streamid[2]
Definition XProtocol.hh:156

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 822 of file XrdClXRootDTransport.cc.

823 {
824 return PathID( 0, 0 );
825 }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 830 of file XrdClXRootDTransport.cc.

833 {
834 XRootDChannelInfo *info = 0;
835 channelData.Get( info );
836 XrdSysMutexHelper scopedLock( info->mutex );
837
838 //--------------------------------------------------------------------------
839 // If we're not connected to a data server or we don't know that yet
840 // we stream through 0
841 //--------------------------------------------------------------------------
842 if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
843 return PathID( 0, 0 );
844
845 //--------------------------------------------------------------------------
846 // Select the streams
847 //--------------------------------------------------------------------------
848 Log *log = DefaultEnv::GetLog();
849 uint16_t upStream = 0;
850 uint16_t downStream = 0;
851
852 if( hint )
853 {
854 upStream = hint->up;
855 downStream = hint->down;
856 }
857 else
858 {
859 upStream = 0;
860 std::vector<bool> connected;
861 connected.reserve( info->stream.size() - 1 );
862 size_t nbConnected = 0;
863 for( size_t i = 1; i < info->stream.size(); ++i )
864 if( info->stream[i].status == XRootDStreamInfo::Connected )
865 {
866 connected.push_back( true );
867 ++nbConnected;
868 }
869 else
870 connected.push_back( false );
871
872 if( nbConnected == 0 )
873 downStream = 0;
874 else
875 downStream = info->strmSelector->Select( connected );
876 }
877
878 if( upStream >= info->stream.size() )
879 {
880 log->Debug( XRootDTransportMsg,
881 "[%s] Up link stream %d does not exist, using 0",
882 info->streamName.c_str(), upStream );
883 upStream = 0;
884 }
885
886 if( downStream >= info->stream.size() )
887 {
888 log->Debug( XRootDTransportMsg,
889 "[%s] Down link stream %d does not exist, using 0",
890 info->streamName.c_str(), downStream );
891 downStream = 0;
892 }
893
894 //--------------------------------------------------------------------------
895 // Modify the message
896 //--------------------------------------------------------------------------
897 UnMarshallRequest( msg );
898 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
899 switch( hdr->requestid )
900 {
901 //------------------------------------------------------------------------
902 // Read - we update the path id to tell the server where we want to
903 // get the response, but we still send the request through stream 0
904 // We need to allocate space for read_args if we don't have it
905 // included yet
906 //------------------------------------------------------------------------
907 case kXR_read:
908 {
909 if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
910 {
911 msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
912 void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
913 memset( newBuf, 0, 8 );
914 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
915 req->dlen += 8;
916 }
917 read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
918 args->pathid = info->stream[downStream].pathId;
919 break;
920 }
921
922
923 //------------------------------------------------------------------------
924 // PgRead - we update the path id to tell the server where we want to
925 // get the response, but we still send the request through stream 0
926 // We need to allocate space for ClientPgReadReqArgs if we don't have it
927 // included yet
928 //------------------------------------------------------------------------
929 case kXR_pgread:
930 {
931 if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
932 {
933 msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
934 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
935 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
936 ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
937 req->dlen += sizeof( ClientPgReadReqArgs );
938 }
939 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
940 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
941 args->pathid = info->stream[downStream].pathId;
942 break;
943 }
944
945 //------------------------------------------------------------------------
946 // ReadV - the situation is identical to read but we don't need any
947 // additional structures to specify the return path
948 //------------------------------------------------------------------------
949 case kXR_readv:
950 {
951 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
952 req->pathid = info->stream[downStream].pathId;
953 break;
954 }
955
956 //------------------------------------------------------------------------
957 // Write - multiplexing writes doesn't work properly in the server
958 //------------------------------------------------------------------------
959 case kXR_write:
960 {
961// ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
962// req->pathid = info->stream[downStream].pathId;
963 break;
964 }
965
966 //------------------------------------------------------------------------
967 // WriteV - multiplexing writes doesn't work properly in the server
968 //------------------------------------------------------------------------
969 case kXR_writev:
970 {
971// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
972// req->pathid = info->stream[downStream].pathId;
973 break;
974 }
975
976 //------------------------------------------------------------------------
977 // PgWrite - multiplexing writes doesn't work properly in the server
978 //------------------------------------------------------------------------
979 case kXR_pgwrite:
980 {
981// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
982// req->pathid = info->stream[downStream].pathId;
983 break;
984 }
985 };
986 MarshallRequest( msg );
987 return PathID( upStream, downStream );
988 }
kXR_char pathid
Definition XProtocol.hh:653
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, ClientReadVRequest::pathid, read_args::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject & channelData)
static

Number of currently connected data streams.

Definition at line 1468 of file XrdClXRootDTransport.cc.

1469 {
1470 XRootDChannelInfo *info = 0;
1471 channelData.Get( info );
1472 XrdSysMutexHelper scopedLock( info->mutex );
1473
1474 uint16_t nbConnected = 0;
1475 for( size_t i = 1; i < info->stream.size(); ++i )
1476 if( info->stream[i].status == XRootDStreamInfo::Connected )
1477 ++nbConnected;
1478
1479 return nbConnected;
1480 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, and XrdCl::XRootDChannelInfo::stream.

Referenced by XrdCl::Channel::NbConnectedStrm().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168 {
169 return true;
170 }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1757 of file XrdClXRootDTransport.cc.

1759 {
1760 XRootDChannelInfo *info = 0;
1761 channelData.Get( info );
1762
1764 int notlsok = DefaultNoTlsOK;
1765 env->GetInt( "NoTlsOK", notlsok );
1766
1767 if( notlsok )
1768 return info->encrypted;
1769
1770 // Did the server instructed us to switch to TLS right away?
1771 if( info->serverFlags & kXR_gotoTLS )
1772 {
1773 info->encrypted = true;
1774 return true ;
1775 }
1776
1777 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1778
1779 //--------------------------------------------------------------------------
1780 // The control stream (sub-stream 0) might need to switch to TLS before
1781 // login or after login
1782 //--------------------------------------------------------------------------
1783 if( handShakeData->subStreamId == 0 )
1784 {
1785 //------------------------------------------------------------------------
1786 // We are about to login and the server asked to start encrypting
1787 // before login
1788 //------------------------------------------------------------------------
1789 if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1790 ( info->serverFlags & kXR_tlsLogin ) )
1791 {
1792 info->encrypted = true;
1793 return true;
1794 }
1795
1796 //--------------------------------------------------------------------
1797 // The hand-shake is done and the server requested to encrypt the session
1798 //--------------------------------------------------------------------
1799 if( (sInfo.status == XRootDStreamInfo::Connected ||
1800 //--------------------------------------------------------------------
1801 // we really need to turn on TLS before we sent kXR_endsess and we
1802 // are about to do so (1st enable encryption, then send kXR_endsess)
1803 //--------------------------------------------------------------------
1804 sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1805 ( info->serverFlags & kXR_tlsSess ) )
1806 {
1807 info->encrypted = true;
1808 return true;
1809 }
1810 }
1811 //--------------------------------------------------------------------------
1812 // A data stream (sub-stream > 0) if need be will be switched to TLS before
1813 // bind.
1814 //--------------------------------------------------------------------------
1815 else
1816 {
1817 //------------------------------------------------------------------------
1818 // We are about to bind a data stream and the server asked to start
1819 // encrypting before bind
1820 //------------------------------------------------------------------------
1821 if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1822 ( info->serverFlags & kXR_tlsData ) )
1823 {
1824 info->encrypted = true;
1825 return true;
1826 }
1827 }
1828
1829 return false;
1830 }
#define kXR_tlsLogin
#define kXR_gotoTLS
#define kXR_tlsSess
#define kXR_tlsData
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t query,
AnyObject & result,
AnyObject & channelData )
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1513 of file XrdClXRootDTransport.cc.

1516 {
1517 XRootDChannelInfo *info = 0;
1518 channelData.Get( info );
1519 XrdSysMutexHelper scopedLock( info->mutex );
1520
1521 switch( query )
1522 {
1523 //------------------------------------------------------------------------
1524 // Protocol name
1525 //------------------------------------------------------------------------
1527 result.Set( (const char*)"XRootD", false );
1528 return Status();
1529
1530 //------------------------------------------------------------------------
1531 // Authentication
1532 //------------------------------------------------------------------------
1534 result.Set( new std::string( info->authProtocolName ), false );
1535 return Status();
1536
1537 //------------------------------------------------------------------------
1538 // Server flags
1539 //------------------------------------------------------------------------
1541 result.Set( new int( info->serverFlags ), false );
1542 return Status();
1543
1544 //------------------------------------------------------------------------
1545 // Protocol version
1546 //------------------------------------------------------------------------
1548 result.Set( new int( info->protocolVersion ), false );
1549 return Status();
1550
1552 result.Set( new bool( info->encrypted ), false );
1553 return Status();
1554 };
1555 return Status( stError, errQueryNotSupported );
1556 }
const uint16_t errQueryNotSupported
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ SetDescription()

static void XrdCl::XRootDTransport::SetDescription ( Message * msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246 {
247 std::ostringstream o;
248 GenerateDescription( msg->GetBuffer(), o );
249 msg->SetDescription( o.str() );
250 }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::Open(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject & channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 995 of file XrdClXRootDTransport.cc.

996 {
997 XRootDChannelInfo *info = 0;
998 channelData.Get( info );
999 XrdSysMutexHelper scopedLock( info->mutex );
1000
1001 //--------------------------------------------------------------------------
1002 // If the connection has been opened in order to orchestrate a TPC or
1003 // the remote server is a Manager or Metamanager we will need only one
1004 // (control) stream.
1005 //--------------------------------------------------------------------------
1006 if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1007
1008 //--------------------------------------------------------------------------
1009 // Number of streams requested by user
1010 //--------------------------------------------------------------------------
1011 uint16_t ret = info->stream.size();
1012
1014 int nodata = DefaultTlsNoData;
1015 env->GetInt( "TlsNoData", nodata );
1016
1017 // Does the server require the stream 0 to be encrypted?
1018 bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1019 ( info->serverFlags & kXR_tlsLogin ) ||
1020 ( info->serverFlags & kXR_tlsSess );
1021 // Does the server NOT require the data streams to be encrypted?
1022 bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1023 // Does the user require the stream 0 to be encrypted?
1024 bool usrTlsStrm0 = info->encrypted;
1025 // Does the user NOT require the data streams to be encrypted?
1026 bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1027
1028 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1030 {
1031 //------------------------------------------------------------------------
1032 // The server or user asked us to encrypt stream 0, but to send the data
1033 // (read/write) using a plain TCP connection
1034 //------------------------------------------------------------------------
1035 if( ret == 1 ) ++ret;
1036 }
1037
1038 if( ret > info->stream.size() )
1039 {
1040 info->stream.resize( ret );
1041 info->strmSelector->AdjustQueues( ret );
1042 }
1043
1044 return ret;
1045 }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message & msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1381 of file XrdClXRootDTransport.cc.

1382 {
1383 ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1384 uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1385
1386 switch( reqType )
1387 {
1388 case kXR_pgwrite:
1389 {
1390 //--------------------------------------------------------------------------
1391 // If there's no additional data there's nothing to unmarshal
1392 //--------------------------------------------------------------------------
1393 if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1394 //--------------------------------------------------------------------------
1395 // If there's not enough data to form correction-segment report an error
1396 //--------------------------------------------------------------------------
1397 if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1398 return XRootDStatus( stError, errInvalidMessage, 0,
1399 "kXR_status: invalid message size." );
1400
1401 //--------------------------------------------------------------------------
1402 // Calculate the crc32c for the additional data
1403 //--------------------------------------------------------------------------
1405 cse->cseCRC = ntohl( cse->cseCRC );
1406 size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1407 void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1408 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1409
1410 //--------------------------------------------------------------------------
1411 // Do the integrity checks
1412 //--------------------------------------------------------------------------
1413 if( crcval != cse->cseCRC )
1414 {
1415 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1416 "corrupted (crc32c integrity check failed)." );
1417 }
1418
1419 cse->dlFirst = ntohs( cse->dlFirst );
1420 cse->dlLast = ntohs( cse->dlLast );
1421
1422 size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1423 sizeof( kXR_int64 );
1424 kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1425 sizeof( ServerResponseBody_pgWrCSE ) );
1426
1427 for( size_t i = 0; i < pgcnt; ++i )
1428 pgoffs[i] = ntohll( pgoffs[i] );
1429
1430 return XRootDStatus();
1431 break;
1432 }
1433
1434 default:
1435 break;
1436 }
1437
1438 return XRootDStatus( stError, errNotSupported );
1439 }
ServerResponseStatus status
@ kXR_1stRequest
Definition XProtocol.hh:111
long long kXR_int64
Definition XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
const uint16_t errNotSupported

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message * msg,
uint16_t reqType )
static

Unmarshall the body of the incoming message.

Definition at line 1227 of file XrdClXRootDTransport.cc.

1228 {
1229 ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1230
1231 //--------------------------------------------------------------------------
1232 // kXR_ok
1233 //--------------------------------------------------------------------------
1234 if( m->hdr.status == kXR_ok )
1235 {
1236 switch( reqType )
1237 {
1238 //----------------------------------------------------------------------
1239 // kXR_protocol
1240 //----------------------------------------------------------------------
1241 case kXR_protocol:
1242 if( m->hdr.dlen < 8 )
1243 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1244 m->body.protocol.pval = ntohl( m->body.protocol.pval );
1245 m->body.protocol.flags = ntohl( m->body.protocol.flags );
1246 break;
1247 }
1248 }
1249 //--------------------------------------------------------------------------
1250 // kXR_error
1251 //--------------------------------------------------------------------------
1252 else if( m->hdr.status == kXR_error )
1253 {
1254 if( m->hdr.dlen < 4 )
1255 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1256 m->body.error.errnum = ntohl( m->body.error.errnum );
1257 }
1258
1259 //--------------------------------------------------------------------------
1260 // kXR_wait
1261 //--------------------------------------------------------------------------
1262 else if( m->hdr.status == kXR_wait )
1263 {
1264 if( m->hdr.dlen < 4 )
1265 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1266 m->body.wait.seconds = htonl( m->body.wait.seconds );
1267 }
1268
1269 //--------------------------------------------------------------------------
1270 // kXR_redirect
1271 //--------------------------------------------------------------------------
1272 else if( m->hdr.status == kXR_redirect )
1273 {
1274 if( m->hdr.dlen < 4 )
1275 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1276 m->body.redirect.port = htonl( m->body.redirect.port );
1277 }
1278
1279 //--------------------------------------------------------------------------
1280 // kXR_waitresp
1281 //--------------------------------------------------------------------------
1282 else if( m->hdr.status == kXR_waitresp )
1283 {
1284 if( m->hdr.dlen < 4 )
1285 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1286 m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1287 }
1288
1289 //--------------------------------------------------------------------------
1290 // kXR_attn
1291 //--------------------------------------------------------------------------
1292 else if( m->hdr.status == kXR_attn )
1293 {
1294 if( m->hdr.dlen < 4 )
1295 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1296 m->body.attn.actnum = htonl( m->body.attn.actnum );
1297 }
1298
1299 return XRootDStatus();
1300 }
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_error
Definition XProtocol.hh:903

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message & msg)
static

Unmarshall the header incoming message.

Definition at line 1444 of file XrdClXRootDTransport.cc.

1445 {
1446 ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1447 header->status = ntohs( header->status );
1448 header->dlen = ntohl( header->dlen );
1449 }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message * msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1206 of file XrdClXRootDTransport.cc.

1207 {
1208 if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1209 // We rely on the marshaling process to be symmetric!
1210 // First we unmarshall the request ID and the length because
1211 // MarshallRequest() relies on these, and then we need to unmarshall these
1212 // two again, because they get marshalled in MarshallRequest().
1213 // All this is pretty damn ugly and should be rewritten.
1214 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1215 req->header.requestid = htons( req->header.requestid );
1216 req->header.dlen = htonl( req->header.dlen );
1217 XRootDStatus st = MarshallRequest( msg );
1218 req->header.requestid = htons( req->header.requestid );
1219 req->header.dlen = htonl( req->header.dlen );
1220 msg->SetIsMarshalled( false );
1221 return st;
1222 }
const uint16_t suAlreadyDone

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message & msg,
uint16_t reqType )
static

Unmarshall the body of the status response.

Definition at line 1305 of file XrdClXRootDTransport.cc.

1306 {
1307 //--------------------------------------------------------------------------
1308 // Calculate the crc32c before the unmarshaling the body!
1309 //--------------------------------------------------------------------------
1310 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1311 char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1312 size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1313 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1314
1315 size_t stlen = sizeof( ServerResponseStatus );
1316 switch( reqType )
1317 {
1318 case kXR_pgread:
1319 {
1320 stlen += sizeof( ServerResponseBody_pgRead );
1321 break;
1322 }
1323
1324 case kXR_pgwrite:
1325 {
1326 stlen += sizeof( ServerResponseBody_pgWrite );
1327 break;
1328 }
1329 }
1330
1331 if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1332 "kXR_status: invalid message size." );
1333
1334 rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1335 rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1336
1337 switch( reqType )
1338 {
1339 case kXR_pgread:
1340 {
1341 ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1342 pgrdbdy->offset = ntohll( pgrdbdy->offset );
1343 break;
1344 }
1345
1346 case kXR_pgwrite:
1347 {
1348 ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1349 pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1350 break;
1351 }
1352 }
1353
1354 //--------------------------------------------------------------------------
1355 // Do the integrity checks
1356 //--------------------------------------------------------------------------
1357 if( crcval != rspst->bdy.crc32c )
1358 {
1359 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1360 "corrupted (crc32c integrity check failed)." );
1361 }
1362
1363 if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1364 rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1365 {
1366 return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1367 "(stream ID mismatch)." );
1368 }
1369
1370
1371
1372 if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1373 {
1374 return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1375 "(request ID mismatch)." );
1376 }
1377
1378 return XRootDStatus();
1379 }
struct ServerResponseHeader hdr

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseBody_Status::dlen, ServerResponseHeader::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseBody_Status::streamID, and ServerResponseHeader::streamid.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1748 of file XrdClXRootDTransport.cc.

1749 {
1750 XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1751 pSecUnloadHandler->unloaded = true;
1752 }

References XrdCl::PluginUnloadHandler::lock, and XrdCl::PluginUnloadHandler::unloaded.

Friends And Related Symbol Documentation

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.


The documentation for this class was generated from the following files: