43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49const char *File::m_traceID =
"File";
53File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
57 m_cfi(
Cache::GetInstance().
GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
65 m_detach_time_logged(false),
70 m_prefetch_state(kOff),
72 m_prefetch_read_cnt(0),
73 m_prefetch_hit_cnt(0),
95 TRACEF(
Debug,
"~File() ended, prefetch score = " << m_prefetch_score);
102 File *file =
new File(path, offset, fileSize);
130 m_in_shutdown =
true;
132 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
134 m_prefetch_state = kStopped;
135 cache()->DeRegisterPrefetchFile(
this);
147 Stats delta = m_last_stats;
149 m_last_stats = m_stats.
Clone();
160 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
168 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
172 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
184 insert_remote_location(loc);
200 IoSet_i mi = m_io_set.find(io);
202 if (mi != m_io_set.end())
207 ", active_reads " << n_active_reads <<
208 ", active_prefetches " << io->m_active_prefetches <<
209 ", allow_prefetching " << io->m_allow_prefetching <<
210 ", ios_in_detach " << m_ios_in_detach);
212 "\tio_map.size() " << m_io_set.size() <<
213 ", block_map.size() " << m_block_map.size() <<
", file");
215 insert_remote_location(loc);
217 io->m_allow_prefetching =
false;
218 io->m_in_detach =
true;
221 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
223 if ( ! select_current_io_or_disable_prefetching(
false) )
225 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
232 bool io_active_result;
234 if (n_active_reads > 0)
236 io_active_result =
true;
238 else if (m_io_set.size() - m_ios_in_detach == 1)
240 io_active_result = ! m_block_map.empty();
244 io_active_result = io->m_active_prefetches > 0;
247 if ( ! io_active_result)
252 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
254 return io_active_result;
258 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
269 m_detach_time_logged =
false;
278 if ( ! m_in_shutdown)
280 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
284 m_detach_time_logged =
true;
286 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
290 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
302 time_t now = time(0);
307 IoSet_i mi = m_io_set.find(io);
309 if (mi == m_io_set.end())
312 io->m_attach_time = now;
315 insert_remote_location(loc);
317 if (m_prefetch_state == kStopped)
319 m_prefetch_state = kOn;
320 cache()->RegisterPrefetchFile(
this);
325 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
339 time_t now = time(0);
343 IoSet_i mi = m_io_set.find(io);
345 if (mi != m_io_set.end())
347 if (mi == m_current_io)
352 m_stats.
IoDetach(now - io->m_attach_time);
356 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
358 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
359 m_prefetch_state = kStopped;
360 cache()->DeRegisterPrefetchFile(
this);
365 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
377 static const char *tpfx =
"Open() ";
379 TRACEF(Dump, tpfx <<
"open file for disk cache");
386 struct stat data_stat, info_stat;
390 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
391 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
394 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
395 myEnv.
Put(
"oss.asize", size_str);
407 m_data_file = myOss.
newFile(myUser);
408 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
412 delete m_data_file; m_data_file = 0;
416 myEnv.
Put(
"oss.asize",
"64k");
422 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
426 m_info_file = myOss.
newFile(myUser);
427 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
431 delete m_info_file; m_info_file = 0;
432 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
436 bool initialize_info_file =
true;
438 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
440 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
441 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
447 initialize_info_file =
false;
449 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
460 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
461 initialize_info_file =
true;
470 if (initialize_info_file)
475 m_cfi.
Write(m_info_file, ifn.c_str());
476 m_info_file->
Fsync();
477 cache()->WriteFileSizeXAttr(m_info_file->
getFD(), m_file_size);
478 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.
GetNBlocks());
485 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
502 if ((res = m_data_file->
Fstat(&sbuff)))
return res;
504 sbuff.st_size = m_file_size;
506 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
517bool File::overlap(
int blk,
526 const long long beg = blk * blk_size;
527 const long long end = beg + blk_size;
528 const long long req_end = req_off + req_size;
530 if (req_off < end && req_end > beg)
532 const long long ovlp_beg = std::max(beg, req_off);
533 const long long ovlp_end = std::min(end, req_end);
535 off = ovlp_beg - req_off;
536 blk_off = ovlp_beg - beg;
537 size = (int) (ovlp_end - ovlp_beg);
539 assert(size <= blk_size);
550Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
558 const long long off = i * m_block_size;
559 const int last_block = m_num_blocks - 1;
560 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
562 int blk_size, req_size;
563 if (i == last_block) {
564 blk_size = req_size = m_file_size - off;
565 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
567 blk_size = req_size = m_block_size;
571 char *buf = cache()->RequestRAM(req_size);
575 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
585 m_prefetch_state = kHold;
586 cache()->DeRegisterPrefetchFile(
this);
591 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
598void File::ProcessBlockRequest(
Block *b)
606 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
608 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
624 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
626 ProcessBlockRequest(*bi);
632void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
634 int n_chunks = ioVec.size();
637 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
638 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
648 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
653int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
655 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
657 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
661 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
665 if (rs != expected_size)
667 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
686 if (m_in_shutdown || io->m_in_detach)
689 return m_in_shutdown ? -ENOENT : -EBADF;
697 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
702 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
704 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
711 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
715 if (m_in_shutdown || io->m_in_detach)
718 return m_in_shutdown ? -ENOENT : -EBADF;
731 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
736int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
748 int prefetch_cnt = 0;
753 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
755 std::vector<XrdOucIOVec> iovec_disk;
756 std::vector<XrdOucIOVec> iovec_direct;
757 int iovec_disk_total = 0;
758 int iovec_direct_total = 0;
760 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
767 const int idx_first = iUserOff / m_block_size;
768 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
770 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
772 enum LastBlock_e { LB_other, LB_disk, LB_direct };
774 LastBlock_e lbe = LB_other;
776 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
779 BlockMap_i bi = m_block_map.find(block_idx);
786 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
789 if (bi != m_block_map.end())
791 inc_ref_count(bi->second);
792 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
794 if (bi->second->is_finished())
798 assert(bi->second->is_ok());
800 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
802 if (bi->second->m_prefetch)
813 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
822 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
825 iovec_disk.back().size += size;
827 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
828 iovec_disk_total += size;
842 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
845 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
847 blks_to_request.push_back(b);
856 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
858 iovec_direct_total += size;
865 iovec_direct.back().size += size;
867 long long in_offset = block_idx * m_block_size + blk_off;
868 char *out_pos = iUserBuff + off;
875 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
884 inc_prefetch_hit_cnt(prefetch_cnt);
889 if ( ! blks_to_request.empty())
891 ProcessBlockRequests(blks_to_request);
892 blks_to_request.clear();
896 if ( ! iovec_direct.empty())
898 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
900 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
905 long long bytes_read = 0;
909 if ( ! blks_ready.empty())
911 for (
auto &bvi : blks_ready)
913 for (
auto &cr : bvi.second)
915 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
916 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
917 bytes_read += cr.m_size;
923 if ( ! iovec_disk.empty())
925 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
926 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
943 for (
auto &bvi : blks_ready)
944 dec_ref_count(bvi.first, (
int) bvi.second.size());
978 return error_cond ? error_cond : bytes_read;
990 long long offset = b->
m_offset - m_offset;
1010 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1020 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1023 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1025 bool schedule_sync =
false;
1046 m_writes_during_sync.push_back(blk_idx);
1051 ++m_non_flushed_cnt;
1055 schedule_sync =
true;
1057 m_non_flushed_cnt = 0;
1064 cache()->ScheduleFileSync(
this);
1074 int ret = m_data_file->
Fsync();
1075 bool errorp =
false;
1080 m_cfi.
Write(m_info_file, m_filename.c_str());
1081 int cret = m_info_file->
Fsync();
1084 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1090 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1096 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1103 m_writes_during_sync.clear();
1109 int written_while_in_sync;
1110 bool resync =
false;
1113 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1117 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1118 m_writes_during_sync.clear();
1122 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1127 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1138void File::free_block(
Block* b)
1141 int i = b->
m_offset / m_block_size;
1142 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1143 size_t ret = m_block_map.erase(i);
1147 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1157 m_prefetch_state = kOn;
1158 cache()->RegisterPrefetchFile(
this);
1164bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1168 int io_size = (int) m_io_set.size();
1173 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1176 m_current_io = m_io_set.begin();
1179 else if (io_size > 1)
1181 IoSet_i mi = m_current_io;
1182 if (skip_current && mi != m_io_set.end()) ++mi;
1184 for (
int i = 0; i < io_size; ++i)
1186 if (mi == m_io_set.end()) mi = m_io_set.begin();
1188 if ((*mi)->m_allow_prefetching)
1200 m_current_io = m_io_set.end();
1201 m_prefetch_state = kStopped;
1202 cache()->DeRegisterPrefetchFile(
this);
1210void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1216 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1218 m_state_cond.
Lock();
1234 FinalizeReadRequest(rreq);
1261 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1264 m_state_cond.
Lock();
1269 rreq->m_stats.m_BytesMissed += creq.
m_size;
1271 rreq->m_stats.m_BytesHit += creq.
m_size;
1273 --rreq->m_n_chunk_reqs;
1276 inc_prefetch_hit_cnt(1);
1280 bool rreq_complete = rreq->is_complete();
1285 FinalizeReadRequest(rreq);
1299void File::ProcessBlockResponse(
Block *b,
int res)
1301 static const char* tpfx =
"ProcessBlockResponse ";
1303 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1305 if (res >= 0 && res != b->
get_size())
1309 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1313 m_state_cond.
Lock();
1319 IoSet_i mi = m_io_set.find(io);
1320 if (mi != m_io_set.end())
1322 --io->m_active_prefetches;
1325 if (res < 0 && io->m_allow_prefetching)
1327 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1328 io->m_allow_prefetching =
false;
1331 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1333 if ( ! select_current_io_or_disable_prefetching(
false) )
1335 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1341 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1358 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1359 if ( ! m_in_shutdown)
1364 cache()->AddWriteTask(b,
true);
1373 for (
auto &creq : creqs_to_notify)
1375 ProcessBlockSuccess(b, creq);
1384 <<
", io=" << b->
get_io() <<
", error=" << res);
1389 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1390#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1401 std::list<ReadRequest*> rreqs_to_complete;
1410 ProcessBlockError(b, rreq);
1413 rreqs_to_complete.push_back(rreq);
1418 creqs_to_keep.push_back(creq);
1422 bool reissue =
false;
1423 if ( ! creqs_to_keep.empty())
1425 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1427 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1428 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1437 for (
auto rreq : rreqs_to_complete)
1438 FinalizeReadRequest(rreq);
1441 ProcessBlockRequest(b);
1449 return m_filename.c_str();
1454int File::offsetIdx(
int iIdx)
const
1456 return iIdx - m_offset/m_block_size;
1470 TRACEF(DumpXL,
"Prefetch() entering.");
1474 if (m_prefetch_state != kOn)
1479 if ( ! select_current_io_or_disable_prefetching(
true) )
1481 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1486 for (
int f = 0; f < m_num_blocks; ++f)
1490 int f_act = f + m_offset / m_block_size;
1492 BlockMap_i bi = m_block_map.find(f_act);
1493 if (bi == m_block_map.end())
1495 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1498 TRACEF(Dump,
"Prefetch take block " << f_act);
1502 inc_prefetch_read_cnt(1);
1507 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1516 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1517 m_prefetch_state = kComplete;
1518 cache()->DeRegisterPrefetchFile(
this);
1522 (*m_current_io)->m_active_prefetches += (int) blks.size();
1526 if ( ! blks.empty())
1528 ProcessBlockRequests(blks);
1537 return m_prefetch_score;
1550void File::insert_remote_location(
const std::string &loc)
1554 size_t p = loc.find_first_of(
'@');
1555 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1562 if ( ! m_remote_locations.empty())
1566 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1570 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1573 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1575 s +=
'"'; s += *i; s +=
'"';
1576 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Fstat(struct stat *buf)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)