Merge pull request #3131 from ey6es/metavoxels

Basic incremental streaming of metavoxel data through a reliable channel (first attempts unreliable channel, then moves it over if too big).
This commit is contained in:
Clément Brisset 2014-07-08 10:00:57 -07:00
commit 0a30251411
13 changed files with 397 additions and 130 deletions

View file

@ -21,7 +21,8 @@
const int SEND_INTERVAL = 50; const int SEND_INTERVAL = 50;
MetavoxelServer::MetavoxelServer(const QByteArray& packet) : MetavoxelServer::MetavoxelServer(const QByteArray& packet) :
ThreadedAssignment(packet) { ThreadedAssignment(packet),
_sendTimer(this) {
_sendTimer.setSingleShot(true); _sendTimer.setSingleShot(true);
connect(&_sendTimer, SIGNAL(timeout()), SLOT(sendDeltas())); connect(&_sendTimer, SIGNAL(timeout()), SLOT(sendDeltas()));
@ -91,24 +92,54 @@ void MetavoxelServer::sendDeltas() {
MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) : MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServer* server) :
Endpoint(node, new PacketRecord(), NULL), Endpoint(node, new PacketRecord(), NULL),
_server(server) { _server(server),
_reliableDeltaChannel(NULL) {
connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&)));
connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&)), connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(checkReliableDeltaReceived()));
SLOT(handleMessage(const QVariant&))); connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&, Bitstream&)),
SLOT(handleMessage(const QVariant&, Bitstream&)));
} }
void MetavoxelSession::update() { void MetavoxelSession::update() {
// wait until we have a valid lod // wait until we have a valid lod before sending
if (_lod.isValid()) { if (!_lod.isValid()) {
Endpoint::update(); return;
} }
} // if we're sending a reliable delta, wait until it's acknowledged
if (_reliableDeltaChannel) {
void MetavoxelSession::writeUpdateMessage(Bitstream& out) { Bitstream& out = _sequencer.startPacket();
out << QVariant::fromValue(MetavoxelDeltaPendingMessage());
_sequencer.endPacket();
return;
}
Bitstream& out = _sequencer.startPacket();
int start = _sequencer.getOutputStream().getUnderlying().device()->pos();
out << QVariant::fromValue(MetavoxelDeltaMessage()); out << QVariant::fromValue(MetavoxelDeltaMessage());
PacketRecord* sendRecord = getLastAcknowledgedSendRecord(); PacketRecord* sendRecord = getLastAcknowledgedSendRecord();
_server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); _server->getData().writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod);
out.flush();
int end = _sequencer.getOutputStream().getUnderlying().device()->pos();
if (end > _sequencer.getMaxPacketSize()) {
// we need to send the delta on the reliable channel
_reliableDeltaChannel = _sequencer.getReliableOutputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->startMessage();
_reliableDeltaChannel->getBuffer().write(_sequencer.getOutgoingPacketData().constData() + start, end - start);
_reliableDeltaChannel->endMessage();
_reliableDeltaWriteMappings = out.getAndResetWriteMappings();
_reliableDeltaReceivedOffset = _reliableDeltaChannel->getBytesWritten();
_reliableDeltaData = _server->getData();
_reliableDeltaLOD = _lod;
// go back to the beginning with the current packet and note that there's a delta pending
_sequencer.getOutputStream().getUnderlying().device()->seek(start);
out << QVariant::fromValue(MetavoxelDeltaPendingMessage());
_sequencer.endPacket();
} else {
_sequencer.endPacket();
}
} }
void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) { void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) {
@ -116,7 +147,8 @@ void MetavoxelSession::handleMessage(const QVariant& message, Bitstream& in) {
} }
PacketRecord* MetavoxelSession::maybeCreateSendRecord() const { PacketRecord* MetavoxelSession::maybeCreateSendRecord() const {
return new PacketRecord(_lod, _server->getData()); return _reliableDeltaChannel ? new PacketRecord(_reliableDeltaLOD, _reliableDeltaData) :
new PacketRecord(_lod, _server->getData());
} }
void MetavoxelSession::handleMessage(const QVariant& message) { void MetavoxelSession::handleMessage(const QVariant& message) {
@ -134,3 +166,13 @@ void MetavoxelSession::handleMessage(const QVariant& message) {
} }
} }
} }
void MetavoxelSession::checkReliableDeltaReceived() {
if (!_reliableDeltaChannel || _reliableDeltaChannel->getOffset() < _reliableDeltaReceivedOffset) {
return;
}
_sequencer.getOutputStream().persistWriteMappings(_reliableDeltaWriteMappings);
_reliableDeltaWriteMappings = Bitstream::WriteMappings();
_reliableDeltaData = MetavoxelData();
_reliableDeltaChannel = NULL;
}

View file

@ -63,7 +63,6 @@ public:
protected: protected:
virtual void writeUpdateMessage(Bitstream& out);
virtual void handleMessage(const QVariant& message, Bitstream& in); virtual void handleMessage(const QVariant& message, Bitstream& in);
virtual PacketRecord* maybeCreateSendRecord() const; virtual PacketRecord* maybeCreateSendRecord() const;
@ -71,12 +70,19 @@ protected:
private slots: private slots:
void handleMessage(const QVariant& message); void handleMessage(const QVariant& message);
void checkReliableDeltaReceived();
private: private:
MetavoxelServer* _server; MetavoxelServer* _server;
MetavoxelLOD _lod; MetavoxelLOD _lod;
ReliableChannel* _reliableDeltaChannel;
int _reliableDeltaReceivedOffset;
MetavoxelData _reliableDeltaData;
MetavoxelLOD _reliableDeltaLOD;
Bitstream::WriteMappings _reliableDeltaWriteMappings;
}; };
#endif // hifi_MetavoxelServer_h #endif // hifi_MetavoxelServer_h

View file

@ -274,6 +274,26 @@ void Bitstream::persistAndResetReadMappings() {
persistReadMappings(getAndResetReadMappings()); persistReadMappings(getAndResetReadMappings());
} }
void Bitstream::copyPersistentMappings(const Bitstream& other) {
_objectStreamerStreamer.copyPersistentMappings(other._objectStreamerStreamer);
_typeStreamerStreamer.copyPersistentMappings(other._typeStreamerStreamer);
_attributeStreamer.copyPersistentMappings(other._attributeStreamer);
_scriptStringStreamer.copyPersistentMappings(other._scriptStringStreamer);
_sharedObjectStreamer.copyPersistentMappings(other._sharedObjectStreamer);
_sharedObjectReferences = other._sharedObjectReferences;
_weakSharedObjectHash = other._weakSharedObjectHash;
}
void Bitstream::clearPersistentMappings() {
_objectStreamerStreamer.clearPersistentMappings();
_typeStreamerStreamer.clearPersistentMappings();
_attributeStreamer.clearPersistentMappings();
_scriptStringStreamer.clearPersistentMappings();
_sharedObjectStreamer.clearPersistentMappings();
_sharedObjectReferences.clear();
_weakSharedObjectHash.clear();
}
void Bitstream::clearSharedObject(int id) { void Bitstream::clearSharedObject(int id) {
SharedObjectPointer object = _sharedObjectStreamer.takePersistentValue(id); SharedObjectPointer object = _sharedObjectStreamer.takePersistentValue(id);
if (object) { if (object) {
@ -1122,7 +1142,7 @@ Bitstream& Bitstream::operator>(ObjectStreamerPointer& streamer) {
} }
if (_metadataType == NO_METADATA) { if (_metadataType == NO_METADATA) {
if (!metaObject) { if (!metaObject) {
qWarning() << "Unknown class name:" << className; throw BitstreamException(QString("Unknown class name: ") + className);
} }
return *this; return *this;
} }
@ -1232,7 +1252,7 @@ Bitstream& Bitstream::operator>(TypeStreamerPointer& streamer) {
} }
if (_metadataType == NO_METADATA) { if (_metadataType == NO_METADATA) {
if (!baseStreamer) { if (!baseStreamer) {
qWarning() << "Unknown type name:" << typeName; throw BitstreamException(QString("Unknown type name: ") + typeName);
} }
return *this; return *this;
} }
@ -1240,7 +1260,7 @@ Bitstream& Bitstream::operator>(TypeStreamerPointer& streamer) {
*this >> category; *this >> category;
if (category == TypeStreamer::SIMPLE_CATEGORY) { if (category == TypeStreamer::SIMPLE_CATEGORY) {
if (!streamer) { if (!streamer) {
qWarning() << "Unknown type name:" << typeName; throw BitstreamException(QString("Unknown type name: ") + typeName);
} }
return *this; return *this;
} }
@ -1441,7 +1461,7 @@ Bitstream& Bitstream::operator>(SharedObjectPointer& object) {
_objectStreamerStreamer >> objectStreamer; _objectStreamerStreamer >> objectStreamer;
if (delta) { if (delta) {
if (!reference) { if (!reference) {
qWarning() << "Delta without reference" << id << originID; throw BitstreamException(QString("Delta without reference [id=%1, originID=%2]").arg(id).arg(originID));
} }
objectStreamer->readRawDelta(*this, reference.data(), pointer.data()); objectStreamer->readRawDelta(*this, reference.data(), pointer.data());
} else { } else {
@ -1451,7 +1471,7 @@ Bitstream& Bitstream::operator>(SharedObjectPointer& object) {
QObject* rawObject; QObject* rawObject;
if (delta) { if (delta) {
if (!reference) { if (!reference) {
qWarning() << "Delta without reference" << id << originID; throw BitstreamException(QString("Delta without reference [id=%1, originID=%2]").arg(id).arg(originID));
} }
readRawDelta(rawObject, (const QObject*)reference.data()); readRawDelta(rawObject, (const QObject*)reference.data());
} else { } else {
@ -1682,6 +1702,10 @@ const TypeStreamer* Bitstream::createInvalidTypeStreamer() {
return streamer; return streamer;
} }
BitstreamException::BitstreamException(const QString& description) :
_description(description) {
}
QJsonValue JSONWriter::getData(bool value) { QJsonValue JSONWriter::getData(bool value) {
return value; return value;
} }

View file

@ -102,6 +102,9 @@ public:
V takePersistentValue(int id) { V value = _persistentValues.take(id); _valueIDs.remove(value); return value; } V takePersistentValue(int id) { V value = _persistentValues.take(id); _valueIDs.remove(value); return value; }
void copyPersistentMappings(const RepeatedValueStreamer& other);
void clearPersistentMappings();
RepeatedValueStreamer& operator<<(K value); RepeatedValueStreamer& operator<<(K value);
RepeatedValueStreamer& operator>>(V& value); RepeatedValueStreamer& operator>>(V& value);
@ -199,6 +202,29 @@ template<class K, class P, class V> inline RepeatedValueStreamer<K, P, V>&
return *this; return *this;
} }
template<class K, class P, class V> inline void RepeatedValueStreamer<K, P, V>::copyPersistentMappings(
const RepeatedValueStreamer<K, P, V>& other) {
_lastPersistentID = other._lastPersistentID;
_idStreamer.setBitsFromValue(_lastPersistentID);
_persistentIDs = other._persistentIDs;
_transientOffsets.clear();
_lastTransientOffset = 0;
_persistentValues = other._persistentValues;
_transientValues.clear();
_valueIDs = other._valueIDs;
}
template<class K, class P, class V> inline void RepeatedValueStreamer<K, P, V>::clearPersistentMappings() {
_lastPersistentID = 0;
_idStreamer.setBitsFromValue(_lastPersistentID);
_persistentIDs.clear();
_transientOffsets.clear();
_lastTransientOffset = 0;
_persistentValues.clear();
_transientValues.clear();
_valueIDs.clear();
}
/// A stream for bit-aligned data. Through a combination of code generation, reflection, macros, and templates, provides a /// A stream for bit-aligned data. Through a combination of code generation, reflection, macros, and templates, provides a
/// serialization mechanism that may be used for both networking and persistent storage. For unreliable networking, the /// serialization mechanism that may be used for both networking and persistent storage. For unreliable networking, the
/// class provides a mapping system that resends mappings for ids until they are acknowledged (and thus persisted). For /// class provides a mapping system that resends mappings for ids until they are acknowledged (and thus persisted). For
@ -303,6 +329,9 @@ public:
Bitstream(QDataStream& underlying, MetadataType metadataType = NO_METADATA, Bitstream(QDataStream& underlying, MetadataType metadataType = NO_METADATA,
GenericsMode = NO_GENERICS, QObject* parent = NULL); GenericsMode = NO_GENERICS, QObject* parent = NULL);
/// Returns a reference to the underlying data stream.
QDataStream& getUnderlying() { return _underlying; }
/// Substitutes the supplied metaobject for the given class name's default mapping. This is mostly useful for testing the /// Substitutes the supplied metaobject for the given class name's default mapping. This is mostly useful for testing the
/// process of mapping between different types, but may in the future be used for permanently renaming classes. /// process of mapping between different types, but may in the future be used for permanently renaming classes.
void addMetaObjectSubstitution(const QByteArray& className, const QMetaObject* metaObject); void addMetaObjectSubstitution(const QByteArray& className, const QMetaObject* metaObject);
@ -347,6 +376,12 @@ public:
/// Immediately persists and resets the read mappings. /// Immediately persists and resets the read mappings.
void persistAndResetReadMappings(); void persistAndResetReadMappings();
/// Copies the persistent mappings from the specified other stream.
void copyPersistentMappings(const Bitstream& other);
/// Clears the persistent mappings for this stream.
void clearPersistentMappings();
/// Returns a reference to the weak hash storing shared objects for this stream. /// Returns a reference to the weak hash storing shared objects for this stream.
const WeakSharedObjectHash& getWeakSharedObjectHash() const { return _weakSharedObjectHash; } const WeakSharedObjectHash& getWeakSharedObjectHash() const { return _weakSharedObjectHash; }
@ -823,6 +858,19 @@ template<class K, class V> inline Bitstream& Bitstream::operator>>(QHash<K, V>&
return *this; return *this;
} }
/// Thrown for unrecoverable errors.
class BitstreamException {
public:
BitstreamException(const QString& description);
const QString& getDescription() const { return _description; }
private:
QString _description;
};
/// Provides a means of writing Bitstream-able data to JSON rather than the usual binary format in a manner that allows it to /// Provides a means of writing Bitstream-able data to JSON rather than the usual binary format in a manner that allows it to
/// be manipulated and re-read, converted to binary, etc. To use, create a JSONWriter, stream values in using the << operator, /// be manipulated and re-read, converted to binary, etc. To use, create a JSONWriter, stream values in using the << operator,
/// and call getDocument to obtain the JSON data. /// and call getDocument to obtain the JSON data.

View file

@ -113,17 +113,16 @@ Bitstream& DatagramSequencer::startPacket() {
_outgoingPacketStream << (quint32)record.packetNumber; _outgoingPacketStream << (quint32)record.packetNumber;
} }
// write the high-priority messages
_outgoingPacketStream << (quint32)_highPriorityMessages.size();
foreach (const HighPriorityMessage& message, _highPriorityMessages) {
_outputStream << message.data;
}
// return the stream, allowing the caller to write the rest // return the stream, allowing the caller to write the rest
return _outputStream; return _outputStream;
} }
void DatagramSequencer::endPacket() { void DatagramSequencer::endPacket() {
// write the high-priority messages
_outputStream << _highPriorityMessages.size();
foreach (const HighPriorityMessage& message, _highPriorityMessages) {
_outputStream << message.data;
}
_outputStream.flush(); _outputStream.flush();
// if we have space remaining, send some data from our reliable channels // if we have space remaining, send some data from our reliable channels
@ -222,22 +221,22 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
_sendRecords.erase(_sendRecords.begin(), it + 1); _sendRecords.erase(_sendRecords.begin(), it + 1);
} }
// alert external parties so that they can read the middle
emit readyToRead(_inputStream);
// read and dispatch the high-priority messages // read and dispatch the high-priority messages
quint32 highPriorityMessageCount; int highPriorityMessageCount;
_incomingPacketStream >> highPriorityMessageCount; _inputStream >> highPriorityMessageCount;
int newHighPriorityMessages = highPriorityMessageCount - _receivedHighPriorityMessages; int newHighPriorityMessages = highPriorityMessageCount - _receivedHighPriorityMessages;
for (quint32 i = 0; i < highPriorityMessageCount; i++) { for (int i = 0; i < highPriorityMessageCount; i++) {
QVariant data; QVariant data;
_inputStream >> data; _inputStream >> data;
if ((int)i >= _receivedHighPriorityMessages) { if (i >= _receivedHighPriorityMessages) {
emit receivedHighPriorityMessage(data); emit receivedHighPriorityMessage(data);
} }
} }
_receivedHighPriorityMessages = highPriorityMessageCount; _receivedHighPriorityMessages = highPriorityMessageCount;
// alert external parties so that they can read the middle
emit readyToRead(_inputStream);
// read the reliable data, if any // read the reliable data, if any
quint32 reliableChannels; quint32 reliableChannels;
_incomingPacketStream >> reliableChannels; _incomingPacketStream >> reliableChannels;
@ -253,6 +252,8 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
// record the receipt // record the receipt
ReceiveRecord record = { _incomingPacketNumber, _inputStream.getAndResetReadMappings(), newHighPriorityMessages }; ReceiveRecord record = { _incomingPacketNumber, _inputStream.getAndResetReadMappings(), newHighPriorityMessages };
_receiveRecords.append(record); _receiveRecords.append(record);
emit receiveRecorded();
} }
void DatagramSequencer::sendClearSharedObjectMessage(int id) { void DatagramSequencer::sendClearSharedObjectMessage(int id) {
@ -274,6 +275,11 @@ void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
} }
} }
void DatagramSequencer::clearReliableChannel(QObject* object) {
ReliableChannel* channel = static_cast<ReliableChannel*>(object);
(channel->isOutput() ? _reliableOutputChannels : _reliableInputChannels).remove(channel->getIndex());
}
void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
// stop acknowledging the recorded packets // stop acknowledging the recorded packets
while (!_receiveRecords.isEmpty() && _receiveRecords.first().packetNumber <= record.lastReceivedPacketNumber) { while (!_receiveRecords.isEmpty() && _receiveRecords.first().packetNumber <= record.lastReceivedPacketNumber) {
@ -295,7 +301,10 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
// acknowledge the received spans // acknowledge the received spans
foreach (const ChannelSpan& span, record.spans) { foreach (const ChannelSpan& span, record.spans) {
getReliableOutputChannel(span.channel)->spanAcknowledged(span); ReliableChannel* channel = _reliableOutputChannels.value(span.channel);
if (channel) {
channel->spanAcknowledged(span);
}
} }
// increase the packet rate with every ack until we pass the slow start threshold; then, every round trip // increase the packet rate with every ack until we pass the slow start threshold; then, every round trip
@ -310,7 +319,10 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
void DatagramSequencer::sendRecordLost(const SendRecord& record) { void DatagramSequencer::sendRecordLost(const SendRecord& record) {
// notify the channels of their lost spans // notify the channels of their lost spans
foreach (const ChannelSpan& span, record.spans) { foreach (const ChannelSpan& span, record.spans) {
getReliableOutputChannel(span.channel)->spanLost(record.packetNumber, _outgoingPacketNumber + 1); ReliableChannel* channel = _reliableOutputChannels.value(span.channel);
if (channel) {
channel->spanLost(record.packetNumber, _outgoingPacketNumber + 1);
}
} }
// halve the rate and remember as threshold // halve the rate and remember as threshold
@ -364,6 +376,8 @@ void DatagramSequencer::sendPacket(const QByteArray& packet, const QVector<Chann
_outputStream.getAndResetWriteMappings(), spans }; _outputStream.getAndResetWriteMappings(), spans };
_sendRecords.append(record); _sendRecords.append(record);
emit sendRecorded();
// write the sequence number and size, which are the same between all fragments // write the sequence number and size, which are the same between all fragments
_outgoingDatagramBuffer.seek(_datagramHeaderSize); _outgoingDatagramBuffer.seek(_datagramHeaderSize);
_outgoingDatagramStream << (quint32)_outgoingPacketNumber; _outgoingDatagramStream << (quint32)_outgoingPacketNumber;
@ -658,16 +672,24 @@ int ReliableChannel::getBytesAvailable() const {
return _buffer.size() - _acknowledged.getTotalSet(); return _buffer.size() - _acknowledged.getTotalSet();
} }
void ReliableChannel::sendMessage(const QVariant& message) { void ReliableChannel::startMessage() {
// write a placeholder for the length, then fill it in when we know what it is // write a placeholder for the length; we'll fill it in when we know what it is
int placeholder = _buffer.pos(); _messageLengthPlaceholder = _buffer.pos();
_dataStream << (quint32)0; _dataStream << (quint32)0;
_bitstream << message; }
void ReliableChannel::endMessage() {
_bitstream.flush(); _bitstream.flush();
_bitstream.persistAndResetWriteMappings(); _bitstream.persistAndResetWriteMappings();
quint32 length = _buffer.pos() - placeholder; quint32 length = _buffer.pos() - _messageLengthPlaceholder;
_buffer.writeBytes(placeholder, sizeof(quint32), (const char*)&length); _buffer.writeBytes(_messageLengthPlaceholder, sizeof(quint32), (const char*)&length);
}
void ReliableChannel::sendMessage(const QVariant& message) {
startMessage();
_bitstream << message;
endMessage();
} }
void ReliableChannel::sendClearSharedObjectMessage(int id) { void ReliableChannel::sendClearSharedObjectMessage(int id) {
@ -675,7 +697,7 @@ void ReliableChannel::sendClearSharedObjectMessage(int id) {
sendMessage(QVariant::fromValue(message)); sendMessage(QVariant::fromValue(message));
} }
void ReliableChannel::handleMessage(const QVariant& message) { void ReliableChannel::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == ClearSharedObjectMessage::Type) { if (message.userType() == ClearSharedObjectMessage::Type) {
_bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id); _bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id);
@ -688,6 +710,7 @@ void ReliableChannel::handleMessage(const QVariant& message) {
ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool output) : ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool output) :
QObject(sequencer), QObject(sequencer),
_index(index), _index(index),
_output(output),
_dataStream(&_buffer), _dataStream(&_buffer),
_bitstream(_dataStream), _bitstream(_dataStream),
_priority(1.0f), _priority(1.0f),
@ -700,7 +723,9 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
_dataStream.setByteOrder(QDataStream::LittleEndian); _dataStream.setByteOrder(QDataStream::LittleEndian);
connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int))); connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
connect(this, SIGNAL(receivedMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); connect(this, SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&)));
sequencer->connect(this, SIGNAL(destroyed(QObject*)), SLOT(clearReliableChannel(QObject*)));
} }
void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) { void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) {
@ -843,9 +868,9 @@ void ReliableChannel::readData(QDataStream& in) {
_dataStream.skipRawData(sizeof(quint32)); _dataStream.skipRawData(sizeof(quint32));
QVariant message; QVariant message;
_bitstream >> message; _bitstream >> message;
emit receivedMessage(message, _bitstream);
_bitstream.reset(); _bitstream.reset();
_bitstream.persistAndResetReadMappings(); _bitstream.persistAndResetReadMappings();
emit receivedMessage(message);
continue; continue;
} }
} }

View file

@ -78,6 +78,15 @@ public:
/// Returns the packet number of the last packet received (or the packet currently being assembled). /// Returns the packet number of the last packet received (or the packet currently being assembled).
int getIncomingPacketNumber() const { return _incomingPacketNumber; } int getIncomingPacketNumber() const { return _incomingPacketNumber; }
/// Returns a reference to the stream used to read packets.
Bitstream& getInputStream() { return _inputStream; }
/// Returns a reference to the stream used to write packets.
Bitstream& getOutputStream() { return _outputStream; }
/// Returns a reference to the outgoing packet data.
const QByteArray& getOutgoingPacketData() const { return _outgoingPacketData; }
/// Returns the packet number of the sent packet at the specified index. /// Returns the packet number of the sent packet at the specified index.
int getSentPacketNumber(int index) const { return _sendRecords.at(index).packetNumber; } int getSentPacketNumber(int index) const { return _sendRecords.at(index).packetNumber; }
@ -126,9 +135,15 @@ signals:
/// Emitted when a packet is available to read. /// Emitted when a packet is available to read.
void readyToRead(Bitstream& input); void readyToRead(Bitstream& input);
/// Emitted when we've received a high-priority message /// Emitted when we've received a high-priority message.
void receivedHighPriorityMessage(const QVariant& data); void receivedHighPriorityMessage(const QVariant& data);
/// Emitted when we've recorded the transmission of a packet.
void sendRecorded();
/// Emitted when we've recorded the receipt of a packet (that is, at the end of packet processing).
void receiveRecorded();
/// Emitted when a sent packet has been acknowledged by the remote side. /// Emitted when a sent packet has been acknowledged by the remote side.
/// \param index the index of the packet in our list of send records /// \param index the index of the packet in our list of send records
void sendAcknowledged(int index); void sendAcknowledged(int index);
@ -141,6 +156,7 @@ private slots:
void sendClearSharedObjectMessage(int id); void sendClearSharedObjectMessage(int id);
void handleHighPriorityMessage(const QVariant& data); void handleHighPriorityMessage(const QVariant& data);
void clearReliableChannel(QObject* object);
private: private:
@ -319,6 +335,9 @@ public:
/// Returns the channel's index in the sequencer's channel map. /// Returns the channel's index in the sequencer's channel map.
int getIndex() const { return _index; } int getIndex() const { return _index; }
/// Checks whether this is an output channel.
bool isOutput() const { return _output; }
/// Returns a reference to the buffer used to write/read data to/from this channel. /// Returns a reference to the buffer used to write/read data to/from this channel.
CircularBuffer& getBuffer() { return _buffer; } CircularBuffer& getBuffer() { return _buffer; }
@ -336,22 +355,36 @@ public:
/// Returns the number of bytes available to read from this channel. /// Returns the number of bytes available to read from this channel.
int getBytesAvailable() const; int getBytesAvailable() const;
/// Returns the offset, which represents the total number of bytes acknowledged
/// (on the write end) or received completely (on the read end).
int getOffset() const { return _offset; }
/// Returns the total number of bytes written to this channel.
int getBytesWritten() const { return _offset + _buffer.pos(); }
/// Sets whether we expect to write/read framed messages. /// Sets whether we expect to write/read framed messages.
void setMessagesEnabled(bool enabled) { _messagesEnabled = enabled; } void setMessagesEnabled(bool enabled) { _messagesEnabled = enabled; }
bool getMessagesEnabled() const { return _messagesEnabled; } bool getMessagesEnabled() const { return _messagesEnabled; }
/// Sends a framed message on this channel. /// Starts a framed message on this channel.
void startMessage();
/// Ends a framed message on this channel.
void endMessage();
/// Sends a framed message on this channel (convenience function that calls startMessage,
/// writes the message to the bitstream, then calls endMessage).
void sendMessage(const QVariant& message); void sendMessage(const QVariant& message);
signals: signals:
/// Fired when a framed message has been received on this channel. /// Fired when a framed message has been received on this channel.
void receivedMessage(const QVariant& message); void receivedMessage(const QVariant& message, Bitstream& in);
private slots: private slots:
void sendClearSharedObjectMessage(int id); void sendClearSharedObjectMessage(int id);
void handleMessage(const QVariant& message); void handleMessage(const QVariant& message, Bitstream& in);
private: private:
@ -370,6 +403,7 @@ private:
void readData(QDataStream& in); void readData(QDataStream& in);
int _index; int _index;
bool _output;
CircularBuffer _buffer; CircularBuffer _buffer;
CircularBuffer _assemblyBuffer; CircularBuffer _assemblyBuffer;
QDataStream _dataStream; QDataStream _dataStream;
@ -381,6 +415,7 @@ private:
int _writePositionResetPacketNumber; int _writePositionResetPacketNumber;
SpanList _acknowledged; SpanList _acknowledged;
bool _messagesEnabled; bool _messagesEnabled;
int _messageLengthPlaceholder;
}; };
#endif // hifi_DatagramSequencer_h #endif // hifi_DatagramSequencer_h

View file

@ -19,6 +19,8 @@ Endpoint::Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendReco
connect(&_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&))); connect(&_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&)));
connect(&_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&))); connect(&_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));
connect(&_sequencer, SIGNAL(sendRecorded()), SLOT(recordSend()));
connect(&_sequencer, SIGNAL(receiveRecorded()), SLOT(recordReceive()));
connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(clearSendRecordsBefore(int))); connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(clearSendRecordsBefore(int)));
connect(&_sequencer, SIGNAL(receiveAcknowledged(int)), SLOT(clearReceiveRecordsBefore(int))); connect(&_sequencer, SIGNAL(receiveAcknowledged(int)), SLOT(clearReceiveRecordsBefore(int)));
@ -40,9 +42,6 @@ void Endpoint::update() {
Bitstream& out = _sequencer.startPacket(); Bitstream& out = _sequencer.startPacket();
writeUpdateMessage(out); writeUpdateMessage(out);
_sequencer.endPacket(); _sequencer.endPacket();
// record the send
_sendRecords.append(maybeCreateSendRecord());
} }
int Endpoint::parseData(const QByteArray& packet) { int Endpoint::parseData(const QByteArray& packet) {
@ -59,8 +58,21 @@ void Endpoint::readMessage(Bitstream& in) {
QVariant message; QVariant message;
in >> message; in >> message;
handleMessage(message, in); handleMessage(message, in);
}
// record the receipt
void Endpoint::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) {
handleMessage(element, in);
}
}
}
void Endpoint::recordSend() {
_sendRecords.append(maybeCreateSendRecord());
}
void Endpoint::recordReceive() {
_receiveRecords.append(maybeCreateReceiveRecord()); _receiveRecords.append(maybeCreateReceiveRecord());
} }
@ -84,14 +96,6 @@ void Endpoint::writeUpdateMessage(Bitstream& out) {
out << QVariant(); out << QVariant();
} }
void Endpoint::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) {
handleMessage(element, in);
}
}
}
PacketRecord* Endpoint::maybeCreateSendRecord() const { PacketRecord* Endpoint::maybeCreateSendRecord() const {
return NULL; return NULL;
} }

View file

@ -24,6 +24,9 @@ class Endpoint : public NodeData {
Q_OBJECT Q_OBJECT
public: public:
/// The index of the input/output channel used to transmit reliable deltas.
static const int RELIABLE_DELTA_CHANNEL_INDEX = 1;
Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendRecord = NULL, Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendRecord = NULL,
PacketRecord* baselineReceiveRecord = NULL); PacketRecord* baselineReceiveRecord = NULL);
@ -37,6 +40,10 @@ protected slots:
virtual void sendDatagram(const QByteArray& data); virtual void sendDatagram(const QByteArray& data);
virtual void readMessage(Bitstream& in); virtual void readMessage(Bitstream& in);
virtual void handleMessage(const QVariant& message, Bitstream& in);
void recordSend();
void recordReceive();
void clearSendRecordsBefore(int index); void clearSendRecordsBefore(int index);
void clearReceiveRecordsBefore(int index); void clearReceiveRecordsBefore(int index);
@ -44,7 +51,6 @@ protected slots:
protected: protected:
virtual void writeUpdateMessage(Bitstream& out); virtual void writeUpdateMessage(Bitstream& out);
virtual void handleMessage(const QVariant& message, Bitstream& in);
virtual PacketRecord* maybeCreateSendRecord() const; virtual PacketRecord* maybeCreateSendRecord() const;
virtual PacketRecord* maybeCreateReceiveRecord() const; virtual PacketRecord* maybeCreateReceiveRecord() const;

View file

@ -86,7 +86,11 @@ void MetavoxelClientManager::updateClient(MetavoxelClient* client) {
MetavoxelClient::MetavoxelClient(const SharedNodePointer& node, MetavoxelClientManager* manager) : MetavoxelClient::MetavoxelClient(const SharedNodePointer& node, MetavoxelClientManager* manager) :
Endpoint(node, new PacketRecord(), new PacketRecord()), Endpoint(node, new PacketRecord(), new PacketRecord()),
_manager(manager) { _manager(manager),
_reliableDeltaChannel(NULL) {
connect(_sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX),
SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&)));
} }
void MetavoxelClient::guide(MetavoxelVisitor& visitor) { void MetavoxelClient::guide(MetavoxelVisitor& visitor) {
@ -112,31 +116,44 @@ void MetavoxelClient::writeUpdateMessage(Bitstream& out) {
out << QVariant::fromValue(state); out << QVariant::fromValue(state);
} }
void MetavoxelClient::readMessage(Bitstream& in) {
Endpoint::readMessage(in);
// reapply local edits
foreach (const DatagramSequencer::HighPriorityMessage& message, _sequencer.getHighPriorityMessages()) {
if (message.data.userType() == MetavoxelEditMessage::Type) {
message.data.value<MetavoxelEditMessage>().apply(_data, _sequencer.getWeakSharedObjectHash());
}
}
}
void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) { void MetavoxelClient::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == MetavoxelDeltaMessage::Type) { int userType = message.userType();
if (userType == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, getLastAcknowledgedSendRecord()->getLOD()); if (_reliableDeltaChannel) {
_remoteData.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _remoteDataLOD = _reliableDeltaLOD);
_sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings());
in.clearPersistentMappings();
_reliableDeltaChannel = NULL;
} else {
_remoteData.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in,
_remoteDataLOD = getLastAcknowledgedSendRecord()->getLOD());
in.reset();
}
// copy to local and reapply local edits
_data = _remoteData;
foreach (const DatagramSequencer::HighPriorityMessage& message, _sequencer.getHighPriorityMessages()) {
if (message.data.userType() == MetavoxelEditMessage::Type) {
message.data.value<MetavoxelEditMessage>().apply(_data, _sequencer.getWeakSharedObjectHash());
}
}
} else if (userType == MetavoxelDeltaPendingMessage::Type) {
if (!_reliableDeltaChannel) {
_reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream());
_reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD();
}
} else { } else {
Endpoint::handleMessage(message, in); Endpoint::handleMessage(message, in);
} }
} }
PacketRecord* MetavoxelClient::maybeCreateSendRecord() const { PacketRecord* MetavoxelClient::maybeCreateSendRecord() const {
return new PacketRecord(_manager->getLOD()); return new PacketRecord(_reliableDeltaChannel ? _reliableDeltaLOD : _manager->getLOD());
} }
PacketRecord* MetavoxelClient::maybeCreateReceiveRecord() const { PacketRecord* MetavoxelClient::maybeCreateReceiveRecord() const {
return new PacketRecord(getLastAcknowledgedSendRecord()->getLOD(), _data); return new PacketRecord(_remoteDataLOD, _remoteData);
} }

View file

@ -60,7 +60,6 @@ public:
protected: protected:
virtual void writeUpdateMessage(Bitstream& out); virtual void writeUpdateMessage(Bitstream& out);
virtual void readMessage(Bitstream& in);
virtual void handleMessage(const QVariant& message, Bitstream& in); virtual void handleMessage(const QVariant& message, Bitstream& in);
virtual PacketRecord* maybeCreateSendRecord() const; virtual PacketRecord* maybeCreateSendRecord() const;
@ -70,6 +69,11 @@ private:
MetavoxelClientManager* _manager; MetavoxelClientManager* _manager;
MetavoxelData _data; MetavoxelData _data;
MetavoxelData _remoteData;
MetavoxelLOD _remoteDataLOD;
ReliableChannel* _reliableDeltaChannel;
MetavoxelLOD _reliableDeltaLOD;
}; };
#endif // hifi_MetavoxelClientManager_h #endif // hifi_MetavoxelClientManager_h

View file

@ -61,6 +61,13 @@ class MetavoxelDeltaMessage {
DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaMessage) DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaMessage)
/// A message indicating that metavoxel delta information is being sent on a reliable channel.
class MetavoxelDeltaPendingMessage {
STREAMABLE
};
DECLARE_STREAMABLE_METATYPE(MetavoxelDeltaPendingMessage)
/// A simple streamable edit. /// A simple streamable edit.
class MetavoxelEditMessage { class MetavoxelEditMessage {
STREAMABLE STREAMABLE

View file

@ -646,16 +646,23 @@ TestEndpoint::TestEndpoint(Mode mode) :
Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()), Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()),
_mode(mode), _mode(mode),
_highPriorityMessagesToSend(0.0f), _highPriorityMessagesToSend(0.0f),
_reliableMessagesToSend(0.0f) { _reliableMessagesToSend(0.0f),
_reliableDeltaChannel(NULL) {
connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)),
SLOT(handleHighPriorityMessage(const QVariant&))); SLOT(handleHighPriorityMessage(const QVariant&)));
connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&, Bitstream&)),
SLOT(handleReliableMessage(const QVariant&, Bitstream&)));
if (mode == METAVOXEL_CLIENT_MODE) { if (mode == METAVOXEL_CLIENT_MODE) {
_lod = MetavoxelLOD(glm::vec3(), 0.01f); _lod = MetavoxelLOD(glm::vec3(), 0.01f);
connect(_sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX),
SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleReliableMessage(const QVariant&, Bitstream&)));
return; return;
} }
if (mode == METAVOXEL_SERVER_MODE) { if (mode == METAVOXEL_SERVER_MODE) {
connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(checkReliableDeltaReceived()));
_data.expand(); _data.expand();
_data.expand(); _data.expand();
@ -673,9 +680,6 @@ TestEndpoint::TestEndpoint(Mode mode) :
// create the object that represents out delta-encoded state // create the object that represents out delta-encoded state
_localState = new TestSharedObjectA(); _localState = new TestSharedObjectA();
connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&)),
SLOT(handleReliableMessage(const QVariant&)));
ReliableChannel* secondInput = _sequencer.getReliableInputChannel(1); ReliableChannel* secondInput = _sequencer.getReliableInputChannel(1);
secondInput->setMessagesEnabled(false); secondInput->setMessagesEnabled(false);
connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel())); connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
@ -867,9 +871,6 @@ bool TestEndpoint::simulate(int iterationNumber) {
maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent); maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent);
maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent); maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent);
// record the send
_sendRecords.append(maybeCreateSendRecord());
} }
return false; return false;
@ -880,9 +881,6 @@ bool TestEndpoint::simulate(int iterationNumber) {
out << QVariant::fromValue(state); out << QVariant::fromValue(state);
_sequencer.endPacket(); _sequencer.endPacket();
// record the send
_sendRecords.append(maybeCreateSendRecord());
} else if (_mode == METAVOXEL_SERVER_MODE) { } else if (_mode == METAVOXEL_SERVER_MODE) {
// make a random change // make a random change
MutateVisitor visitor; MutateVisitor visitor;
@ -907,15 +905,39 @@ bool TestEndpoint::simulate(int iterationNumber) {
if (!_lod.isValid()) { if (!_lod.isValid()) {
return false; return false;
} }
// if we're sending a reliable delta, wait until it's acknowledged
if (_reliableDeltaChannel) {
Bitstream& out = _sequencer.startPacket();
out << QVariant::fromValue(MetavoxelDeltaPendingMessage());
_sequencer.endPacket();
return false;
}
Bitstream& out = _sequencer.startPacket(); Bitstream& out = _sequencer.startPacket();
int start = _sequencer.getOutputStream().getUnderlying().device()->pos();
out << QVariant::fromValue(MetavoxelDeltaMessage()); out << QVariant::fromValue(MetavoxelDeltaMessage());
PacketRecord* sendRecord = getLastAcknowledgedSendRecord(); PacketRecord* sendRecord = getLastAcknowledgedSendRecord();
_data.writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); _data.writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod);
_sequencer.endPacket(); out.flush();
int end = _sequencer.getOutputStream().getUnderlying().device()->pos();
// record the send if (end > _sequencer.getMaxPacketSize()) {
_sendRecords.append(maybeCreateSendRecord()); // we need to send the delta on the reliable channel
_reliableDeltaChannel = _sequencer.getReliableOutputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->startMessage();
_reliableDeltaChannel->getBuffer().write(_sequencer.getOutgoingPacketData().constData() + start, end - start);
_reliableDeltaChannel->endMessage();
_reliableDeltaWriteMappings = out.getAndResetWriteMappings();
_reliableDeltaReceivedOffset = _reliableDeltaChannel->getBytesWritten();
_reliableDeltaData = _data;
_reliableDeltaLOD = _lod;
_sequencer.getOutputStream().getUnderlying().device()->seek(start);
out << QVariant::fromValue(MetavoxelDeltaPendingMessage());
_sequencer.endPacket();
} else {
_sequencer.endPacket();
}
} else { } else {
// enqueue some number of high priority messages // enqueue some number of high priority messages
const float MIN_HIGH_PRIORITY_MESSAGES = 0.0f; const float MIN_HIGH_PRIORITY_MESSAGES = 0.0f;
@ -957,9 +979,6 @@ bool TestEndpoint::simulate(int iterationNumber) {
qDebug() << message; qDebug() << message;
return true; return true;
} }
// record the send
_sendRecords.append(maybeCreateSendRecord());
} }
maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent); maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent);
maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent); maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent);
@ -995,7 +1014,7 @@ void TestEndpoint::sendDatagram(const QByteArray& datagram) {
// some are received out of order // some are received out of order
const float REORDER_PROBABILITY = 0.1f; const float REORDER_PROBABILITY = 0.1f;
if (randFloat() < REORDER_PROBABILITY * probabilityMultiplier) { if (randFloat() < REORDER_PROBABILITY * probabilityMultiplier) {
const int MIN_DELAY = 1; const int MIN_DELAY = 2;
const int MAX_DELAY = 5; const int MAX_DELAY = 5;
// have to copy the datagram; the one we're passed is a reference to a shared buffer // have to copy the datagram; the one we're passed is a reference to a shared buffer
_delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()), _delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()),
@ -1008,58 +1027,32 @@ void TestEndpoint::sendDatagram(const QByteArray& datagram) {
} }
} }
_other->parseData(datagram); _delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()), 1));
} }
void TestEndpoint::readMessage(Bitstream& in) { void TestEndpoint::readMessage(Bitstream& in) {
if (_mode == CONGESTION_MODE) { if (_mode == CONGESTION_MODE) {
QVariant message; QVariant message;
in >> message; in >> message;
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
return; return;
} }
if (_mode == METAVOXEL_CLIENT_MODE) { if (_mode == METAVOXEL_CLIENT_MODE) {
QVariant message; QVariant message;
in >> message; in >> message;
handleMessage(message, in); handleMessage(message, in);
// deep-compare data to sent version
int packetNumber = _sequencer.getIncomingPacketNumber();
foreach (PacketRecord* record, _other->_sendRecords) {
TestSendRecord* sendRecord = static_cast<TestSendRecord*>(record);
if (sendRecord->getPacketNumber() == packetNumber) {
if (!sendRecord->getData().deepEquals(_data, getLastAcknowledgedSendRecord()->getLOD())) {
qDebug() << "Sent/received metavoxel data mismatch.";
exit(true);
}
break;
}
}
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
return; return;
} }
if (_mode == METAVOXEL_SERVER_MODE) { if (_mode == METAVOXEL_SERVER_MODE) {
QVariant message; QVariant message;
in >> message; in >> message;
handleMessage(message, in); handleMessage(message, in);
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
return; return;
} }
SequencedTestMessage message; SequencedTestMessage message;
in >> message; in >> message;
_remoteState = message.state; _remoteState = message.state;
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
for (QList<SequencedTestMessage>::iterator it = _other->_unreliableMessagesSent.begin(); for (QList<SequencedTestMessage>::iterator it = _other->_unreliableMessagesSent.begin();
it != _other->_unreliableMessagesSent.end(); it++) { it != _other->_unreliableMessagesSent.end(); it++) {
if (it->sequenceNumber == message.sequenceNumber) { if (it->sequenceNumber == message.sequenceNumber) {
@ -1088,8 +1081,16 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) {
} else if (userType == MetavoxelDeltaMessage::Type) { } else if (userType == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, getLastAcknowledgedSendRecord()->getLOD()); _data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in,
_dataLOD = getLastAcknowledgedSendRecord()->getLOD());
compareMetavoxelData();
} else if (userType == MetavoxelDeltaPendingMessage::Type) {
if (!_reliableDeltaChannel) {
_reliableDeltaChannel = _sequencer.getReliableInputChannel(RELIABLE_DELTA_CHANNEL_INDEX);
_reliableDeltaChannel->getBitstream().copyPersistentMappings(_sequencer.getInputStream());
_reliableDeltaLOD = getLastAcknowledgedSendRecord()->getLOD();
}
} else if (userType == QMetaType::QVariantList) { } else if (userType == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) { foreach (const QVariant& element, message.toList()) {
handleMessage(element, in); handleMessage(element, in);
@ -1098,13 +1099,15 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) {
} }
PacketRecord* TestEndpoint::maybeCreateSendRecord() const { PacketRecord* TestEndpoint::maybeCreateSendRecord() const {
if (_reliableDeltaChannel) {
return new TestSendRecord(_reliableDeltaLOD, _reliableDeltaData, _localState, _sequencer.getOutgoingPacketNumber());
}
return new TestSendRecord(_lod, (_mode == METAVOXEL_CLIENT_MODE) ? MetavoxelData() : _data, return new TestSendRecord(_lod, (_mode == METAVOXEL_CLIENT_MODE) ? MetavoxelData() : _data,
_localState, _sequencer.getOutgoingPacketNumber()); _localState, _sequencer.getOutgoingPacketNumber());
} }
PacketRecord* TestEndpoint::maybeCreateReceiveRecord() const { PacketRecord* TestEndpoint::maybeCreateReceiveRecord() const {
return new TestReceiveRecord(getLastAcknowledgedSendRecord()->getLOD(), return new TestReceiveRecord(_dataLOD, (_mode == METAVOXEL_SERVER_MODE) ? MetavoxelData() : _data, _remoteState);
(_mode == METAVOXEL_SERVER_MODE) ? MetavoxelData() : _data, _remoteState);
} }
void TestEndpoint::handleHighPriorityMessage(const QVariant& message) { void TestEndpoint::handleHighPriorityMessage(const QVariant& message) {
@ -1121,7 +1124,16 @@ void TestEndpoint::handleHighPriorityMessage(const QVariant& message) {
highPriorityMessagesReceived++; highPriorityMessagesReceived++;
} }
void TestEndpoint::handleReliableMessage(const QVariant& message) { void TestEndpoint::handleReliableMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, _dataLOD = _reliableDeltaLOD);
_sequencer.getInputStream().persistReadMappings(in.getAndResetReadMappings());
in.clearPersistentMappings();
compareMetavoxelData();
_reliableDeltaChannel = NULL;
return;
}
if (message.userType() == ClearSharedObjectMessage::Type || if (message.userType() == ClearSharedObjectMessage::Type ||
message.userType() == ClearMainChannelSharedObjectMessage::Type) { message.userType() == ClearMainChannelSharedObjectMessage::Type) {
return; return;
@ -1150,6 +1162,33 @@ void TestEndpoint::readReliableChannel() {
streamedBytesReceived += bytes.size(); streamedBytesReceived += bytes.size();
} }
void TestEndpoint::checkReliableDeltaReceived() {
if (!_reliableDeltaChannel || _reliableDeltaChannel->getOffset() < _reliableDeltaReceivedOffset) {
return;
}
_sequencer.getOutputStream().persistWriteMappings(_reliableDeltaWriteMappings);
_reliableDeltaWriteMappings = Bitstream::WriteMappings();
_reliableDeltaData = MetavoxelData();
_reliableDeltaChannel = NULL;
}
void TestEndpoint::compareMetavoxelData() {
// deep-compare data to sent version
int packetNumber = _sequencer.getIncomingPacketNumber();
foreach (PacketRecord* record, _other->_sendRecords) {
TestSendRecord* sendRecord = static_cast<TestSendRecord*>(record);
if (sendRecord->getPacketNumber() == packetNumber) {
if (!sendRecord->getData().deepEquals(_data, getLastAcknowledgedSendRecord()->getLOD())) {
qDebug() << "Sent/received metavoxel data mismatch.";
exit(true);
}
return;
}
}
qDebug() << "Received metavoxel data with no corresponding send." << packetNumber;
exit(true);
}
TestSharedObjectA::TestSharedObjectA(float foo, TestEnum baz, TestFlags bong) : TestSharedObjectA::TestSharedObjectA(float foo, TestEnum baz, TestFlags bong) :
_foo(foo), _foo(foo),
_baz(baz), _baz(baz),

View file

@ -64,17 +64,21 @@ protected:
private slots: private slots:
void handleHighPriorityMessage(const QVariant& message); void handleHighPriorityMessage(const QVariant& message);
void handleReliableMessage(const QVariant& message); void handleReliableMessage(const QVariant& message, Bitstream& in);
void readReliableChannel(); void readReliableChannel();
void checkReliableDeltaReceived();
private: private:
void compareMetavoxelData();
Mode _mode; Mode _mode;
SharedObjectPointer _localState; SharedObjectPointer _localState;
SharedObjectPointer _remoteState; SharedObjectPointer _remoteState;
MetavoxelData _data; MetavoxelData _data;
MetavoxelLOD _dataLOD;
MetavoxelLOD _lod; MetavoxelLOD _lod;
SharedObjectPointer _sphere; SharedObjectPointer _sphere;
@ -94,6 +98,12 @@ private:
float _reliableMessagesToSend; float _reliableMessagesToSend;
QVariantList _reliableMessagesSent; QVariantList _reliableMessagesSent;
CircularBuffer _dataStreamed; CircularBuffer _dataStreamed;
ReliableChannel* _reliableDeltaChannel;
int _reliableDeltaReceivedOffset;
MetavoxelData _reliableDeltaData;
MetavoxelLOD _reliableDeltaLOD;
Bitstream::WriteMappings _reliableDeltaWriteMappings;
}; };
/// A simple shared object. /// A simple shared object.