XRootD
Loading...
Searching...
No Matches
XrdClClassicCopyJob.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClFile.hh"
30#include "XrdCl/XrdClMonitor.hh"
31#include "XrdCl/XrdClUtils.hh"
33#include "XrdCks/XrdCksCalc.hh"
40#include "XrdClXCpCtx.hh"
42#include "XrdSys/XrdSysE2T.hh"
44
45#include <memory>
46#include <mutex>
47#include <queue>
48#include <algorithm>
49#include <chrono>
50#include <thread>
51#include <vector>
52
53#include <sys/types.h>
54#include <sys/stat.h>
55#include <fcntl.h>
56#include <cerrno>
57#include <unistd.h>
58
59#if __cplusplus < 201103L
60#include <ctime>
61#endif
62
63namespace
64{
65 //----------------------------------------------------------------------------
67 //----------------------------------------------------------------------------
68 template<typename U = std::ratio<1, 1>>
69 class mytimer_t
70 {
71 public:
72 mytimer_t() : start( clock_t::now() ){ }
73 void reset(){ start = clock_t::now(); }
74 uint64_t elapsed() const
75 {
76 return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
77 }
78 private:
79 typedef std::chrono::high_resolution_clock clock_t;
80 typedef std::chrono::duration<uint64_t, U> unit_t;
81 std::chrono::time_point<clock_t> start;
82 };
83
84 using timer_sec_t = mytimer_t<>;
85 using timer_nsec_t = mytimer_t<std::nano>;
86
87
88 inline XrdCl::XRootDStatus Translate( std::vector<XrdCl::XAttr> &in,
89 std::vector<XrdCl::xattr_t> &out )
90 {
91 std::vector<XrdCl::xattr_t> ret;
92 ret.reserve( in.size() );
93 std::vector<XrdCl::XAttr>::iterator itr = in.begin();
94 for( ; itr != in.end() ; ++itr )
95 {
96 if( !itr->status.IsOK() ) return itr->status;
97 XrdCl::xattr_t xa( itr->name, itr->value );
98 ret.push_back( std::move( xa ) );
99 }
100 out.swap( ret );
101 return XrdCl::XRootDStatus();
102 }
103
104 //----------------------------------------------------------------------------
106 //----------------------------------------------------------------------------
108 std::vector<XrdCl::xattr_t> &xattrs )
109 {
110 std::vector<XrdCl::XAttr> rsp;
111 XrdCl::XRootDStatus st = file.ListXAttr( rsp );
112 if( !st.IsOK() ) return st;
113 return Translate( rsp, xattrs );
114 }
115
116 //----------------------------------------------------------------------------
118 //----------------------------------------------------------------------------
119 inline XrdCl::XRootDStatus GetXAttr( const std::string &url,
120 std::vector<XrdCl::xattr_t> &xattrs )
121 {
122 XrdCl::URL u( url );
123 XrdCl::FileSystem fs( u );
124 std::vector<XrdCl::XAttr> rsp;
125 XrdCl::XRootDStatus st = fs.ListXAttr( u.GetPath(), rsp );
126 if( !st.IsOK() ) return st;
127 return Translate( rsp, xattrs );
128 }
129
131 const std::vector<XrdCl::xattr_t> &xattrs )
132 {
133 std::vector<XrdCl::XAttrStatus> rsp;
134 file.SetXAttr( xattrs, rsp );
135 std::vector<XrdCl::XAttrStatus>::iterator itr = rsp.begin();
136 for( ; itr != rsp.end() ; ++itr )
137 if( !itr->status.IsOK() ) return itr->status;
138 return XrdCl::XRootDStatus();
139 }
140
141 //----------------------------------------------------------------------------
143 //----------------------------------------------------------------------------
144 class Source
145 {
146 public:
147 //------------------------------------------------------------------------
148 // Destructor
149 //------------------------------------------------------------------------
150 Source( const std::string &checkSumType = "",
151 const std::vector<std::string> &addcks = std::vector<std::string>() ) :
152 pCkSumHelper( 0 ),
153 pContinue( false )
154 {
155 if( !checkSumType.empty() )
156 pCkSumHelper = new XrdCl::CheckSumHelper( "source", checkSumType );
157
158 for( auto &type : addcks )
159 pAddCksHelpers.push_back( new XrdCl::CheckSumHelper( "source", type ) );
160 };
161
162 virtual ~Source()
163 {
164 delete pCkSumHelper;
165 for( auto ptr : pAddCksHelpers )
166 delete ptr;
167 }
168
169 //------------------------------------------------------------------------
171 //------------------------------------------------------------------------
172 virtual XrdCl::XRootDStatus Initialize() = 0;
173
174 //------------------------------------------------------------------------
176 //------------------------------------------------------------------------
177 virtual int64_t GetSize() = 0;
178
179 //------------------------------------------------------------------------
181 //------------------------------------------------------------------------
182 virtual XrdCl::XRootDStatus StartAt( uint64_t offset ) = 0;
183
184 //------------------------------------------------------------------------
191 //------------------------------------------------------------------------
192 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci ) = 0;
193
194 //------------------------------------------------------------------------
196 //------------------------------------------------------------------------
197 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
198 std::string &checkSumType ) = 0;
199
200 //------------------------------------------------------------------------
202 //------------------------------------------------------------------------
203 virtual std::vector<std::string> GetAddCks() = 0;
204
205 //------------------------------------------------------------------------
207 //------------------------------------------------------------------------
208 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs ) = 0;
209
210 //------------------------------------------------------------------------
212 //------------------------------------------------------------------------
213 virtual XrdCl::XRootDStatus TryOtherServer()
214 {
216 }
217
218 protected:
219
220 XrdCl::CheckSumHelper *pCkSumHelper;
221 std::vector<XrdCl::CheckSumHelper*> pAddCksHelpers;
222 bool pContinue;
223 };
224
225 //----------------------------------------------------------------------------
227 //----------------------------------------------------------------------------
228 class Destination
229 {
230 public:
231 //------------------------------------------------------------------------
233 //------------------------------------------------------------------------
234 Destination( const std::string &checkSumType = "" ):
235 pPosc( false ), pForce( false ), pCoerce( false ), pMakeDir( false ),
236 pContinue( false ), pCkSumHelper( 0 )
237 {
238 if( !checkSumType.empty() )
239 pCkSumHelper = new XrdCl::CheckSumHelper( "destination", checkSumType );
240 }
241
242 //------------------------------------------------------------------------
244 //------------------------------------------------------------------------
245 virtual ~Destination()
246 {
247 delete pCkSumHelper;
248 }
249
250 //------------------------------------------------------------------------
252 //------------------------------------------------------------------------
253 virtual XrdCl::XRootDStatus Initialize() = 0;
254
255 //------------------------------------------------------------------------
257 //------------------------------------------------------------------------
258 virtual XrdCl::XRootDStatus Finalize() = 0;
259
260 //------------------------------------------------------------------------
265 //------------------------------------------------------------------------
266 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci ) = 0;
267
268 //------------------------------------------------------------------------
270 //------------------------------------------------------------------------
271 virtual XrdCl::XRootDStatus Flush() = 0;
272
273 //------------------------------------------------------------------------
275 //------------------------------------------------------------------------
276 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
277 std::string &checkSumType ) = 0;
278
279 //------------------------------------------------------------------------
281 //------------------------------------------------------------------------
282 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs ) = 0;
283
284 //------------------------------------------------------------------------
286 //------------------------------------------------------------------------
287 virtual int64_t GetSize() = 0;
288
289 //------------------------------------------------------------------------
291 //------------------------------------------------------------------------
292 void SetPOSC( bool posc )
293 {
294 pPosc = posc;
295 }
296
297 //------------------------------------------------------------------------
299 //------------------------------------------------------------------------
300 void SetForce( bool force )
301 {
302 pForce = force;
303 }
304
305 //------------------------------------------------------------------------
307 //------------------------------------------------------------------------
308 void SetContinue( bool continue_ )
309 {
310 pContinue = continue_;
311 }
312
313 //------------------------------------------------------------------------
315 //------------------------------------------------------------------------
316 void SetCoerce( bool coerce )
317 {
318 pCoerce = coerce;
319 }
320
321 //------------------------------------------------------------------------
323 //------------------------------------------------------------------------
324 void SetMakeDir( bool makedir )
325 {
326 pMakeDir = makedir;
327 }
328
329 //------------------------------------------------------------------------
331 //------------------------------------------------------------------------
332 virtual const std::string& GetLastURL() const
333 {
334 static const std::string empty;
335 return empty;
336 }
337
338 //------------------------------------------------------------------------
340 //------------------------------------------------------------------------
341 virtual const std::string& GetWrtRecoveryRedir() const
342 {
343 static const std::string empty;
344 return empty;
345 }
346
347 protected:
348 bool pPosc;
349 bool pForce;
350 bool pCoerce;
351 bool pMakeDir;
352 bool pContinue;
353
354 XrdCl::CheckSumHelper *pCkSumHelper;
355 };
356
357 //----------------------------------------------------------------------------
359 //----------------------------------------------------------------------------
360 class StdInSource: public Source
361 {
362 public:
363 //------------------------------------------------------------------------
365 //------------------------------------------------------------------------
366 StdInSource( const std::string &ckSumType, uint32_t chunkSize, const std::vector<std::string> &addcks ):
367 Source( ckSumType, addcks ),
368 pCurrentOffset(0),
369 pChunkSize( chunkSize )
370 {
371
372 }
373
374 //------------------------------------------------------------------------
376 //------------------------------------------------------------------------
377 virtual ~StdInSource()
378 {
379
380 }
381
382 //------------------------------------------------------------------------
384 //------------------------------------------------------------------------
385 virtual XrdCl::XRootDStatus Initialize()
386 {
387 if( pCkSumHelper )
388 {
389 auto st = pCkSumHelper->Initialize();
390 if( !st.IsOK() ) return st;
391 for( auto cksHelper : pAddCksHelpers )
392 {
393 st = cksHelper->Initialize();
394 if( !st.IsOK() ) return st;
395 }
396 }
397 return XrdCl::XRootDStatus();
398 }
399
400 //------------------------------------------------------------------------
402 //------------------------------------------------------------------------
403 virtual int64_t GetSize()
404 {
405 return -1;
406 }
407
408 //------------------------------------------------------------------------
410 //------------------------------------------------------------------------
411 virtual XrdCl::XRootDStatus StartAt( uint64_t )
412 {
414 "Cannot continue from stdin!" );
415 }
416
417 //------------------------------------------------------------------------
419 //------------------------------------------------------------------------
420 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
421 {
422 using namespace XrdCl;
423 Log *log = DefaultEnv::GetLog();
424
425 uint32_t toRead = pChunkSize;
426 char *buffer = new char[toRead];
427
428 int64_t bytesRead = 0;
429 uint32_t offset = 0;
430 while( toRead )
431 {
432 int64_t bRead = read( 0, buffer+offset, toRead );
433 if( bRead == -1 )
434 {
435 log->Debug( UtilityMsg, "Unable to read from stdin: %s",
436 XrdSysE2T( errno ) );
437 delete [] buffer;
438 return XRootDStatus( stError, errOSError, errno );
439 }
440
441 if( bRead == 0 )
442 break;
443
444 bytesRead += bRead;
445 offset += bRead;
446 toRead -= bRead;
447 }
448
449 if( bytesRead == 0 )
450 {
451 delete [] buffer;
452 return XRootDStatus( stOK, suDone );
453 }
454
455 if( pCkSumHelper )
456 pCkSumHelper->Update( buffer, bytesRead );
457
458 for( auto cksHelper : pAddCksHelpers )
459 cksHelper->Update( buffer, bytesRead );
460
461 ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
462 pCurrentOffset += bytesRead;
463 return XRootDStatus( stOK, suContinue );
464 }
465
466 //------------------------------------------------------------------------
468 //------------------------------------------------------------------------
469 virtual XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
470 std::string &checkSum,
471 std::string &checkSumType )
472 {
473 using namespace XrdCl;
474 if( cksHelper )
475 return cksHelper->GetCheckSum( checkSum, checkSumType );
476 return XRootDStatus( stError, errCheckSumError );
477 }
478
479 //------------------------------------------------------------------------
481 //------------------------------------------------------------------------
482 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
483 std::string &checkSumType )
484 {
485 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
486 }
487
488 //------------------------------------------------------------------------
490 //------------------------------------------------------------------------
491 std::vector<std::string> GetAddCks()
492 {
493 std::vector<std::string> ret;
494 for( auto cksHelper : pAddCksHelpers )
495 {
496 std::string type = cksHelper->GetType();
497 std::string cks;
498 GetCheckSumImpl( cksHelper, cks, type );
499 ret.push_back( type + ":" + cks );
500 }
501 return ret;
502 }
503
504 //------------------------------------------------------------------------
506 //------------------------------------------------------------------------
507 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
508 {
509 return XrdCl::XRootDStatus();
510 }
511
512 private:
513 StdInSource(const StdInSource &other);
514 StdInSource &operator = (const StdInSource &other);
515
516 uint64_t pCurrentOffset;
517 uint32_t pChunkSize;
518 };
519
520 //----------------------------------------------------------------------------
522 //----------------------------------------------------------------------------
523 class XRootDSource: public Source
524 {
525 struct CancellableJob : public XrdCl::Job
526 {
527 virtual void Cancel() = 0;
528
529 std::mutex mtx;
530 };
531
532 //----------------------------------------------------------------------------
533 // On-connect callback job, a lambda would be more elegant, but we still have
534 // to support SLC6
535 //----------------------------------------------------------------------------
536 template<typename READER>
537 struct OnConnJob : public CancellableJob
538 {
539 OnConnJob( XRootDSource *self, READER *reader ) : self( self ), reader( reader )
540 {
541 }
542
543 void Run( void* )
544 {
545 std::unique_lock<std::mutex> lck( mtx );
546 if( !self || !reader ) return;
547 // add new chunks to the queue
548 if( self->pNbConn < self->pMaxNbConn )
549 self->FillQueue( reader );
550 }
551
552 void Cancel()
553 {
554 std::unique_lock<std::mutex> lck( mtx );
555 self = 0;
556 reader = 0;
557 }
558
559 private:
560 XRootDSource *self;
561 READER *reader;
562
563 };
564
565 public:
566
567 //------------------------------------------------------------------------
569 //------------------------------------------------------------------------
570 XrdCl::XRootDStatus TryOtherServer()
571 {
572 return pFile->TryOtherServer();
573 }
574
575 //------------------------------------------------------------------------
577 //------------------------------------------------------------------------
578 XRootDSource( const XrdCl::URL *url,
579 uint32_t chunkSize,
580 uint8_t parallelChunks,
581 const std::string &ckSumType,
582 const std::vector<std::string> &addcks,
583 bool doserver ):
584 Source( ckSumType, addcks ),
585 pUrl( url ), pFile( new XrdCl::File() ), pSize( -1 ),
586 pCurrentOffset( 0 ), pChunkSize( chunkSize ),
587 pParallel( parallelChunks ),
588 pNbConn( 0 ), pUsePgRead( false ),
589 pDoServer( doserver )
590 {
592 XrdCl::DefaultEnv::GetEnv()->GetInt( "SubStreamsPerChannel", val );
593 pMaxNbConn = val - 1; // account for the control stream
594 }
595
596 //------------------------------------------------------------------------
598 //------------------------------------------------------------------------
599 virtual ~XRootDSource()
600 {
601 if( pDataConnCB )
602 pDataConnCB->Cancel();
603
604 CleanUpChunks();
605 if( pFile->IsOpen() )
606 XrdCl::XRootDStatus status = pFile->Close();
607 delete pFile;
608 }
609
610 //------------------------------------------------------------------------
612 //------------------------------------------------------------------------
613 virtual XrdCl::XRootDStatus Initialize()
614 {
615 using namespace XrdCl;
616 Log *log = DefaultEnv::GetLog();
617 log->Debug( UtilityMsg, "Opening %s for reading",
618 pUrl->GetObfuscatedURL().c_str() );
619
620 std::string value;
621 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
622 pFile->SetProperty( "ReadRecovery", value );
623
624 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
625 if( !st.IsOK() )
626 return st;
627
628 StatInfo *statInfo;
629 st = pFile->Stat( false, statInfo );
630 if( !st.IsOK() )
631 return st;
632
633 pSize = statInfo->GetSize();
634 delete statInfo;
635
636 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
637 {
638 st = pCkSumHelper->Initialize();
639 if( !st.IsOK() ) return st;
640
641 for( auto cksHelper : pAddCksHelpers )
642 {
643 st = cksHelper->Initialize();
644 if( !st.IsOK() ) return st;
645 }
646 }
647
648 //----------------------------------------------------------------------
649 // Figere out the actual data server we are talking to
650 //----------------------------------------------------------------------
651 if( !pUrl->IsLocalFile() ||
652 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
653 {
654 pFile->GetProperty( "LastURL", pDataServer );
655 }
656
657
658 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
659 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
660 {
661 //--------------------------------------------------------------------
662 // Decide whether we can use PgRead
663 //--------------------------------------------------------------------
665 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
666 pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
667 }
668
669 //----------------------------------------------------------------------
670 // Print the IPv4/IPv6 stack to the stderr if we are running in server
671 // mode
672 //----------------------------------------------------------------------
673 if( pDoServer && !pUrl->IsLocalFile() )
674 {
675 AnyObject obj;
676 DefaultEnv::GetPostMaster()->QueryTransport( pDataServer, StreamQuery::IpStack, obj );
677 std::string *ipstack = nullptr;
678 obj.Get( ipstack );
679 std::cerr << "!-!" << *ipstack << std::endl;
680 delete ipstack;
681 }
682
683 SetOnDataConnectHandler( pFile );
684
685 return XRootDStatus();
686 }
687
688 //------------------------------------------------------------------------
690 //------------------------------------------------------------------------
691 virtual int64_t GetSize()
692 {
693 return pSize;
694 }
695
696 //------------------------------------------------------------------------
698 //------------------------------------------------------------------------
699 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
700 {
701 pCurrentOffset = offset;
702 pContinue = true;
703 return XrdCl::XRootDStatus();
704 }
705
706 //------------------------------------------------------------------------
713 //------------------------------------------------------------------------
714 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
715 {
716 return GetChunkImpl( pFile, ci );
717 }
718
719 //------------------------------------------------------------------------
721 //------------------------------------------------------------------------
722 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
723 {
724 return ::GetXAttr( *pFile, xattrs );
725 }
726
727 //------------------------------------------------------------------------
728 // Clean up the chunks that are flying
729 //------------------------------------------------------------------------
730 void CleanUpChunks()
731 {
732 while( !pChunks.empty() )
733 {
734 ChunkHandler *ch = pChunks.front();
735 pChunks.pop();
736 ch->sem->Wait();
737 delete [] (char *)ch->chunk.GetBuffer();
738 delete ch;
739 }
740 }
741
742 //------------------------------------------------------------------------
743 // Get check sum
744 //------------------------------------------------------------------------
745 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
746 std::string &checkSumType )
747 {
748 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
749 }
750
751 XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
752 std::string &checkSum,
753 std::string &checkSumType )
754 {
755 if( pUrl->IsMetalink() )
756 {
758 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
759 checkSum = redirector->GetCheckSum( checkSumType );
760 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
761 }
762
763 if( pUrl->IsLocalFile() )
764 {
765 if( pContinue )
766 // in case of --continue option we have to calculate the checksum from scratch
767 return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
768
769 if( cksHelper )
770 return cksHelper->GetCheckSum( checkSum, checkSumType );
771
773 }
774
775 std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
776 std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
777 return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
778 }
779
780 //------------------------------------------------------------------------
782 //------------------------------------------------------------------------
783 std::vector<std::string> GetAddCks()
784 {
785 std::vector<std::string> ret;
786 for( auto cksHelper : pAddCksHelpers )
787 {
788 std::string type = cksHelper->GetType();
789 std::string cks;
790 GetCheckSumImpl( cksHelper, cks, type );
791 ret.push_back( cks );
792 }
793 return ret;
794 }
795
796 private:
797 XRootDSource(const XRootDSource &other);
798 XRootDSource &operator = (const XRootDSource &other);
799
800 protected:
801
802 //------------------------------------------------------------------------
803 // Fill the queue with in-the-fly read requests
804 //------------------------------------------------------------------------
805 template<typename READER>
806 inline void FillQueue( READER *reader )
807 {
808 //----------------------------------------------------------------------
809 // Get the number of connected streams
810 //----------------------------------------------------------------------
811 uint16_t parallel = pParallel;
812 if( pNbConn < pMaxNbConn )
813 {
815 NbConnectedStrm( pDataServer );
816 }
817 if( pNbConn ) parallel *= pNbConn;
818
819 while( pChunks.size() < parallel && pCurrentOffset < pSize )
820 {
821 uint64_t chunkSize = pChunkSize;
822 if( pCurrentOffset + chunkSize > (uint64_t)pSize )
823 chunkSize = pSize - pCurrentOffset;
824
825 char *buffer = new char[chunkSize];
826 ChunkHandler *ch = new ChunkHandler();
827 ch->status = pUsePgRead
828 ? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
829 : reader->Read( pCurrentOffset, chunkSize, buffer, ch );
830 pChunks.push( ch );
831 pCurrentOffset += chunkSize;
832 if( !ch->status.IsOK() )
833 {
834 ch->sem->Post();
835 break;
836 }
837 }
838 }
839
840 //------------------------------------------------------------------------
841 // Set the on-connect handler for data streams
842 //------------------------------------------------------------------------
843 template<typename READER>
844 void SetOnDataConnectHandler( READER *reader )
845 {
846 // we need to create the object anyway as it contains our mutex now
847 pDataConnCB.reset( new OnConnJob<READER>( this, reader ) );
848
849 // check if it is a local file
850 if( pDataServer.empty() ) return;
851
852 XrdCl::DefaultEnv::GetPostMaster()->SetOnDataConnectHandler( pDataServer, pDataConnCB );
853 }
854
855 //------------------------------------------------------------------------
863 //------------------------------------------------------------------------
864 template<typename READER>
865 XrdCl::XRootDStatus GetChunkImpl( READER *reader, XrdCl::PageInfo &ci )
866 {
867 //----------------------------------------------------------------------
868 // Sanity check
869 //----------------------------------------------------------------------
870 using namespace XrdCl;
871 Log *log = DefaultEnv::GetLog();
872
873 //----------------------------------------------------------------------
874 // Fill the queue
875 //----------------------------------------------------------------------
876 std::unique_lock<std::mutex> lck( pDataConnCB->mtx );
877 FillQueue( reader );
878
879 //----------------------------------------------------------------------
880 // Pick up a chunk from the front and wait for status
881 //----------------------------------------------------------------------
882 if( pChunks.empty() )
883 return XRootDStatus( stOK, suDone );
884
885 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
886 pChunks.pop();
887 lck.unlock();
888
889 ch->sem->Wait();
890
891 if( !ch->status.IsOK() )
892 {
893 log->Debug( UtilityMsg, "Unable read %d bytes at %ld from %s: %s",
894 ch->chunk.GetLength(), ch->chunk.GetOffset(),
895 pUrl->GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
896 delete [] (char *)ch->chunk.GetBuffer();
897 CleanUpChunks();
898 return ch->status;
899 }
900
901 ci = std::move( ch->chunk );
902 // if it is a local file update the checksum
903 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
904 {
905 if( pCkSumHelper )
906 pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
907
908 for( auto cksHelper : pAddCksHelpers )
909 cksHelper->Update( ci.GetBuffer(), ci.GetLength() );
910 }
911
912 return XRootDStatus( stOK, suContinue );
913 }
914
915 //------------------------------------------------------------------------
916 // Asynchronous chunk handler
917 //------------------------------------------------------------------------
919 {
920 public:
921 ChunkHandler(): sem( new XrdSysSemaphore(0) ) {}
922 virtual ~ChunkHandler() { delete sem; }
923 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
924 XrdCl::AnyObject *response )
925 {
926 this->status = *statusval;
927 delete statusval;
928 if( response )
929 {
930 chunk = ToChunk( response );
931 delete response;
932 }
933 sem->Post();
934 }
935
936 XrdCl::PageInfo ToChunk( XrdCl::AnyObject *response )
937 {
938 if( response->Has<XrdCl::PageInfo>() )
939 {
940 XrdCl::PageInfo *resp = nullptr;
941 response->Get( resp );
942 return std::move( *resp );
943 }
944 else
945 {
946 XrdCl::ChunkInfo *resp = nullptr;
947 response->Get( resp );
948 return XrdCl::PageInfo( resp->GetOffset(), resp->GetLength(),
949 resp->GetBuffer() );
950 }
951 }
952
953 XrdSysSemaphore *sem;
954 XrdCl::PageInfo chunk;
955 XrdCl::XRootDStatus status;
956 };
957
958 const XrdCl::URL *pUrl;
959 XrdCl::File *pFile;
960 int64_t pSize;
961 int64_t pCurrentOffset;
962 uint32_t pChunkSize;
963 uint16_t pParallel;
964 std::queue<ChunkHandler*> pChunks;
965 std::string pDataServer;
966 uint16_t pNbConn;
967 uint16_t pMaxNbConn;
968 bool pUsePgRead;
969 bool pDoServer;
970
971 std::shared_ptr<CancellableJob> pDataConnCB;
972 };
973
974 //----------------------------------------------------------------------------
976 //----------------------------------------------------------------------------
977 class XRootDSourceZip: public XRootDSource
978 {
979 public:
980 //------------------------------------------------------------------------
982 //------------------------------------------------------------------------
983 XRootDSourceZip( const std::string &filename,
984 const XrdCl::URL *archive,
985 uint32_t chunkSize,
986 uint8_t parallelChunks,
987 const std::string &ckSumType,
988 const std::vector<std::string> &addcks,
989 bool doserver ):
990 XRootDSource( archive, chunkSize, parallelChunks, ckSumType,
991 addcks, doserver ),
992 pFilename( filename ),
993 pZipArchive( new XrdCl::ZipArchive() )
994 {
995 }
996
997 //------------------------------------------------------------------------
999 //------------------------------------------------------------------------
1000 virtual ~XRootDSourceZip()
1001 {
1002 CleanUpChunks();
1003
1004 XrdCl::WaitFor( XrdCl::CloseArchive( pZipArchive ) );
1005 delete pZipArchive;
1006 }
1007
1008 //------------------------------------------------------------------------
1010 //------------------------------------------------------------------------
1011 virtual XrdCl::XRootDStatus Initialize()
1012 {
1013 using namespace XrdCl;
1014 Log *log = DefaultEnv::GetLog();
1015 log->Debug( UtilityMsg, "Opening %s for reading",
1016 pUrl->GetObfuscatedURL().c_str() );
1017
1018 std::string value;
1019 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1020 pZipArchive->SetProperty( "ReadRecovery", value );
1021
1022 XRootDStatus st = XrdCl::WaitFor( XrdCl::OpenArchive( pZipArchive, pUrl->GetURL(), XrdCl::OpenFlags::Read ) );
1023 if( !st.IsOK() )
1024 return st;
1025
1026 st = pZipArchive->OpenFile( pFilename );
1027 if( !st.IsOK() )
1028 return st;
1029
1030 XrdCl::StatInfo *info = 0;
1031 st = pZipArchive->Stat( info );
1032 if( st.IsOK() )
1033 {
1034 pSize = info->GetSize();
1035 delete info;
1036 }
1037 else
1038 return st;
1039
1040 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper )
1041 {
1042 auto st = pCkSumHelper->Initialize();
1043 if( !st.IsOK() ) return st;
1044 for( auto cksHelper : pAddCksHelpers )
1045 {
1046 st = cksHelper->Initialize();
1047 if( !st.IsOK() ) return st;
1048 }
1049 }
1050
1051 if( ( !pUrl->IsLocalFile() && !pZipArchive->IsSecure() ) ||
1052 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1053 {
1054 pZipArchive->GetProperty( "DataServer", pDataServer );
1055 //--------------------------------------------------------------------
1056 // Decide whether we can use PgRead
1057 //--------------------------------------------------------------------
1059 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1060 pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
1061 }
1062
1063 SetOnDataConnectHandler( pZipArchive );
1064
1065 return XrdCl::XRootDStatus();
1066 }
1067
1068 //------------------------------------------------------------------------
1076 //------------------------------------------------------------------------
1077 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1078 {
1079 return GetChunkImpl( pZipArchive, ci );
1080 }
1081
1082 //------------------------------------------------------------------------
1083 // Get check sum
1084 //------------------------------------------------------------------------
1085 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1086 std::string &checkSumType )
1087 {
1088 return GetCheckSumImpl( checkSum, checkSumType, pCkSumHelper );
1089 }
1090
1091 //------------------------------------------------------------------------
1092 // Get check sum implementation
1093 //------------------------------------------------------------------------
1094 virtual XrdCl::XRootDStatus GetCheckSumImpl( std::string &checkSum,
1095 std::string &checkSumType,
1096 XrdCl::CheckSumHelper *cksHelper )
1097 {
1098 // The ZIP archive by default contains a ZCRC32 checksum
1099 if( checkSumType == "zcrc32" )
1100 {
1101 uint32_t cksum = 0;
1102 auto st = pZipArchive->GetCRC32( pFilename, cksum );
1103 if( !st.IsOK() ) return st;
1104
1105 XrdCksData ckSum;
1106 ckSum.Set( "zcrc32" );
1107 ckSum.Set( reinterpret_cast<void*>( &cksum ), sizeof( uint32_t ) );
1108 char cksBuffer[265];
1109 ckSum.Get( cksBuffer, 256 );
1110 checkSum = "zcrc32:";
1111 checkSum += XrdCl::Utils::NormalizeChecksum( "zcrc32", cksBuffer );
1112 return st;
1113 }
1114
1115 int useMtlnCksum = XrdCl::DefaultZipMtlnCksum;
1117 env->GetInt( "ZipMtlnCksum", useMtlnCksum );
1118 if( useMtlnCksum && pUrl->IsMetalink() )
1119 {
1121 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1122 checkSum = redirector->GetCheckSum( checkSumType );
1123 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1124 }
1125
1126 // if it is a local file we can calculate the checksum ourself
1127 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && cksHelper && !pContinue )
1128 return cksHelper->GetCheckSum( checkSum, checkSumType );
1129
1130 // if it is a remote file other types of checksum are not supported
1132 }
1133
1134 //------------------------------------------------------------------------
1136 //------------------------------------------------------------------------
1137 std::vector<std::string> GetAddCks()
1138 {
1139 std::vector<std::string> ret;
1140 for( auto cksHelper : pAddCksHelpers )
1141 {
1142 std::string type = cksHelper->GetType();
1143 std::string cks;
1144 GetCheckSumImpl( cks, type, cksHelper );
1145 ret.push_back( cks );
1146 }
1147 return ret;
1148 }
1149
1150 //------------------------------------------------------------------------
1152 //------------------------------------------------------------------------
1153 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1154 {
1155 return XrdCl::XRootDStatus();
1156 }
1157
1158 private:
1159
1160 XRootDSourceZip(const XRootDSourceZip &other);
1161 XRootDSourceZip &operator = (const XRootDSourceZip &other);
1162
1163 const std::string pFilename;
1164 XrdCl::ZipArchive *pZipArchive;
1165 };
1166
1167 //----------------------------------------------------------------------------
1169 //----------------------------------------------------------------------------
1170 class XRootDSourceDynamic: public Source
1171 {
1172 public:
1173
1174 //------------------------------------------------------------------------
1176 //------------------------------------------------------------------------
1177 XrdCl::XRootDStatus TryOtherServer()
1178 {
1179 return pFile->TryOtherServer();
1180 }
1181
1182 //------------------------------------------------------------------------
1184 //------------------------------------------------------------------------
1185 XRootDSourceDynamic( const XrdCl::URL *url,
1186 uint32_t chunkSize,
1187 const std::string &ckSumType,
1188 const std::vector<std::string> &addcks ):
1189 Source( ckSumType, addcks ),
1190 pUrl( url ), pFile( new XrdCl::File() ), pCurrentOffset( 0 ),
1191 pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
1192 {
1193 }
1194
1195 //------------------------------------------------------------------------
1197 //------------------------------------------------------------------------
1198 virtual ~XRootDSourceDynamic()
1199 {
1200 XrdCl::XRootDStatus status = pFile->Close();
1201 delete pFile;
1202 }
1203
1204 //------------------------------------------------------------------------
1206 //------------------------------------------------------------------------
1207 virtual XrdCl::XRootDStatus Initialize()
1208 {
1209 using namespace XrdCl;
1210 Log *log = DefaultEnv::GetLog();
1211 log->Debug( UtilityMsg, "Opening %s for reading",
1212 pUrl->GetObfuscatedURL().c_str() );
1213
1214 std::string value;
1215 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1216 pFile->SetProperty( "ReadRecovery", value );
1217
1218 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
1219 if( !st.IsOK() )
1220 return st;
1221
1222 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
1223 {
1224 auto st = pCkSumHelper->Initialize();
1225 if( !st.IsOK() ) return st;
1226 for( auto cksHelper : pAddCksHelpers )
1227 {
1228 st = cksHelper->Initialize();
1229 if( !st.IsOK() ) return st;
1230 }
1231 }
1232
1233 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
1234 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1235 {
1236 std::string datasrv;
1237 pFile->GetProperty( "DataServer", datasrv );
1238 //--------------------------------------------------------------------
1239 // Decide whether we can use PgRead
1240 //--------------------------------------------------------------------
1242 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1243 pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1244 }
1245
1246 return XRootDStatus();
1247 }
1248
1249 //------------------------------------------------------------------------
1251 //------------------------------------------------------------------------
1252 virtual int64_t GetSize()
1253 {
1254 return -1;
1255 }
1256
1257 //------------------------------------------------------------------------
1259 //------------------------------------------------------------------------
1260 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1261 {
1262 pCurrentOffset = offset;
1263 pContinue = true;
1264 return XrdCl::XRootDStatus();
1265 }
1266
1267 //------------------------------------------------------------------------
1275 //------------------------------------------------------------------------
1276 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1277 {
1278 //----------------------------------------------------------------------
1279 // Sanity check
1280 //----------------------------------------------------------------------
1281 using namespace XrdCl;
1282
1283 if( pDone )
1284 return XRootDStatus( stOK, suDone );
1285
1286 //----------------------------------------------------------------------
1287 // Fill the queue
1288 //----------------------------------------------------------------------
1289 char *buffer = new char[pChunkSize];
1290 uint32_t bytesRead = 0;
1291
1292 std::vector<uint32_t> cksums;
1293 XRootDStatus st = pUsePgRead
1294 ? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
1295 : pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );
1296
1297 if( !st.IsOK() )
1298 {
1299 delete [] buffer;
1300 return st;
1301 }
1302
1303 if( !bytesRead )
1304 {
1305 delete [] buffer;
1306 return XRootDStatus( stOK, suDone );
1307 }
1308
1309 if( bytesRead < pChunkSize )
1310 pDone = true;
1311
1312 // if it is a local file update the checksum
1313 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
1314 {
1315 if( pCkSumHelper )
1316 pCkSumHelper->Update( buffer, bytesRead );
1317
1318 for( auto cksHelper : pAddCksHelpers )
1319 cksHelper->Update( buffer, bytesRead );
1320 }
1321
1322 ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
1323 pCurrentOffset += bytesRead;
1324
1325 return XRootDStatus( stOK, suContinue );
1326 }
1327
1328 //------------------------------------------------------------------------
1329 // Get check sum
1330 //------------------------------------------------------------------------
1331 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1332 std::string &checkSumType )
1333 {
1334 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
1335 }
1336
1337 XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
1338 std::string &checkSum,
1339 std::string &checkSumType )
1340 {
1341 if( pUrl->IsMetalink() )
1342 {
1344 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1345 checkSum = redirector->GetCheckSum( checkSumType );
1346 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1347 }
1348
1349 if( pUrl->IsLocalFile() )
1350 {
1351 if( pContinue)
1352 // in case of --continue option we have to calculate the checksum from scratch
1353 return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
1354
1355 if( cksHelper )
1356 return cksHelper->GetCheckSum( checkSum, checkSumType );
1357
1359 }
1360
1361 std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
1362 std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1363 return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
1364 }
1365
1366 //------------------------------------------------------------------------
1368 //------------------------------------------------------------------------
1369 std::vector<std::string> GetAddCks()
1370 {
1371 std::vector<std::string> ret;
1372 for( auto cksHelper : pAddCksHelpers )
1373 {
1374 std::string type = cksHelper->GetType();
1375 std::string cks;
1376 GetCheckSumImpl( cksHelper, cks, type );
1377 ret.push_back( cks );
1378 }
1379 return ret;
1380 }
1381
1382 //------------------------------------------------------------------------
1384 //------------------------------------------------------------------------
1385 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1386 {
1387 return ::GetXAttr( *pFile, xattrs );
1388 }
1389
1390 private:
1391 XRootDSourceDynamic(const XRootDSourceDynamic &other);
1392 XRootDSourceDynamic &operator = (const XRootDSourceDynamic &other);
1393 const XrdCl::URL *pUrl;
1394 XrdCl::File *pFile;
1395 int64_t pCurrentOffset;
1396 uint32_t pChunkSize;
1397 bool pDone;
1398 bool pUsePgRead;
1399 };
1400
1401 //----------------------------------------------------------------------------
1403 //----------------------------------------------------------------------------
1404 class XRootDSourceXCp: public Source
1405 {
1406 public:
1407 //------------------------------------------------------------------------
1409 //------------------------------------------------------------------------
1410 XRootDSourceXCp( const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
1411 pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
1412 {
1413 }
1414
1415 ~XRootDSourceXCp()
1416 {
1417 if( pXCpCtx )
1418 pXCpCtx->Delete();
1419 }
1420
1421 //------------------------------------------------------------------------
1423 //------------------------------------------------------------------------
1424 virtual XrdCl::XRootDStatus Initialize()
1425 {
1427 int64_t fileSize = -1;
1428
1429 if( pUrl->IsMetalink() )
1430 {
1432 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1433 fileSize = redirector->GetSize();
1434 pReplicas = redirector->GetReplicas();
1435 }
1436 else
1437 {
1438 XrdCl::LocationInfo *li = 0;
1439 XrdCl::FileSystem fs( *pUrl );
1440 XrdCl::XRootDStatus st = fs.DeepLocate( pUrl->GetPath(), XrdCl::OpenFlags::Compress | XrdCl::OpenFlags::PrefName, li );
1441 if( !st.IsOK() ) return st;
1442
1444 for( itr = li->Begin(); itr != li->End(); ++itr)
1445 {
1446 std::string url = "root://" + itr->GetAddress() + "/" + pUrl->GetPath();
1447 pReplicas.push_back( url );
1448 }
1449
1450 delete li;
1451 }
1452
1453 std::stringstream ss;
1454 ss << "XCp sources: ";
1455
1456 std::vector<std::string>::iterator itr;
1457 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1458 {
1459 ss << *itr << ", ";
1460 }
1461 log->Debug( XrdCl::UtilityMsg, ss.str().c_str() );
1462
1463 pXCpCtx = new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );
1464
1465 return pXCpCtx->Initialize();
1466 }
1467
1468 //------------------------------------------------------------------------
1470 //------------------------------------------------------------------------
1471 virtual int64_t GetSize()
1472 {
1473 return pXCpCtx->GetSize();
1474 }
1475
1476 //------------------------------------------------------------------------
1478 //------------------------------------------------------------------------
1479 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1480 {
1482 }
1483
1484 //------------------------------------------------------------------------
1492 //------------------------------------------------------------------------
1493 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1494 {
1496 do
1497 {
1498 st = pXCpCtx->GetChunk( ci );
1499 }
1500 while( st.IsOK() && st.code == XrdCl::suRetry );
1501 return st;
1502 }
1503
1504 //------------------------------------------------------------------------
1505 // Get check sum
1506 //------------------------------------------------------------------------
1507 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1508 std::string &checkSumType )
1509 {
1510 if( pUrl->IsMetalink() )
1511 {
1513 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1514 checkSum = redirector->GetCheckSum( checkSumType );
1515 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1516 }
1517
1518 std::vector<std::string>::iterator itr;
1519 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1520 {
1521 XrdCl::URL url( *itr );
1523 checkSumType, url );
1524 if( st.IsOK() ) return st;
1525 }
1526
1528 }
1529
1530 //------------------------------------------------------------------------
1532 //------------------------------------------------------------------------
1533 std::vector<std::string> GetAddCks()
1534 {
1535 return std::vector<std::string>();
1536 }
1537
1538 //------------------------------------------------------------------------
1540 //------------------------------------------------------------------------
1541 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1542 {
1544 std::vector<std::string>::iterator itr = pReplicas.begin();
1545 for( ; itr < pReplicas.end() ; ++itr )
1546 {
1547 st = ::GetXAttr( *itr, xattrs );
1548 if( st.IsOK() ) return st;
1549 }
1550 return st;
1551 }
1552
1553 private:
1554
1555
1556 XrdCl::XCpCtx *pXCpCtx;
1557 const XrdCl::URL *pUrl;
1558 std::vector<std::string> pReplicas;
1559 uint32_t pChunkSize;
1560 uint16_t pParallelChunks;
1561 int32_t pNbSrc;
1562 uint64_t pBlockSize;
1563 };
1564
1565 //----------------------------------------------------------------------------
1567 //----------------------------------------------------------------------------
1568 class StdOutDestination: public Destination
1569 {
1570 public:
1571 //------------------------------------------------------------------------
1573 //------------------------------------------------------------------------
1574 StdOutDestination( const std::string &ckSumType ):
1575 Destination( ckSumType ), pCurrentOffset(0)
1576 {
1577 }
1578
1579 //------------------------------------------------------------------------
1581 //------------------------------------------------------------------------
1582 virtual ~StdOutDestination()
1583 {
1584 }
1585
1586 //------------------------------------------------------------------------
1588 //------------------------------------------------------------------------
1589 virtual XrdCl::XRootDStatus Initialize()
1590 {
1591 if( pContinue )
1593 ENOTSUP, "Cannot continue to stdout." );
1594
1595 if( pCkSumHelper )
1596 return pCkSumHelper->Initialize();
1597 return XrdCl::XRootDStatus();
1598 }
1599
1600 //------------------------------------------------------------------------
1602 //------------------------------------------------------------------------
1603 virtual XrdCl::XRootDStatus Finalize()
1604 {
1605 return XrdCl::XRootDStatus();
1606 }
1607
1608 //------------------------------------------------------------------------
1613 //------------------------------------------------------------------------
1614 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1615 {
1616 using namespace XrdCl;
1617 Log *log = DefaultEnv::GetLog();
1618
1619 if( pCurrentOffset != ci.GetOffset() )
1620 {
1621 log->Error( UtilityMsg, "Got out-of-bounds chunk, expected offset:"
1622 " %ld, got %ld", pCurrentOffset, ci.GetOffset() );
1623 return XRootDStatus( stError, errInternal );
1624 }
1625
1626 int64_t wr = 0;
1627 uint32_t length = ci.GetLength();
1628 char *cursor = (char*)ci.GetBuffer();
1629 do
1630 {
1631 wr = write( 1, cursor, length );
1632 if( wr == -1 )
1633 {
1634 log->Debug( UtilityMsg, "Unable to write to stdout: %s",
1635 XrdSysE2T( errno ) );
1636 delete [] (char*)ci.GetBuffer();
1637 return XRootDStatus( stError, errOSError, errno );
1638 }
1639 pCurrentOffset += wr;
1640 cursor += wr;
1641 length -= wr;
1642 }
1643 while( length );
1644
1645 if( pCkSumHelper )
1646 pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1647 delete [] (char*)ci.GetBuffer();
1648 return XRootDStatus();
1649 }
1650
1651 //------------------------------------------------------------------------
1653 //------------------------------------------------------------------------
1654 virtual XrdCl::XRootDStatus Flush()
1655 {
1656 return XrdCl::XRootDStatus();
1657 }
1658
1659 //------------------------------------------------------------------------
1661 //------------------------------------------------------------------------
1662 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1663 std::string &checkSumType )
1664 {
1665 if( pCkSumHelper )
1666 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1668 }
1669
1670 //------------------------------------------------------------------------
1672 //------------------------------------------------------------------------
1673 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1674 {
1675 return XrdCl::XRootDStatus();
1676 }
1677
1678 //------------------------------------------------------------------------
1680 //------------------------------------------------------------------------
1681 virtual int64_t GetSize()
1682 {
1683 return -1;
1684 }
1685
1686 private:
1687 StdOutDestination(const StdOutDestination &other);
1688 StdOutDestination &operator = (const StdOutDestination &other);
1689 uint64_t pCurrentOffset;
1690 };
1691
1692 //----------------------------------------------------------------------------
1694 //----------------------------------------------------------------------------
1695 class XRootDDestination: public Destination
1696 {
1697 public:
1698 //------------------------------------------------------------------------
1700 //------------------------------------------------------------------------
1701 XRootDDestination( const XrdCl::URL &url, uint8_t parallelChunks,
1702 const std::string &ckSumType, const XrdCl::ClassicCopyJob &cpjob ):
1703 Destination( ckSumType ),
1704 pUrl( url ), pFile( new XrdCl::File( XrdCl::File::DisableVirtRedirect ) ),
1705 pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false ), cpjob( cpjob )
1706 {
1707 }
1708
1709 //------------------------------------------------------------------------
1711 //------------------------------------------------------------------------
1712 virtual ~XRootDDestination()
1713 {
1714 CleanUpChunks();
1715 delete pFile;
1716
1718
1719 //----------------------------------------------------------------------
1720 // Make sure we clean up the cp-target symlink
1721 //----------------------------------------------------------------------
1722 std::string cptarget = XrdCl::DefaultCpTarget;
1723 XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1724 if( !cptarget.empty() )
1725 {
1726 XrdCl::FileSystem fs( "file://localhost" );
1727 XrdCl::XRootDStatus st = fs.Rm( cptarget );
1728 if( !st.IsOK() )
1729 log->Warning( XrdCl::UtilityMsg, "Could not delete cp-target symlink: %s",
1730 st.ToString().c_str() );
1731 }
1732
1733 //----------------------------------------------------------------------
1734 // If the copy failed and user requested posc and we are dealing with
1735 // a local destination, remove the file
1736 //----------------------------------------------------------------------
1737 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
1738 {
1739 XrdCl::FileSystem fs( pUrl );
1740 XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
1741 if( !st.IsOK() )
1742 log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
1743 " on failure: %s", st.ToString().c_str() );
1744 }
1745 }
1746
1747 //------------------------------------------------------------------------
1749 //------------------------------------------------------------------------
1750 virtual XrdCl::XRootDStatus Initialize()
1751 {
1752 using namespace XrdCl;
1753 Log *log = DefaultEnv::GetLog();
1754 log->Debug( UtilityMsg, "Opening %s for writing",
1755 pUrl.GetObfuscatedURL().c_str() );
1756
1757 std::string value;
1758 DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
1759 pFile->SetProperty( "WriteRecovery", value );
1760
1761 OpenFlags::Flags flags = OpenFlags::Update;
1762 if( pForce )
1763 flags |= OpenFlags::Delete;
1764 else if( !pContinue )
1765 flags |= OpenFlags::New;
1766
1767 if( pPosc )
1768 flags |= OpenFlags::POSC;
1769
1770 if( pCoerce )
1771 flags |= OpenFlags::Force;
1772
1773 if( pMakeDir)
1774 flags |= OpenFlags::MakePath;
1775
1776 Access::Mode mode = Access::UR|Access::UW|Access::GR|Access::OR;
1777
1778 XrdCl::XRootDStatus st = pFile->Open( pUrl.GetURL(), flags, mode );
1779 if( !st.IsOK() )
1780 return st;
1781
1782 if( ( !pUrl.IsLocalFile() && !pFile->IsSecure() ) ||
1783 ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
1784 {
1785 std::string datasrv;
1786 pFile->GetProperty( "DataServer", datasrv );
1787 //--------------------------------------------------------------------
1788 // Decide whether we can use PgRead
1789 //--------------------------------------------------------------------
1791 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1792 pUsePgWrt = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1793 }
1794
1795 std::string cptarget = XrdCl::DefaultCpTarget;
1796 XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1797 if( !cptarget.empty() )
1798 {
1799 std::string targeturl;
1800 pFile->GetProperty( "LastURL", targeturl );
1801 targeturl = URL( targeturl ).GetLocation();
1802 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
1803 log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
1804 XrdSysE2T( errno ) );
1805 else
1806 log->Info( UtilityMsg, "Created cp-target symlink: %s -> %s",
1807 cptarget.c_str(), targeturl.c_str() );
1808 }
1809
1810 StatInfo *info = 0;
1811 st = pFile->Stat( false, info );
1812 if( !st.IsOK() )
1813 return st;
1814 pSize = info->GetSize();
1815 delete info;
1816
1817 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1818 return pCkSumHelper->Initialize();
1819
1820 return XRootDStatus();
1821 }
1822
1823 //------------------------------------------------------------------------
1825 //------------------------------------------------------------------------
1826 virtual XrdCl::XRootDStatus Finalize()
1827 {
1828 return pFile->Close();
1829 }
1830
1831 //------------------------------------------------------------------------
1836 //------------------------------------------------------------------------
1837 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1838 {
1839 using namespace XrdCl;
1840 if( !pFile->IsOpen() )
1841 {
1842 delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1843 return XRootDStatus( stError, errUninitialized );
1844 }
1845
1846 //----------------------------------------------------------------------
1847 // If there is still place for this chunk to be sent send it
1848 //----------------------------------------------------------------------
1849 if( pChunks.size() < pParallel )
1850 return QueueChunk( std::move( ci ) );
1851
1852 //----------------------------------------------------------------------
1853 // We wait for a chunk to be sent so that we have space for the current
1854 // one
1855 //----------------------------------------------------------------------
1856 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
1857 pChunks.pop();
1858 ch->sem->Wait();
1859 delete [] (char*)ch->chunk.GetBuffer();
1860 if( !ch->status.IsOK() )
1861 {
1862 Log *log = DefaultEnv::GetLog();
1863 log->Debug( UtilityMsg, "Unable write %d bytes at %ld from %s: %s",
1864 ch->chunk.GetLength(), ch->chunk.GetOffset(),
1865 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
1866 delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1867 CleanUpChunks();
1868
1869 //--------------------------------------------------------------------
1870 // Check if we should re-try the transfer from scratch at a different
1871 // data server
1872 //--------------------------------------------------------------------
1873 return CheckIfRetriable( ch->status );
1874 }
1875
1876 return QueueChunk( std::move( ci ) );
1877 }
1878
1879 //------------------------------------------------------------------------
1881 //------------------------------------------------------------------------
1882 virtual int64_t GetSize()
1883 {
1884 return pSize;
1885 }
1886
1887 //------------------------------------------------------------------------
1889 //------------------------------------------------------------------------
1890 void CleanUpChunks()
1891 {
1892 while( !pChunks.empty() )
1893 {
1894 ChunkHandler *ch = pChunks.front();
1895 pChunks.pop();
1896 ch->sem->Wait();
1897 delete [] (char *)ch->chunk.GetBuffer();
1898 delete ch;
1899 }
1900 }
1901
1902 //------------------------------------------------------------------------
1904 //------------------------------------------------------------------------
1905 XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
1906 {
1907 // we are writing chunks in order so we can calc the checksum
1908 // in case of local files
1909 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1910 pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1911
1912 ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
1914 st = pUsePgWrt
1915 ? pFile->PgWrite(ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch->chunk.GetCksums(), ch)
1916 : pFile->Write( ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
1917 if( !st.IsOK() )
1918 {
1919 CleanUpChunks();
1920 delete [] (char*)ch->chunk.GetBuffer();
1921 delete ch;
1922 return st;
1923 }
1924 pChunks.push( ch );
1925 return XrdCl::XRootDStatus();
1926 }
1927
1928 //------------------------------------------------------------------------
1930 //------------------------------------------------------------------------
1931 virtual XrdCl::XRootDStatus Flush()
1932 {
1934 while( !pChunks.empty() )
1935 {
1936 ChunkHandler *ch = pChunks.front();
1937 pChunks.pop();
1938 ch->sem->Wait();
1939 if( !ch->status.IsOK() )
1940 {
1941 //--------------------------------------------------------------------
1942 // Check if we should re-try the transfer from scratch at a different
1943 // data server
1944 //--------------------------------------------------------------------
1945 st = CheckIfRetriable( ch->status );
1946 }
1947 delete [] (char *)ch->chunk.GetBuffer();
1948 delete ch;
1949 }
1950 return st;
1951 }
1952
1953 //------------------------------------------------------------------------
1955 //------------------------------------------------------------------------
1956 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1957 std::string &checkSumType )
1958 {
1959 if( pUrl.IsLocalFile() )
1960 {
1961 if( pContinue )
1962 // in case of --continue option we have to calculate the checksum from scratch
1963 return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl.GetPath() );
1964
1965 if( pCkSumHelper )
1966 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1967
1969 }
1970
1971 std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1972 return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType,
1973 XrdCl::URL( lastUrl ) );
1974 }
1975
1976 //------------------------------------------------------------------------
1978 //------------------------------------------------------------------------
1979 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1980 {
1981 return ::SetXAttr( *pFile, xattrs );
1982 }
1983
1984 //------------------------------------------------------------------------
1986 //------------------------------------------------------------------------
1987 const std::string& GetLastURL() const
1988 {
1989 return pLastURL;
1990 }
1991
1992 //------------------------------------------------------------------------
1994 //------------------------------------------------------------------------
1995 const std::string& GetWrtRecoveryRedir() const
1996 {
1997 return pWrtRecoveryRedir;
1998 }
1999
2000 private:
2001 XRootDDestination(const XRootDDestination &other);
2002 XRootDDestination &operator = (const XRootDDestination &other);
2003
2004 //------------------------------------------------------------------------
2005 // Asynchronous chunk handler
2006 //------------------------------------------------------------------------
2008 {
2009 public:
2011 sem( new XrdSysSemaphore(0) ),
2012 chunk(std::move( ci ) ) {}
2013 virtual ~ChunkHandler() { delete sem; }
2014 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2015 XrdCl::AnyObject */*response*/ )
2016 {
2017 this->status = *statusval;
2018 delete statusval;
2019 sem->Post();
2020 }
2021
2022 XrdSysSemaphore *sem;
2023 XrdCl::PageInfo chunk;
2024 XrdCl::XRootDStatus status;
2025 };
2026
2027 inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2028 {
2029 if( status.IsOK() ) return status;
2030
2031 //--------------------------------------------------------------------
2032 // Check if we should re-try the transfer from scratch at a different
2033 // data server
2034 //--------------------------------------------------------------------
2035 std::string value;
2036 if( pFile->GetProperty( "WrtRecoveryRedir", value ) )
2037 {
2038 pWrtRecoveryRedir = value;
2039 if( pFile->GetProperty( "LastURL", value ) ) pLastURL = value;
2041 }
2042
2043 return status;
2044 }
2045
2046 const XrdCl::URL pUrl;
2047 XrdCl::File *pFile;
2048 uint8_t pParallel;
2049 std::queue<ChunkHandler *> pChunks;
2050 int64_t pSize;
2051
2052 std::string pWrtRecoveryRedir;
2053 std::string pLastURL;
2054 bool pUsePgWrt;
2055 const XrdCl::ClassicCopyJob &cpjob;
2056 };
2057
2058 //----------------------------------------------------------------------------
2060 //----------------------------------------------------------------------------
2061 class XRootDZipDestination: public Destination
2062 {
2063 public:
2064 //------------------------------------------------------------------------
2066 //------------------------------------------------------------------------
2067 XRootDZipDestination( const XrdCl::URL &url, const std::string &fn,
2068 int64_t size, uint8_t parallelChunks, XrdCl::ClassicCopyJob &cpjob ):
2069 Destination( "zcrc32" ),
2070 pUrl( url ), pFilename( fn ), pZip( new XrdCl::ZipArchive() ),
2071 pParallel( parallelChunks ), pSize( size ), cpjob( cpjob )
2072 {
2073 }
2074
2075 //------------------------------------------------------------------------
2077 //------------------------------------------------------------------------
2078 virtual ~XRootDZipDestination()
2079 {
2080 CleanUpChunks();
2081 delete pZip;
2082
2083 //----------------------------------------------------------------------
2084 // If the copy failed and user requested posc and we are dealing with
2085 // a local destination, remove the file
2086 //----------------------------------------------------------------------
2087 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
2088 {
2089 XrdCl::FileSystem fs( pUrl );
2090 XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
2091 if( !st.IsOK() )
2092 {
2094 log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
2095 " on failure: %s", st.ToString().c_str() );
2096 }
2097 }
2098 }
2099
2100 //------------------------------------------------------------------------
2102 //------------------------------------------------------------------------
2103 virtual XrdCl::XRootDStatus Initialize()
2104 {
2105 using namespace XrdCl;
2106 Log *log = DefaultEnv::GetLog();
2107 log->Debug( UtilityMsg, "Opening %s for writing",
2108 pUrl.GetObfuscatedURL().c_str() );
2109
2110 std::string value;
2111 DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
2112 pZip->SetProperty( "WriteRecovery", value );
2113
2114 OpenFlags::Flags flags = OpenFlags::Update;
2115
2116 FileSystem fs( pUrl );
2117 StatInfo *info = nullptr;
2118 auto st = fs.Stat( pUrl.GetPath(), info );
2119 if( !st.IsOK() && st.code == errErrorResponse && st.errNo == kXR_NotFound )
2120 flags |= OpenFlags::New;
2121
2122 if( pPosc )
2123 flags |= OpenFlags::POSC;
2124
2125 if( pCoerce )
2126 flags |= OpenFlags::Force;
2127
2128 if( pMakeDir)
2129 flags |= OpenFlags::MakePath;
2130
2131 st = XrdCl::WaitFor( XrdCl::OpenArchive( pZip, pUrl.GetURL(), flags ) );
2132 if( !st.IsOK() )
2133 return st;
2134
2135 std::string cptarget = XrdCl::DefaultCpTarget;
2136 XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
2137 if( !cptarget.empty() )
2138 {
2139 std::string targeturl;
2140 pZip->GetProperty( "LastURL", targeturl );
2141 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
2142 log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
2143 XrdSysE2T( errno ) );
2144 }
2145
2146 st = pZip->OpenFile( pFilename, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write, pSize );
2147 if( !st.IsOK() )
2148 return st;
2149
2150 return pCkSumHelper->Initialize();
2151 }
2152
2153 //------------------------------------------------------------------------
2155 //------------------------------------------------------------------------
2156 virtual XrdCl::XRootDStatus Finalize()
2157 {
2158 uint32_t crc32 = 0;
2159 auto st = pCkSumHelper->GetRawCheckSum( "zcrc32", crc32 );
2160 if( !st.IsOK() ) return st;
2161 pZip->UpdateMetadata( crc32 );
2162 pZip->CloseFile();
2163 return XrdCl::WaitFor( XrdCl::CloseArchive( pZip ) );
2164 }
2165
2166 //------------------------------------------------------------------------
2171 //------------------------------------------------------------------------
2172 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
2173 {
2174 using namespace XrdCl;
2175
2176 //----------------------------------------------------------------------
2177 // If there is still place for this chunk to be sent send it
2178 //----------------------------------------------------------------------
2179 if( pChunks.size() < pParallel )
2180 return QueueChunk( std::move( ci ) );
2181
2182 //----------------------------------------------------------------------
2183 // We wait for a chunk to be sent so that we have space for the current
2184 // one
2185 //----------------------------------------------------------------------
2186 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
2187 pChunks.pop();
2188 ch->sem->Wait();
2189 delete [] (char*)ch->chunk.GetBuffer();
2190 if( !ch->status.IsOK() )
2191 {
2192 Log *log = DefaultEnv::GetLog();
2193 log->Debug( UtilityMsg, "Unable write %d bytes at %ld from %s: %s",
2194 ch->chunk.GetLength(), ch->chunk.GetOffset(),
2195 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
2196 CleanUpChunks();
2197
2198 //--------------------------------------------------------------------
2199 // Check if we should re-try the transfer from scratch at a different
2200 // data server
2201 //--------------------------------------------------------------------
2202 return CheckIfRetriable( ch->status );
2203 }
2204
2205 return QueueChunk( std::move( ci ) );
2206 }
2207
2208 //------------------------------------------------------------------------
2210 //------------------------------------------------------------------------
2211 virtual int64_t GetSize()
2212 {
2213 return -1;
2214 }
2215
2216 //------------------------------------------------------------------------
2218 //------------------------------------------------------------------------
2219 void CleanUpChunks()
2220 {
2221 while( !pChunks.empty() )
2222 {
2223 ChunkHandler *ch = pChunks.front();
2224 pChunks.pop();
2225 ch->sem->Wait();
2226 delete [] (char *)ch->chunk.GetBuffer();
2227 delete ch;
2228 }
2229 }
2230
2231 //------------------------------------------------------------------------
2232 // Queue a chunk
2233 //------------------------------------------------------------------------
2234 XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
2235 {
2236 // we are writing chunks in order so we can calc the checksum
2237 // in case of local files
2238 if( pCkSumHelper ) pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
2239
2240 ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
2242
2243 //----------------------------------------------------------------------
2244 // TODO
2245 // In order to use PgWrite with ZIP append we need first to implement
2246 // PgWriteV!!!
2247 //----------------------------------------------------------------------
2248 st = pZip->Write( ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
2249 if( !st.IsOK() )
2250 {
2251 CleanUpChunks();
2252 delete [] (char*)ch->chunk.GetBuffer();
2253 delete ch;
2254 return st;
2255 }
2256 pChunks.push( ch );
2257 return XrdCl::XRootDStatus();
2258 }
2259
2260 //------------------------------------------------------------------------
2262 //------------------------------------------------------------------------
2263 virtual XrdCl::XRootDStatus Flush()
2264 {
2266 while( !pChunks.empty() )
2267 {
2268 ChunkHandler *ch = pChunks.front();
2269 pChunks.pop();
2270 ch->sem->Wait();
2271 if( !ch->status.IsOK() )
2272 {
2273 //--------------------------------------------------------------------
2274 // Check if we should re-try the transfer from scratch at a different
2275 // data server
2276 //--------------------------------------------------------------------
2277 st = CheckIfRetriable( ch->status );
2278 }
2279 delete [] (char *)ch->chunk.GetBuffer();
2280 delete ch;
2281 }
2282 return st;
2283 }
2284
2285 //------------------------------------------------------------------------
2287 //------------------------------------------------------------------------
2288 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
2289 std::string &checkSumType )
2290 {
2292 }
2293
2294 //------------------------------------------------------------------------
2296 //------------------------------------------------------------------------
2297 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
2298 {
2300 }
2301
2302 //------------------------------------------------------------------------
2304 //------------------------------------------------------------------------
2305 const std::string& GetLastURL() const
2306 {
2307 return pLastURL;
2308 }
2309
2310 //------------------------------------------------------------------------
2312 //------------------------------------------------------------------------
2313 const std::string& GetWrtRecoveryRedir() const
2314 {
2315 return pWrtRecoveryRedir;
2316 }
2317
2318 private:
2319 XRootDZipDestination(const XRootDDestination &other);
2320 XRootDZipDestination &operator = (const XRootDDestination &other);
2321
2322 //------------------------------------------------------------------------
2323 // Asynchronous chunk handler
2324 //------------------------------------------------------------------------
2326 {
2327 public:
2329 sem( new XrdSysSemaphore(0) ),
2330 chunk( std::move( ci ) ) {}
2331 virtual ~ChunkHandler() { delete sem; }
2332 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2333 XrdCl::AnyObject */*response*/ )
2334 {
2335 this->status = *statusval;
2336 delete statusval;
2337 sem->Post();
2338 }
2339
2340 XrdSysSemaphore *sem;
2341 XrdCl::PageInfo chunk;
2342 XrdCl::XRootDStatus status;
2343 };
2344
2345 inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2346 {
2347 if( status.IsOK() ) return status;
2348
2349 //--------------------------------------------------------------------
2350 // Check if we should re-try the transfer from scratch at a different
2351 // data server
2352 //--------------------------------------------------------------------
2353 std::string value;
2354 if( pZip->GetProperty( "WrtRecoveryRedir", value ) )
2355 {
2356 pWrtRecoveryRedir = value;
2357 if( pZip->GetProperty( "LastURL", value ) ) pLastURL = value;
2359 }
2360
2361 return status;
2362 }
2363
2364 const XrdCl::URL pUrl;
2365 std::string pFilename;
2366 XrdCl::ZipArchive *pZip;
2367 uint8_t pParallel;
2368 std::queue<ChunkHandler *> pChunks;
2369 int64_t pSize;
2370
2371 std::string pWrtRecoveryRedir;
2372 std::string pLastURL;
2373 XrdCl::ClassicCopyJob &cpjob;
2374 };
2375}
2376
2377//------------------------------------------------------------------------------
2378// Get current time in nanoseconds
2379//------------------------------------------------------------------------------
2380inline std::chrono::nanoseconds time_nsec()
2381{
2382 using namespace std::chrono;
2383 auto since_epoch = high_resolution_clock::now().time_since_epoch();
2384 return duration_cast<nanoseconds>( since_epoch );
2385}
2386
2387//------------------------------------------------------------------------------
2388// Convert seconds to nanoseconds
2389//------------------------------------------------------------------------------
2390inline long long to_nsec( long long sec )
2391{
2392 return sec * 1000000000;
2393}
2394
2395//------------------------------------------------------------------------------
2396// Sleep for # nanoseconds
2397//------------------------------------------------------------------------------
2398inline void sleep_nsec( long long nsec )
2399{
2400#if __cplusplus >= 201103L
2401 using namespace std::chrono;
2402 std::this_thread::sleep_for( nanoseconds( nsec ) );
2403#else
2404 timespec req;
2405 req.tv_sec = nsec / to_nsec( 1 );
2406 req.tv_nsec = nsec % to_nsec( 1 );
2407 nanosleep( &req, 0 );
2408#endif
2409}
2410
2411namespace XrdCl
2412{
2413 //----------------------------------------------------------------------------
2414 // Constructor
2415 //----------------------------------------------------------------------------
2417 PropertyList *jobProperties,
2418 PropertyList *jobResults ):
2419 CopyJob( jobId, jobProperties, jobResults )
2420 {
2421 Log *log = DefaultEnv::GetLog();
2422 log->Debug( UtilityMsg, "Creating a classic copy job, from %s to %s",
2423 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
2424 }
2425
2426 //----------------------------------------------------------------------------
2427 // Run the copy job
2428 //----------------------------------------------------------------------------
2430 {
2431 Log *log = DefaultEnv::GetLog();
2432
2433 std::string checkSumMode;
2434 std::string checkSumType;
2435 std::string checkSumPreset;
2436 std::string zipSource;
2437 uint16_t parallelChunks;
2438 uint32_t chunkSize;
2439 uint64_t blockSize;
2440 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2441 rmOnBadCksum, continue_, zipappend, doserver;
2442 int32_t nbXcpSources;
2443 long long xRate;
2444 long long xRateThreshold;
2445 uint16_t cpTimeout;
2446 std::vector<std::string> addcksums;
2447
2448 pProperties->Get( "checkSumMode", checkSumMode );
2449 pProperties->Get( "checkSumType", checkSumType );
2450 pProperties->Get( "checkSumPreset", checkSumPreset );
2451 pProperties->Get( "parallelChunks", parallelChunks );
2452 pProperties->Get( "chunkSize", chunkSize );
2453 pProperties->Get( "posc", posc );
2454 pProperties->Get( "force", force );
2455 pProperties->Get( "coerce", coerce );
2456 pProperties->Get( "makeDir", makeDir );
2457 pProperties->Get( "dynamicSource", dynamicSource );
2458 pProperties->Get( "zipArchive", zip );
2459 pProperties->Get( "xcp", xcp );
2460 pProperties->Get( "xcpBlockSize", blockSize );
2461 pProperties->Get( "preserveXAttr", preserveXAttr );
2462 pProperties->Get( "xrate", xRate );
2463 pProperties->Get( "xrateThreshold", xRateThreshold );
2464 pProperties->Get( "rmOnBadCksum", rmOnBadCksum );
2465 pProperties->Get( "continue", continue_ );
2466 pProperties->Get( "cpTimeout", cpTimeout );
2467 pProperties->Get( "zipAppend", zipappend );
2468 pProperties->Get( "addcksums", addcksums );
2469 pProperties->Get( "doServer", doserver );
2470
2471 if( zip )
2472 pProperties->Get( "zipSource", zipSource );
2473
2474 if( xcp )
2475 pProperties->Get( "nbXcpSources", nbXcpSources );
2476
2477 if( force && continue_ )
2478 return SetResult( stError, errInvalidArgs, EINVAL,
2479 "Invalid argument combination: continue + force." );
2480
2481 if( zipappend && ( continue_ || force ) )
2482 return SetResult( stError, errInvalidArgs, EINVAL,
2483 "Invalid argument combination: ( continue | force ) + zip-append." );
2484
2485 //--------------------------------------------------------------------------
2486 // Start the cp t/o timer if necessary
2487 //--------------------------------------------------------------------------
2488 std::unique_ptr<timer_sec_t> cptimer;
2489 if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2490
2491 //--------------------------------------------------------------------------
2492 // Remove on bad checksum implies that POSC semantics has to be enabled
2493 //--------------------------------------------------------------------------
2494 if( rmOnBadCksum ) posc = true;
2495
2496 //--------------------------------------------------------------------------
2497 // Resolve the 'auto' checksum type.
2498 //--------------------------------------------------------------------------
2499 if( checkSumType == "auto" )
2500 {
2501 checkSumType = Utils::InferChecksumType( GetSource(), GetTarget(), zip );
2502 if( checkSumType.empty() )
2503 return SetResult( stError, errCheckSumError, ENOTSUP, "Could not infer checksum type." );
2504 else
2505 log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
2506 }
2507
2508 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2509 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2510
2511 //--------------------------------------------------------------------------
2512 // Initialize the source and the destination
2513 //--------------------------------------------------------------------------
2514 std::unique_ptr<Source> src;
2515 if( xcp )
2516 src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2517 else if( zip ) // TODO make zip work for xcp
2518 src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks,
2519 checkSumType, addcksums , doserver) );
2520 else if( GetSource().GetProtocol() == "stdio" )
2521 src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2522 else
2523 {
2524 if( dynamicSource )
2525 src.reset( new XRootDSourceDynamic( &GetSource(), chunkSize, checkSumType, addcksums ) );
2526 else
2527 src.reset( new XRootDSource( &GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2528 }
2529
2530 XRootDStatus st = src->Initialize();
2531 if( !st.IsOK() ) return SourceError( st );
2532 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2533
2534 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2535 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2536
2537 std::unique_ptr<Destination> dest;
2538 URL newDestUrl( GetTarget() );
2539
2540 if( GetTarget().GetProtocol() == "stdio" )
2541 dest.reset( new StdOutDestination( checkSumType ) );
2542 else if( zipappend )
2543 {
2544 std::string fn = GetSource().GetPath();
2545 size_t pos = fn.rfind( '/' );
2546 if( pos != std::string::npos )
2547 fn = fn.substr( pos + 1 );
2548 int64_t size = src->GetSize();
2549 dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2550 }
2551 //--------------------------------------------------------------------------
2552 // For xrootd destination build the oss.asize hint
2553 //--------------------------------------------------------------------------
2554 else
2555 {
2556 if( src->GetSize() >= 0 )
2557 {
2558 URL::ParamsMap params = newDestUrl.GetParams();
2559 std::ostringstream o; o << src->GetSize();
2560 params["oss.asize"] = o.str();
2561 newDestUrl.SetParams( params );
2562 // makeDir = true; // Backward compatibility for xroot destinations!!!
2563 }
2564 dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2565 }
2566
2567 dest->SetForce( force );
2568 dest->SetPOSC( posc );
2569 dest->SetCoerce( coerce );
2570 dest->SetMakeDir( makeDir );
2571 dest->SetContinue( continue_ );
2572 st = dest->Initialize();
2573 if( !st.IsOK() ) return DestinationError( st );
2574
2575 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2576 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2577
2578 //--------------------------------------------------------------------------
2579 // Copy the chunks
2580 //--------------------------------------------------------------------------
2581 if( continue_ )
2582 {
2583 size -= dest->GetSize();
2584 XrdCl::XRootDStatus st = src->StartAt( dest->GetSize() );
2585 if( !st.IsOK() ) return SetResult( st );
2586 }
2587
2588 PageInfo pageInfo;
2589 uint64_t total_processed = 0;
2590 uint64_t processed = 0;
2591 auto start = time_nsec();
2592 uint16_t threshold_interval = parallelChunks;
2593 bool threshold_draining = false;
2594 timer_nsec_t threshold_timer;
2595 while( 1 )
2596 {
2597 st = src->GetChunk( pageInfo );
2598 if( !st.IsOK() )
2599 return SourceError( st);
2600
2601 if( st.IsOK() && st.code == suDone )
2602 break;
2603
2604 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2605 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2606
2607 if( xRate )
2608 {
2609 auto elapsed = ( time_nsec() - start ).count();
2610 double transferred = total_processed + pageInfo.GetLength();
2611 double expected = double( xRate ) / to_nsec( 1 ) * elapsed;
2612 //----------------------------------------------------------------------
2613 // check if our transfer rate didn't exceeded the limit
2614 // (we are too fast)
2615 //----------------------------------------------------------------------
2616 if( elapsed && // make sure elapsed time is greater than 0
2617 transferred > expected )
2618 {
2619 auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
2620 sleep_nsec( nsec );
2621 }
2622 }
2623
2624 if( xRateThreshold )
2625 {
2626 auto elapsed = threshold_timer.elapsed();
2627 double transferred = processed + pageInfo.GetLength();
2628 double expected = double( xRateThreshold ) / to_nsec( 1 ) * elapsed;
2629 //----------------------------------------------------------------------
2630 // check if our transfer rate dropped below the threshold
2631 // (we are too slow)
2632 //----------------------------------------------------------------------
2633 if( elapsed && // make sure elapsed time is greater than 0
2634 transferred < expected &&
2635 threshold_interval == 0 ) // we check every # parallelChunks
2636 {
2637 if( !threshold_draining )
2638 {
2639 log->Warning( UtilityMsg, "Transfer rate dropped below requested ehreshold,"
2640 " trying different source!" );
2641 XRootDStatus st = src->TryOtherServer();
2642 if( !st.IsOK() ) return SetResult( stError, errThresholdExceeded, 0,
2643 "The transfer rate dropped below "
2644 "requested threshold!" );
2645 threshold_draining = true; // before the next measurement we need to drain
2646 // all the chunks that will come from the old server
2647 }
2648 else // now that all the chunks from the old server have
2649 { // been received we can start another measurement
2650 processed = 0;
2651 threshold_timer.reset();
2652 threshold_interval = parallelChunks;
2653 threshold_draining = false;
2654 }
2655 }
2656
2657 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2658 }
2659
2660 total_processed += pageInfo.GetLength();
2661 processed += pageInfo.GetLength();
2662
2663 st = dest->PutChunk( std::move( pageInfo ) );
2664 if( !st.IsOK() )
2665 {
2666 if( st.code == errRetry )
2667 {
2668 pResults->Set( "LastURL", dest->GetLastURL() );
2669 pResults->Set( "WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2670 return SetResult( st );
2671 }
2672 return DestinationError( st );
2673 }
2674
2675 if( progress )
2676 {
2677 progress->JobProgress( pJobId, total_processed, size );
2678 if( progress->ShouldCancel( pJobId ) )
2679 return SetResult( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" );
2680 }
2681 }
2682
2683 st = dest->Flush();
2684 if( !st.IsOK() )
2685 return DestinationError( st );
2686
2687 //--------------------------------------------------------------------------
2688 // Copy extended attributes
2689 //--------------------------------------------------------------------------
2690 if( preserveXAttr && Utils::HasXAttr( GetSource() ) && Utils::HasXAttr( GetTarget() ) )
2691 {
2692 std::vector<xattr_t> xattrs;
2693 st = src->GetXAttr( xattrs );
2694 if( !st.IsOK() ) return SourceError( st );
2695 st = dest->SetXAttr( xattrs );
2696 if( !st.IsOK() ) return DestinationError( st );
2697 }
2698
2699 //--------------------------------------------------------------------------
2700 // The size of the source is known and not enough data has been transferred
2701 // to the destination
2702 //--------------------------------------------------------------------------
2703 if( src->GetSize() >= 0 && size != total_processed )
2704 {
2705 log->Error( UtilityMsg, "The declared source size is %ld bytes, but "
2706 "received %ld bytes.", size, total_processed );
2707 return SetResult( stError, errDataError );
2708 }
2709 pResults->Set( "size", total_processed );
2710
2711 //--------------------------------------------------------------------------
2712 // Finalize the destination
2713 //--------------------------------------------------------------------------
2714 st = dest->Finalize();
2715 if( !st.IsOK() )
2716 return DestinationError( st );
2717
2718 //--------------------------------------------------------------------------
2719 // Verify the checksums if needed
2720 //--------------------------------------------------------------------------
2721 if( checkSumMode != "none" )
2722 {
2723 log->Debug( UtilityMsg, "Attempting checksum calculation, mode: %s.",
2724 checkSumMode.c_str() );
2725 std::string sourceCheckSum;
2726 std::string targetCheckSum;
2727
2728 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2729 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2730
2731 //------------------------------------------------------------------------
2732 // Get the check sum at source
2733 //------------------------------------------------------------------------
2734 timeval oStart, oEnd;
2735 XRootDStatus st;
2736
2737 if( checkSumMode == "end2end" || checkSumMode == "source" ||
2738 !checkSumPreset.empty() )
2739 {
2740 gettimeofday( &oStart, 0 );
2741 if( !checkSumPreset.empty() )
2742 {
2743 sourceCheckSum = checkSumType + ":";
2744 sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
2745 checkSumPreset );
2746 }
2747 else
2748 {
2749 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2750 }
2751 gettimeofday( &oEnd, 0 );
2752
2753 if( !st.IsOK() )
2754 return SourceError( st );
2755
2756 pResults->Set( "sourceCheckSum", sourceCheckSum );
2757 }
2758
2759 if( !addcksums.empty() )
2760 pResults->Set( "additionalCkeckSum", src->GetAddCks() );
2761
2762 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2763 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2764
2765 //------------------------------------------------------------------------
2766 // Get the check sum at destination
2767 //------------------------------------------------------------------------
2768 timeval tStart, tEnd;
2769
2770 if( checkSumMode == "end2end" || checkSumMode == "target" )
2771 {
2772 gettimeofday( &tStart, 0 );
2773 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2774 if( !st.IsOK() )
2775 return DestinationError( st );
2776 gettimeofday( &tEnd, 0 );
2777 pResults->Set( "targetCheckSum", targetCheckSum );
2778 }
2779
2780 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2781 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2782
2783 //------------------------------------------------------------------------
2784 // Make sure the checksums are both lower case
2785 //------------------------------------------------------------------------
2786 auto sanitize_cksum = []( char c )
2787 {
2788 std::locale loc;
2789 if( std::isalpha( c ) ) return std::tolower( c, loc );
2790 return c;
2791 };
2792
2793 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2794 sourceCheckSum.begin(), sanitize_cksum );
2795
2796 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2797 targetCheckSum.begin(), sanitize_cksum );
2798
2799 //------------------------------------------------------------------------
2800 // Compare and inform monitoring
2801 //------------------------------------------------------------------------
2802 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2803 {
2804 bool match = false;
2805 if( sourceCheckSum == targetCheckSum )
2806 match = true;
2807
2809 if( mon )
2810 {
2812 i.transfer.origin = &GetSource();
2813 i.transfer.target = &GetTarget();
2814 i.cksum = sourceCheckSum;
2815 i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
2816 i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
2817 i.isOK = match;
2818 mon->Event( Monitor::EvCheckSum, &i );
2819 }
2820
2821 if( !match )
2822 {
2823 if( rmOnBadCksum )
2824 {
2825 FileSystem fs( newDestUrl );
2826 st = fs.Rm( newDestUrl.GetPath() );
2827 if( !st.IsOK() )
2828 log->Error( UtilityMsg, "Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2829 else
2830 log->Info( UtilityMsg, "Target file removed due to bad checksum!" );
2831 }
2832
2833 st = dest->Finalize();
2834 if( !st.IsOK() )
2835 log->Error( UtilityMsg, "Failed to finalize the destination: %s", st.ToString().c_str() );
2836
2837 return SetResult( stError, errCheckSumError, 0 );
2838 }
2839
2840 log->Info( UtilityMsg, "Checksum verification: succeeded." );
2841 }
2842 }
2843
2844 return SetResult();
2845 }
2846}
@ kXR_NotFound
@ kXR_Cancelled
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
#define write(a, b, c)
Definition XrdPosix.hh:110
#define read(a, b, c)
Definition XrdPosix.hh:77
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
int Set(const char *csName)
Definition XrdCksData.hh:81
int Get(char *Buff, int Blen)
Definition XrdCksData.hh:69
void Get(Type &object)
Retrieve the object being held.
Check sum helper for stdio.
XRootDStatus Initialize()
Initialize.
const std::string & GetType()
XRootDStatus GetCheckSum(std::string &checkSum, std::string &checkSumType)
void Update(const void *buffer, uint32_t size)
ClassicCopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
PropertyList * pResults
const URL & GetSource() const
Get source.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus Rm(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
A file.
Definition XrdClFile.hh:46
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
Definition XrdClFile.cc:764
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Definition XrdClFile.cc:665
Interface for a job to be run by the job manager.
Path location info.
LocationList::iterator Iterator
Iterator over locations.
Iterator Begin()
Get the location begin iterator.
Iterator End()
Get the location end iterator.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:395
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition XrdClURL.cc:337
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus GetLocalCheckSum(std::string &checkSum, const std::string &checkSumType, const std::string &path)
Get a checksum from local file.
static bool HasXAttr(const XrdCl::URL &url)
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static bool HasPgRW(const XrdCl::URL &url)
An interface for metadata redirectors.
virtual long long GetSize() const =0
virtual std::string GetCheckSum(const std::string &type) const =0
virtual const std::vector< std::string > & GetReplicas()=0
Returns a vector with replicas as given in the meatlink file.
const uint16_t suRetry
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
const char *const DefaultCpTarget
const uint16_t errOperationExpired
const uint16_t errNotImplemented
Operation is not implemented.
const uint16_t stError
An error occurred that could potentially be retried.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
const uint16_t errDataError
data is corrupted
const int DefaultSubStreamsPerChannel
const int DefaultCpUsePgWrtRd
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
std::tuple< std::string, std::string > xattr_t
Extended attribute key - value pair.
const uint16_t errNotSupported
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t suDone
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
const uint16_t errNoMoreReplicas
No more replicas to try.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
const int DefaultZipMtlnCksum
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.