Reliable delta fix.

This commit is contained in:
Andrzej Kapolka 2014-10-21 15:48:53 -07:00
parent 02c6b5a8bf
commit bd4738d2c2
8 changed files with 120 additions and 35 deletions

View file

@ -237,8 +237,9 @@ void MetavoxelSession::update() {
// go back to the beginning with the current packet and note that there's a delta pending // go back to the beginning with the current packet and note that there's a delta pending
_sequencer.getOutputStream().getUnderlying().device()->seek(start); _sequencer.getOutputStream().getUnderlying().device()->seek(start);
MetavoxelDeltaPendingMessage msg = { ++_reliableDeltaID, _lod }; MetavoxelDeltaPendingMessage msg = { ++_reliableDeltaID, sendRecord->getPacketNumber(),
out << QVariant::fromValue(msg); _sequencer.getIncomingPacketNumber() };
out << (_reliableDeltaMessage = QVariant::fromValue(msg));
_sequencer.endPacket(); _sequencer.endPacket();
} else { } else {
@ -254,8 +255,9 @@ void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) {
} }
PacketRecord* MetavoxelSession::maybeCreateSendRecord() const { PacketRecord* MetavoxelSession::maybeCreateSendRecord() const {
return _reliableDeltaChannel ? new PacketRecord(_reliableDeltaLOD, _reliableDeltaData) : return _reliableDeltaChannel ? new PacketRecord(_sequencer.getOutgoingPacketNumber(),
new PacketRecord(_lod, _sender->getData()); _reliableDeltaLOD, _reliableDeltaData) : new PacketRecord(_sequencer.getOutgoingPacketNumber(),
_lod, _sender->getData());
} }
void MetavoxelSession::handleMessage(const QVariant& message) { void MetavoxelSession::handleMessage(const QVariant& message) {
@ -290,8 +292,7 @@ void MetavoxelSession::sendPacketGroup(int alreadySent) {
for (int i = 0; i < additionalPackets; i++) { for (int i = 0; i < additionalPackets; i++) {
Bitstream& out = _sequencer.startPacket(); Bitstream& out = _sequencer.startPacket();
if (_reliableDeltaChannel) { if (_reliableDeltaChannel) {
MetavoxelDeltaPendingMessage msg = { _reliableDeltaID, _reliableDeltaLOD }; out << _reliableDeltaMessage;
out << QVariant::fromValue(msg);
} else { } else {
out << QVariant(); out << QVariant();
} }

View file

@ -134,6 +134,7 @@ private:
MetavoxelLOD _reliableDeltaLOD; MetavoxelLOD _reliableDeltaLOD;
Bitstream::WriteMappings _reliableDeltaWriteMappings; Bitstream::WriteMappings _reliableDeltaWriteMappings;
int _reliableDeltaID; int _reliableDeltaID;
QVariant _reliableDeltaMessage;
}; };
/// Handles persistence in a separate thread. /// Handles persistence in a separate thread.

View file

@ -107,7 +107,8 @@ PacketRecord* Endpoint::maybeCreateReceiveRecord() const {
return NULL; return NULL;
} }
PacketRecord::PacketRecord(const MetavoxelLOD& lod, const MetavoxelData& data) : PacketRecord::PacketRecord(int packetNumber, const MetavoxelLOD& lod, const MetavoxelData& data) :
_packetNumber(packetNumber),
_lod(lod), _lod(lod),
_data(data) { _data(data) {
} }

View file

@ -45,10 +45,10 @@ protected slots:
virtual void handleMessage(const QVariant& message, Bitstream& in); virtual void handleMessage(const QVariant& message, Bitstream& in);
void recordSend(); void recordSend();
void recordReceive(); virtual void recordReceive();
void clearSendRecordsBefore(int index); virtual void clearSendRecordsBefore(int index);
void clearReceiveRecordsBefore(int index); virtual void clearReceiveRecordsBefore(int index);
protected: protected:
@ -71,14 +71,16 @@ protected:
class PacketRecord { class PacketRecord {
public: public:
PacketRecord(const MetavoxelLOD& lod = MetavoxelLOD(), const MetavoxelData& data = MetavoxelData()); PacketRecord(int packetNumber = 0, const MetavoxelLOD& lod = MetavoxelLOD(), const MetavoxelData& data = MetavoxelData());
virtual ~PacketRecord(); virtual ~PacketRecord();
int getPacketNumber() const { return _packetNumber; }
const MetavoxelLOD& getLOD() const { return _lod; } const MetavoxelLOD& getLOD() const { return _lod; }
const MetavoxelData& getData() const { return _data; } const MetavoxelData& getData() const { return _data; }
private: private:
int _packetNumber;
MetavoxelLOD _lod; MetavoxelLOD _lod;
MetavoxelData _data; MetavoxelData _data;
}; };

View file

@ -216,12 +216,71 @@ void MetavoxelClient::applyEdit(const MetavoxelEditMessage& edit, bool reliable)
} }
} }
PacketRecord* MetavoxelClient::getAcknowledgedSendRecord(int packetNumber) const {
PacketRecord* lastAcknowledged = getLastAcknowledgedSendRecord();
if (lastAcknowledged->getPacketNumber() == packetNumber) {
return lastAcknowledged;
}
foreach (PacketRecord* record, _clearedSendRecords) {
if (record->getPacketNumber() == packetNumber) {
return record;
}
}
return NULL;
}
PacketRecord* MetavoxelClient::getAcknowledgedReceiveRecord(int packetNumber) const {
PacketRecord* lastAcknowledged = getLastAcknowledgedReceiveRecord();
if (lastAcknowledged->getPacketNumber() == packetNumber) {
return lastAcknowledged;
}
foreach (PacketRecord* record, _clearedReceiveRecords) {
if (record->getPacketNumber() == packetNumber) {
return record;
}
}
return NULL;
}
void MetavoxelClient::dataChanged(const MetavoxelData& oldData) { void MetavoxelClient::dataChanged(const MetavoxelData& oldData) {
// make thread-safe copy // make thread-safe copy
QWriteLocker locker(&_dataCopyLock); QWriteLocker locker(&_dataCopyLock);
_dataCopy = _data; _dataCopy = _data;
} }
void MetavoxelClient::recordReceive() {
Endpoint::recordReceive();
// clear the cleared lists
foreach (PacketRecord* record, _clearedSendRecords) {
delete record;
}
_clearedSendRecords.clear();
foreach (PacketRecord* record, _clearedReceiveRecords) {
delete record;
}
_clearedReceiveRecords.clear();
}
void MetavoxelClient::clearSendRecordsBefore(int index) {
// move to cleared list
QList<PacketRecord*>::iterator end = _sendRecords.begin() + index + 1;
for (QList<PacketRecord*>::const_iterator it = _sendRecords.begin(); it != end; it++) {
_clearedSendRecords.append(*it);
}
_sendRecords.erase(_sendRecords.begin(), end);
}
void MetavoxelClient::clearReceiveRecordsBefore(int index) {
// move to cleared list
QList<PacketRecord*>::iterator end = _receiveRecords.begin() + index + 1;
for (QList<PacketRecord*>::const_iterator it = _receiveRecords.begin(); it != end; it++) {
_clearedReceiveRecords.append(*it);
}
_receiveRecords.erase(_receiveRecords.begin(), end);
}
void MetavoxelClient::writeUpdateMessage(Bitstream& out) { void MetavoxelClient::writeUpdateMessage(Bitstream& out) {
ClientStateMessage state = { _updater->getLOD() }; ClientStateMessage state = { _updater->getLOD() };
out << QVariant::fromValue(state); out << QVariant::fromValue(state);
@ -232,7 +291,9 @@ void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) {
if (userType == MetavoxelDeltaMessage::Type) { if (userType == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
if (_reliableDeltaChannel) { if (_reliableDeltaChannel) {
_remoteData.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _remoteDataLOD = _reliableDeltaLOD); MetavoxelData reference = _remoteData;
MetavoxelLOD referenceLOD = _remoteDataLOD;
_remoteData.readDelta(reference, referenceLOD, in, _remoteDataLOD = _reliableDeltaLOD);
_sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings()); _sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings());
in.clearPersistentMappings(); in.clearPersistentMappings();
_reliableDeltaChannel = NULL; _reliableDeltaChannel = NULL;
@ -260,8 +321,17 @@ void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) {
_reliableDeltaID = pending.id; _reliableDeltaID = pending.id;
_reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX); _reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream()); _reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream());
_reliableDeltaLOD = pending.lod; PacketRecord* sendRecord = getAcknowledgedSendRecord(pending.receivedPacketNumber);
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); if (!sendRecord) {
qWarning() << "Missing send record for delta" << pending.receivedPacketNumber;
return;
}
_reliableDeltaLOD = sendRecord->getLOD();
PacketRecord* receiveRecord = getAcknowledgedReceiveRecord(pending.sentPacketNumber);
if (!receiveRecord) {
qWarning() << "Missing receive record for delta" << pending.sentPacketNumber;
return;
}
_remoteDataLOD = receiveRecord->getLOD(); _remoteDataLOD = receiveRecord->getLOD();
_remoteData = receiveRecord->getData(); _remoteData = receiveRecord->getData();
} }
@ -271,10 +341,11 @@ void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) {
} }
PacketRecord* MetavoxelClient::maybeCreateSendRecord() const { PacketRecord* MetavoxelClient::maybeCreateSendRecord() const {
return new PacketRecord(_reliableDeltaChannel ? _reliableDeltaLOD : _updater->getLOD()); return new PacketRecord(_sequencer.getOutgoingPacketNumber(),
_reliableDeltaChannel ? _reliableDeltaLOD : _updater->getLOD());
} }
PacketRecord* MetavoxelClient::maybeCreateReceiveRecord() const { PacketRecord* MetavoxelClient::maybeCreateReceiveRecord() const {
return new PacketRecord(_remoteDataLOD, _remoteData); return new PacketRecord(_sequencer.getIncomingPacketNumber(), _remoteDataLOD, _remoteData);
} }

View file

@ -116,8 +116,16 @@ public:
protected: protected:
PacketRecord* getAcknowledgedSendRecord(int packetNumber) const;
PacketRecord* getAcknowledgedReceiveRecord(int packetNumber) const;
virtual void dataChanged(const MetavoxelData& oldData); virtual void dataChanged(const MetavoxelData& oldData);
virtual void recordReceive();
virtual void clearSendRecordsBefore(int index);
virtual void clearReceiveRecordsBefore(int index);
virtual void writeUpdateMessage(Bitstream& out); virtual void writeUpdateMessage(Bitstream& out);
virtual void handleMessage(const QVariant& message, Bitstream& in); virtual void handleMessage(const QVariant& message, Bitstream& in);
@ -132,9 +140,13 @@ protected:
ReliableChannel* _reliableDeltaChannel; ReliableChannel* _reliableDeltaChannel;
MetavoxelLOD _reliableDeltaLOD; MetavoxelLOD _reliableDeltaLOD;
int _reliableDeltaID; int _reliableDeltaID;
QVariant _reliableDeltaMessage;
MetavoxelData _dataCopy; MetavoxelData _dataCopy;
QReadWriteLock _dataCopyLock; QReadWriteLock _dataCopyLock;
QList<PacketRecord*> _clearedSendRecords;
QList<PacketRecord*> _clearedReceiveRecords;
}; };
#endif // hifi_MetavoxelClientManager_h #endif // hifi_MetavoxelClientManager_h

View file

@ -68,7 +68,8 @@ class MetavoxelDeltaPendingMessage {
public: public:
STREAM int id; STREAM int id;
STREAM MetavoxelLOD lod; STREAM int sentPacketNumber;
STREAM int receivedPacketNumber;
}; };
DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaPendingMessage) DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaPendingMessage)

View file

@ -603,31 +603,27 @@ int RandomVisitor::visit(MetavoxelInfo& info) {
class TestSendRecord : public PacketRecord { class TestSendRecord : public PacketRecord {
public: public:
TestSendRecord(const MetavoxelLOD& lod = MetavoxelLOD(), const MetavoxelData& data = MetavoxelData(), TestSendRecord(int packetNumber = 0, const MetavoxelLOD& lod = MetavoxelLOD(), const MetavoxelData& data = MetavoxelData(),
const SharedObjectPointer& localState = SharedObjectPointer(), int packetNumber = 0); const SharedObjectPointer& localState = SharedObjectPointer());
const SharedObjectPointer& getLocalState() const { return _localState; } const SharedObjectPointer& getLocalState() const { return _localState; }
int getPacketNumber() const { return _packetNumber; }
private: private:
SharedObjectPointer _localState; SharedObjectPointer _localState;
int _packetNumber;
}; };
TestSendRecord::TestSendRecord(const MetavoxelLOD& lod, const MetavoxelData& data, TestSendRecord::TestSendRecord(int packetNumber, const MetavoxelLOD& lod, const MetavoxelData& data,
const SharedObjectPointer& localState, int packetNumber) : const SharedObjectPointer& localState) :
PacketRecord(lod, data), PacketRecord(packetNumber, lod, data),
_localState(localState), _localState(localState) {
_packetNumber(packetNumber) {
} }
class TestReceiveRecord : public PacketRecord { class TestReceiveRecord : public PacketRecord {
public: public:
TestReceiveRecord(const MetavoxelLOD& lod = MetavoxelLOD(), const MetavoxelData& data = MetavoxelData(), TestReceiveRecord(int packetNumber = 0, const MetavoxelLOD& lod = MetavoxelLOD(),
const SharedObjectPointer& remoteState = SharedObjectPointer()); const MetavoxelData& data = MetavoxelData(), const SharedObjectPointer& remoteState = SharedObjectPointer());
const SharedObjectPointer& getRemoteState() const { return _remoteState; } const SharedObjectPointer& getRemoteState() const { return _remoteState; }
@ -636,9 +632,9 @@ private:
SharedObjectPointer _remoteState; SharedObjectPointer _remoteState;
}; };
TestReceiveRecord::TestReceiveRecord(const MetavoxelLOD& lod, TestReceiveRecord::TestReceiveRecord(int packetNumber, const MetavoxelLOD& lod,
const MetavoxelData& data, const SharedObjectPointer& remoteState) : const MetavoxelData& data, const SharedObjectPointer& remoteState) :
PacketRecord(lod, data), PacketRecord(packetNumber, lod, data),
_remoteState(remoteState) { _remoteState(remoteState) {
} }
@ -1110,14 +1106,14 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) {
PacketRecord* TestEndpoint::maybeCreateSendRecord() const { PacketRecord* TestEndpoint::maybeCreateSendRecord() const {
if (_reliableDeltaChannel) { if (_reliableDeltaChannel) {
return new TestSendRecord(_reliableDeltaLOD, _reliableDeltaData, _localState, _sequencer.getOutgoingPacketNumber()); return new TestSendRecord(_sequencer.getOutgoingPacketNumber(), _reliableDeltaLOD, _reliableDeltaData, _localState);
} }
return new TestSendRecord(_lod, (_mode == METAVOXEL_CLIENT_MODE) ? MetavoxelData() : _data, return new TestSendRecord(_sequencer.getOutgoingPacketNumber(), _lod,
_localState, _sequencer.getOutgoingPacketNumber()); (_mode == METAVOXEL_CLIENT_MODE) ? MetavoxelData() : _data, _localState);
} }
PacketRecord* TestEndpoint::maybeCreateReceiveRecord() const { PacketRecord* TestEndpoint::maybeCreateReceiveRecord() const {
return new TestReceiveRecord(_remoteDataLOD, _remoteData, _remoteState); return new TestReceiveRecord(_sequencer.getIncomingPacketNumber(), _remoteDataLOD, _remoteData, _remoteState);
} }
void TestEndpoint::handleHighPriorityMessage(const QVariant& message) { void TestEndpoint::handleHighPriorityMessage(const QVariant& message) {