From e15f003639819234e5b6e8021e8dd11588119ae9 Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Mon, 7 Jul 2014 11:08:43 -0700 Subject: [PATCH] Bring reliable delta streaming code over from test. --- .../src/metavoxels/MetavoxelServer.cpp | 61 ++++++++++++++++--- .../src/metavoxels/MetavoxelServer.h | 8 ++- libraries/metavoxels/src/Endpoint.h | 3 + .../metavoxels/src/MetavoxelClientManager.cpp | 31 ++++++++-- .../metavoxels/src/MetavoxelClientManager.h | 4 ++ tests/metavoxels/src/MetavoxelTests.cpp | 2 - 6 files changed, 91 insertions(+), 18 deletions(-) diff --git a/assignment-client/src/metavoxels/MetavoxelServer.cpp b/assignment-client/src/metavoxels/MetavoxelServer.cpp index e7e06c96d0..a01efddbf4 100644 --- a/assignment-client/src/metavoxels/MetavoxelServer.cpp +++ b/assignment-client/src/metavoxels/MetavoxelServer.cpp @@ -91,24 +91,56 @@ void MetavoxelServer::sendDeltas() { MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) : Endpoint(node, new PacketRecord(), NULL), - _server(server) { + _server(server), + _reliableDeltaChannel(NULL) { connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); + connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(checkReliableDeltaReceived())); connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&))); } void MetavoxelSession::update() { - // wait until we have a valid lod - if (_lod.isValid()) { - Endpoint::update(); + // wait until we have a valid lod before sending + if (!_lod.isValid()) { + return; } -} - -void MetavoxelSession::writeUpdateMessage(Bitstream& out) { + // if we're sending a reliable delta, wait until it's acknowledged + if (_reliableDeltaChannel) { + Bitstream& out = _sequencer.startPacket(); + out << QVariant::fromValue(MetavoxelDeltaPendingMessage()); + _sequencer.endPacket(); + return; + } + Bitstream& out = _sequencer.startPacket(); out << QVariant::fromValue(MetavoxelDeltaMessage()); PacketRecord* sendRecord = getLastAcknowledgedSendRecord(); - _server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); + out.setBytesRemaining(_sequencer.getMaxPacketSize()); + try { + _server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); + _sequencer.endPacket(); + + } catch (const ByteLimitExceededException& exception) { + _sequencer.cancelPacket(); + + // we need to send the delta on the reliable channel + _reliableDeltaChannel = _sequencer.getReliableOutputChannel(RELIABLE_DELTA_CHANNEL_INDEX); + _reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getOutputStream()); + _reliableDeltaChannel->startMessage(); + _reliableDeltaChannel->getBitstream() << QVariant::fromValue(MetavoxelDeltaMessage()); + _server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), _reliableDeltaChannel->getBitstream(), _lod); + _reliableDeltaWriteMappings = _reliableDeltaChannel->getBitstream().getAndResetWriteMappings(); + _reliableDeltaChannel->getBitstream().clearPersistentMappings(); + _reliableDeltaChannel->endMessage(); + + _reliableDeltaReceivedOffset = _reliableDeltaChannel->getBytesWritten(); + _reliableDeltaData = _server->getData(); + _reliableDeltaLOD = _lod; + + Bitstream& out = _sequencer.startPacket(); + out << QVariant::fromValue(MetavoxelDeltaPendingMessage()); + _sequencer.endPacket(); + } } void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) { @@ -116,7 +148,8 @@ void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) { } PacketRecord* MetavoxelSession::maybeCreateSendRecord() const { - return new PacketRecord(_lod, _server->getData()); + return _reliableDeltaChannel ? new PacketRecord(_reliableDeltaLOD, _reliableDeltaData) : + new PacketRecord(_lod, _server->getData()); } void MetavoxelSession::handleMessage(const QVariant& message) { @@ -134,3 +167,13 @@ void MetavoxelSession::handleMessage(const QVariant& message) { } } } + +void MetavoxelSession::checkReliableDeltaReceived() { + if (!_reliableDeltaChannel || _reliableDeltaChannel->getOffset() < _reliableDeltaReceivedOffset) { + return; + } + _sequencer.getOutputStream().persistWriteMappings(_reliableDeltaWriteMappings); + _reliableDeltaWriteMappings = Bitstream::WriteMappings(); + _reliableDeltaData = MetavoxelData(); + _reliableDeltaChannel = NULL; +} diff --git a/assignment-client/src/metavoxels/MetavoxelServer.h b/assignment-client/src/metavoxels/MetavoxelServer.h index d9b010e282..f2769f26f2 100644 --- a/assignment-client/src/metavoxels/MetavoxelServer.h +++ b/assignment-client/src/metavoxels/MetavoxelServer.h @@ -63,7 +63,6 @@ public: protected: - virtual void writeUpdateMessage(Bitstream& out); virtual void handleMessage(const QVariant& message, Bitstream& in); virtual PacketRecord* maybeCreateSendRecord() const; @@ -71,12 +70,19 @@ protected: private slots: void handleMessage(const QVariant& message); + void checkReliableDeltaReceived(); private: MetavoxelServer* _server; MetavoxelLOD _lod; + + ReliableChannel* _reliableDeltaChannel; + int _reliableDeltaReceivedOffset; + MetavoxelData _reliableDeltaData; + MetavoxelLOD _reliableDeltaLOD; + Bitstream::WriteMappings _reliableDeltaWriteMappings; }; #endif // hifi_MetavoxelServer_h diff --git a/libraries/metavoxels/src/Endpoint.h b/libraries/metavoxels/src/Endpoint.h index b1f468531b..3c681a7b98 100644 --- a/libraries/metavoxels/src/Endpoint.h +++ b/libraries/metavoxels/src/Endpoint.h @@ -24,6 +24,9 @@ class Endpoint : public NodeData { Q_OBJECT public: + + /// The index of the input/output channel used to transmit reliable deltas. + static const int RELIABLE_DELTA_CHANNEL_INDEX = 1; Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendRecord = NULL, PacketRecord* baselineReceiveRecord = NULL); diff --git a/libraries/metavoxels/src/MetavoxelClientManager.cpp b/libraries/metavoxels/src/MetavoxelClientManager.cpp index 008a477187..9abb5e9e5d 100644 --- a/libraries/metavoxels/src/MetavoxelClientManager.cpp +++ b/libraries/metavoxels/src/MetavoxelClientManager.cpp @@ -86,7 +86,11 @@ void MetavoxelClientManager::updateClient(MetavoxelClient* client) { MetavoxelClient::MetavoxelClient(const SharedNodePointer& node, MetavoxelClientManager* manager) : Endpoint(node, new PacketRecord(), new PacketRecord()), - _manager(manager) { + _manager(manager), + _reliableDeltaChannel(NULL) { + + connect(_sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX), + SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&))); } void MetavoxelClient::guide(MetavoxelVisitor& visitor) { @@ -124,19 +128,34 @@ void MetavoxelClient::readMessage(Bitstream& in) { } void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) { - if (message.userType() == MetavoxelDeltaMessage::Type) { + int userType = message.userType(); + if (userType == MetavoxelDeltaMessage::Type) { PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); - _data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, getLastAcknowledgedSendRecord()->getLOD()); - + if (_reliableDeltaChannel) { + _data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _dataLOD = _reliableDeltaLOD); + _sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings()); + in.clearPersistentMappings(); + _reliableDeltaChannel = NULL; + + } else { + _data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, + _dataLOD = getLastAcknowledgedSendRecord()->getLOD()); + } + } else if (userType == MetavoxelDeltaPendingMessage::Type) { + if (!_reliableDeltaChannel) { + _reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX); + _reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream()); + _reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD(); + } } else { Endpoint::handleMessage(message, in); } } PacketRecord* MetavoxelClient::maybeCreateSendRecord() const { - return new PacketRecord(_manager->getLOD()); + return new PacketRecord(_reliableDeltaChannel ? _reliableDeltaLOD : _manager->getLOD()); } PacketRecord* MetavoxelClient::maybeCreateReceiveRecord() const { - return new PacketRecord(getLastAcknowledgedSendRecord()->getLOD(), _data); + return new PacketRecord(_dataLOD, _data); } diff --git a/libraries/metavoxels/src/MetavoxelClientManager.h b/libraries/metavoxels/src/MetavoxelClientManager.h index dd11e871ec..9ab2e2c8b0 100644 --- a/libraries/metavoxels/src/MetavoxelClientManager.h +++ b/libraries/metavoxels/src/MetavoxelClientManager.h @@ -70,6 +70,10 @@ private: MetavoxelClientManager* _manager; MetavoxelData _data; + MetavoxelLOD _dataLOD; + + ReliableChannel* _reliableDeltaChannel; + MetavoxelLOD _reliableDeltaLOD; }; #endif // hifi_MetavoxelClientManager_h diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index cc2d34f180..48ce4716b7 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -642,8 +642,6 @@ TestReceiveRecord::TestReceiveRecord(const MetavoxelLOD& lod, _remoteState(remoteState) { } -const int RELIABLE_DELTA_CHANNEL_INDEX = 1; - TestEndpoint::TestEndpoint(Mode mode) : Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()), _mode(mode),