From da32c6e89bf812b47cb3cf8b9ff7aa79311756f6 Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Thu, 3 Jul 2014 17:29:10 -0700 Subject: [PATCH] Push persistent mappings into reliable delta channel before sending and pull transient mappings out after sending. --- libraries/metavoxels/src/Bitstream.cpp | 20 ++++++ libraries/metavoxels/src/Bitstream.h | 32 +++++++++ .../metavoxels/src/DatagramSequencer.cpp | 18 ++++- libraries/metavoxels/src/DatagramSequencer.h | 11 ++++ tests/metavoxels/src/MetavoxelTests.cpp | 65 ++++++++++++------- tests/metavoxels/src/MetavoxelTests.h | 4 +- 6 files changed, 123 insertions(+), 27 deletions(-) diff --git a/libraries/metavoxels/src/Bitstream.cpp b/libraries/metavoxels/src/Bitstream.cpp index 4a86344c8c..459a12dc15 100644 --- a/libraries/metavoxels/src/Bitstream.cpp +++ b/libraries/metavoxels/src/Bitstream.cpp @@ -278,6 +278,26 @@ void Bitstream::persistAndResetReadMappings() { persistReadMappings(getAndResetReadMappings()); } +void Bitstream::copyPersistentMappings(const Bitstream& other) { + _objectStreamerStreamer.copyPersistentMappings(other._objectStreamerStreamer); + _typeStreamerStreamer.copyPersistentMappings(other._typeStreamerStreamer); + _attributeStreamer.copyPersistentMappings(other._attributeStreamer); + _scriptStringStreamer.copyPersistentMappings(other._scriptStringStreamer); + _sharedObjectStreamer.copyPersistentMappings(other._sharedObjectStreamer); + _sharedObjectReferences = other._sharedObjectReferences; + _weakSharedObjectHash = other._weakSharedObjectHash; +} + +void Bitstream::clearPersistentMappings() { + _objectStreamerStreamer.clearPersistentMappings(); + _typeStreamerStreamer.clearPersistentMappings(); + _attributeStreamer.clearPersistentMappings(); + _scriptStringStreamer.clearPersistentMappings(); + _sharedObjectStreamer.clearPersistentMappings(); + _sharedObjectReferences.clear(); + _weakSharedObjectHash.clear(); +} + void Bitstream::clearSharedObject(int id) { SharedObjectPointer object = _sharedObjectStreamer.takePersistentValue(id); if (object) { diff --git a/libraries/metavoxels/src/Bitstream.h b/libraries/metavoxels/src/Bitstream.h index d900b34847..97f1b70ff0 100644 --- a/libraries/metavoxels/src/Bitstream.h +++ b/libraries/metavoxels/src/Bitstream.h @@ -102,6 +102,9 @@ public: V takePersistentValue(int id) { V value = _persistentValues.take(id); _valueIDs.remove(value); return value; } + void copyPersistentMappings(const RepeatedValueStreamer& other); + void clearPersistentMappings(); + RepeatedValueStreamer& operator<<(K value); RepeatedValueStreamer& operator>>(V& value); @@ -199,6 +202,29 @@ template inline RepeatedValueStreamer& return *this; } +template inline void RepeatedValueStreamer::copyPersistentMappings( + const RepeatedValueStreamer& other) { + _lastPersistentID = other._lastPersistentID; + _idStreamer.setBitsFromValue(_lastPersistentID); + _persistentIDs = other._persistentIDs; + _transientOffsets.clear(); + _lastTransientOffset = 0; + _persistentValues = other._persistentValues; + _transientValues.clear(); + _valueIDs = other._valueIDs; +} + +template inline void RepeatedValueStreamer::clearPersistentMappings() { + _lastPersistentID = 0; + _idStreamer.setBitsFromValue(_lastPersistentID); + _persistentIDs.clear(); + _transientOffsets.clear(); + _lastTransientOffset = 0; + _persistentValues.clear(); + _transientValues.clear(); + _valueIDs.clear(); +} + /// A stream for bit-aligned data. Through a combination of code generation, reflection, macros, and templates, provides a /// serialization mechanism that may be used for both networking and persistent storage. For unreliable networking, the /// class provides a mapping system that resends mappings for ids until they are acknowledged (and thus persisted). For @@ -353,6 +379,12 @@ public: /// Immediately persists and resets the read mappings. void persistAndResetReadMappings(); + /// Copies the persistent mappings from the specified other stream. + void copyPersistentMappings(const Bitstream& other); + + /// Clears the persistent mappings for this stream. + void clearPersistentMappings(); + /// Returns a reference to the weak hash storing shared objects for this stream. const WeakSharedObjectHash& getWeakSharedObjectHash() const { return _weakSharedObjectHash; } diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index 3b16a829e6..2ef3d0213c 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -276,6 +276,11 @@ void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) { } } +void DatagramSequencer::clearReliableChannel(QObject* object) { + ReliableChannel* channel = static_cast(object); + (channel->isOutput() ? _reliableOutputChannels : _reliableInputChannels).remove(channel->getIndex()); +} + void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { // stop acknowledging the recorded packets while (!_receiveRecords.isEmpty() && _receiveRecords.first().packetNumber <= record.lastReceivedPacketNumber) { @@ -297,7 +302,10 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { // acknowledge the received spans foreach (const ChannelSpan& span, record.spans) { - getReliableOutputChannel(span.channel)->spanAcknowledged(span); + ReliableChannel* channel = _reliableOutputChannels.value(span.channel); + if (channel) { + channel->spanAcknowledged(span); + } } // increase the packet rate with every ack until we pass the slow start threshold; then, every round trip @@ -312,7 +320,10 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { void DatagramSequencer::sendRecordLost(const SendRecord& record) { // notify the channels of their lost spans foreach (const ChannelSpan& span, record.spans) { - getReliableOutputChannel(span.channel)->spanLost(record.packetNumber, _outgoingPacketNumber + 1); + ReliableChannel* channel = _reliableOutputChannels.value(span.channel); + if (channel) { + channel->spanLost(record.packetNumber, _outgoingPacketNumber + 1); + } } // halve the rate and remember as threshold @@ -700,6 +711,7 @@ void ReliableChannel::handleMessage(const QVariant& message, Bitstream& in) { ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool output) : QObject(sequencer), _index(index), + _output(output), _dataStream(&_buffer), _bitstream(_dataStream), _priority(1.0f), @@ -713,6 +725,8 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int))); connect(this, SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&))); + + sequencer->connect(this, SIGNAL(destroyed(QObject*)), SLOT(clearReliableChannel(QObject*))); } void ReliableChannel::writeData(QDataStream& out, int bytes, QVector& spans) { diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index 09d2f834ef..32f645b13c 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -78,6 +78,12 @@ public: /// Returns the packet number of the last packet received (or the packet currently being assembled). int getIncomingPacketNumber() const { return _incomingPacketNumber; } + /// Returns a reference to the stream used to read packets. + Bitstream& getInputStream() { return _inputStream; } + + /// Returns a reference to the stream used to write packets. + Bitstream& getOutputStream() { return _outputStream; } + /// Returns the packet number of the sent packet at the specified index. int getSentPacketNumber(int index) const { return _sendRecords.at(index).packetNumber; } @@ -147,6 +153,7 @@ private slots: void sendClearSharedObjectMessage(int id); void handleHighPriorityMessage(const QVariant& data); + void clearReliableChannel(QObject* object); private: @@ -325,6 +332,9 @@ public: /// Returns the channel's index in the sequencer's channel map. int getIndex() const { return _index; } + /// Checks whether this is an output channel. + bool isOutput() const { return _output; } + /// Returns a reference to the buffer used to write/read data to/from this channel. CircularBuffer& getBuffer() { return _buffer; } @@ -390,6 +400,7 @@ private: void readData(QDataStream& in); int _index; + bool _output; CircularBuffer _buffer; CircularBuffer _assemblyBuffer; QDataStream _dataStream; diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index 7c278f3c2f..cc2d34f180 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -642,13 +642,14 @@ TestReceiveRecord::TestReceiveRecord(const MetavoxelLOD& lod, _remoteState(remoteState) { } +const int RELIABLE_DELTA_CHANNEL_INDEX = 1; + TestEndpoint::TestEndpoint(Mode mode) : Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()), _mode(mode), _highPriorityMessagesToSend(0.0f), _reliableMessagesToSend(0.0f), - _reliableDeltaReceivedOffset(0), - _reliableDeltaPending(false) { + _reliableDeltaChannel(NULL) { connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleHighPriorityMessage(const QVariant&))); @@ -657,9 +658,13 @@ TestEndpoint::TestEndpoint(Mode mode) : if (mode == METAVOXEL_CLIENT_MODE) { _lod = MetavoxelLOD(glm::vec3(), 0.01f); + connect(_sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX), + SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleReliableMessage(const QVariant&, Bitstream&))); return; } if (mode == METAVOXEL_SERVER_MODE) { + connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(checkReliableDeltaReceived())); + _data.expand(); _data.expand(); @@ -667,11 +672,11 @@ TestEndpoint::TestEndpoint(Mode mode) : _data.guide(visitor); qDebug() << "Created" << visitor.leafCount << "base leaves"; - //_data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), new Sphere()); + _data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), new Sphere()); _sphere = new Sphere(); static_cast(_sphere.data())->setScale(0.01f); - //_data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), _sphere); + _data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), _sphere); return; } // create the object that represents out delta-encoded state @@ -894,7 +899,7 @@ bool TestEndpoint::simulate(int iterationNumber) { newSphere->setTranslation(newSphere->getTranslation() + glm::vec3(randFloatInRange(-0.01f, 0.01f), randFloatInRange(-0.01f, 0.01f), randFloatInRange(-0.01f, 0.01f))); } - //_data.replace(AttributeRegistry::getInstance()->getSpannersAttribute(), oldSphere, _sphere); + _data.replace(AttributeRegistry::getInstance()->getSpannersAttribute(), oldSphere, _sphere); spannerMutationsPerformed++; } @@ -903,15 +908,11 @@ bool TestEndpoint::simulate(int iterationNumber) { return false; } // if we're sending a reliable delta, wait until it's acknowledged - if (_reliableDeltaReceivedOffset > 0) { - if (_sequencer.getReliableOutputChannel()->getOffset() < _reliableDeltaReceivedOffset) { - Bitstream& out = _sequencer.startPacket(); - out << QVariant::fromValue(MetavoxelDeltaPendingMessage()); - _sequencer.endPacket(); - return false; - } - _reliableDeltaReceivedOffset = 0; - _reliableDeltaData = MetavoxelData(); + if (_reliableDeltaChannel) { + Bitstream& out = _sequencer.startPacket(); + out << QVariant::fromValue(MetavoxelDeltaPendingMessage()); + _sequencer.endPacket(); + return false; } Bitstream& out = _sequencer.startPacket(); out << QVariant::fromValue(MetavoxelDeltaMessage()); @@ -925,13 +926,16 @@ bool TestEndpoint::simulate(int iterationNumber) { _sequencer.cancelPacket(); // we need to send the delta on the reliable channel - ReliableChannel* channel = _sequencer.getReliableOutputChannel(); - channel->startMessage(); - channel->getBitstream() << QVariant::fromValue(MetavoxelDeltaMessage()); - _data.writeDelta(sendRecord->getData(), sendRecord->getLOD(), channel->getBitstream(), _lod); - channel->endMessage(); + _reliableDeltaChannel = _sequencer.getReliableOutputChannel(RELIABLE_DELTA_CHANNEL_INDEX); + _reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getOutputStream()); + _reliableDeltaChannel->startMessage(); + _reliableDeltaChannel->getBitstream() << QVariant::fromValue(MetavoxelDeltaMessage()); + _data.writeDelta(sendRecord->getData(), sendRecord->getLOD(), _reliableDeltaChannel->getBitstream(), _lod); + _reliableDeltaWriteMappings = _reliableDeltaChannel->getBitstream().getAndResetWriteMappings(); + _reliableDeltaChannel->getBitstream().clearPersistentMappings(); + _reliableDeltaChannel->endMessage(); - _reliableDeltaReceivedOffset = channel->getBytesWritten(); + _reliableDeltaReceivedOffset = _reliableDeltaChannel->getBytesWritten(); _reliableDeltaData = _data; _reliableDeltaLOD = _lod; @@ -1087,9 +1091,10 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) { compareMetavoxelData(); } else if (userType == MetavoxelDeltaPendingMessage::Type) { - if (!_reliableDeltaPending) { + if (!_reliableDeltaChannel) { + _reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX); + _reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream()); _reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD(); - _reliableDeltaPending = true; } } else if (userType == QMetaType::QVariantList) { foreach (const QVariant& element, message.toList()) { @@ -1099,7 +1104,7 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) { } PacketRecord* TestEndpoint::maybeCreateSendRecord() const { - if (_reliableDeltaReceivedOffset > 0) { + if (_reliableDeltaChannel) { return new TestSendRecord(_reliableDeltaLOD, _reliableDeltaData, _localState, _sequencer.getOutgoingPacketNumber()); } return new TestSendRecord(_lod, (_mode == METAVOXEL_CLIENT_MODE) ? MetavoxelData() : _data, @@ -1128,8 +1133,10 @@ void TestEndpoint::handleReliableMessage(const QVariant& message, Bitstream& in) if (message.userType() == MetavoxelDeltaMessage::Type) { PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); _data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _dataLOD = _reliableDeltaLOD); + _sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings()); + in.clearPersistentMappings(); compareMetavoxelData(); - _reliableDeltaPending = false; + _reliableDeltaChannel = NULL; return; } if (message.userType() == ClearSharedObjectMessage::Type || @@ -1160,6 +1167,16 @@ void TestEndpoint::readReliableChannel() { streamedBytesReceived += bytes.size(); } +void TestEndpoint::checkReliableDeltaReceived() { + if (!_reliableDeltaChannel || _reliableDeltaChannel->getOffset() < _reliableDeltaReceivedOffset) { + return; + } + _sequencer.getOutputStream().persistWriteMappings(_reliableDeltaWriteMappings); + _reliableDeltaWriteMappings = Bitstream::WriteMappings(); + _reliableDeltaData = MetavoxelData(); + _reliableDeltaChannel = NULL; +} + void TestEndpoint::compareMetavoxelData() { // deep-compare data to sent version int packetNumber = _sequencer.getIncomingPacketNumber(); diff --git a/tests/metavoxels/src/MetavoxelTests.h b/tests/metavoxels/src/MetavoxelTests.h index f451b5e8b6..5d719ccfdf 100644 --- a/tests/metavoxels/src/MetavoxelTests.h +++ b/tests/metavoxels/src/MetavoxelTests.h @@ -66,6 +66,7 @@ private slots: void handleHighPriorityMessage(const QVariant& message); void handleReliableMessage(const QVariant& message, Bitstream& in); void readReliableChannel(); + void checkReliableDeltaReceived(); private: @@ -98,10 +99,11 @@ private: QVariantList _reliableMessagesSent; CircularBuffer _dataStreamed; + ReliableChannel* _reliableDeltaChannel; int _reliableDeltaReceivedOffset; MetavoxelData _reliableDeltaData; MetavoxelLOD _reliableDeltaLOD; - bool _reliableDeltaPending; + Bitstream::WriteMappings _reliableDeltaWriteMappings; }; /// A simple shared object.