diff --git a/renderdoc/serialise/rdcfile.cpp b/renderdoc/serialise/rdcfile.cpp index 93139ddf7..ebeecb2bb 100644 --- a/renderdoc/serialise/rdcfile.cpp +++ b/renderdoc/serialise/rdcfile.cpp @@ -1222,7 +1222,8 @@ StreamWriter *RDCFile::WriteSection(const SectionProperties &props) } // create a writer for writing to disk. It shouldn't close the file - StreamWriter *fileWriter = new StreamWriter(m_File, Ownership::Nothing); + StreamWriter *fileWriter = + new StreamWriter(FileWriter::MakeThreaded(m_File, Ownership::Nothing), Ownership::Stream); StreamWriter *compWriter = NULL; @@ -1246,8 +1247,6 @@ StreamWriter *RDCFile::WriteSection(const SectionProperties &props) // register a destroy callback to tidy up the section at the end fileWriter->AddCloseCallback([this, type, name, headerOffset, dataOffset, fileWriter, compWriter]() { - FileIO::fflush(m_File); - // the offset of the file writer is how many bytes were written to disk - the compressed length. uint64_t compressedLength = fileWriter->GetOffset(); diff --git a/renderdoc/serialise/streamio.cpp b/renderdoc/serialise/streamio.cpp index 21e35d18b..ee5a5d6c7 100644 --- a/renderdoc/serialise/streamio.cpp +++ b/renderdoc/serialise/streamio.cpp @@ -470,6 +470,246 @@ bool StreamReader::ReadFromExternal(void *buffer, uint64_t length) return success; } +FileWriter *FileWriter::MakeDefault(FILE *file, Ownership own) +{ + FileWriter *ret = new FileWriter(file, own); + ret->m_ThreadRunning = 0; + return ret; +} + +FileWriter *FileWriter::MakeThreaded(FILE *file, Ownership own) +{ + FileWriter *ret = new FileWriter(file, own); + for(size_t i = 0; i < NumBlocks; i++) + ret->m_AllocBlocks[i] = {AllocAlignedBuffer(BlockSize), 0}; + ret->m_ProducerOwned.append(ret->m_AllocBlocks, NumBlocks); + ret->m_ThreadRunning = 1; + ret->m_Thread = Threading::CreateThread([ret]() { ret->ThreadEntry(); }); + return ret; +} + +RDResult FileWriter::Write(const void *data, uint64_t length) +{ + if(m_ThreadRunning == 0) + return WriteUnthreaded(data, length); + + return WriteThreaded(data, length); +} + +RDResult FileWriter::WriteUnthreaded(const void *data, uint64_t length) +{ + // this may be called directly in Write, or deferred on the thread. It is unsynchronised and + // internal + RDResult result; + uint64_t written = (uint64_t)FileIO::fwrite(data, 1, (size_t)length, m_File); + if(written != length) + { + SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "Writing to file failed: %s", + FileIO::ErrorString().c_str()); + } + return result; +} + +RDResult FileWriter::WriteThreaded(const void *data, uint64_t length) +{ + // if write fits in this block, memcpy and return. We allow this to completely fill a block, it + // will get flushed on the next write or Flush() call + if(!m_ProducerOwned.empty() && length <= BlockSize - m_ProducerOwned.back().second) + { + memcpy(m_ProducerOwned.back().first + m_ProducerOwned.back().second, data, length); + m_ProducerOwned.back().second += length; + return ResultCode::Succeeded; + } + + RDResult ret; + + // write doesn't fit in the block (or we don't have one free) + + // loop until all bytes are written + const byte *dataPtr = (byte *)data; + while(length > 0) + { + // blocks to submit, we'll have at least one + rdcarray pending; + + // while we have free blocks that we own, and still bytes to write + while(length > 0 && !m_ProducerOwned.empty()) + { + Block &curBlock = m_ProducerOwned.back(); + + // write either the rest of what will fit in the block, or the rest of the data, whatever is + // smaller + uint64_t writeSize = RDCMIN(length, BlockSize - curBlock.second); + memcpy(curBlock.first + curBlock.second, dataPtr, writeSize); + curBlock.second += writeSize; + dataPtr += writeSize; + length -= writeSize; + + // should not be possible with writeSize above being clamped + if(curBlock.second > BlockSize) + { + RDCERR("Block has been overrun"); + // truncate writes to be safe + curBlock.second = BlockSize; + return ResultCode::InternalError; + } + + // if the block is completely full, push it to the consumer + if(curBlock.second == BlockSize) + { + m_ProducerOwned.pop_back(); + pending.push_back(curBlock); + } + } + + // we got here, either we ran out of blocks to write to (or more likely) we finished writing. + // now we push the pending list to the consumer and at the same time grab any blocks that have + // freed up. Hold the lock while modifying those block-passing lists + m_Lock.Lock(); + if(!pending.empty()) + { + m_PendingForConsumer.append(pending); + pending.clear(); + } + if(!m_CompletedFromConsumer.empty()) + { + m_ProducerOwned.insert(0, m_CompletedFromConsumer); + m_CompletedFromConsumer.clear(); + } + if(ret == ResultCode::Succeeded) + ret = m_Error; + m_Lock.Unlock(); + + // if we still have bytes to write and are waiting for blocks to free up, sleep here so we + // don't busy loop trying to get more blocks + if(length > 0) + Threading::Sleep(5); + } + + return ret; +} + +void FileWriter::ThreadEntry() +{ + rdcarray completed; + RDResult error; + + int busyLoopCounter = 0; + + // loop as long as the thread is not being killed + while(Atomic::CmpExch32(&m_ThreadKill, 0, 0) == 0) + { + Block work = {}; + + // hold the lock, take any new work and return any completed work + m_Lock.Lock(); + m_ConsumerOwned.append(m_PendingForConsumer); + m_CompletedFromConsumer.append(completed); + m_PendingForConsumer.clear(); + completed.clear(); + // don't overwrite an old error, but update if there's a new error + if(m_Error == ResultCode::Succeeded && error != ResultCode::Succeeded) + m_Error = error; + m_Lock.Unlock(); + + // grab work to do if we can + if(!m_ConsumerOwned.empty()) + { + work = m_ConsumerOwned[0]; + m_ConsumerOwned.erase(0); + busyLoopCounter = 0; + } + + if(work.second) + { + RDResult res = WriteUnthreaded(work.first, work.second); + if(error == ResultCode::Succeeded) + error = res; + + // reset the offset/size to 0 when returning it + completed.push_back({work.first, 0}); + } + + // after a certain number of loops without any work start to do small sleeps to break up the + // busy loop + if(busyLoopCounter++ > 500) + Threading::Sleep(1); + } + + Atomic::CmpExch32(&m_ThreadRunning, 1, 0); +} + +RDResult FileWriter::Flush() +{ + // if we have some writes, push these now even with a partial block + if(!m_ProducerOwned.empty() && m_ProducerOwned.back().second > 0) + { + Block b = m_ProducerOwned.back(); + m_ProducerOwned.pop_back(); + + // hold the lock so we can push this incomplete block through + m_Lock.Lock(); + m_PendingForConsumer.push_back(b); + m_Lock.Unlock(); + + // all other blocks should be empty + for(Block &owned : m_ProducerOwned) + RDCASSERTEQUAL(owned.second, 0); + } + + // loop as long as the thread is alive. Flushing is rare so we don't mind sleeping here + // if we're unthreaded this loop just won't execute + while(Atomic::CmpExch32(&m_ThreadRunning, 1, 1) > 0) + { + m_Lock.Lock(); + // take ownership of any blocks due to us + m_ProducerOwned.insert(0, m_CompletedFromConsumer); + m_CompletedFromConsumer.clear(); + m_Lock.Unlock(); + + // if we own all the blocks again, we're done + if(m_ProducerOwned.size() == NumBlocks) + break; + + Threading::Sleep(1); + } + + // flush the underlying file + bool success = FileIO::fflush(m_File); + if(!success && m_Error == ResultCode::Succeeded) + { + SET_ERROR_RESULT(m_Error, ResultCode::FileIOFailed, "File flushing failed: %s", + FileIO::ErrorString().c_str()); + } + m_Lock.Lock(); + RDResult ret = m_Error; + m_Lock.Unlock(); + + return ret; +} + +FileWriter::~FileWriter() +{ + if(m_Thread) + { + // ensure we've written everything + Flush(); + + // ask the thread to stop + Atomic::Inc32(&m_ThreadKill); + + Threading::JoinThread(m_Thread); + Threading::CloseThread(m_Thread); + m_Thread = 0; + + for(size_t i = 0; i < NumBlocks; i++) + FreeAlignedBuffer(m_AllocBlocks[i].first); + } + + if(m_Ownership == Ownership::Stream) + FileIO::fclose(m_File); +} + StreamWriter::StreamWriter(uint64_t initialBufSize) { m_BufferBase = m_BufferHead = AllocAlignedBuffer(initialBufSize); @@ -502,7 +742,7 @@ StreamWriter::StreamWriter(Network::Socket *sock, Ownership own) m_InMemory = false; } -StreamWriter::StreamWriter(FILE *file, Ownership own) +StreamWriter::StreamWriter(FileWriter *file, Ownership own) { m_BufferBase = m_BufferHead = m_BufferEnd = NULL; @@ -524,19 +764,24 @@ StreamWriter::StreamWriter(Compressor *compressor, Ownership own) StreamWriter::~StreamWriter() { - for(StreamCloseCallback cb : m_Callbacks) - cb(); - FreeAlignedBuffer(m_BufferBase); if(m_Ownership == Ownership::Stream) { if(m_File) - FileIO::fclose(m_File); + delete m_File; if(m_Compressor) delete m_Compressor; } + else + { + if(m_File) + m_File->Flush(); + } + + for(StreamCloseCallback cb : m_Callbacks) + cb(); } bool StreamWriter::SendSocketData(const void *data, uint64_t numBytes) @@ -604,7 +849,7 @@ void StreamWriter::HandleError(RDResult result) if(m_Ownership == Ownership::Stream) { if(m_File) - FileIO::fclose(m_File); + delete m_File; if(m_Sock) delete m_Sock; diff --git a/renderdoc/serialise/streamio.h b/renderdoc/serialise/streamio.h index fa232ae10..7bcb371f4 100644 --- a/renderdoc/serialise/streamio.h +++ b/renderdoc/serialise/streamio.h @@ -29,6 +29,7 @@ #include "api/replay/replay_enums.h" #include "common/common.h" #include "common/formatting.h" +#include "common/threading.h" #include "os/os_specific.h" enum class Ownership @@ -295,6 +296,58 @@ private: rdcarray m_Callbacks; }; +class FileWriter +{ +public: + static FileWriter *MakeDefault(FILE *file, Ownership own); + static FileWriter *MakeThreaded(FILE *file, Ownership own); + + ~FileWriter(); + + RDResult Write(const void *data, uint64_t length); + RDResult Flush(); + +private: + FileWriter(FILE *file, Ownership own) : m_File(file), m_Ownership(own) {} + RDResult WriteThreaded(const void *data, uint64_t length); + RDResult WriteUnthreaded(const void *data, uint64_t length); + void ThreadEntry(); + + FILE *m_File; + + // do we own the file/compressor? are we responsible for + // cleaning it up? + Ownership m_Ownership; + + static const uint64_t BlockSize = 4 * 1024 * 1024; + static const uint64_t NumBlocks = 8; + + // + using Block = rdcpair; + + int32_t m_ThreadRunning = 0; + int32_t m_ThreadKill = 0; + Threading::ThreadHandle m_Thread = 0; + + // only touched by the producer, set of blocks allocated for easy cleanup. These blocks are in at + // most one of the arrays below + Block m_AllocBlocks[NumBlocks] = {}; + + // list of blocks the producer owns. The last in this list is the one we're writing to currently + rdcarray m_ProducerOwned; + // list of blocks the consumer owns. This list is being churned through on the work thread + rdcarray m_ConsumerOwned; + + // the lock protects everything below + Threading::SpinLock m_Lock; + // work to be pushed onto m_ConsumerOwned from the producer + rdcarray m_PendingForConsumer; + // blocks that can be pulled into m_ProducerOwned by the producer + rdcarray m_CompletedFromConsumer; + // any error that has appeared + RDResult m_Error; +}; + class StreamWriter { public: @@ -305,8 +358,14 @@ public: StreamWriter(StreamInvalidType, RDResult res); StreamWriter(uint64_t initialBufSize); - StreamWriter(FILE *file, Ownership own); - StreamWriter(Network::Socket *file, Ownership own); + StreamWriter(FileWriter *file, Ownership own); + // when given a FILE* make a default filewriter and own it ourselves, but the filewriter uses the + // ownership of the FILE that was specified + StreamWriter(FILE *file, Ownership own) + : StreamWriter(FileWriter::MakeDefault(file, own), Ownership::Stream) + { + } + StreamWriter(Network::Socket *sock, Ownership own); StreamWriter(Compressor *compressor, Ownership own); bool IsErrored() { return m_Error != ResultCode::Succeeded; } @@ -385,17 +444,13 @@ public: } else if(m_File) { - uint64_t written = (uint64_t)FileIO::fwrite(data, 1, (size_t)numBytes, m_File); - if(written != numBytes) - { - RDResult result; - SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "Writing to file failed: %s", - FileIO::ErrorString().c_str()); - HandleError(result); - return false; - } + RDResult result = m_File->Write(data, numBytes); - return true; + if(result == ResultCode::Succeeded) + return true; + + HandleError(result); + return false; } else if(m_Sock) { @@ -472,14 +527,11 @@ public: } else if(m_File) { - bool success = FileIO::fflush(m_File); + RDResult result = m_File->Flush(); - if(success) + if(result == ResultCode::Succeeded) return true; - RDResult result; - SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "File flushing failed: %s", - FileIO::ErrorString().c_str()); HandleError(result); return false; @@ -500,14 +552,11 @@ public: } else if(m_File) { - bool success = FileIO::fflush(m_File); + RDResult result = m_File->Flush(); - if(success) + if(result == ResultCode::Succeeded) return true; - RDResult result; - SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "File flushing failed: %s", - FileIO::ErrorString().c_str()); HandleError(result); return false; @@ -567,8 +616,8 @@ private: // the total size of the file/compressor (ie. how much data flushed through it) uint64_t m_WriteSize = 0; - // file pointer, if we're writing to a file - FILE *m_File = NULL; + // file writer, if we're writing to a file + FileWriter *m_File = NULL; // the compressor, if writing to it Compressor *m_Compressor = NULL;