diff --git a/renderdoc/CMakeLists.txt b/renderdoc/CMakeLists.txt index af73bfce2..058cf0487 100644 --- a/renderdoc/CMakeLists.txt +++ b/renderdoc/CMakeLists.txt @@ -124,6 +124,14 @@ set(sources replay/replay_controller.h serialise/serialiser.cpp serialise/serialiser.h + serialise/lz4io.cpp + serialise/lz4io.h + serialise/zstdio.cpp + serialise/zstdio.h + serialise/streamio.cpp + serialise/streamio.h + serialise/comp_io_tests.cpp + serialise/streamio_tests.cpp strings/grisu2.cpp strings/string_utils.cpp strings/string_utils.h diff --git a/renderdoc/renderdoc.vcxproj b/renderdoc/renderdoc.vcxproj index 5e519510a..1b42546b8 100644 --- a/renderdoc/renderdoc.vcxproj +++ b/renderdoc/renderdoc.vcxproj @@ -190,7 +190,10 @@ + + + @@ -393,7 +396,12 @@ + + + + + diff --git a/renderdoc/renderdoc.vcxproj.filters b/renderdoc/renderdoc.vcxproj.filters index e8aa9d19a..05b8dbd9d 100644 --- a/renderdoc/renderdoc.vcxproj.filters +++ b/renderdoc/renderdoc.vcxproj.filters @@ -97,6 +97,12 @@ {50c142cb-d8b9-44f7-a68a-ab9015975ea5} + + {3cb4fd0e-7cd9-45a3-99d2-1158f27ec438} + + + {3b8694d9-cb25-45e0-a1c2-8cc8cd03df3f} + @@ -342,12 +348,21 @@ Common\Strings - - API\Replay + + Common\Serialise\Compressors + + + Common\Serialise\Compressors + + + Common\Serialise\Stream I/O API\Replay + + API\Replay + @@ -587,6 +602,21 @@ Common\Strings + + Common\Serialise\Compressors + + + Common\Serialise\Compressors + + + Common\Serialise\Compressors + + + Common\Serialise\Stream I/O + + + Common\Serialise\Stream I/O + diff --git a/renderdoc/serialise/comp_io_tests.cpp b/renderdoc/serialise/comp_io_tests.cpp new file mode 100644 index 000000000..1afbe1d18 --- /dev/null +++ b/renderdoc/serialise/comp_io_tests.cpp @@ -0,0 +1,193 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#include "lz4io.h" +#include "serialiser.h" +#include "zstdio.h" + +#if ENABLED(ENABLE_UNIT_TESTS) + +#include "3rdparty/catch/catch.hpp" + +TEST_CASE("Test LZ4 compression/decompression", "[streamio][lz4]") +{ + StreamWriter buf(StreamWriter::DefaultScratchSize); + + byte *randomData = new byte[1024 * 1024]; + + for(int i = 0; i < 1024 * 1024; i++) + randomData[i] = rand() & 0xff; + + // write the data + { + StreamWriter writer(new LZ4Compressor(&buf, Ownership::Nothing), Ownership::Stream); + + byte *fixedData = new byte[1024 * 1024]; + byte *regularData = new byte[1024 * 1024]; + + memset(fixedData, 0x7c, 1024 * 1024); + + for(int i = 0; i < 1024 * 1024; i++) + regularData[i] = i & 0xff; + + writer.Write(fixedData, 1024 * 1024); + writer.Write(randomData, 1024 * 1024); + writer.Write(regularData, 1024 * 1024); + writer.Write(fixedData, 1024 * 1024); + + // check that the compression got good wins out of the above data. The random data will be + // pretty much untouched but the rest should compress massively. + CHECK(buf.GetOffset() < 1024 * 1024 + 20 * 1024); + CHECK(writer.GetOffset() == 4 * 1024 * 1024); + + CHECK_FALSE(writer.IsErrored()); + + writer.Finish(); + + CHECK_FALSE(writer.IsErrored()); + + delete[] fixedData; + delete[] regularData; + } + + // we now only have the compressed data, decompress it + { + StreamReader reader( + new LZ4Decompressor(new StreamReader(buf.GetData(), buf.GetOffset()), Ownership::Stream), + 4 * 1024 * 1024, Ownership::Stream); + // recreate this for easy memcmp'ing + byte *fixedData = new byte[1024 * 1024]; + byte *regularData = new byte[1024 * 1024]; + + memset(fixedData, 0x7c, 1024 * 1024); + + for(int i = 0; i < 1024 * 1024; i++) + regularData[i] = i & 0xff; + + byte *readData = new byte[1024 * 1024]; + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024)); + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, randomData, 1024 * 1024)); + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, regularData, 1024 * 1024)); + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024)); + + CHECK_FALSE(reader.IsErrored()); + CHECK(reader.AtEnd()); + + delete[] readData; + delete[] fixedData; + delete[] regularData; + } + + delete[] randomData; +}; + +TEST_CASE("Test ZSTD compression/decompression", "[streamio][zstd]") +{ + StreamWriter buf(StreamWriter::DefaultScratchSize); + + byte *randomData = new byte[1024 * 1024]; + + for(int i = 0; i < 1024 * 1024; i++) + randomData[i] = rand() & 0xff; + + // write the data + { + StreamWriter writer(new ZSTDCompressor(&buf, Ownership::Nothing), Ownership::Stream); + + byte *fixedData = new byte[1024 * 1024]; + byte *regularData = new byte[1024 * 1024]; + + memset(fixedData, 0x7c, 1024 * 1024); + + for(int i = 0; i < 1024 * 1024; i++) + regularData[i] = i & 0xff; + + writer.Write(fixedData, 1024 * 1024); + writer.Write(randomData, 1024 * 1024); + writer.Write(regularData, 1024 * 1024); + writer.Write(fixedData, 1024 * 1024); + + // check that the compression got good wins out of the above data. The random data will be + // pretty much untouched but the rest should compress massively. + CHECK(buf.GetOffset() < 1024 * 1024 + 4 * 1024); + CHECK(writer.GetOffset() == 4 * 1024 * 1024); + + CHECK_FALSE(writer.IsErrored()); + + writer.Finish(); + + CHECK_FALSE(writer.IsErrored()); + + delete[] fixedData; + delete[] regularData; + } + + // we now only have the compressed data, decompress it + { + StreamReader reader( + new ZSTDDecompressor(new StreamReader(buf.GetData(), buf.GetOffset()), Ownership::Stream), + 4 * 1024 * 1024, Ownership::Stream); + // recreate this for easy memcmp'ing + byte *fixedData = new byte[1024 * 1024]; + byte *regularData = new byte[1024 * 1024]; + + memset(fixedData, 0x7c, 1024 * 1024); + + for(int i = 0; i < 1024 * 1024; i++) + regularData[i] = i & 0xff; + + byte *readData = new byte[1024 * 1024]; + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024)); + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, randomData, 1024 * 1024)); + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, regularData, 1024 * 1024)); + + reader.Read(readData, 1024 * 1024); + CHECK_FALSE(memcmp(readData, fixedData, 1024 * 1024)); + + CHECK_FALSE(reader.IsErrored()); + CHECK(reader.AtEnd()); + + delete[] readData; + delete[] fixedData; + delete[] regularData; + } + + delete[] randomData; +}; + +#endif // ENABLED(ENABLE_UNIT_TESTS) diff --git a/renderdoc/serialise/lz4io.cpp b/renderdoc/serialise/lz4io.cpp new file mode 100644 index 000000000..7b878028a --- /dev/null +++ b/renderdoc/serialise/lz4io.cpp @@ -0,0 +1,287 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#include "lz4io.h" + +static const uint64_t lz4BlockSize = 64 * 1024; + +LZ4Compressor::LZ4Compressor(StreamWriter *write, Ownership own) : Compressor(write, own) +{ + m_Page[0] = AllocAlignedBuffer(lz4BlockSize); + m_Page[1] = AllocAlignedBuffer(lz4BlockSize); + m_CompressBuffer = AllocAlignedBuffer(LZ4_COMPRESSBOUND(lz4BlockSize)); + + m_PageOffset = 0; + + LZ4_resetStream(&m_LZ4Comp); +} + +LZ4Compressor::~LZ4Compressor() +{ + FreeAlignedBuffer(m_Page[0]); + FreeAlignedBuffer(m_Page[1]); + FreeAlignedBuffer(m_CompressBuffer); +} + +bool LZ4Compressor::Write(const void *data, uint64_t numBytes) +{ + // if we encountered a stream error this will be NULL + if(!m_CompressBuffer) + return false; + + if(numBytes == 0) + return true; + + // The basic plan is: + // Write into page N incrementally until it is completely full. When full, flush it out to lz4 and + // swap pages. + // This keeps lz4 happy with 64kb of history each time it compresses. + // If we are writing some data the crosses the boundary between pages, we write the part that will + // fit on one page, flush & swap, write the rest into the next page. + + if(m_PageOffset + numBytes <= lz4BlockSize) + { + // simplest path, no page wrapping/spanning at all + memcpy(m_Page[0] + m_PageOffset, data, (size_t)numBytes); + m_PageOffset += numBytes; + + return true; + } + else + { + // do partial copies that span pages and flush as necessary + + const byte *src = (const byte *)data; + + // copy whatever will fit on this page + { + uint64_t firstBytes = lz4BlockSize - m_PageOffset; + memcpy(m_Page[0] + m_PageOffset, src, (size_t)firstBytes); + + m_PageOffset += firstBytes; + numBytes -= firstBytes; + src += firstBytes; + } + + bool success = true; + + while(success && numBytes > 0) + { + // flush and swap pages + success &= FlushPage0(); + + if(!success) + return success; + + // how many bytes can we copy in this page? + uint64_t partialBytes = RDCMIN(lz4BlockSize, numBytes); + memcpy(m_Page[0], src, (size_t)partialBytes); + + // advance the source pointer, dest offset, and remove the bytes we read + m_PageOffset += partialBytes; + numBytes -= partialBytes; + src += partialBytes; + } + + return success; + } +} + +bool LZ4Compressor::Finish() +{ + // This function just writes the current page and closes lz4. Since we assume all blocks are + // precisely 64kb in size + // only the last one can be smaller, so we only write a partial page when finishing. + // Calling Write() after Finish() is illegal + return FlushPage0(); +} + +bool LZ4Compressor::FlushPage0() +{ + // if we encountered a stream error this will be NULL + if(!m_CompressBuffer) + return false; + + // m_PageOffset is the amount written, usually equal to lz4BlockSize except the last block. + int32_t compSize = + LZ4_compress_fast_continue(&m_LZ4Comp, (const char *)m_Page[0], (char *)m_CompressBuffer, + (int)m_PageOffset, (int)LZ4_COMPRESSBOUND(lz4BlockSize), 1); + + if(compSize < 0) + { + RDCERR("Error compressing: %i", compSize); + FreeAlignedBuffer(m_Page[0]); + FreeAlignedBuffer(m_Page[1]); + FreeAlignedBuffer(m_CompressBuffer); + m_Page[0] = m_Page[1] = m_CompressBuffer = NULL; + return false; + } + + bool success = true; + + success &= m_Write->Write(compSize); + success &= m_Write->Write(m_CompressBuffer, compSize); + + // swap pages + std::swap(m_Page[0], m_Page[1]); + + // start writing to the start of the page again + m_PageOffset = 0; + + return success; +} + +LZ4Decompressor::LZ4Decompressor(StreamReader *read, Ownership own) : Decompressor(read, own) +{ + m_Page[0] = AllocAlignedBuffer(lz4BlockSize); + m_Page[1] = AllocAlignedBuffer(lz4BlockSize); + m_CompressBuffer = AllocAlignedBuffer(LZ4_COMPRESSBOUND(lz4BlockSize)); + + m_PageOffset = 0; + m_PageLength = 0; + + LZ4_setStreamDecode(&m_LZ4Decomp, NULL, 0); +} + +LZ4Decompressor::~LZ4Decompressor() +{ + FreeAlignedBuffer(m_Page[0]); + FreeAlignedBuffer(m_Page[1]); + FreeAlignedBuffer(m_CompressBuffer); +} + +bool LZ4Decompressor::Recompress(Compressor *comp) +{ + bool success = true; + + while(success && !m_Read->AtEnd()) + { + success &= FillPage0(); + if(success) + success &= comp->Write(m_Page[0], m_PageLength); + } + success &= comp->Finish(); + + return success; +} + +bool LZ4Decompressor::Read(void *data, uint64_t numBytes) +{ + // if we encountered a stream error this will be NULL + if(!m_CompressBuffer) + return false; + + if(numBytes == 0) + return true; + + // At any point, m_Page[0] contains the current window with uncompressed bytes. + // If we can satisfy a read from it, then we just memcpy and increment m_PageOffset. + // When we wrap around, we do a partial memcpy from m_Page[0], then swap the pages and + // decompress some more bytes into m_Page[0]. Thus, m_Page[1] contains the history (if + // it exists) + + // if we already have all the data in-memory, just copy and return + uint64_t available = m_PageLength - m_PageOffset; + + if(numBytes <= available) + { + memcpy(data, m_Page[0] + m_PageOffset, (size_t)numBytes); + m_PageOffset += numBytes; + return true; + } + + byte *dst = (byte *)data; + + // copy what remains in m_Page[0] + memcpy(dst, m_Page[0] + m_PageOffset, (size_t)available); + + // adjust what needs to be copied + dst += available; + numBytes -= available; + + bool success = true; + + while(success && numBytes > 0) + { + success &= FillPage0(); + + if(!success) + return success; + + // if we can now satisfy the remainder of the read, do so and return + if(numBytes <= m_PageLength) + { + memcpy(dst, m_Page[0], (size_t)numBytes); + m_PageOffset += numBytes; + return success; + } + + // otherwise copy this page in and continue + memcpy(dst, m_Page[0], (size_t)m_PageLength); + dst += m_PageLength; + numBytes -= m_PageLength; + } + + return success; +} + +bool LZ4Decompressor::FillPage0() +{ + // swap pages + std::swap(m_Page[0], m_Page[1]); + + int32_t compSize = 0; + + bool success = true; + + success &= m_Read->Read(compSize); + success &= m_Read->Read(m_CompressBuffer, compSize); + + if(!success) + { + FreeAlignedBuffer(m_Page[0]); + FreeAlignedBuffer(m_Page[1]); + FreeAlignedBuffer(m_CompressBuffer); + m_Page[0] = m_Page[1] = m_CompressBuffer = NULL; + return false; + } + + int32_t decompSize = LZ4_decompress_safe_continue(&m_LZ4Decomp, (const char *)m_CompressBuffer, + (char *)m_Page[0], compSize, lz4BlockSize); + + if(decompSize < 0) + { + RDCERR("Error decompressing: %i", decompSize); + FreeAlignedBuffer(m_Page[0]); + FreeAlignedBuffer(m_Page[1]); + FreeAlignedBuffer(m_CompressBuffer); + m_Page[0] = m_Page[1] = m_CompressBuffer = NULL; + return false; + } + + m_PageOffset = 0; + m_PageLength = decompSize; + + return success; +} diff --git a/renderdoc/serialise/lz4io.h b/renderdoc/serialise/lz4io.h new file mode 100644 index 000000000..75efd8793 --- /dev/null +++ b/renderdoc/serialise/lz4io.h @@ -0,0 +1,67 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#pragma once + +#include "lz4/lz4.h" +#include "streamio.h" + +class LZ4Compressor : public Compressor +{ +public: + LZ4Compressor(StreamWriter *write, Ownership own); + ~LZ4Compressor(); + + bool Write(const void *data, uint64_t numBytes); + bool Finish(); + +private: + bool FlushPage0(); + + byte *m_Page[2]; + byte *m_CompressBuffer; + uint64_t m_PageOffset; + + LZ4_stream_t m_LZ4Comp; +}; + +class LZ4Decompressor : public Decompressor +{ +public: + LZ4Decompressor(StreamReader *read, Ownership own); + ~LZ4Decompressor(); + + bool Recompress(Compressor *comp); + bool Read(void *data, uint64_t numBytes); + +private: + bool FillPage0(); + + byte *m_Page[2]; + byte *m_CompressBuffer; + uint64_t m_PageOffset; + uint64_t m_PageLength; + + LZ4_streamDecode_t m_LZ4Decomp; +}; diff --git a/renderdoc/serialise/streamio.cpp b/renderdoc/serialise/streamio.cpp new file mode 100644 index 000000000..489631268 --- /dev/null +++ b/renderdoc/serialise/streamio.cpp @@ -0,0 +1,452 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#include "streamio.h" + +Compressor::~Compressor() +{ + if(m_Ownership == Ownership::Stream && m_Write) + delete m_Write; +} + +Decompressor::~Decompressor() +{ + if(m_Ownership == Ownership::Stream && m_Read) + delete m_Read; +} + +static const uint64_t initialBufferSize = 64 * 1024; +const byte StreamWriter::empty[128] = {}; + +StreamReader::StreamReader(const byte *buffer, uint64_t bufferSize) +{ + m_InputSize = m_BufferSize = bufferSize; + m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize); + + memcpy(m_BufferHead, buffer, (size_t)m_BufferSize); + + m_Ownership = Ownership::Nothing; +} + +StreamReader::StreamReader(const std::vector &buffer) +{ + m_InputSize = m_BufferSize = buffer.size(); + m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize); + + memcpy(m_BufferHead, buffer.data(), (size_t)m_BufferSize); + + m_Ownership = Ownership::Nothing; +} + +StreamReader::StreamReader(StreamInvalidType) +{ + m_InputSize = 0; + + m_BufferSize = 0; + m_BufferHead = m_BufferBase = NULL; + + m_Ownership = Ownership::Nothing; +} + +StreamReader::StreamReader(Network::Socket *sock, Ownership own) +{ + m_Sock = sock; + + m_InputSize = m_BufferSize = initialBufferSize; + m_BufferBase = AllocAlignedBuffer(m_BufferSize); + // place head at the *end* of the buffer, because for sockets we pretend the buffer is constantly + // exhausted, and just do a read of the minimum number of bytes each time to satisfy each read + m_BufferHead = m_BufferBase + m_BufferSize; + + m_Ownership = own; +} + +StreamReader::StreamReader(FILE *file, uint64_t fileSize, Ownership own) +{ + if(file == NULL) + { + m_InputSize = 0; + + m_BufferSize = 0; + m_BufferHead = m_BufferBase = NULL; + + m_Ownership = Ownership::Nothing; + return; + } + + m_File = file; + m_InputSize = fileSize; + + m_BufferSize = initialBufferSize; + m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize); + + ReadFromExternal(0, RDCMIN(m_InputSize, m_BufferSize)); + + m_Ownership = own; +} + +StreamReader::StreamReader(FILE *file) +{ + if(file == NULL) + { + m_InputSize = 0; + + m_BufferSize = 0; + m_BufferHead = m_BufferBase = NULL; + + m_Ownership = Ownership::Nothing; + return; + } + + FileIO::fseek64(file, 0, SEEK_END); + m_InputSize = FileIO::ftell64(file); + FileIO::fseek64(file, 0, SEEK_SET); + + m_File = file; + + m_BufferSize = initialBufferSize; + m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize); + + ReadFromExternal(0, RDCMIN(m_InputSize, m_BufferSize)); + + m_Ownership = Ownership::Stream; +} + +StreamReader::StreamReader(StreamReader *reader, uint64_t bufferSize) +{ + m_InputSize = m_BufferSize = bufferSize; + m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize); + + reader->Read(m_BufferBase, bufferSize); + + m_Ownership = Ownership::Nothing; +} + +StreamReader::StreamReader(Decompressor *decompressor, uint64_t uncompressedSize, Ownership own) +{ + m_Decompressor = decompressor; + m_InputSize = uncompressedSize; + + m_BufferSize = initialBufferSize; + m_BufferHead = m_BufferBase = AllocAlignedBuffer(m_BufferSize); + + m_Ownership = own; + + ReadFromExternal(0, RDCMIN(uncompressedSize, m_BufferSize)); +} + +StreamReader::~StreamReader() +{ + for(StreamCloseCallback cb : m_Callbacks) + cb(); + + FreeAlignedBuffer(m_BufferBase); + + if(m_Ownership == Ownership::Stream) + { + if(m_File) + FileIO::fclose(m_File); + + if(m_Decompressor) + delete m_Decompressor; + } +} + +void StreamReader::SetOffset(uint64_t offs) +{ + if(m_File || m_Decompressor) + { + RDCERR("File and decompress stream readers do not support seeking"); + return; + } + + m_BufferHead = m_BufferBase + offs; +} + +bool StreamReader::Reserve(uint64_t numBytes) +{ + if(m_Sock) + { + // if we're reading more bytes than our buffer size, resize up + if(numBytes > m_BufferSize) + { + FreeAlignedBuffer(m_BufferBase); + m_InputSize = m_BufferSize = AlignUp(numBytes, 256ULL); + m_BufferBase = AllocAlignedBuffer(m_BufferSize); + m_BufferHead = m_BufferBase + m_BufferSize; + } + + m_ReadOffset += numBytes; + + // read into the end of the buffer, so that the subsequent read re-exhausts the buffer + m_BufferHead = m_BufferBase + m_BufferSize - numBytes; + return ReadFromExternal(m_BufferSize - numBytes, numBytes); + } + + RDCASSERT(m_File || m_Decompressor); + + // store old buffer and the read data, so we can move it into the new buffer + byte *oldBuffer = m_BufferBase; + + // always keep at least a certain window behind what we read. + uint64_t backwardsWindow = RDCMIN(64ULL, m_BufferHead - m_BufferBase); + + byte *currentData = m_BufferHead - backwardsWindow; + uint64_t currentDataSize = m_BufferSize - (m_BufferHead - m_BufferBase) + backwardsWindow; + + uint64_t BufferOffset = m_BufferHead - m_BufferBase; + + // if we are reading more than our current buffer size, expand the buffer size + if(numBytes + backwardsWindow > m_BufferSize) + { + // very conservative resizing - don't do "double and add" - to avoid + // a 1GB buffer being read and needing to allocate 2GB. The cost is we + // will reallocate a bit more often + m_BufferSize = numBytes + backwardsWindow; + m_BufferBase = AllocAlignedBuffer(m_BufferSize); + } + + // move the unread data into the buffer + memmove(m_BufferBase, currentData, (size_t)currentDataSize); + + if(BufferOffset > backwardsWindow) + { + m_ReadOffset += BufferOffset - backwardsWindow; + m_BufferHead = m_BufferBase + backwardsWindow; + } + else + { + m_BufferHead = m_BufferBase + BufferOffset; + } + + // if there's anything left of the file to read in, do so now + bool ret = false; + + ret = ReadFromExternal(currentDataSize, RDCMIN(m_BufferSize - currentDataSize, + m_InputSize - m_ReadOffset - currentDataSize)); + + if(oldBuffer != m_BufferBase && m_BufferBase) + FreeAlignedBuffer(oldBuffer); + + return ret; +} + +bool StreamReader::ReadFromExternal(uint64_t bufferOffs, uint64_t length) +{ + bool success = true; + + if(m_Decompressor) + { + success = m_Decompressor->Read(m_BufferBase + bufferOffs, length); + } + else if(m_File) + { + uint64_t numRead = FileIO::fread(m_BufferBase + bufferOffs, 1, (size_t)length, m_File); + success = (numRead == length); + } + else if(m_Sock) + { + if(!m_Sock->Connected()) + { + success = false; + } + else + { + success = m_Sock->RecvDataBlocking(m_BufferBase + bufferOffs, (uint32_t)length); + } + } + else + { + // we're in an error-state, nothing to read from + return false; + } + + if(!success) + { + if(m_File) + RDCERR("Error reading from file, errno %d", errno); + else if(m_Sock) + RDCWARN("Error reading from socket"); + + m_HasError = true; + + // move to error state + FreeAlignedBuffer(m_BufferBase); + + if(m_Ownership == Ownership::Stream) + { + if(m_File) + FileIO::fclose(m_File); + + if(m_Sock) + delete m_Sock; + + if(m_Decompressor) + delete m_Decompressor; + } + + m_File = NULL; + m_Sock = NULL; + m_Decompressor = NULL; + m_ReadOffset = 0; + m_InputSize = 0; + + m_BufferSize = 0; + m_BufferHead = m_BufferBase = NULL; + + m_Ownership = Ownership::Nothing; + } + + return success; +} + +StreamWriter::StreamWriter(uint64_t initialBufSize) +{ + m_BufferBase = m_BufferHead = AllocAlignedBuffer(initialBufSize); + m_BufferEnd = m_BufferBase + initialBufSize; + + m_Ownership = Ownership::Nothing; +} + +StreamWriter::StreamWriter(StreamInvalidType) +{ + m_BufferBase = m_BufferHead = m_BufferEnd = NULL; + + m_Ownership = Ownership::Nothing; + m_InMemory = false; +} + +StreamWriter::StreamWriter(Network::Socket *sock, Ownership own) +{ + m_BufferBase = m_BufferHead = m_BufferEnd = NULL; + + m_Sock = sock; + + m_Ownership = own; + m_InMemory = false; +} + +StreamWriter::StreamWriter(FILE *file, Ownership own) +{ + m_BufferBase = m_BufferHead = m_BufferEnd = NULL; + + m_File = file; + + m_Ownership = own; + m_InMemory = false; +} + +StreamWriter::StreamWriter(Compressor *compressor, Ownership own) +{ + m_BufferBase = m_BufferHead = m_BufferEnd = NULL; + + m_Compressor = compressor; + + m_Ownership = own; + m_InMemory = false; +} + +StreamWriter::~StreamWriter() +{ + for(StreamCloseCallback cb : m_Callbacks) + cb(); + + FreeAlignedBuffer(m_BufferBase); + + if(m_Ownership == Ownership::Stream) + { + if(m_File) + FileIO::fclose(m_File); + + if(m_Compressor) + delete m_Compressor; + } +} + +void StreamWriter::HandleError() +{ + if(m_File) + RDCERR("Error writing to file, errno %d", errno); + else if(m_Sock) + RDCWARN("Error writing to socket"); + + m_HasError = true; + + FreeAlignedBuffer(m_BufferBase); + + if(m_Ownership == Ownership::Stream) + { + if(m_File) + FileIO::fclose(m_File); + + if(m_Sock) + delete m_Sock; + + if(m_Compressor) + delete m_Compressor; + } + + m_BufferBase = m_BufferHead = m_BufferEnd = NULL; + + m_WriteSize = 0; + m_File = NULL; + m_Sock = NULL; + m_Compressor = NULL; + + m_Ownership = Ownership::Nothing; + m_InMemory = false; +} + +void StreamTransfer(StreamWriter *writer, StreamReader *reader, float *progress) +{ + uint64_t totalSize = reader->GetSize(); + + // copy 1MB at a time + const uint64_t StreamIOChunkSize = 1024 * 1024; + + const uint64_t bufSize = RDCMIN(StreamIOChunkSize, totalSize); + uint64_t numBufs = totalSize / bufSize; + // last remaining partial buffer + if(totalSize % (uint64_t)bufSize > 0) + numBufs++; + + byte *buf = new byte[(size_t)bufSize]; + + if(progress) + *progress = 0.0001f; + + for(uint64_t i = 0; i < numBufs; i++) + { + uint64_t payloadLength = RDCMIN(bufSize, totalSize); + + reader->Read(buf, payloadLength); + writer->Write(buf, payloadLength); + + totalSize -= payloadLength; + if(progress) + *progress = float(i + 1) / float(numBufs); + } + + delete[] buf; +} \ No newline at end of file diff --git a/renderdoc/serialise/streamio.h b/renderdoc/serialise/streamio.h new file mode 100644 index 000000000..4143cd607 --- /dev/null +++ b/renderdoc/serialise/streamio.h @@ -0,0 +1,459 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#pragma once + +#include +#include +#include +#include "common/common.h" + +enum class Ownership +{ + Nothing, + Stream, +}; + +class StreamWriter; +class StreamReader; + +typedef std::function StreamCloseCallback; + +class Compressor +{ +public: + Compressor(StreamWriter *write, Ownership own) : m_Write(write), m_Ownership(own) {} + virtual ~Compressor(); + virtual bool Write(const void *data, uint64_t numBytes) = 0; + virtual bool Finish() = 0; + +protected: + StreamWriter *m_Write; + Ownership m_Ownership; +}; + +class Decompressor +{ +public: + Decompressor(StreamReader *read, Ownership own) : m_Read(read), m_Ownership(own) {} + virtual ~Decompressor(); + virtual bool Recompress(Compressor *comp) = 0; + virtual bool Read(void *data, uint64_t numBytes) = 0; + +protected: + StreamReader *m_Read; + Ownership m_Ownership; +}; + +class StreamReader +{ +public: + enum StreamInvalidType + { + InvalidStream + }; + + StreamReader(StreamInvalidType); + StreamReader(const byte *buffer, uint64_t bufferSize); + StreamReader(const std::vector &buffer); + + StreamReader(Network::Socket *sock, Ownership own); + StreamReader(FILE *file, uint64_t fileSize, Ownership own); + StreamReader(FILE *file); + StreamReader(StreamReader *reader, uint64_t bufferSize); + StreamReader(Decompressor *decompressor, uint64_t uncompressedSize, Ownership own); + + ~StreamReader(); + + bool IsErrored() { return m_HasError; } + void SetOffset(uint64_t offs); + + inline uint64_t GetOffset() { return m_BufferHead - m_BufferBase + m_ReadOffset; } + inline uint64_t GetSize() { return m_InputSize; } + inline bool AtEnd() { return GetOffset() >= GetSize(); } + template + bool AlignTo() + { + uint64_t offs = GetOffset(); + uint64_t alignedOffs = AlignUp(offs, alignment); + + uint64_t bytesToAlign = alignedOffs - offs; + + if(bytesToAlign > 0) + return Read(NULL, bytesToAlign); + + return true; + } + + bool Read(void *data, uint64_t numBytes) + { + if(numBytes == 0) + return true; + + if(!m_BufferBase) + { + // read 0s if we're in an error state + if(data) + memset(data, 0, (size_t)numBytes); + + return false; + } + + // if we're reading past the end, error, read nothing (no partial reads) and return + if(m_Sock == NULL && GetOffset() + numBytes > GetSize()) + { + RDCERR("Reading off the end of the stream"); + m_BufferHead = m_BufferBase + m_BufferSize; + if(data) + memset(data, 0, (size_t)numBytes); + m_HasError = true; + return false; + } + + // if we're reading from an external source, reserve enough bytes to do the read + if(m_File || m_Sock || m_Decompressor) + { + // This preserves everything from min(m_BufferBase, m_BufferHead - 64) -> end of buffer + // which will still be in place relative to m_BufferHead. + // In other words - reservation will keep the aleady-read data that's after the head pointer, + // as well as up to 64 bytes *behind* the head if it exists. + if(numBytes > Available()) + { + bool success = Reserve(numBytes); + + if(!success) + return false; + } + } + + // perform the actual copy + if(data) + memcpy(data, m_BufferHead, (size_t)numBytes); + + // advance head + m_BufferHead += numBytes; + + return true; + } + + bool SkipBytes(uint64_t numBytes) { return Read(NULL, numBytes); } + // compile-time constant element to let the compiler inline the memcpy + template + bool Read(T &data) + { + return Read(&data, sizeof(T)); + } + + void AddCloseCallback(StreamCloseCallback callback) { m_Callbacks.push_back(callback); } +private: + inline uint64_t Available() + { + if(m_Sock) + return 0; + return m_BufferSize - (m_BufferHead - m_BufferBase); + } + bool Reserve(uint64_t numBytes); + bool ReadFromExternal(uint64_t bufferOffs, uint64_t length); + + // base of the buffer allocation + byte *m_BufferBase; + + // where we are currently reading from in the buffer + byte *m_BufferHead; + + // the size of the buffer (just a window if reading from external source) + uint64_t m_BufferSize; + + // the total size of the total input. This is how many bytes you can read, regardless + // of how many bytes might actually be stored on the other side of the source (i.e. + // this is the uncompressed output size) + uint64_t m_InputSize; + + // file pointer, if we're reading from a file + FILE *m_File = NULL; + + // socket, if we're reading from a socket + Network::Socket *m_Sock = NULL; + + // the decompressor, if reading from it + Decompressor *m_Decompressor = NULL; + + // the offset in the file/decompressor that corresponds to the start of m_BufferBase + uint64_t m_ReadOffset = 0; + + // flag indicating if an error has been encountered and the stream is now invalid + bool m_HasError = false; + + // do we own the file/compressor? are we responsible for + // cleaning it up? + Ownership m_Ownership; + + // callbacks that will be invoked when this stream is being destroyed + std::vector m_Callbacks; +}; + +class StreamWriter +{ +public: + enum StreamInvalidType + { + InvalidStream + }; + + StreamWriter(StreamInvalidType); + StreamWriter(uint64_t initialBufSize); + StreamWriter(FILE *file, Ownership own); + StreamWriter(Network::Socket *file, Ownership own); + StreamWriter(Compressor *compressor, Ownership own); + + bool IsErrored() { return m_HasError; } + static const int DefaultScratchSize = 32 * 1024; + + ~StreamWriter(); + + void Rewind() + { + if(m_InMemory) + { + m_BufferHead = m_BufferBase; + m_WriteSize = 0; + return; + } + + RDCERR("Can't rewind a file/compressor stream writer"); + } + + uint64_t GetOffset() { return m_WriteSize; } + const byte *GetData() { return m_BufferBase; } + template + bool AlignTo() + { + uint64_t offs; + if(m_InMemory) + offs = m_BufferHead - m_BufferBase; + else + offs = GetOffset(); + + uint64_t alignedOffs = AlignUp(offs, alignment); + + uint64_t bytesToAlign = alignedOffs - offs; + + RDCCOMPILE_ASSERT(alignment <= sizeof(empty), + "Empty array is not large enough - increase size to support alignment"); + + if(bytesToAlign > 0) + return Write(empty, bytesToAlign); + + return true; + } + + bool Write(const void *data, uint64_t numBytes) + { + if(numBytes == 0) + return true; + + m_WriteSize += numBytes; + + if(m_InMemory) + { + // in-memory path + + // are we about to write outside the buffer? Resize it larger + if(m_BufferHead + numBytes >= m_BufferEnd) + EnsureSized(numBytes); + + // perform the actual copy + memcpy(m_BufferHead, data, (size_t)numBytes); + + // advance head + m_BufferHead += numBytes; + + return true; + } + else if(m_Compressor) + { + return m_Compressor->Write(data, numBytes); + } + else if(m_File) + { + uint64_t written = (uint64_t)FileIO::fwrite(data, 1, (size_t)numBytes, m_File); + if(written != numBytes) + { + HandleError(); + return false; + } + + return true; + } + else if(m_Sock) + { + bool success = m_Sock->SendDataBlocking(data, (uint32_t)numBytes); + if(!success) + { + HandleError(); + return false; + } + + return true; + } + else + { + // we're in an error-state, nothing to write to + return false; + } + } + + // compile-time constant amount of data to let the compiler inline the memcpy + template + bool Write(const T &data) + { + const uint64_t numBytes = sizeof(T); + + if(m_InMemory) + { + // we duplicate the implementation here instead of calling the Write(void *, size_t) + // overload above since then the compiler may not be able to optimise out the memcpy + m_WriteSize += numBytes; + + // are we about to write outside the buffer? Resize it larger + if(m_BufferHead + numBytes >= m_BufferEnd) + EnsureSized(numBytes); + + // perform the actual copy + memcpy(m_BufferHead, &data, (size_t)numBytes); + + // advance head + m_BufferHead += numBytes; + + return true; + } + else + { + return Write(&data, numBytes); + } + } + + // write a particular value at an offset (not necessarily just append). + template + bool WriteAt(uint64_t offs, const T &data) + { + if(!m_File && !m_Sock && !m_Compressor) + { + RDCASSERT(ptrdiff_t(offs + sizeof(data)) <= m_BufferHead - m_BufferBase); + byte *oldHead = m_BufferHead; + uint64_t oldWriteSize = m_WriteSize; + + m_BufferHead = m_BufferBase + offs; + bool ret = Write(data); + + m_WriteSize = oldWriteSize; + m_BufferHead = oldHead; + return ret; + } + + RDCERR("Can't seek a file/socket/compressor stream writer"); + + return false; + } + + bool Finish() + { + if(m_Compressor) + return m_Compressor->Finish(); + else if(m_File) + return fflush(m_File) == 0; + else if(m_Sock) + return true; + + return true; + } + + void AddCloseCallback(StreamCloseCallback callback) { m_Callbacks.push_back(callback); } +private: + inline void EnsureSized(const uint64_t numBytes) + { + uint64_t bufferSize = m_BufferEnd - m_BufferBase; + const uint64_t newSize = (m_BufferHead - m_BufferBase) + numBytes; + + if(bufferSize < newSize) + { + // reallocate to a conservative size, don't 'double and allocate' + while(bufferSize < newSize) + bufferSize += 128 * 1024; + + byte *newBuf = AllocAlignedBuffer(bufferSize); + + uint64_t curUsed = m_BufferHead - m_BufferBase; + + memcpy(newBuf, m_BufferBase, (size_t)curUsed); + + FreeAlignedBuffer(m_BufferBase); + + m_BufferBase = newBuf; + m_BufferHead = newBuf + curUsed; + m_BufferEnd = m_BufferBase + bufferSize; + } + } + + void HandleError(); + + // used for aligned writes + static const byte empty[128]; + + // base of the buffer allocation if we're writing to a buffer + byte *m_BufferBase; + + // where we are currently writing to in the buffer + byte *m_BufferHead; + + // the end of the buffer + byte *m_BufferEnd; + + // the total size of the file/compressor (ie. how much data flushed through it) + uint64_t m_WriteSize = 0; + + // file pointer, if we're writing to a file + FILE *m_File = NULL; + + // the compressor, if writing to it + Compressor *m_Compressor = NULL; + + // the socket, if writing to it + Network::Socket *m_Sock = NULL; + + // true if we're not writing to file/compressor, used to optimise checks in Write + bool m_InMemory = true; + + // flag indicating if an error has been encountered and the stream is now invalid + bool m_HasError = false; + + // do we own the file/compressor? are we responsible for + // cleaning it up? + Ownership m_Ownership; + + // callbacks that will be invoked when this stream is being destroyed + std::vector m_Callbacks; +}; + +void StreamTransfer(StreamWriter *writer, StreamReader *reader, float *progress); \ No newline at end of file diff --git a/renderdoc/serialise/streamio_tests.cpp b/renderdoc/serialise/streamio_tests.cpp new file mode 100644 index 000000000..27c695123 --- /dev/null +++ b/renderdoc/serialise/streamio_tests.cpp @@ -0,0 +1,306 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#include "streamio.h" +#include "common/timing.h" + +#if ENABLED(ENABLE_UNIT_TESTS) + +#include "3rdparty/catch/catch.hpp" + +TEST_CASE("Test basic stream I/O operations", "[streamio]") +{ + StreamWriter writer(1024); + + CHECK(writer.GetOffset() == 0); + CHECK_FALSE(writer.IsErrored()); + + writer.Write(5); + + CHECK(writer.GetOffset() == 4); + CHECK_FALSE(writer.IsErrored()); + + writer.Write(6); + + CHECK(writer.GetOffset() == 8); + CHECK_FALSE(writer.IsErrored()); + + writer.Write(7); + + CHECK(writer.GetOffset() == 10); + CHECK_FALSE(writer.IsErrored()); + + writer.AlignTo<16>(); + + CHECK(writer.GetOffset() == 16); + CHECK_FALSE(writer.IsErrored()); + + struct + { + uint32_t a = 5; + uint32_t b = 6; + uint16_t c = 7; + byte padding[6] = {0, 0, 0, 0, 0, 0}; + } compare; + + RDCCOMPILE_ASSERT(sizeof(compare) == 16, "Compare struct is the wrong size"); + + CHECK_FALSE(memcmp(writer.GetData(), &compare, sizeof(compare))); + + StreamReader reader((const byte *)&compare, sizeof(compare)); + + uint32_t test; + reader.Read(test); + CHECK(test == 5); + reader.Read(test); + CHECK(test == 6); + + uint16_t test2; + reader.Read(test2); + CHECK(test2 == 7); + + CHECK_FALSE(reader.IsErrored()); + + reader.AlignTo<16>(); + + CHECK_FALSE(reader.IsErrored()); + CHECK(reader.AtEnd()); + + // reading off the end should read 0s and move to error state + reader.Read(test); + CHECK(test == 0); + + CHECK(reader.IsErrored()); +}; + +TEST_CASE("Test stream I/O operations over the network", "[streamio][network]") +{ + uint16_t port = 8235; + Network::Socket *server = NULL; + + for(uint16_t probe = 0; probe < 20; probe++) + { + server = Network::CreateServerSocket("localhost", port, 2); + + if(server) + break; + + port++; + } + + REQUIRE(server); + + Network::Socket *sender = Network::CreateClientSocket("localhost", port, 10); + + REQUIRE(sender); + + Network::Socket *receiver = server->AcceptClient(false); + + REQUIRE(receiver); + + SECTION("Send/receive single int") + { + StreamWriter writer(sender, Ownership::Nothing); + StreamReader reader(receiver, Ownership::Nothing); + + REQUIRE_FALSE(writer.IsErrored()); + REQUIRE_FALSE(reader.IsErrored()); + + // we have to do the send/receive on threads since it is blocking + + uint32_t receivedValue = 0; + + Threading::ThreadHandle recvThread = + Threading::CreateThread([&reader, &receivedValue]() { reader.Read(receivedValue); }); + + Threading::ThreadHandle sendThread = Threading::CreateThread([&writer]() { + uint32_t pi = 3141592; + writer.Write(pi); + }); + + Threading::Sleep(50); + + // REQUIRE that the value has propagated here. If not then something has gone wrong and we're + // not getting forward progress, so we don't want to try to join the threads + // + // If this fails while debugging it's because the sleep above wasn't long enough to cover the + // stepping process. We don't wait on threads to prevent holding up the whole process if there's + // some deadlock between them. + REQUIRE(receivedValue == 3141592); + + Threading::JoinThread(sendThread); + Threading::CloseThread(sendThread); + + Threading::JoinThread(recvThread); + Threading::CloseThread(recvThread); + + CHECK_FALSE(writer.IsErrored()); + CHECK_FALSE(reader.IsErrored()); + }; + + SECTION("Send/receive multiple values") + { + StreamWriter writer(sender, Ownership::Nothing); + StreamReader reader(receiver, Ownership::Nothing); + + REQUIRE_FALSE(writer.IsErrored()); + REQUIRE_FALSE(reader.IsErrored()); + + // we have to do the send/receive on threads since it is blocking + + std::vector receivedValues; + std::vector list = {1, 1, 2, 3, 5, 8, 13, 21, 34, + 55, 89, 144, 233, 377, 610, 987, 1597}; + + // Tracks the lifetime of each thread. + volatile int32_t threadA = 0, threadB = 0; + + Threading::ThreadHandle recvThread = + Threading::CreateThread([&threadA, &reader, &receivedValues]() { + int32_t sz = 0; + reader.Read(sz); + receivedValues.resize(sz); + for(int32_t i = 0; i < sz; i++) + reader.Read(receivedValues[i]); + + Atomic::Inc32(&threadA); + }); + + Threading::ThreadHandle sendThread = Threading::CreateThread([&threadB, &writer, list]() { + int32_t sz = (int32_t)list.size(); + writer.Write(sz); + for(int32_t i = 0; i < sz; i++) + writer.Write(list[i]); + + Atomic::Inc32(&threadB); + }); + + // wait up to 2 seconds for the threads to exit + for(int i = 0; i < 2000 / 50; i++) + { + Threading::Sleep(50); + if(threadA && threadB) + break; + } + + REQUIRE(threadA); + REQUIRE(threadB); + + REQUIRE(receivedValues.size() == 17); + CHECK(receivedValues == list); + CHECK(writer.GetOffset() == reader.GetOffset()); + CHECK(writer.GetOffset() > 128); + + Threading::JoinThread(sendThread); + Threading::CloseThread(sendThread); + + Threading::JoinThread(recvThread); + Threading::CloseThread(recvThread); + + threadA = 0; + threadB = 0; + + receivedValues.clear(); + + uint64_t vals[10] = {1, 6, 0, 5, 3, 8, 7, 9, 2, 4}; + + sendThread = Threading::CreateThread([&threadA, sender, &writer, &vals]() { + + PerformanceTimer timer; + + for(int32_t i = 0; i < 128; i++) + { + writer.Write(vals); + + // add random sleeps + if(timer.GetMilliseconds() < i * 2) + Threading::Sleep(15); + } + + // close the socket now + sender->Shutdown(); + + Atomic::Inc32(&threadA); + }); + + recvThread = Threading::CreateThread([&threadB, &reader, &receivedValues]() { + uint64_t vals[10]; + + reader.Read(vals); + + // keep reading indefinitely until we hit an error (i.e. socket disconnected) + while(!reader.IsErrored()) + { + receivedValues.insert(receivedValues.end(), vals, vals + ARRAY_COUNT(vals)); + reader.Read(vals); + } + + Atomic::Inc32(&threadB); + }); + + // wait up to 2 seconds for the threads to exit + for(int i = 0; i < 2000 / 50; i++) + { + Threading::Sleep(50); + if(threadA && threadB) + break; + } + + REQUIRE(threadA); + REQUIRE(threadB); + + Threading::JoinThread(sendThread); + Threading::CloseThread(sendThread); + + Threading::JoinThread(recvThread); + Threading::CloseThread(recvThread); + + // should have written 128 sets of 10 uint64s + REQUIRE(receivedValues.size() == 1280); + for(int i = 0; i < 128; i++) + { + for(int x = 0; x < 10; x++) + { + CHECK(receivedValues[i * 10 + x] == vals[x]); + } + } + + // reader *should* be errored now + CHECK_FALSE(writer.IsErrored()); + CHECK(reader.IsErrored()); + + // we shouldn't be able to write any more into the socket after it's been closed + + int32_t test = 42; + bool success = writer.Write(test); + CHECK_FALSE(success); + CHECK(writer.IsErrored()); + }; + + delete sender; + delete receiver; + delete server; +}; + +#endif // ENABLED(ENABLE_UNIT_TESTS) \ No newline at end of file diff --git a/renderdoc/serialise/zstdio.cpp b/renderdoc/serialise/zstdio.cpp new file mode 100644 index 000000000..5977cb6bd --- /dev/null +++ b/renderdoc/serialise/zstdio.cpp @@ -0,0 +1,345 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#define ZSTD_STATIC_LINKING_ONLY +#include "zstdio.h" + +static const uint64_t zstdBlockSize = 128 * 1024; +static const uint64_t compressBlockSize = ZSTD_compressBound(zstdBlockSize); + +ZSTDCompressor::ZSTDCompressor(StreamWriter *write, Ownership own) : Compressor(write, own) +{ + m_Page = AllocAlignedBuffer(zstdBlockSize); + m_CompressBuffer = AllocAlignedBuffer(compressBlockSize); + + m_PageOffset = 0; + + m_Stream = ZSTD_createCStream(); +} + +ZSTDCompressor::~ZSTDCompressor() +{ + ZSTD_freeCStream(m_Stream); + + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); +} + +bool ZSTDCompressor::Write(const void *data, uint64_t numBytes) +{ + // if we encountered a stream error this will be NULL + if(!m_CompressBuffer) + return false; + + if(numBytes == 0) + return true; + + // this is largely similar to LZ4Compressor, so check the comments there for more details. + // The only difference is that the lz4 streaming compression assumes a history of 64kb, where + // here we use a larger block size but no history must be maintained. + + if(m_PageOffset + numBytes <= zstdBlockSize) + { + // simplest path, no page wrapping/spanning at all + memcpy(m_Page + m_PageOffset, data, (size_t)numBytes); + m_PageOffset += numBytes; + + return true; + } + else + { + const byte *src = (const byte *)data; + + // copy whatever will fit on this page + { + uint64_t firstBytes = zstdBlockSize - m_PageOffset; + memcpy(m_Page + m_PageOffset, src, (size_t)firstBytes); + + m_PageOffset += firstBytes; + numBytes -= firstBytes; + src += firstBytes; + } + + bool success = true; + + while(success && numBytes > 0) + { + // flush page + success &= FlushPage(); + + if(!success) + return success; + + // how many bytes can we copy in this page? + uint64_t partialBytes = RDCMIN(zstdBlockSize, numBytes); + memcpy(m_Page, src, (size_t)partialBytes); + + // advance the source pointer, dest offset, and remove the bytes we read + m_PageOffset += partialBytes; + numBytes -= partialBytes; + src += partialBytes; + } + + return success; + } +} + +bool ZSTDCompressor::Finish() +{ + // This function just writes the current page and closes zstd. Since we assume all blocks are + // precisely 64kb in size + // only the last one can be smaller, so we only write a partial page when finishing. + // Calling Write() after Finish() is illegal + + return FlushPage(); +} + +bool ZSTDCompressor::FlushPage() +{ + // if we encountered a stream error this will be NULL + if(!m_CompressBuffer) + return false; + + ZSTD_inBuffer in = {m_Page, (size_t)m_PageOffset, 0}; + ZSTD_outBuffer out = {m_CompressBuffer, ZSTD_CStreamOutSize(), 0}; + + bool success = true; + + success &= CompressZSTDFrame(in, out); + + // if there was an error, bail + if(!m_CompressBuffer) + return false; + + // a bit redundant to write this but it means we can read the entire frame without + // doing multiple reads + success &= m_Write->Write((uint32_t)out.pos); + success &= m_Write->Write(m_CompressBuffer, out.pos); + + // start writing to the start of the page again + m_PageOffset = 0; + + return success; +} + +bool ZSTDCompressor::CompressZSTDFrame(ZSTD_inBuffer &in, ZSTD_outBuffer &out) +{ + size_t err = ZSTD_initCStream(m_Stream, 7); + + if(ZSTD_isError(err)) + { + RDCERR("Error compressing: %s", ZSTD_getErrorName(err)); + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); + m_Page = m_CompressBuffer = NULL; + return false; + } + + // keep calling compressStream until everything is consumed + while(in.pos < in.size) + { + size_t inpos = in.pos; + size_t outpos = out.pos; + + err = ZSTD_compressStream(m_Stream, &out, &in); + + if(ZSTD_isError(err) || (inpos == in.pos && outpos == out.pos)) + { + if(ZSTD_isError(err)) + RDCERR("Error compressing: %s", ZSTD_getErrorName(err)); + else + RDCERR("Error compressing, no progress made"); + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); + m_Page = m_CompressBuffer = NULL; + return false; + } + } + + err = ZSTD_endStream(m_Stream, &out); + + if(ZSTD_isError(err) || err != 0) + { + if(ZSTD_isError(err)) + RDCERR("Error compressing: %s", ZSTD_getErrorName(err)); + else + RDCERR("Error compressing, couldn't end stream"); + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); + m_Page = m_CompressBuffer = NULL; + return false; + } + + return true; +} + +ZSTDDecompressor::ZSTDDecompressor(StreamReader *read, Ownership own) : Decompressor(read, own) +{ + m_Page = AllocAlignedBuffer(zstdBlockSize); + m_CompressBuffer = AllocAlignedBuffer(compressBlockSize); + + m_PageOffset = 0; + m_PageLength = 0; + + m_Stream = ZSTD_createDStream(); +} + +ZSTDDecompressor::~ZSTDDecompressor() +{ + ZSTD_freeDStream(m_Stream); + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); +} + +bool ZSTDDecompressor::Recompress(Compressor *comp) +{ + bool success = true; + + while(success && !m_Read->AtEnd()) + { + success &= FillPage(); + if(success) + success &= comp->Write(m_Page, m_PageLength); + } + success &= comp->Finish(); + + return success; +} + +bool ZSTDDecompressor::Read(void *data, uint64_t numBytes) +{ + // if we encountered a stream error this will be NULL + if(!m_CompressBuffer) + return false; + + if(numBytes == 0) + return true; + + // this is simpler than the ZstdWriter::Write() implementation. + // At any point, m_Page contains the current window with uncompressed bytes. + // If we can satisfy a read from it, then we just memcpy and increment m_PageOffset. + // When we wrap around, we do a partial memcpy from m_Page, then decompress more bytes. + + // if we already have all the data in-memory, just copy and return + uint64_t available = m_PageLength - m_PageOffset; + + if(numBytes <= available) + { + memcpy(data, m_Page + m_PageOffset, (size_t)numBytes); + m_PageOffset += numBytes; + return true; + } + + byte *dst = (byte *)data; + + // copy what remains in m_Page + memcpy(dst, m_Page + m_PageOffset, (size_t)available); + + // adjust what needs to be copied + dst += available; + numBytes -= available; + + bool success = true; + + while(success && numBytes > 0) + { + success &= FillPage(); + + if(!success) + return success; + + // if we can now satisfy the remainder of the read, do so and return + if(numBytes <= m_PageLength) + { + memcpy(dst, m_Page, (size_t)numBytes); + m_PageOffset += numBytes; + return success; + } + + // otherwise copy this page in and continue + memcpy(dst, m_Page, (size_t)m_PageLength); + dst += m_PageLength; + numBytes -= m_PageLength; + } + + return success; +} + +bool ZSTDDecompressor::FillPage() +{ + uint32_t compSize = 0; + + bool success = true; + + success &= m_Read->Read(compSize); + success &= m_Read->Read(m_CompressBuffer, compSize); + + if(!success) + { + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); + m_Page = m_CompressBuffer = NULL; + return false; + } + + size_t err = ZSTD_initDStream(m_Stream); + + if(ZSTD_isError(err)) + { + RDCERR("Error decompressing: %s", ZSTD_getErrorName(err)); + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); + m_Page = m_CompressBuffer = NULL; + return false; + } + + ZSTD_inBuffer in = {m_CompressBuffer, compSize, 0}; + ZSTD_outBuffer out = {m_Page, zstdBlockSize, 0}; + + // keep calling compressStream until everything is consumed + while(in.pos < in.size) + { + size_t inpos = in.pos; + size_t outpos = out.pos; + + err = ZSTD_decompressStream(m_Stream, &out, &in); + + if(ZSTD_isError(err) || (inpos == in.pos && outpos == out.pos)) + { + if(ZSTD_isError(err)) + RDCERR("Error decompressing: %s", ZSTD_getErrorName(err)); + else + RDCERR("Error decompressing, no progress made"); + FreeAlignedBuffer(m_Page); + FreeAlignedBuffer(m_CompressBuffer); + m_Page = m_CompressBuffer = NULL; + return false; + } + } + + m_PageOffset = 0; + m_PageLength = out.pos; + + return success; +} diff --git a/renderdoc/serialise/zstdio.h b/renderdoc/serialise/zstdio.h new file mode 100644 index 000000000..90068e60a --- /dev/null +++ b/renderdoc/serialise/zstdio.h @@ -0,0 +1,69 @@ +/****************************************************************************** + * The MIT License (MIT) + * + * Copyright (c) 2017 Baldur Karlsson + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + ******************************************************************************/ + +#pragma once + +#include "zstd/zstd.h" +#include "streamio.h" + +class ZSTDCompressor : public Compressor +{ +public: + ZSTDCompressor(StreamWriter *write, Ownership own); + ~ZSTDCompressor(); + + bool Write(const void *data, uint64_t numBytes); + bool Finish(); + +private: + bool FlushPage(); + + bool CompressZSTDFrame(ZSTD_inBuffer &in, ZSTD_outBuffer &out); + + byte *m_Page; + byte *m_CompressBuffer; + uint64_t m_PageOffset; + + ZSTD_CStream *m_Stream; +}; + +class ZSTDDecompressor : public Decompressor +{ +public: + ZSTDDecompressor(StreamReader *read, Ownership own); + ~ZSTDDecompressor(); + + bool Recompress(Compressor *comp); + bool Read(void *data, uint64_t numBytes); + +private: + bool FillPage(); + + byte *m_Page; + byte *m_CompressBuffer; + uint64_t m_PageOffset; + uint64_t m_PageLength; + + ZSTD_DStream *m_Stream; +};