diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index 77f893ccb6..a60d4e9df3 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -42,6 +42,7 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* _outgoingDatagramStream.setByteOrder(QDataStream::LittleEndian); connect(&_outputStream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int))); + connect(this, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleHighPriorityMessage(const QVariant&))); memcpy(_outgoingDatagram.data(), datagramHeader.constData(), _datagramHeaderSize); } @@ -182,7 +183,7 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) { QVariant data; _inputStream >> data; if ((int)i >= _receivedHighPriorityMessages) { - handleHighPriorityMessage(data); + emit receivedHighPriorityMessage(data); } } _receivedHighPriorityMessages = highPriorityMessageCount; @@ -208,9 +209,22 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) { } void DatagramSequencer::sendClearSharedObjectMessage(int id) { - // for now, high priority - ClearSharedObjectMessage message = { id }; - sendHighPriorityMessage(QVariant::fromValue(message)); + // send it low-priority unless the channel has messages disabled + ReliableChannel* channel = getReliableOutputChannel(); + if (channel->getMessagesEnabled()) { + ClearMainChannelSharedObjectMessage message = { id }; + channel->sendMessage(QVariant::fromValue(message)); + + } else { + ClearSharedObjectMessage message = { id }; + sendHighPriorityMessage(QVariant::fromValue(message)); + } +} + +void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) { + if (data.userType() == ClearSharedObjectMessage::Type) { + _inputStream.clearSharedObject(data.value().id); + } } void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { @@ -303,15 +317,6 @@ void DatagramSequencer::sendPacket(const QByteArray& packet, const QVector().id); - - } else { - emit receivedHighPriorityMessage(data); - } -} - const int INITIAL_CIRCULAR_BUFFER_CAPACITY = 16; CircularBuffer::CircularBuffer(QObject* parent) : @@ -592,6 +597,16 @@ void ReliableChannel::sendClearSharedObjectMessage(int id) { sendMessage(QVariant::fromValue(message)); } +void ReliableChannel::handleMessage(const QVariant& message) { + if (message.userType() == ClearSharedObjectMessage::Type) { + _bitstream.clearSharedObject(message.value().id); + + } else if (message.userType() == ClearMainChannelSharedObjectMessage::Type) { + static_cast(parent())->_inputStream.clearSharedObject( + message.value().id); + } +} + ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool output) : QObject(sequencer), _index(index), @@ -600,12 +615,13 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o _priority(1.0f), _offset(0), _writePosition(0), - _expectingMessage(true) { + _messagesEnabled(true) { _buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly); _dataStream.setByteOrder(QDataStream::LittleEndian); connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int))); + connect(this, SIGNAL(receivedMessage(const QVariant&)), SLOT(handleMessage(const QVariant&))); } void ReliableChannel::writeData(QDataStream& out, int bytes, QVector& spans) { @@ -719,7 +735,7 @@ void ReliableChannel::readData(QDataStream& in) { forever { // if we're expecting a message, peek into the buffer to see if we have the whole thing. // if so, read it in, handle it, and loop back around in case there are more - if (_expectingMessage) { + if (_messagesEnabled) { int available = _buffer.bytesAvailable(); if (available >= sizeof(quint32)) { quint32 length; @@ -729,7 +745,7 @@ void ReliableChannel::readData(QDataStream& in) { QVariant message; _bitstream >> message; _bitstream.reset(); - handleMessage(message); + emit receivedMessage(message); continue; } } @@ -747,11 +763,3 @@ void ReliableChannel::readData(QDataStream& in) { } } -void ReliableChannel::handleMessage(const QVariant& message) { - if (message.userType() == ClearSharedObjectMessage::Type) { - _bitstream.clearSharedObject(message.value().id); - - } else { - emit receivedMessage(message); - } -} diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index 9adebfbfa4..7156175f51 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -94,6 +94,7 @@ signals: private slots: void sendClearSharedObjectMessage(int id); + void handleHighPriorityMessage(const QVariant& data); private: @@ -133,8 +134,6 @@ private: /// readyToWrite) as necessary. void sendPacket(const QByteArray& packet, const QVector& spans); - void handleHighPriorityMessage(const QVariant& data); - QList _sendRecords; QList _receiveRecords; @@ -273,12 +272,13 @@ public: int getBytesAvailable() const; + /// Sets whether we expect to write/read framed messages. + void setMessagesEnabled(bool enabled) { _messagesEnabled = enabled; } + bool getMessagesEnabled() const { return _messagesEnabled; } + /// Sends a framed message on this channel. void sendMessage(const QVariant& message); - /// For input channels, sets whether the channel is expecting a framed message. - void setExpectingMessage(bool expectingMessage) { _expectingMessage = expectingMessage; } - signals: void receivedMessage(const QVariant& message); @@ -286,7 +286,8 @@ signals: private slots: void sendClearSharedObjectMessage(int id); - + void handleMessage(const QVariant& message); + private: friend class DatagramSequencer; @@ -300,7 +301,6 @@ private: void spanAcknowledged(const DatagramSequencer::ChannelSpan& span); void readData(QDataStream& in); - void handleMessage(const QVariant& message); int _index; CircularBuffer _buffer; @@ -312,7 +312,7 @@ private: int _offset; int _writePosition; SpanList _acknowledged; - bool _expectingMessage; + bool _messagesEnabled; }; #endif /* defined(__interface__DatagramSequencer__) */ diff --git a/libraries/metavoxels/src/MetavoxelMessages.h b/libraries/metavoxels/src/MetavoxelMessages.h index c3cc78c5bc..1c547809fb 100644 --- a/libraries/metavoxels/src/MetavoxelMessages.h +++ b/libraries/metavoxels/src/MetavoxelMessages.h @@ -32,6 +32,17 @@ public: DECLARE_STREAMABLE_METATYPE(ClearSharedObjectMessage) +/// Clears the mapping for a shared object on the main channel (as opposed to the one on which the message was sent). +class ClearMainChannelSharedObjectMessage { + STREAMABLE + +public: + + STREAM int id; +}; + +DECLARE_STREAMABLE_METATYPE(ClearMainChannelSharedObjectMessage) + /// A message containing the state of a client. class ClientStateMessage { STREAMABLE diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index 49ff5714c0..530f7e3108 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -22,10 +22,10 @@ static int highPriorityMessagesSent = 0; static int highPriorityMessagesReceived = 0; static int unreliableMessagesSent = 0; static int unreliableMessagesReceived = 0; +static int reliableMessagesSent = 0; +static int reliableMessagesReceived = 0; static int streamedBytesSent = 0; static int streamedBytesReceived = 0; -static int lowPriorityStreamedBytesSent = 0; -static int lowPriorityStreamedBytesReceived = 0; bool MetavoxelTests::run() { @@ -51,9 +51,8 @@ bool MetavoxelTests::run() { qDebug() << "Sent" << highPriorityMessagesSent << "high priority messages, received" << highPriorityMessagesReceived; qDebug() << "Sent" << unreliableMessagesSent << "unreliable messages, received" << unreliableMessagesReceived; + qDebug() << "Sent" << reliableMessagesSent << "reliable messages, received" << reliableMessagesReceived; qDebug() << "Sent" << streamedBytesSent << "streamed bytes, received" << streamedBytesReceived; - qDebug() << "Sent" << lowPriorityStreamedBytesSent << "low-priority streamed bytes, received" << - lowPriorityStreamedBytesReceived; qDebug() << "Sent" << datagramsSent << "datagrams, received" << datagramsReceived; qDebug() << "All tests passed!"; @@ -77,30 +76,31 @@ static QByteArray createRandomBytes() { Endpoint::Endpoint(const QByteArray& datagramHeader) : _sequencer(new DatagramSequencer(datagramHeader, this)), - _highPriorityMessagesToSend(0.0f) { + _highPriorityMessagesToSend(0.0f), + _reliableMessagesToSend(0.0f) { connect(_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&))); connect(_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&))); connect(_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleHighPriorityMessage(const QVariant&))); - ReliableChannel* firstInput = _sequencer->getReliableInputChannel(); - firstInput->setExpectingMessage(false); - connect(&firstInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel())); + connect(_sequencer->getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&)), + SLOT(handleReliableMessage(const QVariant&))); ReliableChannel* secondInput = _sequencer->getReliableInputChannel(1); - secondInput->setExpectingMessage(false); - connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel())); + secondInput->setMessagesEnabled(false); + connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel())); // enqueue a large amount of data in a low-priority channel ReliableChannel* output = _sequencer->getReliableOutputChannel(1); output->setPriority(0.25f); - const int MIN_LOW_PRIORITY_DATA = 100000; - const int MAX_LOW_PRIORITY_DATA = 200000; - QByteArray bytes = createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA); - _lowPriorityDataStreamed.append(bytes); + output->setMessagesEnabled(false); + const int MIN_STREAM_BYTES = 100000; + const int MAX_STREAM_BYTES = 200000; + QByteArray bytes = createRandomBytes(MIN_STREAM_BYTES, MAX_STREAM_BYTES); + _dataStreamed.append(bytes); output->getBuffer().write(bytes); - lowPriorityStreamedBytesSent += bytes.size(); + streamedBytesSent += bytes.size(); } static QVariant createRandomMessage() { @@ -166,13 +166,17 @@ bool Endpoint::simulate(int iterationNumber) { _highPriorityMessagesToSend -= 1.0f; } - // stream some random data - const int MIN_BYTES_TO_STREAM = 10; - const int MAX_BYTES_TO_STREAM = 100; - QByteArray bytes = createRandomBytes(MIN_BYTES_TO_STREAM, MAX_BYTES_TO_STREAM); - _dataStreamed.append(bytes); - streamedBytesSent += bytes.size(); - _sequencer->getReliableOutputChannel()->getDataStream().writeRawData(bytes.constData(), bytes.size()); + // and some number of reliable messages + const float MIN_RELIABLE_MESSAGES = 0.0f; + const float MAX_RELIABLE_MESSAGES = 4.0f; + _reliableMessagesToSend += randFloatInRange(MIN_RELIABLE_MESSAGES, MAX_RELIABLE_MESSAGES); + while (_reliableMessagesToSend >= 1.0f) { + QVariant message = createRandomMessage(); + _reliableMessagesSent.append(message); + _sequencer->getReliableOutputChannel()->sendMessage(message); + reliableMessagesSent++; + _reliableMessagesToSend -= 1.0f; + } // send a packet try { @@ -249,8 +253,19 @@ void Endpoint::readMessage(Bitstream& in) { throw QString("Received unsent/already sent unreliable message."); } +void Endpoint::handleReliableMessage(const QVariant& message) { + if (_other->_reliableMessagesSent.isEmpty()) { + throw QString("Received unsent/already sent reliable message."); + } + QVariant sentMessage = _other->_reliableMessagesSent.takeFirst(); + if (!messagesEqual(message, sentMessage)) { + throw QString("Sent/received reliable message mismatch."); + } + reliableMessagesReceived++; +} + void Endpoint::readReliableChannel() { - CircularBuffer& buffer = _sequencer->getReliableInputChannel()->getBuffer(); + CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer(); QByteArray bytes = buffer.read(buffer.bytesAvailable()); if (_other->_dataStreamed.size() < bytes.size()) { throw QString("Received unsent/already sent streamed data."); @@ -262,17 +277,3 @@ void Endpoint::readReliableChannel() { } streamedBytesReceived += bytes.size(); } - -void Endpoint::readLowPriorityReliableChannel() { - CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer(); - QByteArray bytes = buffer.read(buffer.bytesAvailable()); - if (_other->_lowPriorityDataStreamed.size() < bytes.size()) { - throw QString("Received unsent/already sent low-priority streamed data."); - } - QByteArray compare = _other->_lowPriorityDataStreamed.readBytes(0, bytes.size()); - _other->_lowPriorityDataStreamed.remove(bytes.size()); - if (compare != bytes) { - throw QString("Sent/received low-priority streamed data mismatch."); - } - lowPriorityStreamedBytesReceived += bytes.size(); -} diff --git a/tests/metavoxels/src/MetavoxelTests.h b/tests/metavoxels/src/MetavoxelTests.h index b73f7eb07e..f19870ac15 100644 --- a/tests/metavoxels/src/MetavoxelTests.h +++ b/tests/metavoxels/src/MetavoxelTests.h @@ -48,9 +48,9 @@ private slots: void sendDatagram(const QByteArray& datagram); void handleHighPriorityMessage(const QVariant& message); void readMessage(Bitstream& in); + void handleReliableMessage(const QVariant& message); void readReliableChannel(); - void readLowPriorityReliableChannel(); - + private: DatagramSequencer* _sequencer; @@ -59,8 +59,9 @@ private: float _highPriorityMessagesToSend; QVariantList _highPriorityMessagesSent; QList _unreliableMessagesSent; + float _reliableMessagesToSend; + QVariantList _reliableMessagesSent; CircularBuffer _dataStreamed; - CircularBuffer _lowPriorityDataStreamed; }; /// A simple test message. @@ -88,7 +89,7 @@ public: DECLARE_STREAMABLE_METATYPE(TestMessageB) // A test message that demonstrates inheritance and composition. -class TestMessageC : public TestMessageA { +class TestMessageC : STREAM public TestMessageA { STREAMABLE public: diff --git a/tools/mtc/src/main.cpp b/tools/mtc/src/main.cpp index 050fe0e418..248c2ddd2d 100644 --- a/tools/mtc/src/main.cpp +++ b/tools/mtc/src/main.cpp @@ -121,7 +121,7 @@ void generateOutput (QTextStream& out, const QList& streamables) { out << " &&\n"; out << " "; } - out << "static_cast<" << base << "&>(first) == static_cast<" << base << "&>(second)"; + out << "static_cast(first) == static_cast(second)"; first = false; } foreach (const QString& field, str.fields) { @@ -147,7 +147,7 @@ void generateOutput (QTextStream& out, const QList& streamables) { out << " ||\n"; out << " "; } - out << "static_cast<" << base << "&>(first) != static_cast<" << base << "&>(second)"; + out << "static_cast(first) != static_cast(second)"; first = false; } foreach (const QString& field, str.fields) {