Fix racing busy signals when connecting to remote servers

This commit is contained in:
baldurk
2020-05-15 00:57:38 +01:00
parent 1d579dfb66
commit 08cf66abce
8 changed files with 196 additions and 155 deletions
+6 -8
View File
@@ -125,15 +125,13 @@ void RemoteHost::CheckStatus()
return;
}
// to avoid doing complex work while holding the remote host lock, we check the status here then
// call into the internal function that will propagate that data to the proper storage if needed.
IRemoteServer *rend = NULL;
ReplayStatus status = RENDERDOC_CreateRemoteServerConnection(m_hostname.c_str(), &rend);
UpdateStatus(RENDERDOC_CheckRemoteServerConnection(m_hostname.c_str()));
}
if(rend)
rend->ShutdownConnection();
UpdateStatus(status);
ReplayStatus RemoteHost::Connect(IRemoteServer **server)
{
QMutexLocker autolock(&m_data->mutex);
return RENDERDOC_CreateRemoteServerConnection(m_hostname.c_str(), server);
}
void RemoteHost::SetConnected(bool connected)
+8
View File
@@ -93,6 +93,14 @@ public:
)");
void SetLastCapturePath(const rdcstr &path);
DOCUMENT(R"(Create a connection to the remote server.
:return: The status of opening the capture, whether success or failure, and a :class:`RemoteServer`
instance if it were successful
:rtype: ``pair`` of ReplayStatus and RemoteServer
)");
ReplayStatus Connect(IRemoteServer **server);
DOCUMENT(
"The :class:`DeviceProtocolController` for this host, or ``None`` if no protocol is in use");
IDeviceProtocolController *Protocol() const { return m_protocol; }
+1 -1
View File
@@ -311,7 +311,7 @@ void ReplayManager::CloseThread()
ReplayStatus ReplayManager::ConnectToRemoteServer(RemoteHost host)
{
ReplayStatus status = RENDERDOC_CreateRemoteServerConnection(host.Hostname().c_str(), &m_Remote);
ReplayStatus status = host.Connect(&m_Remote);
if(host.Protocol() && host.Protocol()->GetProtocolName() == "adb")
{
@@ -192,6 +192,11 @@ void RemoteHost::CheckStatus()
{
}
ReplayStatus RemoteHost::Connect(IRemoteServer **server)
{
return ReplayStatus::Succeeded;
}
ReplayStatus RemoteHost::Launch()
{
return ReplayStatus::Succeeded;
+16 -5
View File
@@ -597,11 +597,22 @@ void RemoteManager::on_connect_clicked()
}
else
{
IRemoteServer *server = NULL;
ReplayStatus status =
RENDERDOC_CreateRemoteServerConnection(host.Hostname().c_str(), &server);
if(server)
server->ShutdownServerAndConnection();
ReplayStatus status = ReplayStatus::Succeeded;
LambdaThread *th = new LambdaThread([&host, &status]() {
IRemoteServer *server = NULL;
status = host.Connect(&server);
if(server)
server->ShutdownServerAndConnection();
});
th->start();
th->wait(500);
if(th->isRunning())
{
ShowProgressDialog(this, tr("Shutting down server, please wait..."),
[th]() { return !th->isRunning(); });
}
th->deleteLater();
setRemoteServerLive(node, false, false);
if(status != ReplayStatus::Succeeded)
+24 -5
View File
@@ -534,11 +534,7 @@ struct AndroidRemoteServer : public RemoteServer
m_LogcatThread->Finish();
}
virtual void ShutdownConnection() override
{
ResetAndroidSettings();
RemoteServer::ShutdownConnection();
}
virtual void ShutdownConnection() override;
virtual void ShutdownServerAndConnection() override
{
@@ -804,6 +800,7 @@ struct AndroidController : public IDeviceProtocolHandler
{
std::function<void()> meth;
int32_t done = 0;
bool selfdelete = false;
};
rdcarray<Command *> cmdqueue;
@@ -829,6 +826,9 @@ struct AndroidController : public IDeviceProtocolHandler
cmd->meth();
Atomic::Inc32(&cmd->done);
if(cmd->selfdelete)
delete cmd;
}
}
@@ -846,6 +846,18 @@ struct AndroidController : public IDeviceProtocolHandler
Threading::Sleep(5);
}
void AsyncInvoke(std::function<void()> method)
{
Command *cmd = new Command;
cmd->meth = method;
cmd->selfdelete = true;
{
SCOPED_LOCK(lock);
cmdqueue.push_back(cmd);
}
}
rdcstr GetProtocolName() override { return "adb"; }
rdcarray<rdcstr> GetDevices() override
{
@@ -1065,6 +1077,13 @@ struct AndroidController : public IDeviceProtocolHandler
};
};
void AndroidRemoteServer::ShutdownConnection()
{
rdcstr deviceID = m_deviceID;
AndroidController::m_Inst.AsyncInvoke([deviceID]() { Android::ResetCaptureSettings(deviceID); });
RemoteServer::ShutdownConnection();
}
ExecuteResult AndroidRemoteServer::ExecuteAndInject(const char *a, const char *w, const char *c,
const rdcarray<EnvironmentModification> &env,
const CaptureOptions &opts)
+15 -2
View File
@@ -1735,17 +1735,30 @@ extern "C" RENDERDOC_API uint32_t RENDERDOC_CC RENDERDOC_EnumerateRemoteTargets(
// Remote server
//////////////////////////////////////////////////////////////////////////
DOCUMENT(R"(Create a connection to a remote server running on given hostname and port.
DOCUMENT(R"(Create a connection to a remote server running on given hostname.
:param str URL: The hostname to connect to, if blank then localhost is used. If no protocol is
specified then default TCP enumeration happens.
:return: The status of opening the capture, whether success or failure, and a :class:`RemoteServer`
:return: The status of opening the connection, whether success or failure, and a :class:`RemoteServer`
instance if it were successful
:rtype: ``pair`` of ReplayStatus and RemoteServer
)");
extern "C" RENDERDOC_API ReplayStatus RENDERDOC_CC
RENDERDOC_CreateRemoteServerConnection(const char *URL, IRemoteServer **rend);
DOCUMENT(R"(Check the connection to a remote server running on given hostname.
This should be preferred to :func:`CreateRemoteServerConnection` when no connection is desired, as
the status can be checked without interfering with making connections.
:param str URL: The hostname to connect to, if blank then localhost is used. If no protocol is
specified then default TCP enumeration happens.
:return: The status of the server.
:rtype: ReplayStatus
)");
extern "C" RENDERDOC_API ReplayStatus RENDERDOC_CC
RENDERDOC_CheckRemoteServerConnection(const char *URL);
DOCUMENT(R"(This launches a remote server which will continually run in a loop to server requests
from external sources.
+121 -134
View File
@@ -28,6 +28,7 @@
#include "android/android.h"
#include "api/replay/renderdoc_replay.h"
#include "api/replay/version.h"
#include "common/threading.h"
#include "core/core.h"
#include "core/settings.h"
#include "os/os_specific.h"
@@ -154,60 +155,93 @@ struct ClientThread
Threading::ThreadHandle thread;
};
static void InactiveRemoteClientThread(ClientThread *threadData)
struct ActiveClient
{
Threading::SetCurrentThreadName("InactiveRemoteClientThread");
Threading::CriticalSection lock;
ClientThread *active = NULL;
};
static bool HandleHandshakeClient(ActiveClient &activeClient, ClientThread *threadData)
{
uint32_t ip = threadData->socket->GetRemoteIP();
uint32_t version = 0;
bool activeConnectionDesired = false;
bool activeConnectionEstablished = false;
{
uint32_t version = 0;
ReadSerialiser ser(new StreamReader(threadData->socket, Ownership::Nothing), Ownership::Stream);
// this thread just handles receiving the handshake and sending a busy signal without blocking
// the server thread
RemoteServerPacket type = ser.ReadChunk<RemoteServerPacket>();
if(ser.IsErrored() || type != eRemoteServer_Handshake)
{
ReadSerialiser ser(new StreamReader(threadData->socket, Ownership::Nothing), Ownership::Stream);
// this thread just handles receiving the handshake and sending a busy signal without blocking
// the server thread
RemoteServerPacket type = ser.ReadChunk<RemoteServerPacket>();
if(ser.IsErrored() || type != eRemoteServer_Handshake)
{
RDCWARN("Didn't receive proper handshake");
SAFE_DELETE(threadData->socket);
return;
}
SERIALISE_ELEMENT(version);
ser.EndChunk();
RDCWARN("Didn't receive proper handshake");
return activeConnectionEstablished;
}
SERIALISE_ELEMENT(version);
SERIALISE_ELEMENT(activeConnectionDesired);
ser.EndChunk();
}
{
WriteSerialiser ser(new StreamWriter(threadData->socket, Ownership::Nothing), Ownership::Stream);
ser.SetStreamingMode(true);
if(version != RemoteServerProtocolVersion)
{
WriteSerialiser ser(new StreamWriter(threadData->socket, Ownership::Nothing),
Ownership::Stream);
RDCLOG("Connection using protocol %u, but we are running %u", version,
RemoteServerProtocolVersion);
ser.SetStreamingMode(true);
if(version != RemoteServerProtocolVersion)
{
RDCLOG("Connection using protocol %u, but we are running %u", version,
RemoteServerProtocolVersion);
SCOPED_SERIALISE_CHUNK(eRemoteServer_VersionMismatch);
}
}
else
{
bool busy = false;
{
SCOPED_LOCK(activeClient.lock);
busy = activeClient.active != NULL;
// if we're not busy, and the connection wants to be active, promote it.
if(!busy && activeConnectionDesired)
{
SCOPED_SERIALISE_CHUNK(eRemoteServer_VersionMismatch);
RDCLOG("Promoting connection from %u.%u.%u.%u to active.", Network::GetIPOctet(ip, 0),
Network::GetIPOctet(ip, 1), Network::GetIPOctet(ip, 2), Network::GetIPOctet(ip, 3));
activeConnectionEstablished = true;
}
}
else
// if we were busy, return that status
if(busy)
{
RDCLOG("Returning busy signal for connection from %u.%u.%u.%u.", Network::GetIPOctet(ip, 0),
Network::GetIPOctet(ip, 1), Network::GetIPOctet(ip, 2), Network::GetIPOctet(ip, 3));
SCOPED_SERIALISE_CHUNK(eRemoteServer_Busy);
}
// otherwise we return a successful handshake. For active connections this begins the active
// thread, for passive connection checks this is enough
else
{
RDCLOG("Returning OK signal for connection from %u.%u.%u.%u.", Network::GetIPOctet(ip, 0),
Network::GetIPOctet(ip, 1), Network::GetIPOctet(ip, 2), Network::GetIPOctet(ip, 3));
SCOPED_SERIALISE_CHUNK(eRemoteServer_Handshake);
}
}
SAFE_DELETE(threadData->socket);
RDCLOG("Closed inactive connection from %u.%u.%u.%u.", Network::GetIPOctet(ip, 0),
Network::GetIPOctet(ip, 1), Network::GetIPOctet(ip, 2), Network::GetIPOctet(ip, 3));
}
// return whether or not an active connection was established
return activeConnectionEstablished;
}
static void ActiveRemoteClientThread(ClientThread *threadData,
@@ -221,50 +255,6 @@ static void ActiveRemoteClientThread(ClientThread *threadData,
uint32_t ip = client->GetRemoteIP();
uint32_t version = 0;
{
ReadSerialiser ser(new StreamReader(client, Ownership::Nothing), Ownership::Stream);
// this thread just handles receiving the handshake and sending a busy signal without blocking
// the server thread
RemoteServerPacket type = ser.ReadChunk<RemoteServerPacket>();
if(ser.IsErrored() || type != eRemoteServer_Handshake)
{
RDCWARN("Didn't receive proper handshake");
SAFE_DELETE(client);
return;
}
SERIALISE_ELEMENT(version);
ser.EndChunk();
}
{
WriteSerialiser ser(new StreamWriter(client, Ownership::Nothing), Ownership::Stream);
ser.SetStreamingMode(true);
if(version != RemoteServerProtocolVersion)
{
RDCLOG("Connection using protocol %u, but we are running %u", version,
RemoteServerProtocolVersion);
{
SCOPED_SERIALISE_CHUNK(eRemoteServer_VersionMismatch);
}
SAFE_DELETE(client);
return;
}
else
{
// handshake and continue
SCOPED_SERIALISE_CHUNK(eRemoteServer_Handshake);
}
}
rdcarray<rdcstr> tempFiles;
IRemoteDriver *remoteDriver = NULL;
IReplayDriver *replayDriver = NULL;
@@ -1022,38 +1012,37 @@ void RenderDoc::BecomeRemoteServer(const char *listenhost, uint16_t port,
RDCLOG("Replay host ready for requests...");
ClientThread *activeClientData = NULL;
ActiveClient activeClientData;
rdcarray<ClientThread *> inactives;
rdcarray<ClientThread *> clients;
while(!killReplay())
{
Network::Socket *client = sock->AcceptClient(0);
if(activeClientData && activeClientData->killServer)
break;
// reap any dead inactive threads
for(size_t i = 0; i < inactives.size(); i++)
{
if(inactives[i]->socket == NULL)
{
Threading::JoinThread(inactives[i]->thread);
Threading::CloseThread(inactives[i]->thread);
delete inactives[i];
inactives.erase(i);
SCOPED_LOCK(activeClientData.lock);
if(activeClientData.active && activeClientData.active->killServer)
break;
}
}
// reap our active connection possibly
if(activeClientData && activeClientData->socket == NULL)
// reap any dead client threads
for(size_t i = 0; i < clients.size(); i++)
{
Threading::JoinThread(activeClientData->thread);
Threading::CloseThread(activeClientData->thread);
if(clients[i]->socket == NULL)
{
{
SCOPED_LOCK(activeClientData.lock);
if(activeClientData.active == clients[i])
activeClientData.active = NULL;
}
delete activeClientData;
activeClientData = NULL;
Threading::JoinThread(clients[i]->thread);
Threading::CloseThread(clients[i]->thread);
delete clients[i];
clients.erase(i);
break;
}
}
if(client == NULL)
@@ -1097,49 +1086,39 @@ void RenderDoc::BecomeRemoteServer(const char *listenhost, uint16_t port,
continue;
}
if(activeClientData == NULL)
{
activeClientData = new ClientThread();
activeClientData->socket = client;
activeClientData->allowExecution = allowExecution;
RDCLOG("Processing connection");
activeClientData->thread = Threading::CreateThread([activeClientData, previewWindow]() {
ActiveRemoteClientThread(activeClientData, previewWindow);
});
ClientThread *clientThread = new ClientThread();
clientThread->socket = client;
clientThread->allowExecution = allowExecution;
clientThread->thread =
Threading::CreateThread([&activeClientData, clientThread, previewWindow]() {
if(HandleHandshakeClient(activeClientData, clientThread))
{
ActiveRemoteClientThread(clientThread, previewWindow);
}
else
{
SAFE_DELETE(clientThread->socket);
}
});
RDCLOG("Making active connection");
}
else
{
ClientThread *inactive = new ClientThread();
inactive->socket = client;
inactive->allowExecution = false;
inactive->thread =
Threading::CreateThread([inactive]() { InactiveRemoteClientThread(inactive); });
inactives.push_back(inactive);
RDCLOG("Refusing inactive connection");
}
clients.push_back(clientThread);
}
if(activeClientData && activeClientData->socket != NULL)
{
activeClientData->killThread = true;
Threading::JoinThread(activeClientData->thread);
Threading::CloseThread(activeClientData->thread);
delete activeClientData;
SCOPED_LOCK(activeClientData.lock);
if(activeClientData.active)
activeClientData.active->killThread = true;
activeClientData.active = NULL;
}
// shut down client threads
for(size_t i = 0; i < inactives.size(); i++)
for(size_t i = 0; i < clients.size(); i++)
{
Threading::JoinThread(inactives[i]->thread);
Threading::CloseThread(inactives[i]->thread);
delete inactives[i];
Threading::JoinThread(clients[i]->thread);
Threading::CloseThread(clients[i]->thread);
delete clients[i];
}
SAFE_DELETE(sock);
@@ -1148,9 +1127,6 @@ void RenderDoc::BecomeRemoteServer(const char *listenhost, uint16_t port,
extern "C" RENDERDOC_API ReplayStatus RENDERDOC_CC
RENDERDOC_CreateRemoteServerConnection(const char *URL, IRemoteServer **rend)
{
if(rend == NULL)
return ReplayStatus::InternalError;
rdcstr host = "localhost";
if(URL != NULL && URL[0] != '\0')
host = URL;
@@ -1180,6 +1156,8 @@ RENDERDOC_CreateRemoteServerConnection(const char *URL, IRemoteServer **rend)
sock->SetTimeout(RemoteServer_TimeoutMS());
bool activeConnection = (rend != NULL);
{
WriteSerialiser ser(new StreamWriter(sock, Ownership::Nothing), Ownership::Stream);
@@ -1187,6 +1165,7 @@ RENDERDOC_CreateRemoteServerConnection(const char *URL, IRemoteServer **rend)
SCOPED_SERIALISE_CHUNK(eRemoteServer_Handshake);
SERIALISE_ELEMENT(version);
SERIALISE_ELEMENT(activeConnection);
}
if(!sock->Connected())
@@ -1219,6 +1198,9 @@ RENDERDOC_CreateRemoteServerConnection(const char *URL, IRemoteServer **rend)
}
}
if(rend == NULL)
return ReplayStatus::Succeeded;
if(protocol)
*rend = protocol->CreateRemoteServer(sock, deviceID);
else
@@ -1227,6 +1209,11 @@ RENDERDOC_CreateRemoteServerConnection(const char *URL, IRemoteServer **rend)
return ReplayStatus::Succeeded;
}
extern "C" RENDERDOC_API ReplayStatus RENDERDOC_CC RENDERDOC_CheckRemoteServerConnection(const char *URL)
{
return RENDERDOC_CreateRemoteServerConnection(URL, NULL);
}
#undef WRITE_DATA_SCOPE
#undef READ_DATA_SCOPE
#define WRITE_DATA_SCOPE() WriteSerialiser &ser = *writer;