Stream large buffers directly when reading

* This prevents us resizing our read-window to a large size only to read then
  memcpy into place.
This commit is contained in:
baldurk
2020-03-16 18:35:37 +00:00
parent cc64061172
commit 6e106d4c6c
3 changed files with 243 additions and 10 deletions
+110
View File
@@ -475,6 +475,116 @@ TEST_CASE("Read/write via structured of basic types", "[serialiser]")
delete buf;
};
TEST_CASE("Read/writing large buffers", "[serialiser]")
{
rdcstr filename = FileIO::GetTempFolderFilename() + "/scratch.bin";
bytebuf buffer;
buffer.resize(40 * 1024 * 1024);
for(size_t i = 0; i < buffer.size(); i++)
buffer[i] = byte((rand() & 0xff0) >> 4);
{
WriteSerialiser ser(new StreamWriter(StreamWriter::DefaultScratchSize), Ownership::Stream);
WriteSerialiser fileser(
new StreamWriter(FileIO::fopen(filename.c_str(), "wb"), Ownership::Stream),
Ownership::Stream);
uint32_t dummy1 = 99;
uint32_t dummy2 = 123;
ser.WriteChunk(1);
ser.Serialise("dummy"_lit, dummy1);
ser.EndChunk();
Chunk(ser, 1).Write(fileser);
ser.WriteChunk(2);
ser.Serialise("buffer"_lit, buffer);
ser.EndChunk();
Chunk(ser, 1).Write(fileser);
ser.WriteChunk(3);
ser.Serialise("buffer"_lit, buffer);
ser.EndChunk();
Chunk(ser, 1).Write(fileser);
ser.WriteChunk(4);
ser.Serialise("dummy"_lit, dummy2);
ser.EndChunk();
Chunk(ser, 1).Write(fileser);
}
for(size_t pass = 0; pass < 2; pass++)
{
StreamReader reader(FileIO::fopen(filename.c_str(), "rb"));
ReadSerialiser ser(&reader, Ownership::Nothing);
uint32_t c = 0;
c = ser.ReadChunk<uint32_t>();
CHECK(c == 1);
{
uint32_t dummy = 0;
ser.Serialise("dummy"_lit, dummy);
CHECK(dummy == 99);
}
ser.EndChunk();
CHECK(reader.GetOffset() == 64 * 1);
c = ser.ReadChunk<uint32_t>();
if(pass == 0)
{
CHECK(c == 2);
bytebuf readbuf;
ser.Serialise("buffer"_lit, readbuf);
CHECK((readbuf == buffer));
}
else
{
ser.SkipCurrentChunk();
}
ser.EndChunk();
CHECK(reader.GetOffset() == 40 * 1024 * 1024 + 64 * 2);
c = ser.ReadChunk<uint32_t>();
{
CHECK(c == 3);
bytebuf readbuf;
ser.Serialise("buffer"_lit, readbuf);
CHECK((readbuf == buffer));
}
ser.EndChunk();
CHECK(reader.GetOffset() == 80 * 1024 * 1024 + 64 * 3);
c = ser.ReadChunk<uint32_t>();
CHECK(c == 4);
{
uint32_t dummy = 0;
ser.Serialise("dummy"_lit, dummy);
CHECK(dummy == 123);
}
ser.EndChunk();
CHECK(reader.GetOffset() == 80 * 1024 * 1024 + 64 * 4);
}
FileIO::Delete(filename.c_str());
};
TEST_CASE("Read/write chunk metadata", "[serialiser]")
{
StreamWriter *buf = new StreamWriter(StreamWriter::DefaultScratchSize);
+104 -8
View File
@@ -24,6 +24,7 @@
#include "streamio.h"
#include <errno.h>
#include "api/replay/stringise.h"
#include "common/timing.h"
Compressor::~Compressor()
@@ -118,7 +119,7 @@ StreamReader::StreamReader(FILE *file, uint64_t fileSize, Ownership own)
m_BufferSize = initialBufferSize;
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
ReadFromExternal(0, RDCMIN(m_InputSize, m_BufferSize));
ReadFromExternal(m_BufferBase, RDCMIN(m_InputSize, m_BufferSize));
m_Ownership = own;
}
@@ -145,7 +146,7 @@ StreamReader::StreamReader(FILE *file)
m_BufferSize = initialBufferSize;
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
ReadFromExternal(0, RDCMIN(m_InputSize, m_BufferSize));
ReadFromExternal(m_BufferBase, RDCMIN(m_InputSize, m_BufferSize));
m_Ownership = Ownership::Stream;
}
@@ -170,7 +171,7 @@ StreamReader::StreamReader(Decompressor *decompressor, uint64_t uncompressedSize
m_Ownership = own;
ReadFromExternal(0, RDCMIN(uncompressedSize, m_BufferSize));
ReadFromExternal(m_BufferBase, RDCMIN(uncompressedSize, m_BufferSize));
}
StreamReader::~StreamReader()
@@ -256,7 +257,7 @@ bool StreamReader::Reserve(uint64_t numBytes)
if(m_Sock)
readSize = numBytes - Available();
ret = ReadFromExternal(currentDataSize, readSize);
ret = ReadFromExternal(m_BufferBase + currentDataSize, readSize);
if(oldBuffer != m_BufferBase && m_BufferBase)
FreeAlignedBuffer(oldBuffer);
@@ -264,17 +265,112 @@ bool StreamReader::Reserve(uint64_t numBytes)
return ret;
}
bool StreamReader::ReadFromExternal(uint64_t bufferOffs, uint64_t length)
bool StreamReader::ReadLargeBuffer(void *buffer, uint64_t length)
{
RDCASSERT(m_Sock || m_File || m_Decompressor);
byte *dest = (byte *)buffer;
// first exhaust whatever we have in the current buffer.
{
const uint64_t avail = Available();
// if we don't have 128 bytes left over we shouldn't be in here
RDCASSERT(avail + 128 <= length, avail, length);
// don't actually read if the destination buffer is NULL
if(dest)
{
memcpy(dest, m_BufferHead, avail);
dest += avail;
}
length -= avail;
m_ReadOffset += m_BufferSize;
}
// now read everything but the last 128 bytes directly from the external source
if(length > 128)
{
uint64_t directReadLength = length - 128;
length -= directReadLength;
m_ReadOffset += directReadLength;
if(buffer)
{
bool ret = ReadFromExternal(dest, directReadLength);
dest += directReadLength;
// if we failed, return now
if(!ret)
return ret;
}
else
{
// if we have no buffer to read into, just seek the stream in buffer-sized chunks using the
// existing buffer. Ensure the buffer is big enough to do this at a reasonable rate.
if(m_BufferSize < 1024 * 1024)
{
m_BufferSize = 1024 * 1024;
FreeAlignedBuffer(m_BufferBase);
m_BufferBase = AllocAlignedBuffer(m_BufferSize);
}
while(directReadLength > 0)
{
uint64_t chunkRead = RDCMIN(m_BufferSize, directReadLength);
bool ret = ReadFromExternal(m_BufferBase, chunkRead);
// if we failed, return now
if(!ret)
return ret;
directReadLength -= chunkRead;
}
}
}
// we now have exactly 128 bytes to read, guaranteed by how the function is called.
// we read that into the end of our buffer deliberately so that we can leave the buffer in the
// right state to have a backwards window (though it shouldn't be needed for large serialises like
// this).
if(m_BufferSize < 128)
{
m_BufferSize = 128;
FreeAlignedBuffer(m_BufferBase);
m_BufferBase = AllocAlignedBuffer(m_BufferSize);
}
// set the head to *after* where we're reading, this is where it'll end up after the read.
m_BufferHead = m_BufferBase + m_BufferSize;
// read the 128 bytes
m_ReadOffset += 128;
bool ret = ReadFromExternal(m_BufferHead - 128, 128);
// memcpy it where it's needed
if(dest && ret)
memcpy(dest, m_BufferHead - 128, 128);
// adjust read offset back for the 'fake' buffer we leave behind
m_ReadOffset -= m_BufferSize;
return ret;
}
bool StreamReader::ReadFromExternal(void *buffer, uint64_t length)
{
bool success = true;
if(m_Decompressor)
{
success = m_Decompressor->Read(m_BufferBase + bufferOffs, length);
success = m_Decompressor->Read(buffer, length);
}
else if(m_File)
{
uint64_t numRead = FileIO::fread(m_BufferBase + bufferOffs, 1, (size_t)length, m_File);
uint64_t numRead = FileIO::fread(buffer, 1, (size_t)length, m_File);
success = (numRead == length);
}
else if(m_Sock)
@@ -286,7 +382,7 @@ bool StreamReader::ReadFromExternal(uint64_t bufferOffs, uint64_t length)
else
{
// first get the required data blocking (this will sleep the thread until it comes in).
byte *readDest = m_BufferBase + bufferOffs;
byte *readDest = (byte *)buffer;
success = m_Sock->RecvDataBlocking(readDest, (uint32_t)length);
+29 -2
View File
@@ -158,10 +158,36 @@ public:
// as well as up to 64 bytes *behind* the head if it exists.
if(numBytes > Available())
{
bool success = Reserve(numBytes);
bool success = false;
bool alreadyread = false;
// if we're reading 10MB or more then read directly into the output memory rather than
// resizing up, reading all of that, then memcpy'ing out of our window.
// To simplify the implementation of ReadLargeBuffer if we can *almost* satisfy this with
// what we have without leaving 128 bytes left over, we go through the normal path.
// This does mean that you could do incrementally larger reads and get the window larger
// and larger by just skating over the limit each time, but that's fine because the main
// case we want to catch is a window that's only a few MB and then suddenly we read 100s of
// MB.
if(numBytes >= 10 * 1024 * 1024 && Available() + 128 < numBytes)
{
success = ReadLargeBuffer(data, numBytes);
alreadyread = true;
}
else
{
success = Reserve(numBytes);
}
if(!success)
{
if(data)
memset(data, 0, numBytes);
return false;
}
if(alreadyread)
return true;
}
}
@@ -211,7 +237,8 @@ private:
return m_BufferSize - (m_BufferHead - m_BufferBase);
}
bool Reserve(uint64_t numBytes);
bool ReadFromExternal(uint64_t bufferOffs, uint64_t length);
bool ReadLargeBuffer(void *buffer, uint64_t length);
bool ReadFromExternal(void *buffer, uint64_t length);
// base of the buffer allocation
byte *m_BufferBase;