XRootD
Loading...
Searching...
No Matches
XrdEcReader.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdEc/XrdEcReader.hh"
27#include "XrdEc/XrdEcConfig.hh"
28#include "XrdEc/XrdEcObjCfg.hh"
30
31#include "XrdZip/XrdZipLFH.hh"
32#include "XrdZip/XrdZipCDFH.hh"
33#include "XrdZip/XrdZipUtils.hh"
34
36
41
42#include "XrdCl/XrdClLog.hh"
44
45#include <algorithm>
46#include <iterator>
47#include <numeric>
48#include <tuple>
49#include <set>
50
51namespace XrdEc
52{
53 //---------------------------------------------------------------------------
54 // OpenOnly operation (@see ZipOperation) - a private ZIP operation
55 //---------------------------------------------------------------------------
56 template<bool HasHndl>
57 class OpenOnlyImpl: public XrdCl::ZipOperation<OpenOnlyImpl, HasHndl,
58 XrdCl::Resp<void>, XrdCl::Arg<std::string>, XrdCl::Arg<bool>>
59
60 {
61 public:
62
63 //-----------------------------------------------------------------------
64 // Inherit constructors from FileOperation (@see FileOperation)
65 //-----------------------------------------------------------------------
68
69 //-----------------------------------------------------------------------
70 // Argument indexes in the args tuple
71 //-----------------------------------------------------------------------
72 enum { UrlArg, UpdtArg };
73
74 //-----------------------------------------------------------------------
75 // @return : name of the operation (@see Operation)
76 //-----------------------------------------------------------------------
77 std::string ToString()
78 {
79 return "OpenOnly";
80 }
81
82 protected:
83
84 //-----------------------------------------------------------------------
85 // RunImpl operation (@see Operation)
86 //
87 // @param params : container with parameters forwarded from
88 // previous operation
89 // @return : status of the operation
90 //-----------------------------------------------------------------------
92 uint16_t pipelineTimeout )
93 {
94 std::string url = std::get<UrlArg>( this->args ).Get();
95 bool updt = std::get<UpdtArg>( this->args ).Get();
96 uint16_t timeout = pipelineTimeout < this->timeout ?
97 pipelineTimeout : this->timeout;
98 return this->zip->OpenOnly( url, updt, handler, timeout );
99 }
100 };
101
102 //---------------------------------------------------------------------------
103 // Factory for creating OpenArchiveImpl objects
104 //---------------------------------------------------------------------------
107 XrdCl::Arg<bool> updt,
108 uint16_t timeout = 0 )
109 {
110 return OpenOnlyImpl<false>( std::move( zip ), std::move( fn ),
111 std::move( updt ) ).Timeout( timeout );
112 }
113
114 //-------------------------------------------------------------------------
115 // A single data block
116 //-------------------------------------------------------------------------
117 struct block_t
118 {
119 typedef std::tuple<uint64_t, uint32_t, char*, callback_t> args_t;
120 typedef std::vector<args_t> pending_t;
121
122 //-----------------------------------------------------------------------
123 // Stripe state: empty / loading / valid
124 //-----------------------------------------------------------------------
126
127 //-----------------------------------------------------------------------
128 // Constructor
129 //-----------------------------------------------------------------------
131 objcfg( objcfg ),
132 stripes( objcfg.nbchunks ),
133 state( objcfg.nbchunks, Empty ),
134 pending( objcfg.nbchunks ),
135 blkid( blkid ),
136 recovering( 0 )
137 {
138 }
139
140 //-----------------------------------------------------------------------
141 // Read data from stripe
142 //
143 // @param self : the block_t object
144 // @param strpid : stripe ID
145 // @param offset : relative offset within the stripe
146 // @param size : number of bytes to be read from the stripe
147 // @param usrbuff : user buffer for the data
148 // @param usrcb : user callback to be notified when the read operation
149 // has been resolved
150 // @param timeout : operation timeout
151 //-----------------------------------------------------------------------
152 static void read( std::shared_ptr<block_t> &self,
153 size_t strpid,
154 uint64_t offset,
155 uint32_t size,
156 char *usrbuff,
157 callback_t usrcb,
158 uint16_t timeout )
159 {
160 std::unique_lock<std::mutex> lck( self->mtx );
161
162 //---------------------------------------------------------------------
163 // The cache is empty, we need to load the data
164 //---------------------------------------------------------------------
165 if( self->state[strpid] == Empty )
166 {
167 self->reader.Read( self->blkid, strpid, self->stripes[strpid],
168 read_callback( self, strpid ), timeout );
169 self->state[strpid] = Loading;
170 }
171 //---------------------------------------------------------------------
172 // The stripe is either corrupted or unreachable
173 //---------------------------------------------------------------------
174 if( self->state[strpid] == Missing )
175 {
176 if( !error_correction( self ) )
177 {
178 //-----------------------------------------------------------------
179 // Recovery was not possible, notify the user of the error
180 //-----------------------------------------------------------------
182 return;
183 }
184 //-------------------------------------------------------------------
185 // we fall through to the following if-statements that will handle
186 // Recovering / Valid state
187 //-------------------------------------------------------------------
188 }
189 //---------------------------------------------------------------------
190 // The cache is loading or recovering, we don't have the data yet
191 //---------------------------------------------------------------------
192 if( self->state[strpid] == Loading || self->state[strpid] == Recovering )
193 {
194 self->pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
195 return;
196 }
197 //---------------------------------------------------------------------
198 // We do have the data so we can serve the user right away
199 //---------------------------------------------------------------------
200 if( self->state[strpid] == Valid )
201 {
202 if( offset + size > self->stripes[strpid].size() )
203 size = self->stripes[strpid].size() - offset;
204 if(usrbuff)
205 memcpy( usrbuff, self->stripes[strpid].data() + offset, size );
206 usrcb( XrdCl::XRootDStatus(), size );
207 return;
208 }
209 //---------------------------------------------------------------------
210 // In principle we should never end up here, nevertheless if this
211 // happens it is clearly an error ...
212 //---------------------------------------------------------------------
214 }
215
216 //-----------------------------------------------------------------------
217 // If neccessary trigger error correction procedure
218 // @param self : the block_t object
219 // @return : false if the block is corrupted and cannot be recovered,
220 // true otherwise
221 //-----------------------------------------------------------------------
222 static bool error_correction( std::shared_ptr<block_t> &self )
223 {
224 //---------------------------------------------------------------------
225 // Do the accounting for our stripes
226 //---------------------------------------------------------------------
227 size_t missingcnt = 0, validcnt = 0, loadingcnt = 0, recoveringcnt = 0;
228 std::for_each( self->state.begin(), self->state.end(), [&]( state_t &s )
229 {
230 switch( s )
231 {
232 case Missing: ++missingcnt; break;
233 case Valid: ++validcnt; break;
234 case Loading: ++loadingcnt; break;
235 case Recovering: ++recoveringcnt; break;
236 default: ;
237 }
238 } );
239 //---------------------------------------------------------------------
240 // If there are no missing stripes all is good ...
241 //---------------------------------------------------------------------
242 if( missingcnt + recoveringcnt == 0 ) return true;
243 //---------------------------------------------------------------------
244 // Check if we can do the recovery at all (if too many stripes are
245 // missing it won't be possible)
246 //---------------------------------------------------------------------
247 if( missingcnt + recoveringcnt > self->objcfg.nbparity )
248 {
249 std::for_each( self->state.begin(), self->state.end(),
250 []( state_t &s ){ if( s == Recovering ) s = Missing; } );
251 return false;
252 }
253 //---------------------------------------------------------------------
254 // Check if we can do the recovery right away
255 //---------------------------------------------------------------------
256 if( validcnt >= self->objcfg.nbdata )
257 {
258 Config &cfg = Config::Instance();
259 stripes_t strps( self->get_stripes() );
260 try
261 {
262 cfg.GetRedundancy( self->objcfg ).compute( strps );
263 }
264 catch( const IOError &ex )
265 {
266 std::for_each( self->state.begin(), self->state.end(),
267 []( state_t &s ){ if( s == Recovering ) s = Missing; } );
268 return false;
269 }
270 //-------------------------------------------------------------------
271 // Now when we recovered the data we need to mark every stripe as
272 // valid and execute the pending reads
273 //-------------------------------------------------------------------
274 for( size_t strpid = 0; strpid < self->objcfg.nbchunks; ++strpid )
275 {
276 if( self->state[strpid] != Recovering ) continue;
277 self->state[strpid] = Valid;
278 self->carryout( self->pending[strpid], self->stripes[strpid] );
279 }
280 return true;
281 }
282 //---------------------------------------------------------------------
283 // Try loading the data and only then attempt recovery
284 //---------------------------------------------------------------------
285 size_t i = 0;
286 while( loadingcnt + validcnt < self->objcfg.nbdata && i < self->objcfg.nbchunks )
287 {
288 size_t strpid = i++;
289 if( self->state[strpid] != Empty ) continue;
290 self->reader.Read( self->blkid, strpid, self->stripes[strpid],
291 read_callback( self, strpid ) );
292 self->state[strpid] = Loading;
293 ++loadingcnt;
294 }
295
296 //-------------------------------------------------------------------
297 // Now that we triggered the recovery procedure mark every missing
298 // stripe as recovering.
299 //-------------------------------------------------------------------
300 std::for_each( self->state.begin(), self->state.end(),
301 []( state_t &s ){ if( s == Missing ) s = Recovering; } );
302 return true;
303 }
304
305 //-----------------------------------------------------------------------
306 // Get a callback for read operation
307 //-----------------------------------------------------------------------
308 inline static
309 callback_t read_callback( std::shared_ptr<block_t> &self, size_t strpid )
310 {
311 return [self, strpid]( const XrdCl::XRootDStatus &st, uint32_t ) mutable
312 {
313 std::unique_lock<std::mutex> lck( self->mtx );
314 self->state[strpid] = st.IsOK() ? Valid : Missing;
315 //------------------------------------------------------------
316 // Check if we need to do any error correction (either for
317 // the current stripe, or any other stripe)
318 //------------------------------------------------------------
319 bool recoverable = error_correction( self );
320 //------------------------------------------------------------
321 // Carry out the pending read requests if we got the data
322 //------------------------------------------------------------
323 if( st.IsOK() )
324 self->carryout( self->pending[strpid], self->stripes[strpid], st );
325 //------------------------------------------------------------
326 // Carry out the pending read requests if there was an error
327 // and we cannot recover
328 //------------------------------------------------------------
329 if( !recoverable )
330 self->fail_missing();
331 };
332 }
333
334 //-----------------------------------------------------------------------
335 // Get stripes_t data structure used for error recovery
336 //-----------------------------------------------------------------------
338 {
339 stripes_t ret;
340 ret.reserve( objcfg.nbchunks );
341 for( size_t i = 0; i < objcfg.nbchunks; ++i )
342 {
343 if( state[i] == Valid )
344 ret.emplace_back( stripes[i].data(), true );
345 else
346 {
347 stripes[i].resize( objcfg.chunksize, 0 );
348 ret.emplace_back( stripes[i].data(), false );
349 }
350 }
351 return ret;
352 }
353
354 //-------------------------------------------------------------------------
355 // Execute the pending read requests
356 //-------------------------------------------------------------------------
357 inline void carryout( pending_t &pending,
358 const buffer_t &stripe,
360 {
361 //-----------------------------------------------------------------------
362 // Iterate over all pending read operations for given stripe
363 //-----------------------------------------------------------------------
364 auto itr = pending.begin();
365 for( ; itr != pending.end() ; ++itr )
366 {
367 auto &args = *itr;
368 callback_t &callback = std::get<3>( args );
369 uint32_t nbrd = 0; // number of bytes read
370 //---------------------------------------------------------------------
371 // If the read was successful, copy the data to user buffer
372 //---------------------------------------------------------------------
373 if( st.IsOK() )
374 {
375 uint64_t offset = std::get<0>( args );
376 uint32_t size = std::get<1>( args );
377 char *usrbuff = std::get<2>( args );
378 // are we reading past the end of file?
379 if( offset > stripe.size() )
380 size = 0;
381 // are partially reading past the end of the file?
382 else if( offset + size > stripe.size() )
383 size = stripe.size() - offset;
384 if(usrbuff)
385 memcpy( usrbuff, stripe.data() + offset, size );
386 nbrd = size;
387 }
388 //---------------------------------------------------------------------
389 // Call the user callback
390 //---------------------------------------------------------------------
391 callback( st, nbrd );
392 }
393 //-----------------------------------------------------------------------
394 // Now we can clear the pending reads
395 //-----------------------------------------------------------------------
396 pending.clear();
397 }
398
399 //-------------------------------------------------------------------------
400 // Execute pending read requests for missing stripes
401 //-------------------------------------------------------------------------
402 inline void fail_missing()
403 {
404 size_t size = objcfg.nbchunks;
405 for( size_t i = 0; i < size; ++i )
406 {
407 if( state[i] != Missing ) continue;
408 carryout( pending[i], stripes[i],
410 }
411 }
412
415 std::vector<buffer_t> stripes; //< data buffer for every stripe
416 std::vector<state_t> state; //< state of every data buffer (empty/loading/valid)
417 std::vector<pending_t> pending; //< pending reads per stripe
418 size_t blkid; //< block ID
419 bool recovering; //< true if we are in the process of recovering data, false otherwise
420 std::mutex mtx;
421 };
422
423 //---------------------------------------------------------------------------
424 // Destructor (we need it in the source file because block_t is defined in
425 // here)
426 //---------------------------------------------------------------------------
427 Reader::~Reader()
428 {
429 }
430
431 //---------------------------------------------------------------------------
432 // Open the erasure coded / striped object
433 //---------------------------------------------------------------------------
434 void Reader::Open( XrdCl::ResponseHandler *handler, uint16_t timeout )
435 {
436 const size_t size = objcfg.plgr.size();
437 std::vector<XrdCl::Pipeline> opens; opens.reserve( size );
438 for( size_t i = 0; i < size; ++i )
439 {
440 // generate the URL
441 std::string url = objcfg.GetDataUrl( i );
442 archiveIndices.emplace(url, i);
443 // create the file object
444 dataarchs.emplace( url, std::make_shared<XrdCl::ZipArchive>(
445 Config::Instance().enable_plugins ) );
446 // open the archive
447 if( objcfg.nomtfile )
448 {
449 opens.emplace_back( XrdCl::OpenArchive( *dataarchs[url], url, XrdCl::OpenFlags::Read ) );
450 }
451 else
452 opens.emplace_back( OpenOnly( *dataarchs[url], url, false ) );
453 }
454
455 auto pipehndl = [=]( const XrdCl::XRootDStatus &st )
456 { // set the central directories in ZIP archives (if we use metadata files)
457 auto itr = dataarchs.begin();
458 for( ; itr != dataarchs.end() ; ++itr )
459 {
460 const std::string &url = itr->first;
461 auto &zipptr = itr->second;
462 if( zipptr->openstage == XrdCl::ZipArchive::NotParsed )
463 zipptr->SetCD( metadata[url] );
464 else if( zipptr->openstage != XrdCl::ZipArchive::Done && !metadata.empty() )
465 AddMissing( metadata[url] );
466 auto itr = zipptr->cdmap.begin();
467 for( ; itr != zipptr->cdmap.end() ; ++itr )
468 {
469 urlmap.emplace( itr->first, url );
470 size_t blknb = fntoblk( itr->first );
471 if( blknb > lstblk ) lstblk = blknb;
472 }
473 }
474 metadata.clear();
475 // call user handler
476 if( handler )
477 handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
478 };
479 // in parallel open the data files and read the metadata
480 XrdCl::Pipeline p = objcfg.nomtfile
481 ? XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) | ReadSize( 0 ) | XrdCl::Final( pipehndl )
482 : XrdCl::Parallel( ReadMetadata( 0 ),
483 XrdCl::Parallel( opens ).AtLeast( objcfg.nbdata ) ) >> pipehndl;
484 XrdCl::Async( std::move( p ), timeout );
485 }
486
487 //-----------------------------------------------------------------------
488 // Read data from the data object
489 //-----------------------------------------------------------------------
490 void Reader::Read( uint64_t offset,
491 uint32_t length,
492 void *buffer,
493 XrdCl::ResponseHandler *handler,
494 uint16_t timeout )
495 {
496 if( objcfg.nomtfile )
497 {
498 if( offset >= filesize )
499 length = 0;
500 else if( offset + length > filesize )
501 length = filesize - offset;
502 }
503
504 if( length == 0 )
505 {
506 ScheduleHandler( offset, 0, buffer, handler );
507 return;
508 }
509
510 char *usrbuff = reinterpret_cast<char*>( buffer );
511 typedef std::tuple<uint64_t, uint32_t,
512 void*, uint32_t,
514 XrdCl::XRootDStatus> rdctx_t;
515 auto rdctx = std::make_shared<rdctx_t>( offset, 0, buffer,
516 length, handler,
518 auto rdmtx = std::make_shared<std::mutex>();
519
520 while( length > 0 )
521 {
522 size_t blkid = offset / objcfg.datasize; //< ID of the block from which we will be reading
523 size_t strpid = ( offset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
524 uint64_t rdoff = offset - blkid * objcfg.datasize - strpid * objcfg.chunksize; //< relative read offset within the stripe
525 uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
526 if( rdsize > length ) rdsize = length;
527 //-------------------------------------------------------------------
528 // Make sure we operate on a valid block
529 //-------------------------------------------------------------------
530 std::unique_lock<std::mutex> lck( blkmtx );
531 if( !block || block->blkid != blkid )
532 block = std::make_shared<block_t>( blkid, *this, objcfg );
533 //-------------------------------------------------------------------
534 // Prepare the callback for reading from single stripe
535 //-------------------------------------------------------------------
536 auto blk = block;
537 lck.unlock();
538 auto callback = [blk, rdctx, rdsize, rdmtx]( const XrdCl::XRootDStatus &st, uint32_t nbrd )
539 {
540 std::unique_lock<std::mutex> lck( *rdmtx );
541 //---------------------------------------------------------------------
542 // update number of bytes left to be read (bytes requested not actually
543 // read)
544 //---------------------------------------------------------------------
545 std::get<3>( *rdctx ) -= rdsize;
546 //---------------------------------------------------------------------
547 // Handle failure ...
548 //---------------------------------------------------------------------
549 if( !st.IsOK() )
550 std::get<5>( *rdctx ) = st; // the error
551 //---------------------------------------------------------------------
552 // Handle success ...
553 //---------------------------------------------------------------------
554 else
555 std::get<1>( *rdctx ) += nbrd; // number of bytes read
556 //---------------------------------------------------------------------
557 // Are we done?
558 //---------------------------------------------------------------------
559 if( std::get<3>( *rdctx ) == 0 )
560 {
561 //-------------------------------------------------------------------
562 // Check if the read operation was successful ...
563 //-------------------------------------------------------------------
564 XrdCl::XRootDStatus &status = std::get<5>( *rdctx );
565 if( !status.IsOK() )
566 ScheduleHandler( std::get<4>( *rdctx ), status );
567 else
568 ScheduleHandler( std::get<0>( *rdctx ), std::get<1>( *rdctx ),
569 std::get<2>( *rdctx ), std::get<4>( *rdctx ) );
570 }
571 };
572 //-------------------------------------------------------------------
573 // Read data from a stripe
574 //-------------------------------------------------------------------
575 block_t::read( blk, strpid, rdoff, rdsize, usrbuff, callback, timeout );
576 //-------------------------------------------------------------------
577 // Update absolute offset, read length, and user buffer
578 //-------------------------------------------------------------------
579 offset += rdsize;
580 length -= rdsize;
581 usrbuff += rdsize;
582 }
583 }
584
585 //-----------------------------------------------------------------------
586 // Close the data object
587 //-----------------------------------------------------------------------
588 void Reader::Close( XrdCl::ResponseHandler *handler, uint16_t timeout )
589 {
590 //---------------------------------------------------------------------
591 // prepare the pipelines ...
592 //---------------------------------------------------------------------
593 std::vector<XrdCl::Pipeline> closes;
594 closes.reserve( dataarchs.size() );
595 auto itr = dataarchs.begin();
596 for( ; itr != dataarchs.end() ; ++itr )
597 {
598 auto &zipptr = itr->second;
599 if( zipptr->IsOpen() )
600 {
601 zipptr->SetProperty( "BundledClose", "true");
602 closes.emplace_back( XrdCl::CloseArchive( *zipptr ) >>
603 [zipptr]( XrdCl::XRootDStatus& ){ } );
604 }
605 }
606
607 // if there is nothing to close just schedule the handler
608 if( closes.empty() ) ScheduleHandler( handler );
609 // otherwise close the archives
610 else XrdCl::Async( XrdCl::Parallel( closes ) >> handler, timeout );
611 }
612
613 //-------------------------------------------------------------------------
614 // on-definition is not allowed here beforeiven stripes from given block
615 //-------------------------------------------------------------------------
616 void Reader::Read( size_t blknb, size_t strpnb, buffer_t &buffer, callback_t cb, uint16_t timeout )
617 {
618 // generate the file name (blknb/strpnb)
619 std::string fn = objcfg.GetFileName( blknb, strpnb );
620 // if the block/stripe does not exist it means we are reading passed the end of the file
621 auto itr = urlmap.find( fn );
622 if( itr == urlmap.end() )
623 {
624 auto st = !IsMissing( fn ) ? XrdCl::XRootDStatus() :
625 XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotFound );
626 ThreadPool::Instance().Execute( cb, st, 0 );
627 return;
628 }
629 // get the URL of the ZIP archive with the respective data
630 const std::string &url = itr->second;
631 // get the ZipArchive object
632 auto &zipptr = dataarchs[url];
633 // check the size of the data to be read
634 XrdCl::StatInfo *info = nullptr;
635 auto st = zipptr->Stat( fn, info );
636 if( !st.IsOK() )
637 {
638 ThreadPool::Instance().Execute( cb, st, 0 );
639 return;
640 }
641 uint32_t rdsize = info->GetSize();
642 delete info;
643 // create a buffer for the data
644 buffer.resize( objcfg.chunksize );
645 // issue the read request
646 XrdCl::Async( XrdCl::ReadFrom( *zipptr, fn, 0, rdsize, buffer.data() ) >>
647 [zipptr, fn, cb, this]( XrdCl::XRootDStatus &st, XrdCl::ChunkInfo &ch )
648 {
649 //---------------------------------------------------
650 // If read failed there's nothing to do, just pass the
651 // status to user callback
652 //---------------------------------------------------
653 if( !st.IsOK() )
654 {
655 cb( st, 0 );
656 return;
657 }
658 //---------------------------------------------------
659 // Get the checksum for the read data
660 //---------------------------------------------------
661 uint32_t orgcksum = 0;
662 auto s = zipptr->GetCRC32( fn, orgcksum );
663 //---------------------------------------------------
664 // If we cannot extract the checksum assume the data
665 // are corrupted
666 //---------------------------------------------------
667 if( !st.IsOK() )
668 {
669 cb( st, 0 );
670 return;
671 }
672 //---------------------------------------------------
673 // Verify data integrity
674 //---------------------------------------------------
675 uint32_t cksum = objcfg.digest( 0, ch.buffer, ch.length );
676 if( orgcksum != cksum )
677 {
678 cb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ), 0 );
679 return;
680 }
681 //---------------------------------------------------
682 // All is good, we can call now the user callback
683 //---------------------------------------------------
684 cb( XrdCl::XRootDStatus(), ch.length );
685 }, timeout );
686 }
687
688 //-----------------------------------------------------------------------
689 // Read metadata for the object
690 //-----------------------------------------------------------------------
691 XrdCl::Pipeline Reader::ReadMetadata( size_t index )
692 {
693 const size_t size = objcfg.plgr.size();
694 // create the File object
695 auto file = std::make_shared<XrdCl::File>( Config::Instance().enable_plugins );
696 // prepare the URL for Open operation
697 std::string url = objcfg.GetMetadataUrl( index );
698 // arguments for the Read operation
700 XrdCl::Fwd<void*> rdbuff;
701
702 return XrdCl::Open( *file, url, XrdCl::OpenFlags::Read ) >>
703 [=]( XrdCl::XRootDStatus &st, XrdCl::StatInfo &info ) mutable
704 {
705 if( !st.IsOK() )
706 {
707 if( index + 1 < size )
708 XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
709 return;
710 }
711 // prepare the args for the subsequent operation
712 rdsize = info.GetSize();
713 rdbuff = new char[info.GetSize()];
714 }
715 | XrdCl::Read( *file, 0, rdsize, rdbuff ) >>
717 {
718 if( !st.IsOK() )
719 {
720 if( index + 1 < size )
721 XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
722 return;
723 }
724 // now parse the metadata
725 if( !ParseMetadata( ch ) )
726 {
727 if( index + 1 < size )
728 XrdCl::Pipeline::Replace( ReadMetadata( index + 1 ) );
729 return;
730 }
731 }
732 | XrdCl::Close( *file ) >>
733 []( XrdCl::XRootDStatus &st )
734 {
735 if( !st.IsOK() )
736 XrdCl::Pipeline::Ignore(); // ignore errors, we don't really care
737 }
738 | XrdCl::Final(
739 [rdbuff, file]( const XrdCl::XRootDStatus& )
740 {
741 // deallocate the buffer if necessary
742 if( rdbuff.Valid() )
743 {
744 char* buffer = reinterpret_cast<char*>( *rdbuff );
745 delete[] buffer;
746 }
747 } );
748 }
749
750 //-----------------------------------------------------------------------
754 //-----------------------------------------------------------------------
755 XrdCl::Pipeline Reader::ReadSize( size_t index )
756 {
757 std::string url = objcfg.GetDataUrl( index );
758 return XrdCl::GetXAttr( dataarchs[url]->GetFile(), "xrdec.filesize" ) >>
759 [index, this]( XrdCl::XRootDStatus &st, std::string &size)
760 {
761 if( !st.IsOK() )
762 {
763 //-------------------------------------------------------------
764 // Check if we can recover the error or a diffrent location
765 //-------------------------------------------------------------
766 if( index + 1 < objcfg.plgr.size() )
767 XrdCl::Pipeline::Replace( ReadSize( index + 1 ) );
768 return;
769 }
770 filesize = std::stoull( size );
771 };
772 }
773
774 //-----------------------------------------------------------------------
775 // Parse metadata from chunk info object
776 //-----------------------------------------------------------------------
777 bool Reader::ParseMetadata( XrdCl::ChunkInfo &ch )
778 {
779 const size_t mincnt = objcfg.nbdata + objcfg.nbparity;
780 const size_t maxcnt = objcfg.plgr.size();
781
782 char *buffer = reinterpret_cast<char*>( ch.buffer );
783 size_t length = ch.length;
784
785 for( size_t i = 0; i < maxcnt; ++i )
786 {
787 uint32_t signature = XrdZip::to<uint32_t>( buffer );
788 if( signature != XrdZip::LFH::lfhSign )
789 {
790 if( i + 1 < mincnt ) return false;
791 break;
792 }
793 XrdZip::LFH lfh( buffer );
794 // check if we are not reading passed the end of the buffer
795 if( lfh.lfhSize + lfh.uncompressedSize > length ) return false;
796 buffer += lfh.lfhSize;
797 length -= lfh.lfhSize;
798 // verify the checksum
799 uint32_t crc32val = objcfg.digest( 0, buffer, lfh.uncompressedSize );
800 if( crc32val != lfh.ZCRC32 ) return false;
801 // keep the metadata
802 std::string url = objcfg.GetDataUrl( std::stoull( lfh.filename ) );
803 metadata.emplace( url, buffer_t( buffer, buffer + lfh.uncompressedSize ) );
804 buffer += lfh.uncompressedSize;
805 length -= lfh.uncompressedSize;
806 }
807
808 return true;
809 }
810
811 //-----------------------------------------------------------------------
812 // Add all the entries from given Central Directory to missing
813 //-----------------------------------------------------------------------
814 void Reader::AddMissing( const buffer_t &cdbuff )
815 {
816 const char *buff = cdbuff.data();
817 size_t size = cdbuff.size();
818 // parse Central Directory records
819 XrdZip::cdvec_t cdvec;
820 XrdZip::cdmap_t cdmap;
821 std::tie(cdvec, cdmap ) = XrdZip::CDFH::Parse( buff, size );
822 auto itr = cdvec.begin();
823 for( ; itr != cdvec.end() ; ++itr )
824 {
825 XrdZip::CDFH &cdfh = **itr;
826 missing.insert( cdfh.filename );
827 }
828 }
829
830 //-----------------------------------------------------------------------
832 //-----------------------------------------------------------------------
833 bool Reader::IsMissing( const std::string &fn )
834 {
835 // if the chunk is in the missing set return true
836 if( missing.count( fn ) ) return true;
837 // if we don't have a metadata file and the chunk exceeds last chunk
838 // also return true
839 if( objcfg.nomtfile && fntoblk( fn ) <= lstblk ) return true;
840 // otherwise return false
841 return false;
842 }
843
844
845 inline callback_t Reader::ErrorCorrected(Reader *reader, std::shared_ptr<block_t> &self, size_t blkid, size_t strpid){
846 return [reader, self, strpid, blkid]( const XrdCl::XRootDStatus &st, uint32_t ) mutable
847 {
848 std::unique_lock<std::mutex> readerLock(reader->missingChunksMutex);
849 reader->missingChunksVectorRead.erase(std::remove(reader->missingChunksVectorRead.begin(), reader->missingChunksVectorRead.end(), std::make_tuple(blkid, strpid)));
850 reader->waitMissing.notify_all();
851 };
852 }
853
854 void Reader::MissingVectorRead(std::shared_ptr<block_t> &currentBlock, size_t blkid, size_t strpid, uint16_t timeout){
855 {
856 std::unique_lock<std::mutex> lk(missingChunksMutex);
857 missingChunksVectorRead.emplace_back(
858 std::make_tuple(blkid,strpid));
859 }
860 currentBlock->state[strpid] = block_t::Missing;
861 currentBlock->read(currentBlock, strpid, 0, objcfg.chunksize,
862 nullptr,
863 ErrorCorrected(this, currentBlock, blkid, strpid),
864 timeout);
865 }
866
867
868 void Reader::VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, uint16_t timeout){
869 if(chunks.size() > 1024) {
871 return;
872 }
873
874 std::vector<XrdCl::ChunkList> hostLists;
875 for(size_t dataHosts = 0; dataHosts < objcfg.plgr.size(); dataHosts++){
876 hostLists.emplace_back(XrdCl::ChunkList());
877 }
878
879 auto log = XrdCl::DefaultEnv::GetLog();
880
881 //bool useGlobalBuffer = buffer != nullptr;
882 char* globalBuffer = (char*)buffer;
883
884 // host index, blkid, strpid
885 std::set<std::tuple<size_t, size_t, size_t>> requestedChunks;
886 // create block_ts for any requested block index
887 std::map<size_t, std::shared_ptr<block_t>> blockMap;
888
889 // go through the requested lists of chunks and assign them to fitting hosts
890 for(size_t index = 0; index < chunks.size(); index++){
891 uint32_t remainLength = chunks[index].length;
892 uint64_t currentOffset = chunks[index].offset;
893
894 while(remainLength > 0){
895 size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
896 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
897 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
898 //uint64_t offsetInFile = rdoff + blkid * objcfg.chunksize; // relative offset within the file
899 uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
900 if( rdsize > remainLength ) rdsize = remainLength;
901 if(currentOffset + rdsize >= filesize) {
902 rdsize = filesize - currentOffset;
903 remainLength = rdsize;
904 }
905
906
907 std::string fn = objcfg.GetFileName(blkid, strpid);
908
909 auto itr = urlmap.find( fn );
910 if( itr == urlmap.end() )
911 {
912 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: No mapping of file to host found.");
913 break;
914 }
915 // get the URL of the ZIP archive with the respective data
916 const std::string &url = itr->second;
917 auto itr2 = archiveIndices.find(url);
918 if(itr2 == archiveIndices.end())
919 {
920 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't find host for file.");
921 break;
922 }
923 size_t indexOfArchive = archiveIndices[url];
924
925 if (blockMap.find(blkid) == blockMap.end())
926 {
927 blockMap.emplace(blkid,
928 std::make_shared<block_t>(blkid, *this, objcfg));
929 }
930
931 blockMap[blkid]->state[strpid] = block_t::Loading;
932 XrdCl::StatInfo* info = nullptr;
933 if(dataarchs[url]->Stat(objcfg.GetFileName(blkid, strpid), info).IsOK())
934 blockMap[blkid]->stripes[strpid].resize( info ->GetSize() );
935
936 auto requestChunk = std::make_tuple(indexOfArchive, blkid, strpid);
937 if(requestedChunks.find(requestChunk) == requestedChunks.end())
938 {
939 uint64_t off = 0;
940 dataarchs[url]->GetOffset(objcfg.GetFileName(blkid, strpid), off);
941 hostLists[indexOfArchive].emplace_back(XrdCl::ChunkInfo(
942 off,
943 info ->GetSize(),
944 blockMap[blkid]->stripes[strpid].data()));
945
946 // fill list of requested chunks by block and stripe id
947 requestedChunks.emplace(requestChunk);
948
949 }
950 remainLength -= rdsize;
951 currentOffset += rdsize;
952
953 }
954 }
955
956 std::vector<XrdCl::Pipeline> hostPipes;
957 hostPipes.reserve(hostLists.size());
958 for(size_t i = 0; i < hostLists.size(); i++){
959 while(hostLists[i].size() > 0){
960 uint32_t range = hostLists[i].size() > 1024 ? 1024 : hostLists[i].size();
961 XrdCl::ChunkList partList(hostLists[i].begin(), hostLists[i].begin() + range);
962 hostLists[i].erase(hostLists[i].begin(), hostLists[i].begin() + range);
963 hostPipes.emplace_back(
964 XrdCl::VectorRead(XrdCl::Ctx<XrdCl::File>(dataarchs[objcfg.GetDataUrl(i)]->archive),
965 partList, nullptr, timeout)
966 >> [=](const XrdCl::XRootDStatus &st, XrdCl::VectorReadInfo ch) mutable
967 {
968 auto it = requestedChunks.begin();
969 while(it!=requestedChunks.end())
970 {
971 auto &args = *it;
972 size_t host = std::get<0>(args);
973 size_t blkid = std::get<1>(args);
974 size_t strpid = std::get<2>(args);
975 it++;
976 if(host == i)
977 {
978 std::shared_ptr<block_t> currentBlock = blockMap[blkid];
979
980
981 if(!st.IsOK())
982 {
983 log->Dump(XrdCl::XRootDMsg, "EC Vector Read of host %llu failed entirely.", i);
984 MissingVectorRead(currentBlock, blkid, strpid, timeout);
985 }
986 else{
987 uint32_t orgcksum = 0;
988 auto s = dataarchs[objcfg.GetDataUrl(i)]->GetCRC32( objcfg.GetFileName(blkid, strpid), orgcksum );
989 //---------------------------------------------------
990 // If we cannot extract the checksum assume the data
991 // are corrupted
992 //---------------------------------------------------
993 if( !st.IsOK() )
994 {
995 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't read CRC32 from CD.");
996 MissingVectorRead(currentBlock, blkid, strpid, timeout);
997 continue;
998 }
999 //---------------------------------------------------
1000 // Verify data integrity
1001 //---------------------------------------------------
1002 uint32_t cksum = objcfg.digest( 0, currentBlock->stripes[strpid].data(), currentBlock->stripes[strpid].size() );
1003 if( orgcksum != cksum )
1004 {
1005 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Wrong checksum for block %llu stripe %llu.", blkid, strpid);
1006 MissingVectorRead(currentBlock, blkid, strpid, timeout);
1007 continue;
1008 }
1009 else{
1010 currentBlock->state[strpid] = block_t::Valid;
1011 bool recoverable = currentBlock->error_correction( currentBlock );
1012 if(!recoverable)
1013 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Couldn't recover block %llu.", blkid);
1014 }
1015 }
1016 }
1017 }
1018 }
1019 );
1020 }
1021 }
1022
1023 auto finalPipehndl = [=] (const XrdCl::XRootDStatus &st) mutable {
1024 // wait until all missing chunks are corrected (uses single reads to get parity stripes)
1025 std::unique_lock<std::mutex> lk(missingChunksMutex);
1026 waitMissing.wait(lk, [=] { return missingChunksVectorRead.size() == 0;});
1027
1028 bool failed = false;
1029 for(size_t index = 0; index < chunks.size(); index++){
1030 uint32_t remainLength = chunks[index].length;
1031 uint64_t currentOffset = chunks[index].offset;
1032
1033 char *localBuffer;
1034 if (globalBuffer)
1035 localBuffer = globalBuffer;
1036 else
1037 localBuffer = (char*)(chunks[index].buffer);
1038
1039 while(remainLength > 0){
1040 size_t blkid = currentOffset / objcfg.datasize; //< ID of the block from which we will be reading
1041 size_t strpid = ( currentOffset % objcfg.datasize ) / objcfg.chunksize; //< ID of the stripe from which we will be reading
1042 uint64_t rdoff = currentOffset - blkid * objcfg.datasize - strpid * objcfg.chunksize ; //< relative read offset within the stripe
1043 uint32_t rdsize = objcfg.chunksize - rdoff; //< read size within the stripe
1044 if( rdsize > remainLength ) rdsize = remainLength;
1045
1046 // put received data into given buffers
1047 if(blockMap.find(blkid) == blockMap.end() || blockMap[blkid] == nullptr){
1048 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Missing block %llu.", blkid);
1049 failed = true;
1050 break;
1051 }
1052 if(blockMap[blkid]->state[strpid] != block_t::Valid){
1053 log->Dump(XrdCl::XRootDMsg, "EC Vector Read: Invalid stripe in block %llu stripe %llu.", blkid, strpid);
1054 failed = true;
1055 break;
1056 }
1057
1058 memcpy(localBuffer, blockMap[blkid]->stripes[strpid].data() + rdoff, rdsize);
1059
1060 remainLength -= rdsize;
1061 currentOffset += rdsize;
1062 localBuffer += rdsize;
1063 }
1064 if(globalBuffer) globalBuffer = localBuffer;
1065 }
1066 if(handler){
1067 if(failed) log->Dump(XrdCl::XRootDMsg, "EC Vector Read failed (at least in part).");
1068 if(failed) handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, "Couldn't read all segments"), nullptr);
1069 else handler->HandleResponse(new XrdCl::XRootDStatus(), nullptr);
1070 }
1071 };
1072
1073 XrdCl::Pipeline p = XrdCl::Parallel(hostPipes) |
1074 XrdCl::Final(finalPipehndl);
1075
1076 XrdCl::Async(std::move(p), timeout);
1077
1078 }
1079
1080} /* namespace XrdEc */
struct stat Stat
Definition XrdCks.cc:49
XrdOucString Valid
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
static Log * GetLog()
Get default log.
std::unique_ptr< PipelineHandler > handler
Operation handler.
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
static Config & Instance()
Singleton access.
XrdCl::XRootDStatus RunImpl(XrdCl::PipelineHandler *handler, uint16_t pipelineTimeout)
std::string ToString()
Name of the operation.
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
const uint16_t stError
An error occurred that could potentially be retried.
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
const uint64_t XRootDMsg
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidOp
ZipReadFromImpl< false > ReadFrom(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ArchiveReadImpl objects.
const uint16_t errInvalidArgs
std::vector< ChunkInfo > ChunkList
List of chunks.
VectorReadImpl< false > VectorRead(Ctx< File > file, Arg< ChunkList > chunks, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating VectorReadImpl objects.
FinalOperation Final
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
std::vector< stripe_t > stripes_t
All stripes in a block.
std::function< void(const XrdCl::XRootDStatus &, uint32_t)> callback_t
void ScheduleHandler(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler)
OpenOnlyImpl< false > OpenOnly(XrdCl::Ctx< XrdCl::ZipArchive > zip, XrdCl::Arg< std::string > fn, XrdCl::Arg< bool > updt, uint16_t timeout=0)
std::vector< char > buffer_t
a buffer type
static INT to(const char *buffer)
std::vector< std::unique_ptr< CDFH > > cdvec_t
Definition XrdZipCDFH.hh:51
std::unordered_map< std::string, size_t > cdmap_t
Definition XrdZipCDFH.hh:56
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
Utility class for storing a pointer to operation context.
Definition XrdClCtx.hh:39
bool Valid() const
Check if it contains a valid value.
Definition XrdClFwd.hh:235
@ Read
Open only for reading.
bool IsOK() const
We're fine.
const uint8_t nbdata
const uint8_t nbchunks
static bool error_correction(std::shared_ptr< block_t > &self)
std::vector< state_t > state
std::tuple< uint64_t, uint32_t, char *, callback_t > args_t
block_t(size_t blkid, Reader &reader, ObjCfg &objcfg)
static void read(std::shared_ptr< block_t > &self, size_t strpid, uint64_t offset, uint32_t size, char *usrbuff, callback_t usrcb, uint16_t timeout)
void carryout(pending_t &pending, const buffer_t &stripe, const XrdCl::XRootDStatus &st=XrdCl::XRootDStatus())
stripes_t get_stripes()
std::vector< buffer_t > stripes
std::mutex mtx
std::vector< args_t > pending_t
std::vector< pending_t > pending
static callback_t read_callback(std::shared_ptr< block_t > &self, size_t strpid)
std::string filename
static std::tuple< cdvec_t, cdmap_t > Parse(const char *buffer, uint32_t bufferSize, uint16_t nbCdRecords)
Definition XrdZipCDFH.hh:75
A data structure representing ZIP Local File Header.
Definition XrdZipLFH.hh:42
static const uint32_t lfhSign
Local File Header signature.
Definition XrdZipLFH.hh:173