Add threading on final compression & write to file

This commit is contained in:
baldurk
2023-04-13 15:22:05 +01:00
parent 814a73f386
commit 1ec0606ce8
3 changed files with 326 additions and 33 deletions
+2 -3
View File
@@ -1222,7 +1222,8 @@ StreamWriter *RDCFile::WriteSection(const SectionProperties &props)
}
// create a writer for writing to disk. It shouldn't close the file
StreamWriter *fileWriter = new StreamWriter(m_File, Ownership::Nothing);
StreamWriter *fileWriter =
new StreamWriter(FileWriter::MakeThreaded(m_File, Ownership::Nothing), Ownership::Stream);
StreamWriter *compWriter = NULL;
@@ -1246,8 +1247,6 @@ StreamWriter *RDCFile::WriteSection(const SectionProperties &props)
// register a destroy callback to tidy up the section at the end
fileWriter->AddCloseCallback([this, type, name, headerOffset, dataOffset, fileWriter, compWriter]() {
FileIO::fflush(m_File);
// the offset of the file writer is how many bytes were written to disk - the compressed length.
uint64_t compressedLength = fileWriter->GetOffset();
+251 -6
View File
@@ -470,6 +470,246 @@ bool StreamReader::ReadFromExternal(void *buffer, uint64_t length)
return success;
}
FileWriter *FileWriter::MakeDefault(FILE *file, Ownership own)
{
FileWriter *ret = new FileWriter(file, own);
ret->m_ThreadRunning = 0;
return ret;
}
FileWriter *FileWriter::MakeThreaded(FILE *file, Ownership own)
{
FileWriter *ret = new FileWriter(file, own);
for(size_t i = 0; i < NumBlocks; i++)
ret->m_AllocBlocks[i] = {AllocAlignedBuffer(BlockSize), 0};
ret->m_ProducerOwned.append(ret->m_AllocBlocks, NumBlocks);
ret->m_ThreadRunning = 1;
ret->m_Thread = Threading::CreateThread([ret]() { ret->ThreadEntry(); });
return ret;
}
RDResult FileWriter::Write(const void *data, uint64_t length)
{
if(m_ThreadRunning == 0)
return WriteUnthreaded(data, length);
return WriteThreaded(data, length);
}
RDResult FileWriter::WriteUnthreaded(const void *data, uint64_t length)
{
// this may be called directly in Write, or deferred on the thread. It is unsynchronised and
// internal
RDResult result;
uint64_t written = (uint64_t)FileIO::fwrite(data, 1, (size_t)length, m_File);
if(written != length)
{
SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "Writing to file failed: %s",
FileIO::ErrorString().c_str());
}
return result;
}
RDResult FileWriter::WriteThreaded(const void *data, uint64_t length)
{
// if write fits in this block, memcpy and return. We allow this to completely fill a block, it
// will get flushed on the next write or Flush() call
if(!m_ProducerOwned.empty() && length <= BlockSize - m_ProducerOwned.back().second)
{
memcpy(m_ProducerOwned.back().first + m_ProducerOwned.back().second, data, length);
m_ProducerOwned.back().second += length;
return ResultCode::Succeeded;
}
RDResult ret;
// write doesn't fit in the block (or we don't have one free)
// loop until all bytes are written
const byte *dataPtr = (byte *)data;
while(length > 0)
{
// blocks to submit, we'll have at least one
rdcarray<Block> pending;
// while we have free blocks that we own, and still bytes to write
while(length > 0 && !m_ProducerOwned.empty())
{
Block &curBlock = m_ProducerOwned.back();
// write either the rest of what will fit in the block, or the rest of the data, whatever is
// smaller
uint64_t writeSize = RDCMIN(length, BlockSize - curBlock.second);
memcpy(curBlock.first + curBlock.second, dataPtr, writeSize);
curBlock.second += writeSize;
dataPtr += writeSize;
length -= writeSize;
// should not be possible with writeSize above being clamped
if(curBlock.second > BlockSize)
{
RDCERR("Block has been overrun");
// truncate writes to be safe
curBlock.second = BlockSize;
return ResultCode::InternalError;
}
// if the block is completely full, push it to the consumer
if(curBlock.second == BlockSize)
{
m_ProducerOwned.pop_back();
pending.push_back(curBlock);
}
}
// we got here, either we ran out of blocks to write to (or more likely) we finished writing.
// now we push the pending list to the consumer and at the same time grab any blocks that have
// freed up. Hold the lock while modifying those block-passing lists
m_Lock.Lock();
if(!pending.empty())
{
m_PendingForConsumer.append(pending);
pending.clear();
}
if(!m_CompletedFromConsumer.empty())
{
m_ProducerOwned.insert(0, m_CompletedFromConsumer);
m_CompletedFromConsumer.clear();
}
if(ret == ResultCode::Succeeded)
ret = m_Error;
m_Lock.Unlock();
// if we still have bytes to write and are waiting for blocks to free up, sleep here so we
// don't busy loop trying to get more blocks
if(length > 0)
Threading::Sleep(5);
}
return ret;
}
void FileWriter::ThreadEntry()
{
rdcarray<Block> completed;
RDResult error;
int busyLoopCounter = 0;
// loop as long as the thread is not being killed
while(Atomic::CmpExch32(&m_ThreadKill, 0, 0) == 0)
{
Block work = {};
// hold the lock, take any new work and return any completed work
m_Lock.Lock();
m_ConsumerOwned.append(m_PendingForConsumer);
m_CompletedFromConsumer.append(completed);
m_PendingForConsumer.clear();
completed.clear();
// don't overwrite an old error, but update if there's a new error
if(m_Error == ResultCode::Succeeded && error != ResultCode::Succeeded)
m_Error = error;
m_Lock.Unlock();
// grab work to do if we can
if(!m_ConsumerOwned.empty())
{
work = m_ConsumerOwned[0];
m_ConsumerOwned.erase(0);
busyLoopCounter = 0;
}
if(work.second)
{
RDResult res = WriteUnthreaded(work.first, work.second);
if(error == ResultCode::Succeeded)
error = res;
// reset the offset/size to 0 when returning it
completed.push_back({work.first, 0});
}
// after a certain number of loops without any work start to do small sleeps to break up the
// busy loop
if(busyLoopCounter++ > 500)
Threading::Sleep(1);
}
Atomic::CmpExch32(&m_ThreadRunning, 1, 0);
}
RDResult FileWriter::Flush()
{
// if we have some writes, push these now even with a partial block
if(!m_ProducerOwned.empty() && m_ProducerOwned.back().second > 0)
{
Block b = m_ProducerOwned.back();
m_ProducerOwned.pop_back();
// hold the lock so we can push this incomplete block through
m_Lock.Lock();
m_PendingForConsumer.push_back(b);
m_Lock.Unlock();
// all other blocks should be empty
for(Block &owned : m_ProducerOwned)
RDCASSERTEQUAL(owned.second, 0);
}
// loop as long as the thread is alive. Flushing is rare so we don't mind sleeping here
// if we're unthreaded this loop just won't execute
while(Atomic::CmpExch32(&m_ThreadRunning, 1, 1) > 0)
{
m_Lock.Lock();
// take ownership of any blocks due to us
m_ProducerOwned.insert(0, m_CompletedFromConsumer);
m_CompletedFromConsumer.clear();
m_Lock.Unlock();
// if we own all the blocks again, we're done
if(m_ProducerOwned.size() == NumBlocks)
break;
Threading::Sleep(1);
}
// flush the underlying file
bool success = FileIO::fflush(m_File);
if(!success && m_Error == ResultCode::Succeeded)
{
SET_ERROR_RESULT(m_Error, ResultCode::FileIOFailed, "File flushing failed: %s",
FileIO::ErrorString().c_str());
}
m_Lock.Lock();
RDResult ret = m_Error;
m_Lock.Unlock();
return ret;
}
FileWriter::~FileWriter()
{
if(m_Thread)
{
// ensure we've written everything
Flush();
// ask the thread to stop
Atomic::Inc32(&m_ThreadKill);
Threading::JoinThread(m_Thread);
Threading::CloseThread(m_Thread);
m_Thread = 0;
for(size_t i = 0; i < NumBlocks; i++)
FreeAlignedBuffer(m_AllocBlocks[i].first);
}
if(m_Ownership == Ownership::Stream)
FileIO::fclose(m_File);
}
StreamWriter::StreamWriter(uint64_t initialBufSize)
{
m_BufferBase = m_BufferHead = AllocAlignedBuffer(initialBufSize);
@@ -502,7 +742,7 @@ StreamWriter::StreamWriter(Network::Socket *sock, Ownership own)
m_InMemory = false;
}
StreamWriter::StreamWriter(FILE *file, Ownership own)
StreamWriter::StreamWriter(FileWriter *file, Ownership own)
{
m_BufferBase = m_BufferHead = m_BufferEnd = NULL;
@@ -524,19 +764,24 @@ StreamWriter::StreamWriter(Compressor *compressor, Ownership own)
StreamWriter::~StreamWriter()
{
for(StreamCloseCallback cb : m_Callbacks)
cb();
FreeAlignedBuffer(m_BufferBase);
if(m_Ownership == Ownership::Stream)
{
if(m_File)
FileIO::fclose(m_File);
delete m_File;
if(m_Compressor)
delete m_Compressor;
}
else
{
if(m_File)
m_File->Flush();
}
for(StreamCloseCallback cb : m_Callbacks)
cb();
}
bool StreamWriter::SendSocketData(const void *data, uint64_t numBytes)
@@ -604,7 +849,7 @@ void StreamWriter::HandleError(RDResult result)
if(m_Ownership == Ownership::Stream)
{
if(m_File)
FileIO::fclose(m_File);
delete m_File;
if(m_Sock)
delete m_Sock;
+73 -24
View File
@@ -29,6 +29,7 @@
#include "api/replay/replay_enums.h"
#include "common/common.h"
#include "common/formatting.h"
#include "common/threading.h"
#include "os/os_specific.h"
enum class Ownership
@@ -295,6 +296,58 @@ private:
rdcarray<StreamCloseCallback> m_Callbacks;
};
class FileWriter
{
public:
static FileWriter *MakeDefault(FILE *file, Ownership own);
static FileWriter *MakeThreaded(FILE *file, Ownership own);
~FileWriter();
RDResult Write(const void *data, uint64_t length);
RDResult Flush();
private:
FileWriter(FILE *file, Ownership own) : m_File(file), m_Ownership(own) {}
RDResult WriteThreaded(const void *data, uint64_t length);
RDResult WriteUnthreaded(const void *data, uint64_t length);
void ThreadEntry();
FILE *m_File;
// do we own the file/compressor? are we responsible for
// cleaning it up?
Ownership m_Ownership;
static const uint64_t BlockSize = 4 * 1024 * 1024;
static const uint64_t NumBlocks = 8;
// <base_pointer, byte_offset/size>
using Block = rdcpair<byte *, uint64_t>;
int32_t m_ThreadRunning = 0;
int32_t m_ThreadKill = 0;
Threading::ThreadHandle m_Thread = 0;
// only touched by the producer, set of blocks allocated for easy cleanup. These blocks are in at
// most one of the arrays below
Block m_AllocBlocks[NumBlocks] = {};
// list of blocks the producer owns. The last in this list is the one we're writing to currently
rdcarray<Block> m_ProducerOwned;
// list of blocks the consumer owns. This list is being churned through on the work thread
rdcarray<Block> m_ConsumerOwned;
// the lock protects everything below
Threading::SpinLock m_Lock;
// work to be pushed onto m_ConsumerOwned from the producer
rdcarray<Block> m_PendingForConsumer;
// blocks that can be pulled into m_ProducerOwned by the producer
rdcarray<Block> m_CompletedFromConsumer;
// any error that has appeared
RDResult m_Error;
};
class StreamWriter
{
public:
@@ -305,8 +358,14 @@ public:
StreamWriter(StreamInvalidType, RDResult res);
StreamWriter(uint64_t initialBufSize);
StreamWriter(FILE *file, Ownership own);
StreamWriter(Network::Socket *file, Ownership own);
StreamWriter(FileWriter *file, Ownership own);
// when given a FILE* make a default filewriter and own it ourselves, but the filewriter uses the
// ownership of the FILE that was specified
StreamWriter(FILE *file, Ownership own)
: StreamWriter(FileWriter::MakeDefault(file, own), Ownership::Stream)
{
}
StreamWriter(Network::Socket *sock, Ownership own);
StreamWriter(Compressor *compressor, Ownership own);
bool IsErrored() { return m_Error != ResultCode::Succeeded; }
@@ -385,17 +444,13 @@ public:
}
else if(m_File)
{
uint64_t written = (uint64_t)FileIO::fwrite(data, 1, (size_t)numBytes, m_File);
if(written != numBytes)
{
RDResult result;
SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "Writing to file failed: %s",
FileIO::ErrorString().c_str());
HandleError(result);
return false;
}
RDResult result = m_File->Write(data, numBytes);
return true;
if(result == ResultCode::Succeeded)
return true;
HandleError(result);
return false;
}
else if(m_Sock)
{
@@ -472,14 +527,11 @@ public:
}
else if(m_File)
{
bool success = FileIO::fflush(m_File);
RDResult result = m_File->Flush();
if(success)
if(result == ResultCode::Succeeded)
return true;
RDResult result;
SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "File flushing failed: %s",
FileIO::ErrorString().c_str());
HandleError(result);
return false;
@@ -500,14 +552,11 @@ public:
}
else if(m_File)
{
bool success = FileIO::fflush(m_File);
RDResult result = m_File->Flush();
if(success)
if(result == ResultCode::Succeeded)
return true;
RDResult result;
SET_ERROR_RESULT(result, ResultCode::FileIOFailed, "File flushing failed: %s",
FileIO::ErrorString().c_str());
HandleError(result);
return false;
@@ -567,8 +616,8 @@ private:
// the total size of the file/compressor (ie. how much data flushed through it)
uint64_t m_WriteSize = 0;
// file pointer, if we're writing to a file
FILE *m_File = NULL;
// file writer, if we're writing to a file
FileWriter *m_File = NULL;
// the compressor, if writing to it
Compressor *m_Compressor = NULL;