Various bits of streaming work: send delete messages for main channel on

reliable channel, test messages on main channel.
This commit is contained in:
Andrzej Kapolka 2014-02-12 12:33:40 -08:00
parent 62db3ebb30
commit f9b0ff0608
6 changed files with 96 additions and 75 deletions

View file

@ -42,6 +42,7 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject*
_outgoingDatagramStream.setByteOrder(QDataStream::LittleEndian);
connect(&_outputStream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
connect(this, SIGNAL(receivedHighPriorityMessage(const QVariant&)), SLOT(handleHighPriorityMessage(const QVariant&)));
memcpy(_outgoingDatagram.data(), datagramHeader.constData(), _datagramHeaderSize);
}
@ -182,7 +183,7 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
QVariant data;
_inputStream >> data;
if ((int)i >= _receivedHighPriorityMessages) {
handleHighPriorityMessage(data);
emit receivedHighPriorityMessage(data);
}
}
_receivedHighPriorityMessages = highPriorityMessageCount;
@ -208,9 +209,22 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
}
void DatagramSequencer::sendClearSharedObjectMessage(int id) {
// for now, high priority
ClearSharedObjectMessage message = { id };
sendHighPriorityMessage(QVariant::fromValue(message));
// send it low-priority unless the channel has messages disabled
ReliableChannel* channel = getReliableOutputChannel();
if (channel->getMessagesEnabled()) {
ClearMainChannelSharedObjectMessage message = { id };
channel->sendMessage(QVariant::fromValue(message));
} else {
ClearSharedObjectMessage message = { id };
sendHighPriorityMessage(QVariant::fromValue(message));
}
}
void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
if (data.userType() == ClearSharedObjectMessage::Type) {
_inputStream.clearSharedObject(data.value<ClearSharedObjectMessage>().id);
}
}
void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
@ -303,15 +317,6 @@ void DatagramSequencer::sendPacket(const QByteArray& packet, const QVector<Chann
} while(offset < packet.size());
}
void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
if (data.userType() == ClearSharedObjectMessage::Type) {
_inputStream.clearSharedObject(data.value<ClearSharedObjectMessage>().id);
} else {
emit receivedHighPriorityMessage(data);
}
}
const int INITIAL_CIRCULAR_BUFFER_CAPACITY = 16;
CircularBuffer::CircularBuffer(QObject* parent) :
@ -592,6 +597,16 @@ void ReliableChannel::sendClearSharedObjectMessage(int id) {
sendMessage(QVariant::fromValue(message));
}
void ReliableChannel::handleMessage(const QVariant& message) {
if (message.userType() == ClearSharedObjectMessage::Type) {
_bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id);
} else if (message.userType() == ClearMainChannelSharedObjectMessage::Type) {
static_cast<DatagramSequencer*>(parent())->_inputStream.clearSharedObject(
message.value<ClearMainChannelSharedObjectMessage>().id);
}
}
ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool output) :
QObject(sequencer),
_index(index),
@ -600,12 +615,13 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
_priority(1.0f),
_offset(0),
_writePosition(0),
_expectingMessage(true) {
_messagesEnabled(true) {
_buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly);
_dataStream.setByteOrder(QDataStream::LittleEndian);
connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
connect(this, SIGNAL(receivedMessage(const QVariant&)), SLOT(handleMessage(const QVariant&)));
}
void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) {
@ -719,7 +735,7 @@ void ReliableChannel::readData(QDataStream& in) {
forever {
// if we're expecting a message, peek into the buffer to see if we have the whole thing.
// if so, read it in, handle it, and loop back around in case there are more
if (_expectingMessage) {
if (_messagesEnabled) {
int available = _buffer.bytesAvailable();
if (available >= sizeof(quint32)) {
quint32 length;
@ -729,7 +745,7 @@ void ReliableChannel::readData(QDataStream& in) {
QVariant message;
_bitstream >> message;
_bitstream.reset();
handleMessage(message);
emit receivedMessage(message);
continue;
}
}
@ -747,11 +763,3 @@ void ReliableChannel::readData(QDataStream& in) {
}
}
void ReliableChannel::handleMessage(const QVariant& message) {
if (message.userType() == ClearSharedObjectMessage::Type) {
_bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id);
} else {
emit receivedMessage(message);
}
}

View file

@ -94,6 +94,7 @@ signals:
private slots:
void sendClearSharedObjectMessage(int id);
void handleHighPriorityMessage(const QVariant& data);
private:
@ -133,8 +134,6 @@ private:
/// readyToWrite) as necessary.
void sendPacket(const QByteArray& packet, const QVector<ChannelSpan>& spans);
void handleHighPriorityMessage(const QVariant& data);
QList<SendRecord> _sendRecords;
QList<ReceiveRecord> _receiveRecords;
@ -273,12 +272,13 @@ public:
int getBytesAvailable() const;
/// Sets whether we expect to write/read framed messages.
void setMessagesEnabled(bool enabled) { _messagesEnabled = enabled; }
bool getMessagesEnabled() const { return _messagesEnabled; }
/// Sends a framed message on this channel.
void sendMessage(const QVariant& message);
/// For input channels, sets whether the channel is expecting a framed message.
void setExpectingMessage(bool expectingMessage) { _expectingMessage = expectingMessage; }
signals:
void receivedMessage(const QVariant& message);
@ -286,7 +286,8 @@ signals:
private slots:
void sendClearSharedObjectMessage(int id);
void handleMessage(const QVariant& message);
private:
friend class DatagramSequencer;
@ -300,7 +301,6 @@ private:
void spanAcknowledged(const DatagramSequencer::ChannelSpan& span);
void readData(QDataStream& in);
void handleMessage(const QVariant& message);
int _index;
CircularBuffer _buffer;
@ -312,7 +312,7 @@ private:
int _offset;
int _writePosition;
SpanList _acknowledged;
bool _expectingMessage;
bool _messagesEnabled;
};
#endif /* defined(__interface__DatagramSequencer__) */

View file

@ -32,6 +32,17 @@ public:
DECLARE_STREAMABLE_METATYPE(ClearSharedObjectMessage)
/// Clears the mapping for a shared object on the main channel (as opposed to the one on which the message was sent).
class ClearMainChannelSharedObjectMessage {
STREAMABLE
public:
STREAM int id;
};
DECLARE_STREAMABLE_METATYPE(ClearMainChannelSharedObjectMessage)
/// A message containing the state of a client.
class ClientStateMessage {
STREAMABLE

View file

@ -22,10 +22,10 @@ static int highPriorityMessagesSent = 0;
static int highPriorityMessagesReceived = 0;
static int unreliableMessagesSent = 0;
static int unreliableMessagesReceived = 0;
static int reliableMessagesSent = 0;
static int reliableMessagesReceived = 0;
static int streamedBytesSent = 0;
static int streamedBytesReceived = 0;
static int lowPriorityStreamedBytesSent = 0;
static int lowPriorityStreamedBytesReceived = 0;
bool MetavoxelTests::run() {
@ -51,9 +51,8 @@ bool MetavoxelTests::run() {
qDebug() << "Sent" << highPriorityMessagesSent << "high priority messages, received" << highPriorityMessagesReceived;
qDebug() << "Sent" << unreliableMessagesSent << "unreliable messages, received" << unreliableMessagesReceived;
qDebug() << "Sent" << reliableMessagesSent << "reliable messages, received" << reliableMessagesReceived;
qDebug() << "Sent" << streamedBytesSent << "streamed bytes, received" << streamedBytesReceived;
qDebug() << "Sent" << lowPriorityStreamedBytesSent << "low-priority streamed bytes, received" <<
lowPriorityStreamedBytesReceived;
qDebug() << "Sent" << datagramsSent << "datagrams, received" << datagramsReceived;
qDebug() << "All tests passed!";
@ -77,30 +76,31 @@ static QByteArray createRandomBytes() {
Endpoint::Endpoint(const QByteArray& datagramHeader) :
_sequencer(new DatagramSequencer(datagramHeader, this)),
_highPriorityMessagesToSend(0.0f) {
_highPriorityMessagesToSend(0.0f),
_reliableMessagesToSend(0.0f) {
connect(_sequencer, SIGNAL(readyToWrite(const QByteArray&)), SLOT(sendDatagram(const QByteArray&)));
connect(_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));
connect(_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)),
SLOT(handleHighPriorityMessage(const QVariant&)));
ReliableChannel* firstInput = _sequencer->getReliableInputChannel();
firstInput->setExpectingMessage(false);
connect(&firstInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
connect(_sequencer->getReliableInputChannel(), SIGNAL(receivedMessage(const QVariant&)),
SLOT(handleReliableMessage(const QVariant&)));
ReliableChannel* secondInput = _sequencer->getReliableInputChannel(1);
secondInput->setExpectingMessage(false);
connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel()));
secondInput->setMessagesEnabled(false);
connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
// enqueue a large amount of data in a low-priority channel
ReliableChannel* output = _sequencer->getReliableOutputChannel(1);
output->setPriority(0.25f);
const int MIN_LOW_PRIORITY_DATA = 100000;
const int MAX_LOW_PRIORITY_DATA = 200000;
QByteArray bytes = createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA);
_lowPriorityDataStreamed.append(bytes);
output->setMessagesEnabled(false);
const int MIN_STREAM_BYTES = 100000;
const int MAX_STREAM_BYTES = 200000;
QByteArray bytes = createRandomBytes(MIN_STREAM_BYTES, MAX_STREAM_BYTES);
_dataStreamed.append(bytes);
output->getBuffer().write(bytes);
lowPriorityStreamedBytesSent += bytes.size();
streamedBytesSent += bytes.size();
}
static QVariant createRandomMessage() {
@ -166,13 +166,17 @@ bool Endpoint::simulate(int iterationNumber) {
_highPriorityMessagesToSend -= 1.0f;
}
// stream some random data
const int MIN_BYTES_TO_STREAM = 10;
const int MAX_BYTES_TO_STREAM = 100;
QByteArray bytes = createRandomBytes(MIN_BYTES_TO_STREAM, MAX_BYTES_TO_STREAM);
_dataStreamed.append(bytes);
streamedBytesSent += bytes.size();
_sequencer->getReliableOutputChannel()->getDataStream().writeRawData(bytes.constData(), bytes.size());
// and some number of reliable messages
const float MIN_RELIABLE_MESSAGES = 0.0f;
const float MAX_RELIABLE_MESSAGES = 4.0f;
_reliableMessagesToSend += randFloatInRange(MIN_RELIABLE_MESSAGES, MAX_RELIABLE_MESSAGES);
while (_reliableMessagesToSend >= 1.0f) {
QVariant message = createRandomMessage();
_reliableMessagesSent.append(message);
_sequencer->getReliableOutputChannel()->sendMessage(message);
reliableMessagesSent++;
_reliableMessagesToSend -= 1.0f;
}
// send a packet
try {
@ -249,8 +253,19 @@ void Endpoint::readMessage(Bitstream& in) {
throw QString("Received unsent/already sent unreliable message.");
}
void Endpoint::handleReliableMessage(const QVariant& message) {
if (_other->_reliableMessagesSent.isEmpty()) {
throw QString("Received unsent/already sent reliable message.");
}
QVariant sentMessage = _other->_reliableMessagesSent.takeFirst();
if (!messagesEqual(message, sentMessage)) {
throw QString("Sent/received reliable message mismatch.");
}
reliableMessagesReceived++;
}
void Endpoint::readReliableChannel() {
CircularBuffer& buffer = _sequencer->getReliableInputChannel()->getBuffer();
CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer();
QByteArray bytes = buffer.read(buffer.bytesAvailable());
if (_other->_dataStreamed.size() < bytes.size()) {
throw QString("Received unsent/already sent streamed data.");
@ -262,17 +277,3 @@ void Endpoint::readReliableChannel() {
}
streamedBytesReceived += bytes.size();
}
void Endpoint::readLowPriorityReliableChannel() {
CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer();
QByteArray bytes = buffer.read(buffer.bytesAvailable());
if (_other->_lowPriorityDataStreamed.size() < bytes.size()) {
throw QString("Received unsent/already sent low-priority streamed data.");
}
QByteArray compare = _other->_lowPriorityDataStreamed.readBytes(0, bytes.size());
_other->_lowPriorityDataStreamed.remove(bytes.size());
if (compare != bytes) {
throw QString("Sent/received low-priority streamed data mismatch.");
}
lowPriorityStreamedBytesReceived += bytes.size();
}

View file

@ -48,9 +48,9 @@ private slots:
void sendDatagram(const QByteArray& datagram);
void handleHighPriorityMessage(const QVariant& message);
void readMessage(Bitstream& in);
void handleReliableMessage(const QVariant& message);
void readReliableChannel();
void readLowPriorityReliableChannel();
private:
DatagramSequencer* _sequencer;
@ -59,8 +59,9 @@ private:
float _highPriorityMessagesToSend;
QVariantList _highPriorityMessagesSent;
QList<SequencedTestMessage> _unreliableMessagesSent;
float _reliableMessagesToSend;
QVariantList _reliableMessagesSent;
CircularBuffer _dataStreamed;
CircularBuffer _lowPriorityDataStreamed;
};
/// A simple test message.
@ -88,7 +89,7 @@ public:
DECLARE_STREAMABLE_METATYPE(TestMessageB)
// A test message that demonstrates inheritance and composition.
class TestMessageC : public TestMessageA {
class TestMessageC : STREAM public TestMessageA {
STREAMABLE
public:

View file

@ -121,7 +121,7 @@ void generateOutput (QTextStream& out, const QList<Streamable>& streamables) {
out << " &&\n";
out << " ";
}
out << "static_cast<" << base << "&>(first) == static_cast<" << base << "&>(second)";
out << "static_cast<const " << base << "&>(first) == static_cast<const " << base << "&>(second)";
first = false;
}
foreach (const QString& field, str.fields) {
@ -147,7 +147,7 @@ void generateOutput (QTextStream& out, const QList<Streamable>& streamables) {
out << " ||\n";
out << " ";
}
out << "static_cast<" << base << "&>(first) != static_cast<" << base << "&>(second)";
out << "static_cast<const " << base << "&>(first) != static_cast<const " << base << "&>(second)";
first = false;
}
foreach (const QString& field, str.fields) {