Working on sending large deltas as reliable messages.

This commit is contained in:
Andrzej Kapolka 2014-07-02 17:13:39 -07:00
parent 3acbaa7ab6
commit 154eb04336
11 changed files with 204 additions and 89 deletions

View file

@ -94,8 +94,8 @@ MetavoxelSession::MetavoxelSession(const SharedNodePointer& node, MetavoxelServe
_server(server) { _server(server) {
connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleMessage(const QVariant&)));
connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&)), connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&, Bitstream&)),
SLOT(handleMessage(const QVariant&))); SLOT(handleMessage(const QVariant&, Bitstream&)));
} }
void MetavoxelSession::update() { void MetavoxelSession::update() {

View file

@ -127,6 +127,7 @@ Bitstream::Bitstream(QDataStream& underlying, MetadataType metadataType, Generic
_underlying(underlying), _underlying(underlying),
_byte(0), _byte(0),
_position(0), _position(0),
_bytesRemaining(INT_MAX),
_metadataType(metadataType), _metadataType(metadataType),
_genericsMode(genericsMode), _genericsMode(genericsMode),
_objectStreamerStreamer(*this), _objectStreamerStreamer(*this),
@ -193,13 +194,16 @@ Bitstream& Bitstream::read(void* data, int bits, int offset) {
void Bitstream::flush() { void Bitstream::flush() {
if (_position != 0) { if (_position != 0) {
_underlying << _byte; _underlying << _byte;
reset(); _bytesRemaining--;
_byte = 0;
_position = 0;
} }
} }
void Bitstream::reset() { void Bitstream::reset() {
_byte = 0; _byte = 0;
_position = 0; _position = 0;
_bytesRemaining = INT_MAX;
} }
Bitstream::WriteMappings Bitstream::getAndResetWriteMappings() { Bitstream::WriteMappings Bitstream::getAndResetWriteMappings() {
@ -1122,7 +1126,7 @@ Bitstream& Bitstream::operator>(ObjectStreamerPointer& streamer) {
} }
if (_metadataType == NO_METADATA) { if (_metadataType == NO_METADATA) {
if (!metaObject) { if (!metaObject) {
qWarning() << "Unknown class name:" << className; throw BitstreamException(QString("Unknown class name: ") + className);
} }
return *this; return *this;
} }
@ -1232,7 +1236,7 @@ Bitstream& Bitstream::operator>(TypeStreamerPointer& streamer) {
} }
if (_metadataType == NO_METADATA) { if (_metadataType == NO_METADATA) {
if (!baseStreamer) { if (!baseStreamer) {
qWarning() << "Unknown type name:" << typeName; throw BitstreamException(QString("Unknown type name: ") + typeName);
} }
return *this; return *this;
} }
@ -1240,7 +1244,7 @@ Bitstream& Bitstream::operator>(TypeStreamerPointer& streamer) {
*this >> category; *this >> category;
if (category == TypeStreamer::SIMPLE_CATEGORY) { if (category == TypeStreamer::SIMPLE_CATEGORY) {
if (!streamer) { if (!streamer) {
qWarning() << "Unknown type name:" << typeName; throw BitstreamException(QString("Unknown type name: ") + typeName);
} }
return *this; return *this;
} }
@ -1441,7 +1445,7 @@ Bitstream& Bitstream::operator>(SharedObjectPointer& object) {
_objectStreamerStreamer >> objectStreamer; _objectStreamerStreamer >> objectStreamer;
if (delta) { if (delta) {
if (!reference) { if (!reference) {
qWarning() << "Delta without reference" << id << originID; throw BitstreamException(QString("Delta without reference [id=%1, originID=%2]").arg(id).arg(originID));
} }
objectStreamer->readRawDelta(*this, reference.data(), pointer.data()); objectStreamer->readRawDelta(*this, reference.data(), pointer.data());
} else { } else {
@ -1451,7 +1455,7 @@ Bitstream& Bitstream::operator>(SharedObjectPointer& object) {
QObject* rawObject; QObject* rawObject;
if (delta) { if (delta) {
if (!reference) { if (!reference) {
qWarning() << "Delta without reference" << id << originID; throw BitstreamException(QString("Delta without reference [id=%1, originID=%2]").arg(id).arg(originID));
} }
readRawDelta(rawObject, (const QObject*)reference.data()); readRawDelta(rawObject, (const QObject*)reference.data());
} else { } else {
@ -1682,6 +1686,10 @@ const TypeStreamer* Bitstream::createInvalidTypeStreamer() {
return streamer; return streamer;
} }
BitstreamException::BitstreamException(const QString& description) :
_description(description) {
}
QJsonValue JSONWriter::getData(bool value) { QJsonValue JSONWriter::getData(bool value) {
return value; return value;
} }

View file

@ -329,6 +329,12 @@ public:
/// Resets to the initial state. /// Resets to the initial state.
void reset(); void reset();
/// Sets the number of "bytes remaining," which will be decremented with each byte written.
void setBytesRemaining(int bytesRemaining) { _bytesRemaining = bytesRemaining; }
/// Returns the number of bytes remaining.
int getBytesRemaining() const { return _bytesRemaining; }
/// Returns the set of transient mappings gathered during writing and resets them. /// Returns the set of transient mappings gathered during writing and resets them.
WriteMappings getAndResetWriteMappings(); WriteMappings getAndResetWriteMappings();
@ -508,6 +514,7 @@ private:
QDataStream& _underlying; QDataStream& _underlying;
quint8 _byte; quint8 _byte;
int _position; int _position;
int _bytesRemaining;
MetadataType _metadataType; MetadataType _metadataType;
GenericsMode _genericsMode; GenericsMode _genericsMode;
@ -823,6 +830,19 @@ template<class K, class V> inline Bitstream& Bitstream::operator>>(QHash<K, V>&
return *this; return *this;
} }
/// Thrown for unrecoverable errors.
class BitstreamException {
public:
BitstreamException(const QString& description);
const QString& getDescription() const { return _description; }
private:
QString _description;
};
/// Provides a means of writing Bitstream-able data to JSON rather than the usual binary format in a manner that allows it to /// Provides a means of writing Bitstream-able data to JSON rather than the usual binary format in a manner that allows it to
/// be manipulated and re-read, converted to binary, etc. To use, create a JSONWriter, stream values in using the << operator, /// be manipulated and re-read, converted to binary, etc. To use, create a JSONWriter, stream values in using the << operator,
/// and call getDocument to obtain the JSON data. /// and call getDocument to obtain the JSON data.

View file

@ -253,6 +253,8 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
// record the receipt // record the receipt
ReceiveRecord record = { _incomingPacketNumber, _inputStream.getAndResetReadMappings(), newHighPriorityMessages }; ReceiveRecord record = { _incomingPacketNumber, _inputStream.getAndResetReadMappings(), newHighPriorityMessages };
_receiveRecords.append(record); _receiveRecords.append(record);
emit receiveRecorded();
} }
void DatagramSequencer::sendClearSharedObjectMessage(int id) { void DatagramSequencer::sendClearSharedObjectMessage(int id) {
@ -364,6 +366,8 @@ void DatagramSequencer::sendPacket(const QByteArray& packet, const QVector<Chann
_outputStream.getAndResetWriteMappings(), spans }; _outputStream.getAndResetWriteMappings(), spans };
_sendRecords.append(record); _sendRecords.append(record);
emit sendRecorded();
// write the sequence number and size, which are the same between all fragments // write the sequence number and size, which are the same between all fragments
_outgoingDatagramBuffer.seek(_datagramHeaderSize); _outgoingDatagramBuffer.seek(_datagramHeaderSize);
_outgoingDatagramStream << (quint32)_outgoingPacketNumber; _outgoingDatagramStream << (quint32)_outgoingPacketNumber;
@ -658,16 +662,24 @@ int ReliableChannel::getBytesAvailable() const {
return _buffer.size() - _acknowledged.getTotalSet(); return _buffer.size() - _acknowledged.getTotalSet();
} }
void ReliableChannel::sendMessage(const QVariant& message) { void ReliableChannel::startMessage() {
// write a placeholder for the length, then fill it in when we know what it is // write a placeholder for the length; we'll fill it in when we know what it is
int placeholder = _buffer.pos(); _messageLengthPlaceholder = _buffer.pos();
_dataStream << (quint32)0; _dataStream << (quint32)0;
_bitstream << message; }
void ReliableChannel::endMessage() {
_bitstream.flush(); _bitstream.flush();
_bitstream.persistAndResetWriteMappings(); _bitstream.persistAndResetWriteMappings();
quint32 length = _buffer.pos() - placeholder; quint32 length = _buffer.pos() - _messageLengthPlaceholder;
_buffer.writeBytes(placeholder, sizeof(quint32), (const char*)&length); _buffer.writeBytes(_messageLengthPlaceholder, sizeof(quint32), (const char*)&length);
}
void ReliableChannel::sendMessage(const QVariant& message) {
startMessage();
_bitstream << message;
endMessage();
} }
void ReliableChannel::sendClearSharedObjectMessage(int id) { void ReliableChannel::sendClearSharedObjectMessage(int id) {
@ -675,7 +687,7 @@ void ReliableChannel::sendClearSharedObjectMessage(int id) {
sendMessage(QVariant::fromValue(message)); sendMessage(QVariant::fromValue(message));
} }
void ReliableChannel::handleMessage(const QVariant& message) { void ReliableChannel::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == ClearSharedObjectMessage::Type) { if (message.userType() == ClearSharedObjectMessage::Type) {
_bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id); _bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id);
@ -700,7 +712,7 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
_dataStream.setByteOrder(QDataStream::LittleEndian); _dataStream.setByteOrder(QDataStream::LittleEndian);
connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int))); connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
connect(this, SIGNAL(receivedMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); connect(this, SIGNAL(receivedMessage(const QVariant&, Bitstream&)), SLOT(handleMessage(const QVariant&, Bitstream&)));
} }
void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) { void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) {
@ -843,9 +855,9 @@ void ReliableChannel::readData(QDataStream& in) {
_dataStream.skipRawData(sizeof(quint32)); _dataStream.skipRawData(sizeof(quint32));
QVariant message; QVariant message;
_bitstream >> message; _bitstream >> message;
emit receivedMessage(message, _bitstream);
_bitstream.reset(); _bitstream.reset();
_bitstream.persistAndResetReadMappings(); _bitstream.persistAndResetReadMappings();
emit receivedMessage(message);
continue; continue;
} }
} }

View file

@ -126,9 +126,15 @@ signals:
/// Emitted when a packet is available to read. /// Emitted when a packet is available to read.
void readyToRead(Bitstream& input); void readyToRead(Bitstream& input);
/// Emitted when we've received a high-priority message /// Emitted when we've received a high-priority message.
void receivedHighPriorityMessage(const QVariant& data); void receivedHighPriorityMessage(const QVariant& data);
/// Emitted when we've recorded the transmission of a packet.
void sendRecorded();
/// Emitted when we've recorded the receipt of a packet (that is, at the end of packet processing).
void receiveRecorded();
/// Emitted when a sent packet has been acknowledged by the remote side. /// Emitted when a sent packet has been acknowledged by the remote side.
/// \param index the index of the packet in our list of send records /// \param index the index of the packet in our list of send records
void sendAcknowledged(int index); void sendAcknowledged(int index);
@ -336,22 +342,36 @@ public:
/// Returns the number of bytes available to read from this channel. /// Returns the number of bytes available to read from this channel.
int getBytesAvailable() const; int getBytesAvailable() const;
/// Returns the offset, which represents the total number of bytes acknowledged
/// (on the write end) or received completely (on the read end).
int getOffset() const { return _offset; }
/// Returns the total number of bytes written to this channel.
int getBytesWritten() const { return _offset + _buffer.pos(); }
/// Sets whether we expect to write/read framed messages. /// Sets whether we expect to write/read framed messages.
void setMessagesEnabled(bool enabled) { _messagesEnabled = enabled; } void setMessagesEnabled(bool enabled) { _messagesEnabled = enabled; }
bool getMessagesEnabled() const { return _messagesEnabled; } bool getMessagesEnabled() const { return _messagesEnabled; }
/// Sends a framed message on this channel. /// Starts a framed message on this channel.
void startMessage();
/// Ends a framed message on this channel.
void endMessage();
/// Sends a framed message on this channel (convenience function that calls startMessage,
/// writes the message to the bitstream, then calls endMessage).
void sendMessage(const QVariant& message); void sendMessage(const QVariant& message);
signals: signals:
/// Fired when a framed message has been received on this channel. /// Fired when a framed message has been received on this channel.
void receivedMessage(const QVariant& message); void receivedMessage(const QVariant& message, Bitstream& in);
private slots: private slots:
void sendClearSharedObjectMessage(int id); void sendClearSharedObjectMessage(int id);
void handleMessage(const QVariant& message); void handleMessage(const QVariant& message, Bitstream& in);
private: private:
@ -381,6 +401,7 @@ private:
int _writePositionResetPacketNumber; int _writePositionResetPacketNumber;
SpanList _acknowledged; SpanList _acknowledged;
bool _messagesEnabled; bool _messagesEnabled;
int _messageLengthPlaceholder;
}; };
#endif // hifi_DatagramSequencer_h #endif // hifi_DatagramSequencer_h

View file

@ -19,6 +19,8 @@ Endpoint::Endpoint(const SharedNodePointer& node, PacketRecord* baselineSendReco
connect(&_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&))); connect(&_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&)));
connect(&_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&))); connect(&_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));
connect(&_sequencer, SIGNAL(sendRecorded()), SLOT(recordSend()));
connect(&_sequencer, SIGNAL(receiveRecorded()), SLOT(recordReceive()));
connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(clearSendRecordsBefore(int))); connect(&_sequencer, SIGNAL(sendAcknowledged(int)), SLOT(clearSendRecordsBefore(int)));
connect(&_sequencer, SIGNAL(receiveAcknowledged(int)), SLOT(clearReceiveRecordsBefore(int))); connect(&_sequencer, SIGNAL(receiveAcknowledged(int)), SLOT(clearReceiveRecordsBefore(int)));
@ -40,9 +42,6 @@ void Endpoint::update() {
Bitstream& out = _sequencer.startPacket(); Bitstream& out = _sequencer.startPacket();
writeUpdateMessage(out); writeUpdateMessage(out);
_sequencer.endPacket(); _sequencer.endPacket();
// record the send
_sendRecords.append(maybeCreateSendRecord());
} }
int Endpoint::parseData(const QByteArray& packet) { int Endpoint::parseData(const QByteArray& packet) {
@ -59,8 +58,21 @@ void Endpoint::readMessage(Bitstream& in) {
QVariant message; QVariant message;
in >> message; in >> message;
handleMessage(message, in); handleMessage(message, in);
}
// record the receipt
void Endpoint::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) {
handleMessage(element, in);
}
}
}
void Endpoint::recordSend() {
_sendRecords.append(maybeCreateSendRecord());
}
void Endpoint::recordReceive() {
_receiveRecords.append(maybeCreateReceiveRecord()); _receiveRecords.append(maybeCreateReceiveRecord());
} }
@ -84,14 +96,6 @@ void Endpoint::writeUpdateMessage(Bitstream& out) {
out << QVariant(); out << QVariant();
} }
void Endpoint::handleMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) {
handleMessage(element, in);
}
}
}
PacketRecord* Endpoint::maybeCreateSendRecord() const { PacketRecord* Endpoint::maybeCreateSendRecord() const {
return NULL; return NULL;
} }

View file

@ -37,6 +37,10 @@ protected slots:
virtual void sendDatagram(const QByteArray& data); virtual void sendDatagram(const QByteArray& data);
virtual void readMessage(Bitstream& in); virtual void readMessage(Bitstream& in);
virtual void handleMessage(const QVariant& message, Bitstream& in);
void recordSend();
void recordReceive();
void clearSendRecordsBefore(int index); void clearSendRecordsBefore(int index);
void clearReceiveRecordsBefore(int index); void clearReceiveRecordsBefore(int index);
@ -44,7 +48,6 @@ protected slots:
protected: protected:
virtual void writeUpdateMessage(Bitstream& out); virtual void writeUpdateMessage(Bitstream& out);
virtual void handleMessage(const QVariant& message, Bitstream& in);
virtual PacketRecord* maybeCreateSendRecord() const; virtual PacketRecord* maybeCreateSendRecord() const;
virtual PacketRecord* maybeCreateReceiveRecord() const; virtual PacketRecord* maybeCreateReceiveRecord() const;

View file

@ -681,6 +681,12 @@ void MetavoxelStreamState::setMinimum(const glm::vec3& lastMinimum, int index) {
minimum = getNextMinimum(lastMinimum, size, index); minimum = getNextMinimum(lastMinimum, size, index);
} }
void MetavoxelStreamState::checkByteLimitExceeded() {
if (stream.getBytesRemaining() < 0) {
throw ByteLimitExceededException();
}
}
MetavoxelNode::MetavoxelNode(const AttributeValue& attributeValue, const MetavoxelNode* copyChildren) : MetavoxelNode::MetavoxelNode(const AttributeValue& attributeValue, const MetavoxelNode* copyChildren) :
_referenceCount(1) { _referenceCount(1) {
@ -772,11 +778,13 @@ void MetavoxelNode::read(MetavoxelStreamState& state) {
void MetavoxelNode::write(MetavoxelStreamState& state) const { void MetavoxelNode::write(MetavoxelStreamState& state) const {
if (!state.shouldSubdivide()) { if (!state.shouldSubdivide()) {
state.attribute->write(state.stream, _attributeValue, true); state.attribute->write(state.stream, _attributeValue, true);
state.checkByteLimitExceeded();
return; return;
} }
bool leaf = isLeaf(); bool leaf = isLeaf();
state.stream << leaf; state.stream << leaf;
state.attribute->write(state.stream, _attributeValue, leaf); state.attribute->write(state.stream, _attributeValue, leaf);
state.checkByteLimitExceeded();
if (!leaf) { if (!leaf) {
MetavoxelStreamState nextState = { glm::vec3(), state.size * 0.5f, state.attribute, MetavoxelStreamState nextState = { glm::vec3(), state.size * 0.5f, state.attribute,
state.stream, state.lod, state.referenceLOD }; state.stream, state.lod, state.referenceLOD };
@ -830,11 +838,13 @@ void MetavoxelNode::readDelta(const MetavoxelNode& reference, MetavoxelStreamSta
void MetavoxelNode::writeDelta(const MetavoxelNode& reference, MetavoxelStreamState& state) const { void MetavoxelNode::writeDelta(const MetavoxelNode& reference, MetavoxelStreamState& state) const {
if (!state.shouldSubdivide()) { if (!state.shouldSubdivide()) {
state.attribute->writeDelta(state.stream, _attributeValue, reference._attributeValue, true); state.attribute->writeDelta(state.stream, _attributeValue, reference._attributeValue, true);
state.checkByteLimitExceeded();
return; return;
} }
bool leaf = isLeaf(); bool leaf = isLeaf();
state.stream << leaf; state.stream << leaf;
state.attribute->writeDelta(state.stream, _attributeValue, reference._attributeValue, leaf); state.attribute->writeDelta(state.stream, _attributeValue, reference._attributeValue, leaf);
state.checkByteLimitExceeded();
if (!leaf) { if (!leaf) {
MetavoxelStreamState nextState = { glm::vec3(), state.size * 0.5f, state.attribute, MetavoxelStreamState nextState = { glm::vec3(), state.size * 0.5f, state.attribute,
state.stream, state.lod, state.referenceLOD }; state.stream, state.lod, state.referenceLOD };
@ -897,6 +907,7 @@ void MetavoxelNode::writeSubdivision(MetavoxelStreamState& state) const {
bool subdivideReference = state.shouldSubdivideReference(); bool subdivideReference = state.shouldSubdivideReference();
if (!subdivideReference) { if (!subdivideReference) {
state.stream << leaf; state.stream << leaf;
state.checkByteLimitExceeded();
} }
if (!leaf) { if (!leaf) {
MetavoxelStreamState nextState = { glm::vec3(), state.size * 0.5f, state.attribute, MetavoxelStreamState nextState = { glm::vec3(), state.size * 0.5f, state.attribute,
@ -921,6 +932,7 @@ void MetavoxelNode::writeSpanners(MetavoxelStreamState& state) const {
foreach (const SharedObjectPointer& object, decodeInline<SharedObjectSet>(_attributeValue)) { foreach (const SharedObjectPointer& object, decodeInline<SharedObjectSet>(_attributeValue)) {
if (static_cast<Spanner*>(object.data())->testAndSetVisited()) { if (static_cast<Spanner*>(object.data())->testAndSetVisited()) {
state.stream << object; state.stream << object;
state.checkByteLimitExceeded();
} }
} }
if (!state.shouldSubdivide() || isLeaf()) { if (!state.shouldSubdivide() || isLeaf()) {
@ -940,11 +952,13 @@ void MetavoxelNode::writeSpannerDelta(const MetavoxelNode& reference, MetavoxelS
foreach (const SharedObjectPointer& object, oldSet) { foreach (const SharedObjectPointer& object, oldSet) {
if (static_cast<Spanner*>(object.data())->testAndSetVisited() && !newSet.contains(object)) { if (static_cast<Spanner*>(object.data())->testAndSetVisited() && !newSet.contains(object)) {
state.stream << object; state.stream << object;
state.checkByteLimitExceeded();
} }
} }
foreach (const SharedObjectPointer& object, newSet) { foreach (const SharedObjectPointer& object, newSet) {
if (static_cast<Spanner*>(object.data())->testAndSetVisited() && !oldSet.contains(object)) { if (static_cast<Spanner*>(object.data())->testAndSetVisited() && !oldSet.contains(object)) {
state.stream << object; state.stream << object;
state.checkByteLimitExceeded();
} }
} }
if (isLeaf() || !state.shouldSubdivide()) { if (isLeaf() || !state.shouldSubdivide()) {

View file

@ -164,6 +164,13 @@ public:
bool becameSubdivided() const; bool becameSubdivided() const;
void setMinimum(const glm::vec3& lastMinimum, int index); void setMinimum(const glm::vec3& lastMinimum, int index);
/// Throws ByteLimitExceededException if the stream has fewer than zero bytes remaining.
void checkByteLimitExceeded();
};
/// Thrown when we have exceeded the byte limit in writing.
class ByteLimitExceededException {
}; };
/// A single node within a metavoxel layer. /// A single node within a metavoxel layer.

View file

@ -646,11 +646,14 @@ TestEndpoint::TestEndpoint(Mode mode) :
Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()), Endpoint(SharedNodePointer(), new TestSendRecord(), new TestReceiveRecord()),
_mode(mode), _mode(mode),
_highPriorityMessagesToSend(0.0f), _highPriorityMessagesToSend(0.0f),
_reliableMessagesToSend(0.0f) { _reliableMessagesToSend(0.0f),
_reliableDeltaReceivedOffset(0) {
connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), connect(&_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)),
SLOT(handleHighPriorityMessage(const QVariant&))); SLOT(handleHighPriorityMessage(const QVariant&)));
connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&, Bitstream&)),
SLOT(handleReliableMessage(const QVariant&, Bitstream&)));
if (mode == METAVOXEL_CLIENT_MODE) { if (mode == METAVOXEL_CLIENT_MODE) {
_lod = MetavoxelLOD(glm::vec3(), 0.01f); _lod = MetavoxelLOD(glm::vec3(), 0.01f);
return; return;
@ -663,19 +666,16 @@ TestEndpoint::TestEndpoint(Mode mode) :
_data.guide(visitor); _data.guide(visitor);
qDebug() << "Created" << visitor.leafCount << "base leaves"; qDebug() << "Created" << visitor.leafCount << "base leaves";
_data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), new Sphere()); //_data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), new Sphere());
_sphere = new Sphere(); _sphere = new Sphere();
static_cast<Transformable*>(_sphere.data())->setScale(0.01f); static_cast<Transformable*>(_sphere.data())->setScale(0.01f);
_data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), _sphere); //_data.insert(AttributeRegistry::getInstance()->getSpannersAttribute(), _sphere);
return; return;
} }
// create the object that represents out delta-encoded state // create the object that represents out delta-encoded state
_localState = new TestSharedObjectA(); _localState = new TestSharedObjectA();
connect(_sequencer.getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&)),
SLOT(handleReliableMessage(const QVariant&)));
ReliableChannel* secondInput = _sequencer.getReliableInputChannel(1); ReliableChannel* secondInput = _sequencer.getReliableInputChannel(1);
secondInput->setMessagesEnabled(false); secondInput->setMessagesEnabled(false);
connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel())); connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
@ -867,9 +867,6 @@ bool TestEndpoint::simulate(int iterationNumber) {
maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent); maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent);
maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent); maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent);
// record the send
_sendRecords.append(maybeCreateSendRecord());
} }
return false; return false;
@ -880,9 +877,6 @@ bool TestEndpoint::simulate(int iterationNumber) {
out << QVariant::fromValue(state); out << QVariant::fromValue(state);
_sequencer.endPacket(); _sequencer.endPacket();
// record the send
_sendRecords.append(maybeCreateSendRecord());
} else if (_mode == METAVOXEL_SERVER_MODE) { } else if (_mode == METAVOXEL_SERVER_MODE) {
// make a random change // make a random change
MutateVisitor visitor; MutateVisitor visitor;
@ -899,7 +893,7 @@ bool TestEndpoint::simulate(int iterationNumber) {
newSphere->setTranslation(newSphere->getTranslation() + glm::vec3(randFloatInRange(-0.01f, 0.01f), newSphere->setTranslation(newSphere->getTranslation() + glm::vec3(randFloatInRange(-0.01f, 0.01f),
randFloatInRange(-0.01f, 0.01f), randFloatInRange(-0.01f, 0.01f))); randFloatInRange(-0.01f, 0.01f), randFloatInRange(-0.01f, 0.01f)));
} }
_data.replace(AttributeRegistry::getInstance()->getSpannersAttribute(), oldSphere, _sphere); //_data.replace(AttributeRegistry::getInstance()->getSpannersAttribute(), oldSphere, _sphere);
spannerMutationsPerformed++; spannerMutationsPerformed++;
} }
@ -907,15 +901,43 @@ bool TestEndpoint::simulate(int iterationNumber) {
if (!_lod.isValid()) { if (!_lod.isValid()) {
return false; return false;
} }
// if we're sending a reliable delta, wait until it's acknowledged
if (_reliableDeltaReceivedOffset > 0) {
if (_sequencer.getReliableOutputChannel()->getOffset() < _reliableDeltaReceivedOffset) {
Bitstream& out = _sequencer.startPacket();
out << QVariant();
_sequencer.endPacket();
return false;
}
_reliableDeltaReceivedOffset = 0;
_reliableDeltaData = MetavoxelData();
}
Bitstream& out = _sequencer.startPacket(); Bitstream& out = _sequencer.startPacket();
out << QVariant::fromValue(MetavoxelDeltaMessage()); out << QVariant::fromValue(MetavoxelDeltaMessage());
PacketRecord* sendRecord = getLastAcknowledgedSendRecord(); PacketRecord* sendRecord = getLastAcknowledgedSendRecord();
_data.writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod); out.setBytesRemaining(_sequencer.getMaxPacketSize());
_sequencer.endPacket(); try {
_data.writeDelta(sendRecord->getData(), sendRecord->getLOD(), out, _lod);
// record the send _sequencer.endPacket();
_sendRecords.append(maybeCreateSendRecord());
} catch (const ByteLimitExceededException& exception) {
_sequencer.cancelPacket();
// we need to send the delta on the reliable channel
ReliableChannel* channel = _sequencer.getReliableOutputChannel();
channel->startMessage();
channel->getBitstream() << QVariant::fromValue(MetavoxelDeltaMessage());
_data.writeDelta(sendRecord->getData(), sendRecord->getLOD(), channel->getBitstream(), _lod);
channel->endMessage();
_reliableDeltaReceivedOffset = channel->getBytesWritten();
_reliableDeltaData = _data;
_reliableDeltaLOD = _lod;
Bitstream& out = _sequencer.startPacket();
out << QVariant();
_sequencer.endPacket();
}
} else { } else {
// enqueue some number of high priority messages // enqueue some number of high priority messages
const float MIN_HIGH_PRIORITY_MESSAGES = 0.0f; const float MIN_HIGH_PRIORITY_MESSAGES = 0.0f;
@ -957,9 +979,6 @@ bool TestEndpoint::simulate(int iterationNumber) {
qDebug() << message; qDebug() << message;
return true; return true;
} }
// record the send
_sendRecords.append(maybeCreateSendRecord());
} }
maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent); maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent);
maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent); maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent);
@ -995,7 +1014,7 @@ void TestEndpoint::sendDatagram(const QByteArray& datagram) {
// some are received out of order // some are received out of order
const float REORDER_PROBABILITY = 0.1f; const float REORDER_PROBABILITY = 0.1f;
if (randFloat() < REORDER_PROBABILITY * probabilityMultiplier) { if (randFloat() < REORDER_PROBABILITY * probabilityMultiplier) {
const int MIN_DELAY = 1; const int MIN_DELAY = 2;
const int MAX_DELAY = 5; const int MAX_DELAY = 5;
// have to copy the datagram; the one we're passed is a reference to a shared buffer // have to copy the datagram; the one we're passed is a reference to a shared buffer
_delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()), _delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()),
@ -1008,58 +1027,32 @@ void TestEndpoint::sendDatagram(const QByteArray& datagram) {
} }
} }
_other->parseData(datagram); _delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()), 1));
} }
void TestEndpoint::readMessage(Bitstream& in) { void TestEndpoint::readMessage(Bitstream& in) {
if (_mode == CONGESTION_MODE) { if (_mode == CONGESTION_MODE) {
QVariant message; QVariant message;
in >> message; in >> message;
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
return; return;
} }
if (_mode == METAVOXEL_CLIENT_MODE) { if (_mode == METAVOXEL_CLIENT_MODE) {
QVariant message; QVariant message;
in >> message; in >> message;
handleMessage(message, in); handleMessage(message, in);
// deep-compare data to sent version
int packetNumber = _sequencer.getIncomingPacketNumber();
foreach (PacketRecord* record, _other->_sendRecords) {
TestSendRecord* sendRecord = static_cast<TestSendRecord*>(record);
if (sendRecord->getPacketNumber() == packetNumber) {
if (!sendRecord->getData().deepEquals(_data, getLastAcknowledgedSendRecord()->getLOD())) {
qDebug() << "Sent/received metavoxel data mismatch.";
exit(true);
}
break;
}
}
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
return; return;
} }
if (_mode == METAVOXEL_SERVER_MODE) { if (_mode == METAVOXEL_SERVER_MODE) {
QVariant message; QVariant message;
in >> message; in >> message;
handleMessage(message, in); handleMessage(message, in);
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
return; return;
} }
SequencedTestMessage message; SequencedTestMessage message;
in >> message; in >> message;
_remoteState = message.state; _remoteState = message.state;
// record the receipt
_receiveRecords.append(maybeCreateReceiveRecord());
for (QList<SequencedTestMessage>::iterator it = _other->_unreliableMessagesSent.begin(); for (QList<SequencedTestMessage>::iterator it = _other->_unreliableMessagesSent.begin();
it != _other->_unreliableMessagesSent.end(); it++) { it != _other->_unreliableMessagesSent.end(); it++) {
if (it->sequenceNumber == message.sequenceNumber) { if (it->sequenceNumber == message.sequenceNumber) {
@ -1089,6 +1082,7 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) {
} else if (userType == MetavoxelDeltaMessage::Type) { } else if (userType == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord(); PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, getLastAcknowledgedSendRecord()->getLOD()); _data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, getLastAcknowledgedSendRecord()->getLOD());
compareMetavoxelData();
} else if (userType == QMetaType::QVariantList) { } else if (userType == QMetaType::QVariantList) {
foreach (const QVariant& element, message.toList()) { foreach (const QVariant& element, message.toList()) {
@ -1098,6 +1092,9 @@ void TestEndpoint::handleMessage(const QVariant& message, Bitstream& in) {
} }
PacketRecord* TestEndpoint::maybeCreateSendRecord() const { PacketRecord* TestEndpoint::maybeCreateSendRecord() const {
if (_reliableDeltaReceivedOffset > 0) {
return new TestSendRecord(_reliableDeltaLOD, _reliableDeltaData, _localState, _sequencer.getOutgoingPacketNumber());
}
return new TestSendRecord(_lod, (_mode == METAVOXEL_CLIENT_MODE) ? MetavoxelData() : _data, return new TestSendRecord(_lod, (_mode == METAVOXEL_CLIENT_MODE) ? MetavoxelData() : _data,
_localState, _sequencer.getOutgoingPacketNumber()); _localState, _sequencer.getOutgoingPacketNumber());
} }
@ -1121,7 +1118,13 @@ void TestEndpoint::handleHighPriorityMessage(const QVariant& message) {
highPriorityMessagesReceived++; highPriorityMessagesReceived++;
} }
void TestEndpoint::handleReliableMessage(const QVariant& message) { void TestEndpoint::handleReliableMessage(const QVariant& message, Bitstream& in) {
if (message.userType() == MetavoxelDeltaMessage::Type) {
PacketRecord* receiveRecord = getLastAcknowledgedReceiveRecord();
_data.readDelta(receiveRecord->getData(), receiveRecord->getLOD(), in, getLastAcknowledgedSendRecord()->getLOD());
compareMetavoxelData();
return;
}
if (message.userType() == ClearSharedObjectMessage::Type || if (message.userType() == ClearSharedObjectMessage::Type ||
message.userType() == ClearMainChannelSharedObjectMessage::Type) { message.userType() == ClearMainChannelSharedObjectMessage::Type) {
return; return;
@ -1150,6 +1153,23 @@ void TestEndpoint::readReliableChannel() {
streamedBytesReceived += bytes.size(); streamedBytesReceived += bytes.size();
} }
void TestEndpoint::compareMetavoxelData() {
// deep-compare data to sent version
int packetNumber = _sequencer.getIncomingPacketNumber();
foreach (PacketRecord* record, _other->_sendRecords) {
TestSendRecord* sendRecord = static_cast<TestSendRecord*>(record);
if (sendRecord->getPacketNumber() == packetNumber) {
if (!sendRecord->getData().deepEquals(_data, getLastAcknowledgedSendRecord()->getLOD())) {
qDebug() << "Sent/received metavoxel data mismatch.";
exit(true);
}
return;
}
}
qDebug() << "Received metavoxel data with no corresponding send." << packetNumber;
exit(true);
}
TestSharedObjectA::TestSharedObjectA(float foo, TestEnum baz, TestFlags bong) : TestSharedObjectA::TestSharedObjectA(float foo, TestEnum baz, TestFlags bong) :
_foo(foo), _foo(foo),
_baz(baz), _baz(baz),

View file

@ -64,11 +64,13 @@ protected:
private slots: private slots:
void handleHighPriorityMessage(const QVariant& message); void handleHighPriorityMessage(const QVariant& message);
void handleReliableMessage(const QVariant& message); void handleReliableMessage(const QVariant& message, Bitstream& in);
void readReliableChannel(); void readReliableChannel();
private: private:
void compareMetavoxelData();
Mode _mode; Mode _mode;
SharedObjectPointer _localState; SharedObjectPointer _localState;
@ -94,6 +96,10 @@ private:
float _reliableMessagesToSend; float _reliableMessagesToSend;
QVariantList _reliableMessagesSent; QVariantList _reliableMessagesSent;
CircularBuffer _dataStreamed; CircularBuffer _dataStreamed;
int _reliableDeltaReceivedOffset;
MetavoxelData _reliableDeltaData;
MetavoxelLOD _reliableDeltaLOD;
}; };
/// A simple shared object. /// A simple shared object.