Add new I/O streaming system, with pluggable compression

* This I/O will form the basis of the new serialiser - it will simply
  read to or write from one of these I/O streams. Then that stream can
  come from a file, go to a memory buffer, or go through a compressor
  or decompressor transparently.
* It also allows a unified way of writing over sockets instead of
  needing special socket helper functions.
* With this commit, the code isn't used aside from in tests.
This commit is contained in:
baldurk
2017-09-27 13:08:29 +01:00
parent 3aa3262089
commit 459c5a856e
11 changed files with 2226 additions and 2 deletions
+8
View File
@@ -124,6 +124,14 @@ set(sources
replay/replay_controller.h
serialise/serialiser.cpp
serialise/serialiser.h
serialise/lz4io.cpp
serialise/lz4io.h
serialise/zstdio.cpp
serialise/zstdio.h
serialise/streamio.cpp
serialise/streamio.h
serialise/comp_io_tests.cpp
serialise/streamio_tests.cpp
strings/grisu2.cpp
strings/string_utils.cpp
strings/string_utils.h
+8
View File
@@ -190,7 +190,10 @@
<ClInclude Include="os\win32\win32_specific.h" />
<ClInclude Include="replay\replay_driver.h" />
<ClInclude Include="replay\replay_controller.h" />
<ClInclude Include="serialise\lz4io.h" />
<ClInclude Include="serialise\serialiser.h" />
<ClInclude Include="serialise\streamio.h" />
<ClInclude Include="serialise\zstdio.h" />
<ClInclude Include="strings\string_utils.h" />
</ItemGroup>
<ItemGroup>
@@ -393,7 +396,12 @@
<ClCompile Include="replay\replay_driver.cpp" />
<ClCompile Include="replay\replay_output.cpp" />
<ClCompile Include="replay\replay_controller.cpp" />
<ClCompile Include="serialise\comp_io_tests.cpp" />
<ClCompile Include="serialise\lz4io.cpp" />
<ClCompile Include="serialise\serialiser.cpp" />
<ClCompile Include="serialise\streamio.cpp" />
<ClCompile Include="serialise\streamio_tests.cpp" />
<ClCompile Include="serialise\zstdio.cpp" />
<ClCompile Include="strings\grisu2.cpp" />
<ClCompile Include="strings\string_utils.cpp" />
<ClCompile Include="strings\utf8printf.cpp" />
+32 -2
View File
@@ -97,6 +97,12 @@
<Filter Include="3rdparty\pugixml">
<UniqueIdentifier>{50c142cb-d8b9-44f7-a68a-ab9015975ea5}</UniqueIdentifier>
</Filter>
<Filter Include="Common\Serialise\Compressors">
<UniqueIdentifier>{3cb4fd0e-7cd9-45a3-99d2-1158f27ec438}</UniqueIdentifier>
</Filter>
<Filter Include="Common\Serialise\Stream I/O">
<UniqueIdentifier>{3b8694d9-cb25-45e0-a1c2-8cc8cd03df3f}</UniqueIdentifier>
</Filter>
</ItemGroup>
<ItemGroup>
<ClInclude Include="maths\camera.h">
@@ -342,12 +348,21 @@
<ClInclude Include="strings\string_utils.h">
<Filter>Common\Strings</Filter>
</ClInclude>
<ClInclude Include="api\replay\structured_data.h">
<Filter>API\Replay</Filter>
<ClInclude Include="serialise\lz4io.h">
<Filter>Common\Serialise\Compressors</Filter>
</ClInclude>
<ClInclude Include="serialise\zstdio.h">
<Filter>Common\Serialise\Compressors</Filter>
</ClInclude>
<ClInclude Include="serialise\streamio.h">
<Filter>Common\Serialise\Stream I/O</Filter>
</ClInclude>
<ClInclude Include="api\replay\stringise.h">
<Filter>API\Replay</Filter>
</ClInclude>
<ClInclude Include="api\replay\structured_data.h">
<Filter>API\Replay</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="maths\camera.cpp">
@@ -587,6 +602,21 @@
<ClCompile Include="strings\utf8printf.cpp">
<Filter>Common\Strings</Filter>
</ClCompile>
<ClCompile Include="serialise\comp_io_tests.cpp">
<Filter>Common\Serialise\Compressors</Filter>
</ClCompile>
<ClCompile Include="serialise\lz4io.cpp">
<Filter>Common\Serialise\Compressors</Filter>
</ClCompile>
<ClCompile Include="serialise\zstdio.cpp">
<Filter>Common\Serialise\Compressors</Filter>
</ClCompile>
<ClCompile Include="serialise\streamio.cpp">
<Filter>Common\Serialise\Stream I/O</Filter>
</ClCompile>
<ClCompile Include="serialise\streamio_tests.cpp">
<Filter>Common\Serialise\Stream I/O</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<None Include="os\win32\comexport.def">
+193
View File
@@ -0,0 +1,193 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#include "lz4io.h"
#include "serialiser.h"
#include "zstdio.h"
#if ENABLED(ENABLE_UNIT_TESTS)
#include "3rdparty/catch/catch.hpp"
TEST_CASE("Test LZ4 compression/decompression", "[streamio][lz4]")
{
StreamWriter buf(StreamWriter::DefaultScratchSize);
byte *randomData = new byte[1024 * 1024];
for(int i = 0; i < 1024 * 1024; i++)
randomData[i] = rand() & 0xff;
// write the data
{
StreamWriter writer(new LZ4Compressor(&buf, Ownership::Nothing), Ownership::Stream);
byte *fixedData = new byte[1024 * 1024];
byte *regularData = new byte[1024 * 1024];
memset(fixedData, 0x7c, 1024 * 1024);
for(int i = 0; i < 1024 * 1024; i++)
regularData[i] = i & 0xff;
writer.Write(fixedData, 1024 * 1024);
writer.Write(randomData, 1024 * 1024);
writer.Write(regularData, 1024 * 1024);
writer.Write(fixedData, 1024 * 1024);
// check that the compression got good wins out of the above data. The random data will be
// pretty much untouched but the rest should compress massively.
CHECK(buf.GetOffset() < 1024 * 1024 + 20 * 1024);
CHECK(writer.GetOffset() == 4 * 1024 * 1024);
CHECK_FALSE(writer.IsErrored());
writer.Finish();
CHECK_FALSE(writer.IsErrored());
delete[] fixedData;
delete[] regularData;
}
// we now only have the compressed data, decompress it
{
StreamReader reader(
new LZ4Decompressor(new StreamReader(buf.GetData(), buf.GetOffset()), Ownership::Stream),
4 * 1024 * 1024, Ownership::Stream);
// recreate this for easy memcmp'ing
byte *fixedData = new byte[1024 * 1024];
byte *regularData = new byte[1024 * 1024];
memset(fixedData, 0x7c, 1024 * 1024);
for(int i = 0; i < 1024 * 1024; i++)
regularData[i] = i & 0xff;
byte *readData = new byte[1024 * 1024];
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024));
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, randomData, 1024 * 1024));
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, regularData, 1024 * 1024));
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024));
CHECK_FALSE(reader.IsErrored());
CHECK(reader.AtEnd());
delete[] readData;
delete[] fixedData;
delete[] regularData;
}
delete[] randomData;
};
TEST_CASE("Test ZSTD compression/decompression", "[streamio][zstd]")
{
StreamWriter buf(StreamWriter::DefaultScratchSize);
byte *randomData = new byte[1024 * 1024];
for(int i = 0; i < 1024 * 1024; i++)
randomData[i] = rand() & 0xff;
// write the data
{
StreamWriter writer(new ZSTDCompressor(&buf, Ownership::Nothing), Ownership::Stream);
byte *fixedData = new byte[1024 * 1024];
byte *regularData = new byte[1024 * 1024];
memset(fixedData, 0x7c, 1024 * 1024);
for(int i = 0; i < 1024 * 1024; i++)
regularData[i] = i & 0xff;
writer.Write(fixedData, 1024 * 1024);
writer.Write(randomData, 1024 * 1024);
writer.Write(regularData, 1024 * 1024);
writer.Write(fixedData, 1024 * 1024);
// check that the compression got good wins out of the above data. The random data will be
// pretty much untouched but the rest should compress massively.
CHECK(buf.GetOffset() < 1024 * 1024 + 4 * 1024);
CHECK(writer.GetOffset() == 4 * 1024 * 1024);
CHECK_FALSE(writer.IsErrored());
writer.Finish();
CHECK_FALSE(writer.IsErrored());
delete[] fixedData;
delete[] regularData;
}
// we now only have the compressed data, decompress it
{
StreamReader reader(
new ZSTDDecompressor(new StreamReader(buf.GetData(), buf.GetOffset()), Ownership::Stream),
4 * 1024 * 1024, Ownership::Stream);
// recreate this for easy memcmp'ing
byte *fixedData = new byte[1024 * 1024];
byte *regularData = new byte[1024 * 1024];
memset(fixedData, 0x7c, 1024 * 1024);
for(int i = 0; i < 1024 * 1024; i++)
regularData[i] = i & 0xff;
byte *readData = new byte[1024 * 1024];
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024));
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, randomData, 1024 * 1024));
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, regularData, 1024 * 1024));
reader.Read(readData, 1024 * 1024);
CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024));
CHECK_FALSE(reader.IsErrored());
CHECK(reader.AtEnd());
delete[] readData;
delete[] fixedData;
delete[] regularData;
}
delete[] randomData;
};
#endif // ENABLED(ENABLE_UNIT_TESTS)
+287
View File
@@ -0,0 +1,287 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#include "lz4io.h"
static const uint64_t lz4BlockSize = 64 * 1024;
LZ4Compressor::LZ4Compressor(StreamWriter *write, Ownership own) : Compressor(write, own)
{
m_Page[0] = AllocAlignedBuffer(lz4BlockSize);
m_Page[1] = AllocAlignedBuffer(lz4BlockSize);
m_CompressBuffer = AllocAlignedBuffer(LZ4_COMPRESSBOUND(lz4BlockSize));
m_PageOffset = 0;
LZ4_resetStream(&m_LZ4Comp);
}
LZ4Compressor::~LZ4Compressor()
{
FreeAlignedBuffer(m_Page[0]);
FreeAlignedBuffer(m_Page[1]);
FreeAlignedBuffer(m_CompressBuffer);
}
bool LZ4Compressor::Write(const void *data, uint64_t numBytes)
{
// if we encountered a stream error this will be NULL
if(!m_CompressBuffer)
return false;
if(numBytes == 0)
return true;
// The basic plan is:
// Write into page N incrementally until it is completely full. When full, flush it out to lz4 and
// swap pages.
// This keeps lz4 happy with 64kb of history each time it compresses.
// If we are writing some data the crosses the boundary between pages, we write the part that will
// fit on one page, flush & swap, write the rest into the next page.
if(m_PageOffset + numBytes <= lz4BlockSize)
{
// simplest path, no page wrapping/spanning at all
memcpy(m_Page[0] + m_PageOffset, data, (size_t)numBytes);
m_PageOffset += numBytes;
return true;
}
else
{
// do partial copies that span pages and flush as necessary
const byte *src = (const byte *)data;
// copy whatever will fit on this page
{
uint64_t firstBytes = lz4BlockSize - m_PageOffset;
memcpy(m_Page[0] + m_PageOffset, src, (size_t)firstBytes);
m_PageOffset += firstBytes;
numBytes -= firstBytes;
src += firstBytes;
}
bool success = true;
while(success && numBytes > 0)
{
// flush and swap pages
success &= FlushPage0();
if(!success)
return success;
// how many bytes can we copy in this page?
uint64_t partialBytes = RDCMIN(lz4BlockSize, numBytes);
memcpy(m_Page[0], src, (size_t)partialBytes);
// advance the source pointer, dest offset, and remove the bytes we read
m_PageOffset += partialBytes;
numBytes -= partialBytes;
src += partialBytes;
}
return success;
}
}
bool LZ4Compressor::Finish()
{
// This function just writes the current page and closes lz4. Since we assume all blocks are
// precisely 64kb in size
// only the last one can be smaller, so we only write a partial page when finishing.
// Calling Write() after Finish() is illegal
return FlushPage0();
}
bool LZ4Compressor::FlushPage0()
{
// if we encountered a stream error this will be NULL
if(!m_CompressBuffer)
return false;
// m_PageOffset is the amount written, usually equal to lz4BlockSize except the last block.
int32_t compSize =
LZ4_compress_fast_continue(&m_LZ4Comp, (const char *)m_Page[0], (char *)m_CompressBuffer,
(int)m_PageOffset, (int)LZ4_COMPRESSBOUND(lz4BlockSize), 1);
if(compSize < 0)
{
RDCERR("Error compressing: %i", compSize);
FreeAlignedBuffer(m_Page[0]);
FreeAlignedBuffer(m_Page[1]);
FreeAlignedBuffer(m_CompressBuffer);
m_Page[0] = m_Page[1] = m_CompressBuffer = NULL;
return false;
}
bool success = true;
success &= m_Write->Write(compSize);
success &= m_Write->Write(m_CompressBuffer, compSize);
// swap pages
std::swap(m_Page[0], m_Page[1]);
// start writing to the start of the page again
m_PageOffset = 0;
return success;
}
LZ4Decompressor::LZ4Decompressor(StreamReader *read, Ownership own) : Decompressor(read, own)
{
m_Page[0] = AllocAlignedBuffer(lz4BlockSize);
m_Page[1] = AllocAlignedBuffer(lz4BlockSize);
m_CompressBuffer = AllocAlignedBuffer(LZ4_COMPRESSBOUND(lz4BlockSize));
m_PageOffset = 0;
m_PageLength = 0;
LZ4_setStreamDecode(&m_LZ4Decomp, NULL, 0);
}
LZ4Decompressor::~LZ4Decompressor()
{
FreeAlignedBuffer(m_Page[0]);
FreeAlignedBuffer(m_Page[1]);
FreeAlignedBuffer(m_CompressBuffer);
}
bool LZ4Decompressor::Recompress(Compressor *comp)
{
bool success = true;
while(success && !m_Read->AtEnd())
{
success &= FillPage0();
if(success)
success &= comp->Write(m_Page[0], m_PageLength);
}
success &= comp->Finish();
return success;
}
bool LZ4Decompressor::Read(void *data, uint64_t numBytes)
{
// if we encountered a stream error this will be NULL
if(!m_CompressBuffer)
return false;
if(numBytes == 0)
return true;
// At any point, m_Page[0] contains the current window with uncompressed bytes.
// If we can satisfy a read from it, then we just memcpy and increment m_PageOffset.
// When we wrap around, we do a partial memcpy from m_Page[0], then swap the pages and
// decompress some more bytes into m_Page[0]. Thus, m_Page[1] contains the history (if
// it exists)
// if we already have all the data in-memory, just copy and return
uint64_t available = m_PageLength - m_PageOffset;
if(numBytes <= available)
{
memcpy(data, m_Page[0] + m_PageOffset, (size_t)numBytes);
m_PageOffset += numBytes;
return true;
}
byte *dst = (byte *)data;
// copy what remains in m_Page[0]
memcpy(dst, m_Page[0] + m_PageOffset, (size_t)available);
// adjust what needs to be copied
dst += available;
numBytes -= available;
bool success = true;
while(success && numBytes > 0)
{
success &= FillPage0();
if(!success)
return success;
// if we can now satisfy the remainder of the read, do so and return
if(numBytes <= m_PageLength)
{
memcpy(dst, m_Page[0], (size_t)numBytes);
m_PageOffset += numBytes;
return success;
}
// otherwise copy this page in and continue
memcpy(dst, m_Page[0], (size_t)m_PageLength);
dst += m_PageLength;
numBytes -= m_PageLength;
}
return success;
}
bool LZ4Decompressor::FillPage0()
{
// swap pages
std::swap(m_Page[0], m_Page[1]);
int32_t compSize = 0;
bool success = true;
success &= m_Read->Read(compSize);
success &= m_Read->Read(m_CompressBuffer, compSize);
if(!success)
{
FreeAlignedBuffer(m_Page[0]);
FreeAlignedBuffer(m_Page[1]);
FreeAlignedBuffer(m_CompressBuffer);
m_Page[0] = m_Page[1] = m_CompressBuffer = NULL;
return false;
}
int32_t decompSize = LZ4_decompress_safe_continue(&m_LZ4Decomp, (const char *)m_CompressBuffer,
(char *)m_Page[0], compSize, lz4BlockSize);
if(decompSize < 0)
{
RDCERR("Error decompressing: %i", decompSize);
FreeAlignedBuffer(m_Page[0]);
FreeAlignedBuffer(m_Page[1]);
FreeAlignedBuffer(m_CompressBuffer);
m_Page[0] = m_Page[1] = m_CompressBuffer = NULL;
return false;
}
m_PageOffset = 0;
m_PageLength = decompSize;
return success;
}
+67
View File
@@ -0,0 +1,67 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#pragma once
#include "lz4/lz4.h"
#include "streamio.h"
class LZ4Compressor : public Compressor
{
public:
LZ4Compressor(StreamWriter *write, Ownership own);
~LZ4Compressor();
bool Write(const void *data, uint64_t numBytes);
bool Finish();
private:
bool FlushPage0();
byte *m_Page[2];
byte *m_CompressBuffer;
uint64_t m_PageOffset;
LZ4_stream_t m_LZ4Comp;
};
class LZ4Decompressor : public Decompressor
{
public:
LZ4Decompressor(StreamReader *read, Ownership own);
~LZ4Decompressor();
bool Recompress(Compressor *comp);
bool Read(void *data, uint64_t numBytes);
private:
bool FillPage0();
byte *m_Page[2];
byte *m_CompressBuffer;
uint64_t m_PageOffset;
uint64_t m_PageLength;
LZ4_streamDecode_t m_LZ4Decomp;
};
+452
View File
@@ -0,0 +1,452 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#include "streamio.h"
Compressor::~Compressor()
{
if(m_Ownership == Ownership::Stream && m_Write)
delete m_Write;
}
Decompressor::~Decompressor()
{
if(m_Ownership == Ownership::Stream && m_Read)
delete m_Read;
}
static const uint64_t initialBufferSize = 64 * 1024;
const byte StreamWriter::empty[128] = {};
StreamReader::StreamReader(const byte *buffer, uint64_t bufferSize)
{
m_InputSize = m_BufferSize = bufferSize;
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
memcpy(m_BufferHead, buffer, (size_t)m_BufferSize);
m_Ownership = Ownership::Nothing;
}
StreamReader::StreamReader(const std::vector<byte> &buffer)
{
m_InputSize = m_BufferSize = buffer.size();
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
memcpy(m_BufferHead, buffer.data(), (size_t)m_BufferSize);
m_Ownership = Ownership::Nothing;
}
StreamReader::StreamReader(StreamInvalidType)
{
m_InputSize = 0;
m_BufferSize = 0;
m_BufferHead = m_BufferBase = NULL;
m_Ownership = Ownership::Nothing;
}
StreamReader::StreamReader(Network::Socket *sock, Ownership own)
{
m_Sock = sock;
m_InputSize = m_BufferSize = initialBufferSize;
m_BufferBase = AllocAlignedBuffer(m_BufferSize);
// place head at the *end* of the buffer, because for sockets we pretend the buffer is constantly
// exhausted, and just do a read of the minimum number of bytes each time to satisfy each read
m_BufferHead = m_BufferBase + m_BufferSize;
m_Ownership = own;
}
StreamReader::StreamReader(FILE *file, uint64_t fileSize, Ownership own)
{
if(file == NULL)
{
m_InputSize = 0;
m_BufferSize = 0;
m_BufferHead = m_BufferBase = NULL;
m_Ownership = Ownership::Nothing;
return;
}
m_File = file;
m_InputSize = fileSize;
m_BufferSize = initialBufferSize;
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
ReadFromExternal(0, RDCMIN(m_InputSize, m_BufferSize));
m_Ownership = own;
}
StreamReader::StreamReader(FILE *file)
{
if(file == NULL)
{
m_InputSize = 0;
m_BufferSize = 0;
m_BufferHead = m_BufferBase = NULL;
m_Ownership = Ownership::Nothing;
return;
}
FileIO::fseek64(file, 0, SEEK_END);
m_InputSize = FileIO::ftell64(file);
FileIO::fseek64(file, 0, SEEK_SET);
m_File = file;
m_BufferSize = initialBufferSize;
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
ReadFromExternal(0, RDCMIN(m_InputSize, m_BufferSize));
m_Ownership = Ownership::Stream;
}
StreamReader::StreamReader(StreamReader *reader, uint64_t bufferSize)
{
m_InputSize = m_BufferSize = bufferSize;
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
reader->Read(m_BufferBase, bufferSize);
m_Ownership = Ownership::Nothing;
}
StreamReader::StreamReader(Decompressor *decompressor, uint64_t uncompressedSize, Ownership own)
{
m_Decompressor = decompressor;
m_InputSize = uncompressedSize;
m_BufferSize = initialBufferSize;
m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize);
m_Ownership = own;
ReadFromExternal(0, RDCMIN(uncompressedSize, m_BufferSize));
}
StreamReader::~StreamReader()
{
for(StreamCloseCallback cb : m_Callbacks)
cb();
FreeAlignedBuffer(m_BufferBase);
if(m_Ownership == Ownership::Stream)
{
if(m_File)
FileIO::fclose(m_File);
if(m_Decompressor)
delete m_Decompressor;
}
}
void StreamReader::SetOffset(uint64_t offs)
{
if(m_File || m_Decompressor)
{
RDCERR("File and decompress stream readers do not support seeking");
return;
}
m_BufferHead = m_BufferBase + offs;
}
bool StreamReader::Reserve(uint64_t numBytes)
{
if(m_Sock)
{
// if we're reading more bytes than our buffer size, resize up
if(numBytes > m_BufferSize)
{
FreeAlignedBuffer(m_BufferBase);
m_InputSize = m_BufferSize = AlignUp<uint64_t>(numBytes, 256ULL);
m_BufferBase = AllocAlignedBuffer(m_BufferSize);
m_BufferHead = m_BufferBase + m_BufferSize;
}
m_ReadOffset += numBytes;
// read into the end of the buffer, so that the subsequent read re-exhausts the buffer
m_BufferHead = m_BufferBase + m_BufferSize - numBytes;
return ReadFromExternal(m_BufferSize - numBytes, numBytes);
}
RDCASSERT(m_File || m_Decompressor);
// store old buffer and the read data, so we can move it into the new buffer
byte *oldBuffer = m_BufferBase;
// always keep at least a certain window behind what we read.
uint64_t backwardsWindow = RDCMIN<uint64_t>(64ULL, m_BufferHead - m_BufferBase);
byte *currentData = m_BufferHead - backwardsWindow;
uint64_t currentDataSize = m_BufferSize - (m_BufferHead - m_BufferBase) + backwardsWindow;
uint64_t BufferOffset = m_BufferHead - m_BufferBase;
// if we are reading more than our current buffer size, expand the buffer size
if(numBytes + backwardsWindow > m_BufferSize)
{
// very conservative resizing - don't do "double and add" - to avoid
// a 1GB buffer being read and needing to allocate 2GB. The cost is we
// will reallocate a bit more often
m_BufferSize = numBytes + backwardsWindow;
m_BufferBase = AllocAlignedBuffer(m_BufferSize);
}
// move the unread data into the buffer
memmove(m_BufferBase, currentData, (size_t)currentDataSize);
if(BufferOffset > backwardsWindow)
{
m_ReadOffset += BufferOffset - backwardsWindow;
m_BufferHead = m_BufferBase + backwardsWindow;
}
else
{
m_BufferHead = m_BufferBase + BufferOffset;
}
// if there's anything left of the file to read in, do so now
bool ret = false;
ret = ReadFromExternal(currentDataSize, RDCMIN(m_BufferSize - currentDataSize,
m_InputSize - m_ReadOffset - currentDataSize));
if(oldBuffer != m_BufferBase && m_BufferBase)
FreeAlignedBuffer(oldBuffer);
return ret;
}
bool StreamReader::ReadFromExternal(uint64_t bufferOffs, uint64_t length)
{
bool success = true;
if(m_Decompressor)
{
success = m_Decompressor->Read(m_BufferBase + bufferOffs, length);
}
else if(m_File)
{
uint64_t numRead = FileIO::fread(m_BufferBase + bufferOffs, 1, (size_t)length, m_File);
success = (numRead == length);
}
else if(m_Sock)
{
if(!m_Sock->Connected())
{
success = false;
}
else
{
success = m_Sock->RecvDataBlocking(m_BufferBase + bufferOffs, (uint32_t)length);
}
}
else
{
// we're in an error-state, nothing to read from
return false;
}
if(!success)
{
if(m_File)
RDCERR("Error reading from file, errno %d", errno);
else if(m_Sock)
RDCWARN("Error reading from socket");
m_HasError = true;
// move to error state
FreeAlignedBuffer(m_BufferBase);
if(m_Ownership == Ownership::Stream)
{
if(m_File)
FileIO::fclose(m_File);
if(m_Sock)
delete m_Sock;
if(m_Decompressor)
delete m_Decompressor;
}
m_File = NULL;
m_Sock = NULL;
m_Decompressor = NULL;
m_ReadOffset = 0;
m_InputSize = 0;
m_BufferSize = 0;
m_BufferHead = m_BufferBase = NULL;
m_Ownership = Ownership::Nothing;
}
return success;
}
StreamWriter::StreamWriter(uint64_t initialBufSize)
{
m_BufferBase = m_BufferHead = AllocAlignedBuffer(initialBufSize);
m_BufferEnd = m_BufferBase + initialBufSize;
m_Ownership = Ownership::Nothing;
}
StreamWriter::StreamWriter(StreamInvalidType)
{
m_BufferBase = m_BufferHead = m_BufferEnd = NULL;
m_Ownership = Ownership::Nothing;
m_InMemory = false;
}
StreamWriter::StreamWriter(Network::Socket *sock, Ownership own)
{
m_BufferBase = m_BufferHead = m_BufferEnd = NULL;
m_Sock = sock;
m_Ownership = own;
m_InMemory = false;
}
StreamWriter::StreamWriter(FILE *file, Ownership own)
{
m_BufferBase = m_BufferHead = m_BufferEnd = NULL;
m_File = file;
m_Ownership = own;
m_InMemory = false;
}
StreamWriter::StreamWriter(Compressor *compressor, Ownership own)
{
m_BufferBase = m_BufferHead = m_BufferEnd = NULL;
m_Compressor = compressor;
m_Ownership = own;
m_InMemory = false;
}
StreamWriter::~StreamWriter()
{
for(StreamCloseCallback cb : m_Callbacks)
cb();
FreeAlignedBuffer(m_BufferBase);
if(m_Ownership == Ownership::Stream)
{
if(m_File)
FileIO::fclose(m_File);
if(m_Compressor)
delete m_Compressor;
}
}
void StreamWriter::HandleError()
{
if(m_File)
RDCERR("Error writing to file, errno %d", errno);
else if(m_Sock)
RDCWARN("Error writing to socket");
m_HasError = true;
FreeAlignedBuffer(m_BufferBase);
if(m_Ownership == Ownership::Stream)
{
if(m_File)
FileIO::fclose(m_File);
if(m_Sock)
delete m_Sock;
if(m_Compressor)
delete m_Compressor;
}
m_BufferBase = m_BufferHead = m_BufferEnd = NULL;
m_WriteSize = 0;
m_File = NULL;
m_Sock = NULL;
m_Compressor = NULL;
m_Ownership = Ownership::Nothing;
m_InMemory = false;
}
void StreamTransfer(StreamWriter *writer, StreamReader *reader, float *progress)
{
uint64_t totalSize = reader->GetSize();
// copy 1MB at a time
const uint64_t StreamIOChunkSize = 1024 * 1024;
const uint64_t bufSize = RDCMIN(StreamIOChunkSize, totalSize);
uint64_t numBufs = totalSize / bufSize;
// last remaining partial buffer
if(totalSize % (uint64_t)bufSize > 0)
numBufs++;
byte *buf = new byte[(size_t)bufSize];
if(progress)
*progress = 0.0001f;
for(uint64_t i = 0; i < numBufs; i++)
{
uint64_t payloadLength = RDCMIN(bufSize, totalSize);
reader->Read(buf, payloadLength);
writer->Write(buf, payloadLength);
totalSize -= payloadLength;
if(progress)
*progress = float(i + 1) / float(numBufs);
}
delete[] buf;
}
+459
View File
@@ -0,0 +1,459 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#pragma once
#include <stdio.h>
#include <functional>
#include <vector>
#include "common/common.h"
enum class Ownership
{
Nothing,
Stream,
};
class StreamWriter;
class StreamReader;
typedef std::function<void()> StreamCloseCallback;
class Compressor
{
public:
Compressor(StreamWriter *write, Ownership own) : m_Write(write), m_Ownership(own) {}
virtual ~Compressor();
virtual bool Write(const void *data, uint64_t numBytes) = 0;
virtual bool Finish() = 0;
protected:
StreamWriter *m_Write;
Ownership m_Ownership;
};
class Decompressor
{
public:
Decompressor(StreamReader *read, Ownership own) : m_Read(read), m_Ownership(own) {}
virtual ~Decompressor();
virtual bool Recompress(Compressor *comp) = 0;
virtual bool Read(void *data, uint64_t numBytes) = 0;
protected:
StreamReader *m_Read;
Ownership m_Ownership;
};
class StreamReader
{
public:
enum StreamInvalidType
{
InvalidStream
};
StreamReader(StreamInvalidType);
StreamReader(const byte *buffer, uint64_t bufferSize);
StreamReader(const std::vector<byte> &buffer);
StreamReader(Network::Socket *sock, Ownership own);
StreamReader(FILE *file, uint64_t fileSize, Ownership own);
StreamReader(FILE *file);
StreamReader(StreamReader *reader, uint64_t bufferSize);
StreamReader(Decompressor *decompressor, uint64_t uncompressedSize, Ownership own);
~StreamReader();
bool IsErrored() { return m_HasError; }
void SetOffset(uint64_t offs);
inline uint64_t GetOffset() { return m_BufferHead - m_BufferBase + m_ReadOffset; }
inline uint64_t GetSize() { return m_InputSize; }
inline bool AtEnd() { return GetOffset() >= GetSize(); }
template <uint64_t alignment>
bool AlignTo()
{
uint64_t offs = GetOffset();
uint64_t alignedOffs = AlignUp(offs, alignment);
uint64_t bytesToAlign = alignedOffs - offs;
if(bytesToAlign > 0)
return Read(NULL, bytesToAlign);
return true;
}
bool Read(void *data, uint64_t numBytes)
{
if(numBytes == 0)
return true;
if(!m_BufferBase)
{
// read 0s if we're in an error state
if(data)
memset(data, 0, (size_t)numBytes);
return false;
}
// if we're reading past the end, error, read nothing (no partial reads) and return
if(m_Sock == NULL && GetOffset() + numBytes > GetSize())
{
RDCERR("Reading off the end of the stream");
m_BufferHead = m_BufferBase + m_BufferSize;
if(data)
memset(data, 0, (size_t)numBytes);
m_HasError = true;
return false;
}
// if we're reading from an external source, reserve enough bytes to do the read
if(m_File || m_Sock || m_Decompressor)
{
// This preserves everything from min(m_BufferBase, m_BufferHead - 64) -> end of buffer
// which will still be in place relative to m_BufferHead.
// In other words - reservation will keep the aleady-read data that's after the head pointer,
// as well as up to 64 bytes *behind* the head if it exists.
if(numBytes > Available())
{
bool success = Reserve(numBytes);
if(!success)
return false;
}
}
// perform the actual copy
if(data)
memcpy(data, m_BufferHead, (size_t)numBytes);
// advance head
m_BufferHead += numBytes;
return true;
}
bool SkipBytes(uint64_t numBytes) { return Read(NULL, numBytes); }
// compile-time constant element to let the compiler inline the memcpy
template <typename T>
bool Read(T &data)
{
return Read(&data, sizeof(T));
}
void AddCloseCallback(StreamCloseCallback callback) { m_Callbacks.push_back(callback); }
private:
inline uint64_t Available()
{
if(m_Sock)
return 0;
return m_BufferSize - (m_BufferHead - m_BufferBase);
}
bool Reserve(uint64_t numBytes);
bool ReadFromExternal(uint64_t bufferOffs, uint64_t length);
// base of the buffer allocation
byte *m_BufferBase;
// where we are currently reading from in the buffer
byte *m_BufferHead;
// the size of the buffer (just a window if reading from external source)
uint64_t m_BufferSize;
// the total size of the total input. This is how many bytes you can read, regardless
// of how many bytes might actually be stored on the other side of the source (i.e.
// this is the uncompressed output size)
uint64_t m_InputSize;
// file pointer, if we're reading from a file
FILE *m_File = NULL;
// socket, if we're reading from a socket
Network::Socket *m_Sock = NULL;
// the decompressor, if reading from it
Decompressor *m_Decompressor = NULL;
// the offset in the file/decompressor that corresponds to the start of m_BufferBase
uint64_t m_ReadOffset = 0;
// flag indicating if an error has been encountered and the stream is now invalid
bool m_HasError = false;
// do we own the file/compressor? are we responsible for
// cleaning it up?
Ownership m_Ownership;
// callbacks that will be invoked when this stream is being destroyed
std::vector<StreamCloseCallback> m_Callbacks;
};
class StreamWriter
{
public:
enum StreamInvalidType
{
InvalidStream
};
StreamWriter(StreamInvalidType);
StreamWriter(uint64_t initialBufSize);
StreamWriter(FILE *file, Ownership own);
StreamWriter(Network::Socket *file, Ownership own);
StreamWriter(Compressor *compressor, Ownership own);
bool IsErrored() { return m_HasError; }
static const int DefaultScratchSize = 32 * 1024;
~StreamWriter();
void Rewind()
{
if(m_InMemory)
{
m_BufferHead = m_BufferBase;
m_WriteSize = 0;
return;
}
RDCERR("Can't rewind a file/compressor stream writer");
}
uint64_t GetOffset() { return m_WriteSize; }
const byte *GetData() { return m_BufferBase; }
template <uint64_t alignment>
bool AlignTo()
{
uint64_t offs;
if(m_InMemory)
offs = m_BufferHead - m_BufferBase;
else
offs = GetOffset();
uint64_t alignedOffs = AlignUp(offs, alignment);
uint64_t bytesToAlign = alignedOffs - offs;
RDCCOMPILE_ASSERT(alignment <= sizeof(empty),
"Empty array is not large enough - increase size to support alignment");
if(bytesToAlign > 0)
return Write(empty, bytesToAlign);
return true;
}
bool Write(const void *data, uint64_t numBytes)
{
if(numBytes == 0)
return true;
m_WriteSize += numBytes;
if(m_InMemory)
{
// in-memory path
// are we about to write outside the buffer? Resize it larger
if(m_BufferHead + numBytes >= m_BufferEnd)
EnsureSized(numBytes);
// perform the actual copy
memcpy(m_BufferHead, data, (size_t)numBytes);
// advance head
m_BufferHead += numBytes;
return true;
}
else if(m_Compressor)
{
return m_Compressor->Write(data, numBytes);
}
else if(m_File)
{
uint64_t written = (uint64_t)FileIO::fwrite(data, 1, (size_t)numBytes, m_File);
if(written != numBytes)
{
HandleError();
return false;
}
return true;
}
else if(m_Sock)
{
bool success = m_Sock->SendDataBlocking(data, (uint32_t)numBytes);
if(!success)
{
HandleError();
return false;
}
return true;
}
else
{
// we're in an error-state, nothing to write to
return false;
}
}
// compile-time constant amount of data to let the compiler inline the memcpy
template <typename T>
bool Write(const T &data)
{
const uint64_t numBytes = sizeof(T);
if(m_InMemory)
{
// we duplicate the implementation here instead of calling the Write(void *, size_t)
// overload above since then the compiler may not be able to optimise out the memcpy
m_WriteSize += numBytes;
// are we about to write outside the buffer? Resize it larger
if(m_BufferHead + numBytes >= m_BufferEnd)
EnsureSized(numBytes);
// perform the actual copy
memcpy(m_BufferHead, &data, (size_t)numBytes);
// advance head
m_BufferHead += numBytes;
return true;
}
else
{
return Write(&data, numBytes);
}
}
// write a particular value at an offset (not necessarily just append).
template <typename T>
bool WriteAt(uint64_t offs, const T &data)
{
if(!m_File && !m_Sock && !m_Compressor)
{
RDCASSERT(ptrdiff_t(offs + sizeof(data)) <= m_BufferHead - m_BufferBase);
byte *oldHead = m_BufferHead;
uint64_t oldWriteSize = m_WriteSize;
m_BufferHead = m_BufferBase + offs;
bool ret = Write(data);
m_WriteSize = oldWriteSize;
m_BufferHead = oldHead;
return ret;
}
RDCERR("Can't seek a file/socket/compressor stream writer");
return false;
}
bool Finish()
{
if(m_Compressor)
return m_Compressor->Finish();
else if(m_File)
return fflush(m_File) == 0;
else if(m_Sock)
return true;
return true;
}
void AddCloseCallback(StreamCloseCallback callback) { m_Callbacks.push_back(callback); }
private:
inline void EnsureSized(const uint64_t numBytes)
{
uint64_t bufferSize = m_BufferEnd - m_BufferBase;
const uint64_t newSize = (m_BufferHead - m_BufferBase) + numBytes;
if(bufferSize < newSize)
{
// reallocate to a conservative size, don't 'double and allocate'
while(bufferSize < newSize)
bufferSize += 128 * 1024;
byte *newBuf = AllocAlignedBuffer(bufferSize);
uint64_t curUsed = m_BufferHead - m_BufferBase;
memcpy(newBuf, m_BufferBase, (size_t)curUsed);
FreeAlignedBuffer(m_BufferBase);
m_BufferBase = newBuf;
m_BufferHead = newBuf + curUsed;
m_BufferEnd = m_BufferBase + bufferSize;
}
}
void HandleError();
// used for aligned writes
static const byte empty[128];
// base of the buffer allocation if we're writing to a buffer
byte *m_BufferBase;
// where we are currently writing to in the buffer
byte *m_BufferHead;
// the end of the buffer
byte *m_BufferEnd;
// the total size of the file/compressor (ie. how much data flushed through it)
uint64_t m_WriteSize = 0;
// file pointer, if we're writing to a file
FILE *m_File = NULL;
// the compressor, if writing to it
Compressor *m_Compressor = NULL;
// the socket, if writing to it
Network::Socket *m_Sock = NULL;
// true if we're not writing to file/compressor, used to optimise checks in Write
bool m_InMemory = true;
// flag indicating if an error has been encountered and the stream is now invalid
bool m_HasError = false;
// do we own the file/compressor? are we responsible for
// cleaning it up?
Ownership m_Ownership;
// callbacks that will be invoked when this stream is being destroyed
std::vector<StreamCloseCallback> m_Callbacks;
};
void StreamTransfer(StreamWriter *writer, StreamReader *reader, float *progress);
+306
View File
@@ -0,0 +1,306 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#include "streamio.h"
#include "common/timing.h"
#if ENABLED(ENABLE_UNIT_TESTS)
#include "3rdparty/catch/catch.hpp"
TEST_CASE("Test basic stream I/O operations", "[streamio]")
{
StreamWriter writer(1024);
CHECK(writer.GetOffset() == 0);
CHECK_FALSE(writer.IsErrored());
writer.Write<uint32_t>(5);
CHECK(writer.GetOffset() == 4);
CHECK_FALSE(writer.IsErrored());
writer.Write<uint32_t>(6);
CHECK(writer.GetOffset() == 8);
CHECK_FALSE(writer.IsErrored());
writer.Write<uint16_t>(7);
CHECK(writer.GetOffset() == 10);
CHECK_FALSE(writer.IsErrored());
writer.AlignTo<16>();
CHECK(writer.GetOffset() == 16);
CHECK_FALSE(writer.IsErrored());
struct
{
uint32_t a = 5;
uint32_t b = 6;
uint16_t c = 7;
byte padding[6] = {0, 0, 0, 0, 0, 0};
} compare;
RDCCOMPILE_ASSERT(sizeof(compare) == 16, "Compare struct is the wrong size");
CHECK_FALSE(memcmp(writer.GetData(), &compare, sizeof(compare)));
StreamReader reader((const byte *)&compare, sizeof(compare));
uint32_t test;
reader.Read(test);
CHECK(test == 5);
reader.Read(test);
CHECK(test == 6);
uint16_t test2;
reader.Read(test2);
CHECK(test2 == 7);
CHECK_FALSE(reader.IsErrored());
reader.AlignTo<16>();
CHECK_FALSE(reader.IsErrored());
CHECK(reader.AtEnd());
// reading off the end should read 0s and move to error state
reader.Read(test);
CHECK(test == 0);
CHECK(reader.IsErrored());
};
TEST_CASE("Test stream I/O operations over the network", "[streamio][network]")
{
uint16_t port = 8235;
Network::Socket *server = NULL;
for(uint16_t probe = 0; probe < 20; probe++)
{
server = Network::CreateServerSocket("localhost", port, 2);
if(server)
break;
port++;
}
REQUIRE(server);
Network::Socket *sender = Network::CreateClientSocket("localhost", port, 10);
REQUIRE(sender);
Network::Socket *receiver = server->AcceptClient(false);
REQUIRE(receiver);
SECTION("Send/receive single int")
{
StreamWriter writer(sender, Ownership::Nothing);
StreamReader reader(receiver, Ownership::Nothing);
REQUIRE_FALSE(writer.IsErrored());
REQUIRE_FALSE(reader.IsErrored());
// we have to do the send/receive on threads since it is blocking
uint32_t receivedValue = 0;
Threading::ThreadHandle recvThread =
Threading::CreateThread([&reader, &receivedValue]() { reader.Read(receivedValue); });
Threading::ThreadHandle sendThread = Threading::CreateThread([&writer]() {
uint32_t pi = 3141592;
writer.Write(pi);
});
Threading::Sleep(50);
// REQUIRE that the value has propagated here. If not then something has gone wrong and we're
// not getting forward progress, so we don't want to try to join the threads
//
// If this fails while debugging it's because the sleep above wasn't long enough to cover the
// stepping process. We don't wait on threads to prevent holding up the whole process if there's
// some deadlock between them.
REQUIRE(receivedValue == 3141592);
Threading::JoinThread(sendThread);
Threading::CloseThread(sendThread);
Threading::JoinThread(recvThread);
Threading::CloseThread(recvThread);
CHECK_FALSE(writer.IsErrored());
CHECK_FALSE(reader.IsErrored());
};
SECTION("Send/receive multiple values")
{
StreamWriter writer(sender, Ownership::Nothing);
StreamReader reader(receiver, Ownership::Nothing);
REQUIRE_FALSE(writer.IsErrored());
REQUIRE_FALSE(reader.IsErrored());
// we have to do the send/receive on threads since it is blocking
std::vector<uint64_t> receivedValues;
std::vector<uint64_t> list = {1, 1, 2, 3, 5, 8, 13, 21, 34,
55, 89, 144, 233, 377, 610, 987, 1597};
// Tracks the lifetime of each thread.
volatile int32_t threadA = 0, threadB = 0;
Threading::ThreadHandle recvThread =
Threading::CreateThread([&threadA, &reader, &receivedValues]() {
int32_t sz = 0;
reader.Read(sz);
receivedValues.resize(sz);
for(int32_t i = 0; i < sz; i++)
reader.Read(receivedValues[i]);
Atomic::Inc32(&threadA);
});
Threading::ThreadHandle sendThread = Threading::CreateThread([&threadB, &writer, list]() {
int32_t sz = (int32_t)list.size();
writer.Write(sz);
for(int32_t i = 0; i < sz; i++)
writer.Write(list[i]);
Atomic::Inc32(&threadB);
});
// wait up to 2 seconds for the threads to exit
for(int i = 0; i < 2000 / 50; i++)
{
Threading::Sleep(50);
if(threadA && threadB)
break;
}
REQUIRE(threadA);
REQUIRE(threadB);
REQUIRE(receivedValues.size() == 17);
CHECK(receivedValues == list);
CHECK(writer.GetOffset() == reader.GetOffset());
CHECK(writer.GetOffset() > 128);
Threading::JoinThread(sendThread);
Threading::CloseThread(sendThread);
Threading::JoinThread(recvThread);
Threading::CloseThread(recvThread);
threadA = 0;
threadB = 0;
receivedValues.clear();
uint64_t vals[10] = {1, 6, 0, 5, 3, 8, 7, 9, 2, 4};
sendThread = Threading::CreateThread([&threadA, sender, &writer, &vals]() {
PerformanceTimer timer;
for(int32_t i = 0; i < 128; i++)
{
writer.Write(vals);
// add random sleeps
if(timer.GetMilliseconds() < i * 2)
Threading::Sleep(15);
}
// close the socket now
sender->Shutdown();
Atomic::Inc32(&threadA);
});
recvThread = Threading::CreateThread([&threadB, &reader, &receivedValues]() {
uint64_t vals[10];
reader.Read(vals);
// keep reading indefinitely until we hit an error (i.e. socket disconnected)
while(!reader.IsErrored())
{
receivedValues.insert(receivedValues.end(), vals, vals + ARRAY_COUNT(vals));
reader.Read(vals);
}
Atomic::Inc32(&threadB);
});
// wait up to 2 seconds for the threads to exit
for(int i = 0; i < 2000 / 50; i++)
{
Threading::Sleep(50);
if(threadA && threadB)
break;
}
REQUIRE(threadA);
REQUIRE(threadB);
Threading::JoinThread(sendThread);
Threading::CloseThread(sendThread);
Threading::JoinThread(recvThread);
Threading::CloseThread(recvThread);
// should have written 128 sets of 10 uint64s
REQUIRE(receivedValues.size() == 1280);
for(int i = 0; i < 128; i++)
{
for(int x = 0; x < 10; x++)
{
CHECK(receivedValues[i * 10 + x] == vals[x]);
}
}
// reader *should* be errored now
CHECK_FALSE(writer.IsErrored());
CHECK(reader.IsErrored());
// we shouldn't be able to write any more into the socket after it's been closed
int32_t test = 42;
bool success = writer.Write(test);
CHECK_FALSE(success);
CHECK(writer.IsErrored());
};
delete sender;
delete receiver;
delete server;
};
#endif // ENABLED(ENABLE_UNIT_TESTS)
+345
View File
@@ -0,0 +1,345 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#define ZSTD_STATIC_LINKING_ONLY
#include "zstdio.h"
static const uint64_t zstdBlockSize = 128 * 1024;
static const uint64_t compressBlockSize = ZSTD_compressBound(zstdBlockSize);
ZSTDCompressor::ZSTDCompressor(StreamWriter *write, Ownership own) : Compressor(write, own)
{
m_Page = AllocAlignedBuffer(zstdBlockSize);
m_CompressBuffer = AllocAlignedBuffer(compressBlockSize);
m_PageOffset = 0;
m_Stream = ZSTD_createCStream();
}
ZSTDCompressor::~ZSTDCompressor()
{
ZSTD_freeCStream(m_Stream);
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
}
bool ZSTDCompressor::Write(const void *data, uint64_t numBytes)
{
// if we encountered a stream error this will be NULL
if(!m_CompressBuffer)
return false;
if(numBytes == 0)
return true;
// this is largely similar to LZ4Compressor, so check the comments there for more details.
// The only difference is that the lz4 streaming compression assumes a history of 64kb, where
// here we use a larger block size but no history must be maintained.
if(m_PageOffset + numBytes <= zstdBlockSize)
{
// simplest path, no page wrapping/spanning at all
memcpy(m_Page + m_PageOffset, data, (size_t)numBytes);
m_PageOffset += numBytes;
return true;
}
else
{
const byte *src = (const byte *)data;
// copy whatever will fit on this page
{
uint64_t firstBytes = zstdBlockSize - m_PageOffset;
memcpy(m_Page + m_PageOffset, src, (size_t)firstBytes);
m_PageOffset += firstBytes;
numBytes -= firstBytes;
src += firstBytes;
}
bool success = true;
while(success && numBytes > 0)
{
// flush page
success &= FlushPage();
if(!success)
return success;
// how many bytes can we copy in this page?
uint64_t partialBytes = RDCMIN(zstdBlockSize, numBytes);
memcpy(m_Page, src, (size_t)partialBytes);
// advance the source pointer, dest offset, and remove the bytes we read
m_PageOffset += partialBytes;
numBytes -= partialBytes;
src += partialBytes;
}
return success;
}
}
bool ZSTDCompressor::Finish()
{
// This function just writes the current page and closes zstd. Since we assume all blocks are
// precisely 64kb in size
// only the last one can be smaller, so we only write a partial page when finishing.
// Calling Write() after Finish() is illegal
return FlushPage();
}
bool ZSTDCompressor::FlushPage()
{
// if we encountered a stream error this will be NULL
if(!m_CompressBuffer)
return false;
ZSTD_inBuffer in = {m_Page, (size_t)m_PageOffset, 0};
ZSTD_outBuffer out = {m_CompressBuffer, ZSTD_CStreamOutSize(), 0};
bool success = true;
success &= CompressZSTDFrame(in, out);
// if there was an error, bail
if(!m_CompressBuffer)
return false;
// a bit redundant to write this but it means we can read the entire frame without
// doing multiple reads
success &= m_Write->Write((uint32_t)out.pos);
success &= m_Write->Write(m_CompressBuffer, out.pos);
// start writing to the start of the page again
m_PageOffset = 0;
return success;
}
bool ZSTDCompressor::CompressZSTDFrame(ZSTD_inBuffer &in, ZSTD_outBuffer &out)
{
size_t err = ZSTD_initCStream(m_Stream, 7);
if(ZSTD_isError(err))
{
RDCERR("Error compressing: %s", ZSTD_getErrorName(err));
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
m_Page = m_CompressBuffer = NULL;
return false;
}
// keep calling compressStream until everything is consumed
while(in.pos < in.size)
{
size_t inpos = in.pos;
size_t outpos = out.pos;
err = ZSTD_compressStream(m_Stream, &out, &in);
if(ZSTD_isError(err) || (inpos == in.pos && outpos == out.pos))
{
if(ZSTD_isError(err))
RDCERR("Error compressing: %s", ZSTD_getErrorName(err));
else
RDCERR("Error compressing, no progress made");
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
m_Page = m_CompressBuffer = NULL;
return false;
}
}
err = ZSTD_endStream(m_Stream, &out);
if(ZSTD_isError(err) || err != 0)
{
if(ZSTD_isError(err))
RDCERR("Error compressing: %s", ZSTD_getErrorName(err));
else
RDCERR("Error compressing, couldn't end stream");
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
m_Page = m_CompressBuffer = NULL;
return false;
}
return true;
}
ZSTDDecompressor::ZSTDDecompressor(StreamReader *read, Ownership own) : Decompressor(read, own)
{
m_Page = AllocAlignedBuffer(zstdBlockSize);
m_CompressBuffer = AllocAlignedBuffer(compressBlockSize);
m_PageOffset = 0;
m_PageLength = 0;
m_Stream = ZSTD_createDStream();
}
ZSTDDecompressor::~ZSTDDecompressor()
{
ZSTD_freeDStream(m_Stream);
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
}
bool ZSTDDecompressor::Recompress(Compressor *comp)
{
bool success = true;
while(success && !m_Read->AtEnd())
{
success &= FillPage();
if(success)
success &= comp->Write(m_Page, m_PageLength);
}
success &= comp->Finish();
return success;
}
bool ZSTDDecompressor::Read(void *data, uint64_t numBytes)
{
// if we encountered a stream error this will be NULL
if(!m_CompressBuffer)
return false;
if(numBytes == 0)
return true;
// this is simpler than the ZstdWriter::Write() implementation.
// At any point, m_Page contains the current window with uncompressed bytes.
// If we can satisfy a read from it, then we just memcpy and increment m_PageOffset.
// When we wrap around, we do a partial memcpy from m_Page, then decompress more bytes.
// if we already have all the data in-memory, just copy and return
uint64_t available = m_PageLength - m_PageOffset;
if(numBytes <= available)
{
memcpy(data, m_Page + m_PageOffset, (size_t)numBytes);
m_PageOffset += numBytes;
return true;
}
byte *dst = (byte *)data;
// copy what remains in m_Page
memcpy(dst, m_Page + m_PageOffset, (size_t)available);
// adjust what needs to be copied
dst += available;
numBytes -= available;
bool success = true;
while(success && numBytes > 0)
{
success &= FillPage();
if(!success)
return success;
// if we can now satisfy the remainder of the read, do so and return
if(numBytes <= m_PageLength)
{
memcpy(dst, m_Page, (size_t)numBytes);
m_PageOffset += numBytes;
return success;
}
// otherwise copy this page in and continue
memcpy(dst, m_Page, (size_t)m_PageLength);
dst += m_PageLength;
numBytes -= m_PageLength;
}
return success;
}
bool ZSTDDecompressor::FillPage()
{
uint32_t compSize = 0;
bool success = true;
success &= m_Read->Read(compSize);
success &= m_Read->Read(m_CompressBuffer, compSize);
if(!success)
{
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
m_Page = m_CompressBuffer = NULL;
return false;
}
size_t err = ZSTD_initDStream(m_Stream);
if(ZSTD_isError(err))
{
RDCERR("Error decompressing: %s", ZSTD_getErrorName(err));
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
m_Page = m_CompressBuffer = NULL;
return false;
}
ZSTD_inBuffer in = {m_CompressBuffer, compSize, 0};
ZSTD_outBuffer out = {m_Page, zstdBlockSize, 0};
// keep calling compressStream until everything is consumed
while(in.pos < in.size)
{
size_t inpos = in.pos;
size_t outpos = out.pos;
err = ZSTD_decompressStream(m_Stream, &out, &in);
if(ZSTD_isError(err) || (inpos == in.pos && outpos == out.pos))
{
if(ZSTD_isError(err))
RDCERR("Error decompressing: %s", ZSTD_getErrorName(err));
else
RDCERR("Error decompressing, no progress made");
FreeAlignedBuffer(m_Page);
FreeAlignedBuffer(m_CompressBuffer);
m_Page = m_CompressBuffer = NULL;
return false;
}
}
m_PageOffset = 0;
m_PageLength = out.pos;
return success;
}
+69
View File
@@ -0,0 +1,69 @@
/******************************************************************************
* The MIT License (MIT)
*
* Copyright (c) 2017 Baldur Karlsson
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
******************************************************************************/
#pragma once
#include "zstd/zstd.h"
#include "streamio.h"
class ZSTDCompressor : public Compressor
{
public:
ZSTDCompressor(StreamWriter *write, Ownership own);
~ZSTDCompressor();
bool Write(const void *data, uint64_t numBytes);
bool Finish();
private:
bool FlushPage();
bool CompressZSTDFrame(ZSTD_inBuffer &in, ZSTD_outBuffer &out);
byte *m_Page;
byte *m_CompressBuffer;
uint64_t m_PageOffset;
ZSTD_CStream *m_Stream;
};
class ZSTDDecompressor : public Decompressor
{
public:
ZSTDDecompressor(StreamReader *read, Ownership own);
~ZSTDDecompressor();
bool Recompress(Compressor *comp);
bool Read(void *data, uint64_t numBytes);
private:
bool FillPage();
byte *m_Page;
byte *m_CompressBuffer;
uint64_t m_PageOffset;
uint64_t m_PageLength;
ZSTD_DStream *m_Stream;
};