Fix/tests for streaming.

This commit is contained in:
Andrzej Kapolka 2014-02-09 17:43:18 -08:00
parent 524ceb4ed2
commit 8c9f06ceec
4 changed files with 79 additions and 17 deletions

View file

@ -195,7 +195,7 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
for (int i = 0; i < reliableChannels; i++) { for (int i = 0; i < reliableChannels; i++) {
quint32 channelIndex; quint32 channelIndex;
_incomingPacketStream >> channelIndex; _incomingPacketStream >> channelIndex;
getReliableOutputChannel(channelIndex)->readData(_incomingPacketStream); getReliableInputChannel(channelIndex)->readData(_incomingPacketStream);
} }
_incomingPacketStream.device()->seek(0); _incomingPacketStream.device()->seek(0);
@ -369,7 +369,7 @@ int SpanList::set(int offset, int length) {
int SpanList::setSpans(QList<Span>::iterator it, int length) { int SpanList::setSpans(QList<Span>::iterator it, int length) {
int remainingLength = length; int remainingLength = length;
int totalRemoved = 0; int totalRemoved = 0;
for (; it != _spans.end(); it++) { for (; it != _spans.end(); it = _spans.erase(it)) {
if (remainingLength < it->unset) { if (remainingLength < it->unset) {
it->unset -= remainingLength; it->unset -= remainingLength;
totalRemoved += remainingLength; totalRemoved += remainingLength;
@ -378,7 +378,6 @@ int SpanList::setSpans(QList<Span>::iterator it, int length) {
int combined = it->unset + it->set; int combined = it->unset + it->set;
remainingLength = qMax(remainingLength - combined, 0); remainingLength = qMax(remainingLength - combined, 0);
totalRemoved += combined; totalRemoved += combined;
it = _spans.erase(it);
_totalSet -= it->set; _totalSet -= it->set;
} }
return qMax(length, totalRemoved); return qMax(length, totalRemoved);

View file

@ -58,7 +58,7 @@ public:
/// Returns the output channel at the specified index, creating it if necessary. /// Returns the output channel at the specified index, creating it if necessary.
ReliableChannel* getReliableOutputChannel(int index = 0); ReliableChannel* getReliableOutputChannel(int index = 0);
/// Returns the intput channel at the /// Returns the intput channel at the specified index, creating it if necessary.
ReliableChannel* getReliableInputChannel(int index = 0); ReliableChannel* getReliableInputChannel(int index = 0);
/// Starts a new packet for transmission. /// Starts a new packet for transmission.
@ -208,6 +208,7 @@ public:
int getIndex() const { return _index; } int getIndex() const { return _index; }
QBuffer& getBuffer() { return _buffer; }
QDataStream& getDataStream() { return _dataStream; } QDataStream& getDataStream() { return _dataStream; }
Bitstream& getBitstream() { return _bitstream; } Bitstream& getBitstream() { return _bitstream; }

View file

@ -17,9 +17,13 @@ MetavoxelTests::MetavoxelTests(int& argc, char** argv) :
QCoreApplication(argc, argv) { QCoreApplication(argc, argv) {
} }
static int datagramsSent = 0;
static int datagramsReceived = 0;
static int highPriorityMessagesSent = 0; static int highPriorityMessagesSent = 0;
static int unreliableMessagesSent = 0; static int unreliableMessagesSent = 0;
static int unreliableMessagesReceived = 0; static int unreliableMessagesReceived = 0;
static int streamedBytesSent = 0;
static int lowPriorityStreamedBytesSent = 0;
bool MetavoxelTests::run() { bool MetavoxelTests::run() {
@ -43,15 +47,31 @@ bool MetavoxelTests::run() {
} }
} }
qDebug() << "Sent " << highPriorityMessagesSent << " high priority messages"; qDebug() << "Sent" << highPriorityMessagesSent << "high priority messages";
qDebug() << "Sent" << unreliableMessagesSent << "unreliable messages, received" << unreliableMessagesReceived;
qDebug() << "Sent " << unreliableMessagesSent << " unreliable messages, received " << unreliableMessagesReceived; qDebug() << "Sent" << streamedBytesSent << "streamed bytes";
qDebug() << "Sent" << lowPriorityStreamedBytesSent << "low-priority streamed bytes";
qDebug() << "Sent" << datagramsSent << "datagrams, received" << datagramsReceived;
qDebug() << "All tests passed!"; qDebug() << "All tests passed!";
return false; return false;
} }
static QByteArray createRandomBytes(int minimumSize, int maximumSize) {
QByteArray bytes(randIntInRange(minimumSize, maximumSize), 0);
for (int i = 0; i < bytes.size(); i++) {
bytes[i] = rand();
}
return bytes;
}
static QByteArray createRandomBytes() {
const int MIN_BYTES = 4;
const int MAX_BYTES = 16;
return createRandomBytes(MIN_BYTES, MAX_BYTES);
}
Endpoint::Endpoint(const QByteArray& datagramHeader) : Endpoint::Endpoint(const QByteArray& datagramHeader) :
_sequencer(new DatagramSequencer(datagramHeader)), _sequencer(new DatagramSequencer(datagramHeader)),
_highPriorityMessagesToSend(0.0f) { _highPriorityMessagesToSend(0.0f) {
@ -60,16 +80,17 @@ Endpoint::Endpoint(const QByteArray& datagramHeader) :
connect(_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&))); connect(_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));
connect(_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)), connect(_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)),
SLOT(handleHighPriorityMessage(const QVariant&))); SLOT(handleHighPriorityMessage(const QVariant&)));
} connect(&_sequencer->getReliableInputChannel()->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
connect(&_sequencer->getReliableInputChannel(1)->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel()));
static QByteArray createRandomBytes() {
const int MIN_BYTES = 4; // enqueue a large amount of data in a low-priority channel
const int MAX_BYTES = 16; ReliableChannel* output = _sequencer->getReliableOutputChannel(1);
QByteArray bytes(randIntInRange(MIN_BYTES, MAX_BYTES), 0); output->setPriority(0.25f);
for (int i = 0; i < bytes.size(); i++) { const int MIN_LOW_PRIORITY_DATA = 100000;
bytes[i] = rand(); const int MAX_LOW_PRIORITY_DATA = 200000;
} _lowPriorityDataStreamed = createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA);
return bytes; output->getBuffer().write(_lowPriorityDataStreamed);
lowPriorityStreamedBytesSent += _lowPriorityDataStreamed.size();
} }
static QVariant createRandomMessage() { static QVariant createRandomMessage() {
@ -123,6 +144,14 @@ bool Endpoint::simulate(int iterationNumber) {
_highPriorityMessagesToSend -= 1.0f; _highPriorityMessagesToSend -= 1.0f;
} }
// stream some random data
const int MIN_BYTES_TO_STREAM = 10;
const int MAX_BYTES_TO_STREAM = 100;
QByteArray bytes = createRandomBytes(MIN_BYTES_TO_STREAM, MAX_BYTES_TO_STREAM);
_dataStreamed.append(bytes);
streamedBytesSent += bytes.size();
_sequencer->getReliableOutputChannel()->getDataStream().writeRawData(bytes.constData(), bytes.size());
// send a packet // send a packet
try { try {
Bitstream& out = _sequencer->startPacket(); Bitstream& out = _sequencer->startPacket();
@ -141,12 +170,15 @@ bool Endpoint::simulate(int iterationNumber) {
} }
void Endpoint::sendDatagram(const QByteArray& datagram) { void Endpoint::sendDatagram(const QByteArray& datagram) {
datagramsSent++;
// some datagrams are dropped // some datagrams are dropped
const float DROP_PROBABILITY = 0.1f; const float DROP_PROBABILITY = 0.1f;
if (randFloat() < DROP_PROBABILITY) { if (randFloat() < DROP_PROBABILITY) {
return; return;
} }
_other->_sequencer->receivedDatagram(datagram); _other->_sequencer->receivedDatagram(datagram);
datagramsReceived++;
} }
void Endpoint::handleHighPriorityMessage(const QVariant& message) { void Endpoint::handleHighPriorityMessage(const QVariant& message) {
@ -176,3 +208,29 @@ void Endpoint::readMessage(Bitstream& in) {
} }
throw QString("Received unsent/already sent unreliable message."); throw QString("Received unsent/already sent unreliable message.");
} }
void Endpoint::readReliableChannel() {
QByteArray bytes = _sequencer->getReliableInputChannel()->getBuffer().readAll();
if (_other->_dataStreamed.size() < bytes.size()) {
throw QString("Received unsent/already sent streamed data.");
}
QByteArray compare = _other->_dataStreamed;
_other->_dataStreamed = _other->_dataStreamed.mid(bytes.size());
compare.truncate(bytes.size());
if (compare != bytes) {
throw QString("Sent/received streamed data mismatch.");
}
}
void Endpoint::readLowPriorityReliableChannel() {
QByteArray bytes = _sequencer->getReliableInputChannel(1)->getBuffer().readAll();
if (_other->_lowPriorityDataStreamed.size() < bytes.size()) {
throw QString("Received unsent/already sent low-priority streamed data.");
}
QByteArray compare = _other->_lowPriorityDataStreamed;
_other->_lowPriorityDataStreamed = _other->_lowPriorityDataStreamed.mid(bytes.size());
compare.truncate(bytes.size());
if (compare != bytes) {
throw QString("Sent/received low-priority streamed data mismatch.");
}
}

View file

@ -49,6 +49,8 @@ private slots:
void sendDatagram(const QByteArray& datagram); void sendDatagram(const QByteArray& datagram);
void handleHighPriorityMessage(const QVariant& message); void handleHighPriorityMessage(const QVariant& message);
void readMessage(Bitstream& in); void readMessage(Bitstream& in);
void readReliableChannel();
void readLowPriorityReliableChannel();
private: private:
@ -57,6 +59,8 @@ private:
float _highPriorityMessagesToSend; float _highPriorityMessagesToSend;
QVariantList _highPriorityMessagesSent; QVariantList _highPriorityMessagesSent;
QList<SequencedTestMessage> _unreliableMessagesSent; QList<SequencedTestMessage> _unreliableMessagesSent;
QByteArray _dataStreamed;
QByteArray _lowPriorityDataStreamed;
}; };
/// A simple test message. /// A simple test message.