From f9b0ff0608bbffc91f89f71bff70840ca61bb25e Mon Sep 17 00:00:00 2001
From: Andrzej Kapolka <drzej.k@gmail.com>
Date: Wed, 12 Feb 2014 12:33:40 -0800
Subject: [PATCH] Various bits of streaming work: send delete messages for main
 channel on reliable channel, test messages on main channel.

---
 .../metavoxels/src/DatagramSequencer.cpp      | 56 ++++++++------
 libraries/metavoxels/src/DatagramSequencer.h  | 16 ++--
 libraries/metavoxels/src/MetavoxelMessages.h  | 11 +++
 tests/metavoxels/src/MetavoxelTests.cpp       | 75 ++++++++++---------
 tests/metavoxels/src/MetavoxelTests.h         |  9 ++-
 tools/mtc/src/main.cpp                        |  4 +-
 6 files changed, 96 insertions(+), 75 deletions(-)

diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp
index 77f893ccb6..a60d4e9df3 100644
--- a/libraries/metavoxels/src/DatagramSequencer.cpp
+++ b/libraries/metavoxels/src/DatagramSequencer.cpp
@@ -42,6 +42,7 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject*
     _outgoingDatagramStream.setByteOrder(QDataStream::LittleEndian);
 
     connect(&_outputStream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
+    connect(this, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleHighPriorityMessage(const QVariant&)));
 
     memcpy(_outgoingDatagram.data(), datagramHeader.constData(), _datagramHeaderSize);
 }
@@ -182,7 +183,7 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
         QVariant data;
         _inputStream >> data;
         if ((int)i >= _receivedHighPriorityMessages) {
-            handleHighPriorityMessage(data);
+            emit receivedHighPriorityMessage(data);
         }
     }
     _receivedHighPriorityMessages = highPriorityMessageCount;
@@ -208,9 +209,22 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
 }
 
 void DatagramSequencer::sendClearSharedObjectMessage(int id) {
-    // for now, high priority
-    ClearSharedObjectMessage message = { id };
-    sendHighPriorityMessage(QVariant::fromValue(message));
+    // send it low-priority unless the channel has messages disabled
+    ReliableChannel* channel = getReliableOutputChannel();
+    if (channel->getMessagesEnabled()) {
+        ClearMainChannelSharedObjectMessage message = { id };
+        channel->sendMessage(QVariant::fromValue(message));
+        
+    } else {
+        ClearSharedObjectMessage message = { id };
+        sendHighPriorityMessage(QVariant::fromValue(message));
+    }
+}
+
+void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
+    if (data.userType() == ClearSharedObjectMessage::Type) {
+        _inputStream.clearSharedObject(data.value<ClearSharedObjectMessage>().id);
+    }
 }
 
 void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
@@ -303,15 +317,6 @@ void DatagramSequencer::sendPacket(const QByteArray& packet, const QVector<Chann
     } while(offset < packet.size());
 }
 
-void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
-    if (data.userType() == ClearSharedObjectMessage::Type) {
-        _inputStream.clearSharedObject(data.value<ClearSharedObjectMessage>().id);
-    
-    } else {
-        emit receivedHighPriorityMessage(data);
-    }
-}
-
 const int INITIAL_CIRCULAR_BUFFER_CAPACITY = 16;
 
 CircularBuffer::CircularBuffer(QObject* parent) :
@@ -592,6 +597,16 @@ void ReliableChannel::sendClearSharedObjectMessage(int id) {
     sendMessage(QVariant::fromValue(message));
 }
 
+void ReliableChannel::handleMessage(const QVariant& message) {
+    if (message.userType() == ClearSharedObjectMessage::Type) {
+        _bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id);
+    
+    } else if (message.userType() == ClearMainChannelSharedObjectMessage::Type) {
+        static_cast<DatagramSequencer*>(parent())->_inputStream.clearSharedObject(
+            message.value<ClearMainChannelSharedObjectMessage>().id);
+    }
+}
+
 ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool output) :
     QObject(sequencer),
     _index(index),
@@ -600,12 +615,13 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
     _priority(1.0f),
     _offset(0),
     _writePosition(0),
-    _expectingMessage(true) {
+    _messagesEnabled(true) {
     
     _buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly);
     _dataStream.setByteOrder(QDataStream::LittleEndian);
     
     connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
+    connect(this, SIGNAL(receivedMessage(const QVariant&)), SLOT(handleMessage(const QVariant&)));
 }
 
 void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) {
@@ -719,7 +735,7 @@ void ReliableChannel::readData(QDataStream& in) {
     forever {
         // if we're expecting a message, peek into the buffer to see if we have the whole thing.
         // if so, read it in, handle it, and loop back around in case there are more
-        if (_expectingMessage) {
+        if (_messagesEnabled) {
             int available = _buffer.bytesAvailable();
             if (available >= sizeof(quint32)) {
                 quint32 length;
@@ -729,7 +745,7 @@ void ReliableChannel::readData(QDataStream& in) {
                     QVariant message;
                     _bitstream >> message;
                     _bitstream.reset();
-                    handleMessage(message);
+                    emit receivedMessage(message);
                     continue;
                 }
             }
@@ -747,11 +763,3 @@ void ReliableChannel::readData(QDataStream& in) {
     }
 }
 
-void ReliableChannel::handleMessage(const QVariant& message) {
-    if (message.userType() == ClearSharedObjectMessage::Type) {
-        _bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id);
-    
-    } else {
-        emit receivedMessage(message);
-    }
-}
diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h
index 9adebfbfa4..7156175f51 100644
--- a/libraries/metavoxels/src/DatagramSequencer.h
+++ b/libraries/metavoxels/src/DatagramSequencer.h
@@ -94,6 +94,7 @@ signals:
 private slots:
 
     void sendClearSharedObjectMessage(int id);
+    void handleHighPriorityMessage(const QVariant& data);
     
 private:
     
@@ -133,8 +134,6 @@ private:
     /// readyToWrite) as necessary.
     void sendPacket(const QByteArray& packet, const QVector<ChannelSpan>& spans);
     
-    void handleHighPriorityMessage(const QVariant& data);
-    
     QList<SendRecord> _sendRecords;
     QList<ReceiveRecord> _receiveRecords;
     
@@ -273,12 +272,13 @@ public:
 
     int getBytesAvailable() const;
 
+    /// Sets whether we expect to write/read framed messages.
+    void setMessagesEnabled(bool enabled) { _messagesEnabled = enabled; }
+    bool getMessagesEnabled() const { return _messagesEnabled; }
+
     /// Sends a framed message on this channel.
     void sendMessage(const QVariant& message);
 
-    /// For input channels, sets whether the channel is expecting a framed message.
-    void setExpectingMessage(bool expectingMessage) { _expectingMessage = expectingMessage; }
-
 signals:
 
     void receivedMessage(const QVariant& message);
@@ -286,7 +286,8 @@ signals:
 private slots:
 
     void sendClearSharedObjectMessage(int id);
-
+    void handleMessage(const QVariant& message);
+    
 private:
     
     friend class DatagramSequencer;
@@ -300,7 +301,6 @@ private:
     void spanAcknowledged(const DatagramSequencer::ChannelSpan& span);
     
     void readData(QDataStream& in);
-    void handleMessage(const QVariant& message);
     
     int _index;
     CircularBuffer _buffer;
@@ -312,7 +312,7 @@ private:
     int _offset;
     int _writePosition;
     SpanList _acknowledged;
-    bool _expectingMessage;
+    bool _messagesEnabled;
 };
 
 #endif /* defined(__interface__DatagramSequencer__) */
diff --git a/libraries/metavoxels/src/MetavoxelMessages.h b/libraries/metavoxels/src/MetavoxelMessages.h
index c3cc78c5bc..1c547809fb 100644
--- a/libraries/metavoxels/src/MetavoxelMessages.h
+++ b/libraries/metavoxels/src/MetavoxelMessages.h
@@ -32,6 +32,17 @@ public:
 
 DECLARE_STREAMABLE_METATYPE(ClearSharedObjectMessage)
 
+/// Clears the mapping for a shared object on the main channel (as opposed to the one on which the message was sent).
+class ClearMainChannelSharedObjectMessage {
+    STREAMABLE
+
+public:
+    
+    STREAM int id; 
+};
+
+DECLARE_STREAMABLE_METATYPE(ClearMainChannelSharedObjectMessage)
+
 /// A message containing the state of a client.
 class ClientStateMessage {
     STREAMABLE
diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp
index 49ff5714c0..530f7e3108 100644
--- a/tests/metavoxels/src/MetavoxelTests.cpp
+++ b/tests/metavoxels/src/MetavoxelTests.cpp
@@ -22,10 +22,10 @@ static int highPriorityMessagesSent = 0;
 static int highPriorityMessagesReceived = 0;
 static int unreliableMessagesSent = 0;
 static int unreliableMessagesReceived = 0;
+static int reliableMessagesSent = 0;
+static int reliableMessagesReceived = 0;
 static int streamedBytesSent = 0;
 static int streamedBytesReceived = 0;
-static int lowPriorityStreamedBytesSent = 0;
-static int lowPriorityStreamedBytesReceived = 0;
 
 bool MetavoxelTests::run() {
     
@@ -51,9 +51,8 @@ bool MetavoxelTests::run() {
     
     qDebug() << "Sent" << highPriorityMessagesSent << "high priority messages, received" << highPriorityMessagesReceived;
     qDebug() << "Sent" << unreliableMessagesSent << "unreliable messages, received" << unreliableMessagesReceived;
+    qDebug() << "Sent" << reliableMessagesSent << "reliable messages, received" << reliableMessagesReceived;
     qDebug() << "Sent" << streamedBytesSent << "streamed bytes, received" << streamedBytesReceived;
-    qDebug() << "Sent" << lowPriorityStreamedBytesSent << "low-priority streamed bytes, received" <<
-        lowPriorityStreamedBytesReceived;
     qDebug() << "Sent" << datagramsSent << "datagrams, received" << datagramsReceived;
     
     qDebug() << "All tests passed!";
@@ -77,30 +76,31 @@ static QByteArray createRandomBytes() {
 
 Endpoint::Endpoint(const QByteArray& datagramHeader) :
     _sequencer(new DatagramSequencer(datagramHeader, this)),
-    _highPriorityMessagesToSend(0.0f) {
+    _highPriorityMessagesToSend(0.0f),
+    _reliableMessagesToSend(0.0f) {
     
     connect(_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&)));
     connect(_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));
     connect(_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)),
         SLOT(handleHighPriorityMessage(const QVariant&)));
     
-    ReliableChannel* firstInput = _sequencer->getReliableInputChannel();
-    firstInput->setExpectingMessage(false);
-    connect(&firstInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
+    connect(_sequencer->getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&)),
+        SLOT(handleReliableMessage(const QVariant&)));
     
     ReliableChannel* secondInput = _sequencer->getReliableInputChannel(1);
-    secondInput->setExpectingMessage(false);
-    connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel()));
+    secondInput->setMessagesEnabled(false);
+    connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
     
     // enqueue a large amount of data in a low-priority channel
     ReliableChannel* output = _sequencer->getReliableOutputChannel(1);
     output->setPriority(0.25f);
-    const int MIN_LOW_PRIORITY_DATA = 100000;
-    const int MAX_LOW_PRIORITY_DATA = 200000;
-    QByteArray bytes = createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA);
-    _lowPriorityDataStreamed.append(bytes);
+    output->setMessagesEnabled(false);
+    const int MIN_STREAM_BYTES = 100000;
+    const int MAX_STREAM_BYTES = 200000;
+    QByteArray bytes = createRandomBytes(MIN_STREAM_BYTES, MAX_STREAM_BYTES);
+    _dataStreamed.append(bytes);
     output->getBuffer().write(bytes);
-    lowPriorityStreamedBytesSent += bytes.size();
+    streamedBytesSent += bytes.size();
 }
 
 static QVariant createRandomMessage() {
@@ -166,13 +166,17 @@ bool Endpoint::simulate(int iterationNumber) {
         _highPriorityMessagesToSend -= 1.0f;
     }
     
-    // stream some random data
-    const int MIN_BYTES_TO_STREAM = 10;
-    const int MAX_BYTES_TO_STREAM = 100;
-    QByteArray bytes = createRandomBytes(MIN_BYTES_TO_STREAM, MAX_BYTES_TO_STREAM);
-    _dataStreamed.append(bytes);
-    streamedBytesSent += bytes.size();
-    _sequencer->getReliableOutputChannel()->getDataStream().writeRawData(bytes.constData(), bytes.size());
+    // and some number of reliable messages
+    const float MIN_RELIABLE_MESSAGES = 0.0f;
+    const float MAX_RELIABLE_MESSAGES = 4.0f;
+    _reliableMessagesToSend += randFloatInRange(MIN_RELIABLE_MESSAGES, MAX_RELIABLE_MESSAGES);   
+    while (_reliableMessagesToSend >= 1.0f) {
+        QVariant message = createRandomMessage();
+        _reliableMessagesSent.append(message);
+        _sequencer->getReliableOutputChannel()->sendMessage(message);
+        reliableMessagesSent++;
+        _reliableMessagesToSend -= 1.0f;
+    }
     
     // send a packet
     try {
@@ -249,8 +253,19 @@ void Endpoint::readMessage(Bitstream& in) {
     throw QString("Received unsent/already sent unreliable message.");
 }
 
+void Endpoint::handleReliableMessage(const QVariant& message) {
+    if (_other->_reliableMessagesSent.isEmpty()) {
+        throw QString("Received unsent/already sent reliable message.");
+    }
+    QVariant sentMessage = _other->_reliableMessagesSent.takeFirst();
+    if (!messagesEqual(message, sentMessage)) {
+        throw QString("Sent/received reliable message mismatch.");
+    }
+    reliableMessagesReceived++;
+}
+
 void Endpoint::readReliableChannel() {
-    CircularBuffer& buffer = _sequencer->getReliableInputChannel()->getBuffer();
+    CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer();
     QByteArray bytes = buffer.read(buffer.bytesAvailable());
     if (_other->_dataStreamed.size() < bytes.size()) {
         throw QString("Received unsent/already sent streamed data.");
@@ -262,17 +277,3 @@ void Endpoint::readReliableChannel() {
     }
     streamedBytesReceived += bytes.size();
 }
-
-void Endpoint::readLowPriorityReliableChannel() {
-    CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer();
-    QByteArray bytes = buffer.read(buffer.bytesAvailable());
-    if (_other->_lowPriorityDataStreamed.size() < bytes.size()) {
-        throw QString("Received unsent/already sent low-priority streamed data.");
-    }
-    QByteArray compare = _other->_lowPriorityDataStreamed.readBytes(0, bytes.size());
-    _other->_lowPriorityDataStreamed.remove(bytes.size());
-    if (compare != bytes) {
-        throw QString("Sent/received low-priority streamed data mismatch.");
-    }
-    lowPriorityStreamedBytesReceived += bytes.size();
-}
diff --git a/tests/metavoxels/src/MetavoxelTests.h b/tests/metavoxels/src/MetavoxelTests.h
index b73f7eb07e..f19870ac15 100644
--- a/tests/metavoxels/src/MetavoxelTests.h
+++ b/tests/metavoxels/src/MetavoxelTests.h
@@ -48,9 +48,9 @@ private slots:
     void sendDatagram(const QByteArray& datagram);    
     void handleHighPriorityMessage(const QVariant& message);
     void readMessage(Bitstream& in);
+    void handleReliableMessage(const QVariant& message);
     void readReliableChannel();
-    void readLowPriorityReliableChannel();
-    
+
 private:
     
     DatagramSequencer* _sequencer;
@@ -59,8 +59,9 @@ private:
     float _highPriorityMessagesToSend;
     QVariantList _highPriorityMessagesSent;
     QList<SequencedTestMessage> _unreliableMessagesSent;
+    float _reliableMessagesToSend;
+    QVariantList _reliableMessagesSent;
     CircularBuffer _dataStreamed;
-    CircularBuffer _lowPriorityDataStreamed;
 };
 
 /// A simple test message.
@@ -88,7 +89,7 @@ public:
 DECLARE_STREAMABLE_METATYPE(TestMessageB)
 
 // A test message that demonstrates inheritance and composition.
-class TestMessageC : public TestMessageA {
+class TestMessageC : STREAM public TestMessageA {
     STREAMABLE
 
 public:
diff --git a/tools/mtc/src/main.cpp b/tools/mtc/src/main.cpp
index 050fe0e418..248c2ddd2d 100644
--- a/tools/mtc/src/main.cpp
+++ b/tools/mtc/src/main.cpp
@@ -121,7 +121,7 @@ void generateOutput (QTextStream& out, const QList<Streamable>& streamables) {
                     out << " &&\n";
                     out << "        ";
                 }
-                out << "static_cast<" << base << "&>(first) == static_cast<" << base << "&>(second)";
+                out << "static_cast<const " << base << "&>(first) == static_cast<const " << base << "&>(second)";
                 first = false;
             }
             foreach (const QString& field, str.fields) {
@@ -147,7 +147,7 @@ void generateOutput (QTextStream& out, const QList<Streamable>& streamables) {
                     out << " ||\n";
                     out << "        ";
                 }
-                out << "static_cast<" << base << "&>(first) != static_cast<" << base << "&>(second)";
+                out << "static_cast<const " << base << "&>(first) != static_cast<const " << base << "&>(second)";
                 first = false;
             }
             foreach (const QString& field, str.fields) {