Add delta-encoding for fetching texture and buffer data in replay proxy

* When we need to cache a texture or buffer locally then before this
  commit we'd transfer the whole resource, and re-transfer it every time
  the event changed even if the contents haven't changed.
* Instead now we track the previous contents on both sides, and just
  send a list of deltas to apply.
This commit is contained in:
baldurk
2017-11-27 20:40:27 +00:00
parent 007fa63c89
commit 025b2a2f54
2 changed files with 371 additions and 11 deletions
+336 -8
View File
@@ -1367,6 +1367,322 @@ void ReplayProxy::FetchStructuredFile()
PROXY_FUNCTION(FetchStructuredFile);
}
struct DeltaSection
{
uint64_t offs = 0;
bytebuf contents;
};
DECLARE_REFLECTION_STRUCT(DeltaSection);
template <typename SerialiserType>
void DoSerialise(SerialiserType &ser, DeltaSection &el)
{
SERIALISE_MEMBER(offs);
SERIALISE_MEMBER(contents);
}
template <typename SerialiserType>
void ReplayProxy::DeltaTransferBytes(SerialiserType &xferser, bytebuf &referenceData, bytebuf &newData)
{
char empty[128] = {};
// we use a list so that we don't have to reserve and pushing new sections will never cause
// previous ones to be reallocated and move around lots of data.
std::list<DeltaSection> deltas;
// lz4 compress
if(xferser.IsReading())
{
uint64_t uncompSize = 0;
xferser.Serialise("uncompSize", uncompSize);
if(uncompSize == 0)
{
// fast path - no changes.
RDCDEBUG("Unchanged");
return;
}
else
{
{
ReadSerialiser ser(
new StreamReader(new LZ4Decompressor(xferser.GetReader(), Ownership::Nothing),
uncompSize, Ownership::Stream),
Ownership::Stream);
SERIALISE_ELEMENT(deltas);
// add any necessary padding.
uint64_t offs = ser.GetReader()->GetOffset();
RDCASSERT(offs <= uncompSize, offs, uncompSize);
RDCASSERT(uncompSize - offs < sizeof(empty), offs, uncompSize);
ser.GetReader()->Read(empty, uncompSize - offs);
}
if(deltas.empty())
{
RDCERR("Unexpected empty delta list");
}
else if(referenceData.empty())
{
// if we don't have reference data we blat the whole contents.
// in this case we only expect one delta with the whole range
if(deltas.size() != 1)
RDCERR("Got more than one delta with no reference data - taking first delta.");
referenceData = deltas.front().contents;
RDCDEBUG("Creating new reference data, %llu bytes", (uint64_t)referenceData.size());
}
else
{
uint64_t deltaBytes = 0;
// apply deltas to refData
for(const DeltaSection &delta : deltas)
{
if(delta.offs + delta.contents.size() > referenceData.size())
{
RDCERR("{%llu, %llu} larger than reference data (%llu bytes) - expanding to fit.",
delta.offs, (uint64_t)delta.contents.size(), (uint64_t)referenceData.size());
referenceData.resize(delta.offs + delta.contents.size());
}
byte *dst = referenceData.data() + (ptrdiff_t)delta.offs;
const byte *src = delta.contents.data();
memcpy(dst, src, delta.contents.size());
deltaBytes += (uint64_t)delta.contents.size();
}
RDCDEBUG("Applied %u deltas data, %llu total delta bytes to %llu resource size",
(uint32_t)deltas.size(), deltaBytes, (uint64_t)referenceData.size());
}
}
}
else
{
uint64_t uncompSize = 0;
if(referenceData.empty())
{
// no previous reference data, need to transfer the whole object.
deltas.resize(1);
deltas.back().contents = newData;
}
else
{
if(referenceData.size() != newData.size())
{
RDCERR("Reference data existed at %llu bytes, but new data is now %llu bytes",
referenceData.size(), newData.size());
// re-transfer the whole block, something went seriously wrong if the resource changed size.
deltas.resize(1);
deltas.back().contents = newData;
}
else
{
// do actual diff.
const byte *srcBegin = newData.data();
const byte *src = srcBegin;
const byte *dst = referenceData.data();
size_t bytesRemain = newData.size();
// we only care about large-ish chunks at a time. This prevents us generating lots of tiny
// deltas where we could batch changes together. This is tuned to not be too large (and
// thus causing us to miss too many sections we could skip) and not too small (causing us
// to devolve into lots of byte-wise deltas). The current value as of this comment of 128
// is definitely on the small end of the range, but consider e.g. an android image of
// 1440x2560 and a pixel-wide line that goes vertically from top to bottom. Reading
// horizontally that will mean 2560 different diffs, and only actually one pixel changed.
// The larger this value gets, the more redundant data we'll send along with.
const size_t chunkSize = 128;
// we use a simple state machine. Start in state 1
//
// State 1: No active delta. Look at the current chunk, if there's no difference move to the
// next chunk and stay in this state. If there is a difference, push a delta onto
// the list at the current offset. Copy the current chunk into the contents of the
// delta. Move to state 2.
// State 2. Active delta. Look at the current chunk, if there is a difference then append
// the current chunk to the last delta's contents, move to the next chunk, and stay
// in this state. If there isn't a difference, move back to state 1 (the delta is
// already 'finished' so we have no need to do anything more on it).
//
// At any point we can end the loop, both states are 'complete' at all points.
enum DeltaState
{
None,
Active
};
DeltaState state = DeltaState::None;
// loop over whole chunks
while(bytesRemain > chunkSize)
{
// check if there's a difference in this chunk.
bool chunkDiff = memcmp(src, dst, chunkSize) != 0;
// if we're in state 1
if(state == DeltaState::None)
{
// if there's a difference, append a new delta with the current offset and chunk
// contents and move to state 2
if(chunkDiff)
{
deltas.push_back(DeltaSection());
deltas.back().offs = src - srcBegin;
deltas.back().contents.append(src, chunkSize);
state = DeltaState::Active;
}
}
// if we're in state 2
else if(state == DeltaState::Active)
{
// continue to append to the delta if there's another difference in this chunk.
if(chunkDiff)
{
deltas.back().contents.append(src, chunkSize);
}
else
{
state = DeltaState::None;
}
}
// move to the next chunk
bytesRemain -= chunkSize;
src += chunkSize;
dst += chunkSize;
}
// if there are still some bytes remaining at the end of the image, smaller than the chunk
// size, just diff directly and send if needed. We could combine this with the last delta if
// we ended in the active state.
if(bytesRemain > 0 && memcmp(src, dst, bytesRemain))
{
deltas.push_back(DeltaSection());
deltas.back().offs = src - srcBegin;
deltas.back().contents.append(src, bytesRemain);
}
}
}
// fast path - no changes.
if(deltas.empty())
{
uncompSize = 0;
}
else
{
// serialise to an invalid writer, to get the size of the data that will be written.
WriteSerialiser ser(new StreamWriter(StreamWriter::InvalidStream), Ownership::Stream);
SERIALISE_ELEMENT(deltas);
uncompSize = ser.GetWriter()->GetOffset() + ser.GetChunkAlignment();
}
xferser.Serialise("uncompSize", uncompSize);
if(uncompSize > 0)
{
WriteSerialiser ser(new StreamWriter(new LZ4Compressor(xferser.GetWriter(), Ownership::Nothing),
Ownership::Stream),
Ownership::Stream);
SERIALISE_ELEMENT(deltas);
// add any necessary padding.
uint64_t offs = ser.GetWriter()->GetOffset();
RDCASSERT(offs <= uncompSize, offs, uncompSize);
RDCASSERT(uncompSize - offs < sizeof(empty), offs, uncompSize);
ser.GetWriter()->Write(empty, uncompSize - offs);
}
// This is the proxy side, so we have the complete newest contents in data. Swap the new data
// into refData for next time.
referenceData.swap(newData);
}
}
template <typename ParamSerialiser, typename ReturnSerialiser>
void ReplayProxy::Proxied_CacheBufferData(ParamSerialiser &paramser, ReturnSerialiser &retser,
ResourceId buff)
{
const ReplayProxyPacket packet = eReplayProxy_CacheBufferData;
{
BEGIN_PARAMS();
SERIALISE_ELEMENT(buff);
END_PARAMS();
}
bytebuf data;
if(paramser.IsReading() && !paramser.IsErrored() && !m_IsErrored)
m_Remote->GetBufferData(buff, 0, 0, data);
{
ReturnSerialiser &ser = retser;
PACKET_HEADER(packet);
}
DeltaTransferBytes(retser, m_ProxyBufferData[buff], data);
retser.EndChunk();
}
void ReplayProxy::CacheBufferData(ResourceId buff)
{
PROXY_FUNCTION(CacheBufferData, buff);
}
template <typename ParamSerialiser, typename ReturnSerialiser>
void ReplayProxy::Proxied_CacheTextureData(ParamSerialiser &paramser, ReturnSerialiser &retser,
ResourceId tex, uint32_t arrayIdx, uint32_t mip,
const GetTextureDataParams &params)
{
const ReplayProxyPacket packet = eReplayProxy_CacheTextureData;
{
BEGIN_PARAMS();
SERIALISE_ELEMENT(tex);
SERIALISE_ELEMENT(arrayIdx);
SERIALISE_ELEMENT(mip);
SERIALISE_ELEMENT(params);
END_PARAMS();
}
bytebuf data;
if(paramser.IsReading() && !paramser.IsErrored() && !m_IsErrored)
m_Remote->GetTextureData(tex, arrayIdx, mip, params, data);
{
ReturnSerialiser &ser = retser;
PACKET_HEADER(packet);
}
TextureCacheEntry entry = {tex, arrayIdx, mip};
DeltaTransferBytes(retser, m_ProxyTextureData[entry], data);
retser.EndChunk();
}
void ReplayProxy::CacheTextureData(ResourceId tex, uint32_t arrayIdx, uint32_t mip,
const GetTextureDataParams &params)
{
PROXY_FUNCTION(CacheTextureData, tex, arrayIdx, mip, params);
}
#pragma endregion Proxied Functions
// If a remap is required, modify the params that are used when getting the proxy texture data
@@ -1466,11 +1782,15 @@ void ReplayProxy::EnsureTexCached(ResourceId texid, uint32_t arrayIdx, uint32_t
const ProxyTextureProperties &proxy = m_ProxyTextures[texid];
bytebuf data;
GetTextureData(texid, arrayIdx, mip, proxy.params, data);
#if ENABLED(TRANSFER_RESOURCE_CONTENTS_DELTAS)
CacheTextureData(texid, arrayIdx, mip, proxy.params);
#else
GetTextureData(texid, arrayIdx, mip, proxy.params, m_ProxyTextureData[entry]);
#endif
if(!data.empty())
m_Proxy->SetProxyTextureData(proxy.id, arrayIdx, mip, data.data(), data.size());
auto it = m_ProxyTextureData.find(entry);
if(it != m_ProxyTextureData.end())
m_Proxy->SetProxyTextureData(proxy.id, arrayIdx, mip, it->second.data(), it->second.size());
m_TextureProxyCache.insert(entry);
}
@@ -1491,11 +1811,15 @@ void ReplayProxy::EnsureBufCached(ResourceId bufid)
ResourceId proxyid = m_ProxyBufferIds[bufid];
bytebuf data;
GetBufferData(bufid, 0, 0, data);
#if ENABLED(TRANSFER_RESOURCE_CONTENTS_DELTAS)
CacheBufferData(bufid);
#else
GetBufferData(bufid, 0, 0, m_ProxyBufferData[bufid]);
#endif
if(!data.empty())
m_Proxy->SetProxyBufferData(proxyid, &data[0], data.size());
auto it = m_ProxyBufferData.find(bufid);
if(it != m_ProxyBufferData.end())
m_Proxy->SetProxyBufferData(proxyid, it->second.data(), it->second.size());
m_BufferProxyCache.insert(bufid);
}
@@ -1511,6 +1835,10 @@ bool ReplayProxy::Tick(int type)
switch(type)
{
case eReplayProxy_CacheBufferData: CacheBufferData(ResourceId()); break;
case eReplayProxy_CacheTextureData:
CacheTextureData(ResourceId(), 0, 0, GetTextureDataParams());
break;
case eReplayProxy_ReplayLog: ReplayLog(0, (ReplayLogType)0); break;
case eReplayProxy_FetchStructuredFile: FetchStructuredFile(); break;
case eReplayProxy_GetAPIProperties: GetAPIProperties(); break;
+35 -3
View File
@@ -29,6 +29,10 @@
#include "replay/replay_driver.h"
#include "serialise/serialiser.h"
// turns on/off the feature to transfer resource contents (cached textures and buffers) as a series
// of deltas to a shared view of the previous resource contents.
#define TRANSFER_RESOURCE_CONTENTS_DELTAS OPTION_ON
enum ReplayProxyPacket
{
// we offset these packet numbers so that it can co-exist
@@ -37,6 +41,9 @@ enum ReplayProxyPacket
eReplayProxy_ReplayLog = eReplayProxy_First,
eReplayProxy_CacheBufferData,
eReplayProxy_CacheTextureData,
eReplayProxy_GetAPIProperties,
eReplayProxy_FetchStructuredFile,
@@ -470,6 +477,18 @@ public:
IMPLEMENT_FUNCTION_PROXIED(void, ReplaceResource, ResourceId from, ResourceId to);
IMPLEMENT_FUNCTION_PROXIED(void, RemoveReplacement, ResourceId id);
// these functions are not part of the replay driver interface - they are similar to GetBufferData
// and GetTextureData, but they do extra work to try and optimise transfer by delta-encoding the
// difference in the returned data to the last time the resource was cached
IMPLEMENT_FUNCTION_PROXIED(void, CacheBufferData, ResourceId buff);
IMPLEMENT_FUNCTION_PROXIED(void, CacheTextureData, ResourceId tex, uint32_t arrayIdx,
uint32_t mip, const GetTextureDataParams &params);
// utility function to serialise the contents of a byte array given the previous contents that's
// available on both sides of the communication.
template <typename SerialiserType>
void DeltaTransferBytes(SerialiserType &xferser, bytebuf &referenceData, bytebuf &newData);
void FileChanged() {}
// will never be used
ResourceId CreateProxyTexture(const TextureDescription &templateTex)
@@ -517,8 +536,11 @@ private:
return mip < o.mip;
}
};
// this cache only exists on the client side, with the proxy renderer. This denotes cases where we
// already have up-to-date texture data for the current event so we don't need to check for any
// deltas. It is cleared any time we set event.
set<TextureCacheEntry> m_TextureProxyCache;
set<ResourceId> m_LocalTextures;
set<ResourceId> m_BufferProxyCache;
struct ProxyTextureProperties
{
@@ -531,11 +553,21 @@ private:
operator ResourceId() const { return id; }
bool operator==(const ResourceId &other) const { return id == other; }
};
// this cache only exists on the client side, with the proxy renderer. It contains the created
// proxy textures to stand-in for remote real textures.
map<ResourceId, ProxyTextureProperties> m_ProxyTextures;
set<ResourceId> m_BufferProxyCache;
map<ResourceId, ResourceId> m_ProxyBufferIds;
// this cache exists on *both* sides of the proxy connection, and must be kept in sync. It is used
// on the remote side to determine which deltas are necessary, and then each time on the client
// side the data is uploaded into the proxy textures above.
std::map<TextureCacheEntry, bytebuf> m_ProxyTextureData;
std::map<ResourceId, bytebuf> m_ProxyBufferData;
// this lists any textures which are only created locally (e.g. custom visualisation shaders) and
// should not be treated as proxied.
std::set<ResourceId> m_LocalTextures;
map<ResourceId, ResourceId> m_LiveIDs;
struct ShaderReflKey