From e72429171ff86589fc0384bc23420562ff1b7cb8 Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Mon, 23 Jun 2014 12:01:49 -0700 Subject: [PATCH 1/6] Working on congestion tests. --- tests/metavoxels/src/MetavoxelTests.cpp | 77 +++++++++++++++++++++---- tests/metavoxels/src/MetavoxelTests.h | 2 +- 2 files changed, 67 insertions(+), 12 deletions(-) diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index 287d3a648c..609d36c085 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -335,7 +335,7 @@ bool MetavoxelTests::run() { QByteArray datagramHeader("testheader"); const int SIMULATION_ITERATIONS = 10000; if (test == 0 || test == 1) { - qDebug() << "Running transmission tests..."; + qDebug() << "Running transmission test..."; qDebug(); // create two endpoints with the same header @@ -365,7 +365,37 @@ bool MetavoxelTests::run() { } if (test == 0 || test == 2) { - qDebug() << "Running serialization tests..."; + qDebug() << "Running congestion control test..."; + qDebug(); + + // clear the stats + streamedBytesSent = streamedBytesReceived = datagramsSent = bytesSent = 0; + datagramsReceived = bytesReceived = maxDatagramsPerPacket = maxBytesPerPacket = 0; + + // create two endpoints with the same header + Endpoint alice(datagramHeader, Endpoint::CONGESTION_MODE), bob(datagramHeader, Endpoint::CONGESTION_MODE); + + alice.setOther(&bob); + bob.setOther(&alice); + + // perform a large number of simulation iterations + for (int i = 0; i < SIMULATION_ITERATIONS; i++) { + if (alice.simulate(i) || bob.simulate(i)) { + return true; + } + } + + qDebug() << "Sent" << streamedBytesSent << "streamed bytes, received" << streamedBytesReceived; + qDebug() << "Sent" << datagramsSent << "datagrams with" << bytesSent << "bytes, received" << + datagramsReceived << "with" << bytesReceived << "bytes"; + qDebug() << "Max" << maxDatagramsPerPacket << "datagrams," << maxBytesPerPacket << "bytes per packet"; + qDebug() << "Average" << (bytesReceived / datagramsReceived) << "bytes per datagram"; + qDebug() << "Speed:" << (bytesReceived / SIMULATION_ITERATIONS) << "bytes per iteration"; + qDebug() << "Efficiency:" << ((float)streamedBytesReceived / bytesReceived); + } + + if (test == 0 || test == 3) { + qDebug() << "Running serialization test..."; qDebug(); if (testSerialization(Bitstream::HASH_METADATA) || testSerialization(Bitstream::FULL_METADATA)) { @@ -373,8 +403,8 @@ bool MetavoxelTests::run() { } } - if (test == 0 || test == 3) { - qDebug() << "Running metavoxel data tests..."; + if (test == 0 || test == 4) { + qDebug() << "Running metavoxel data test..."; qDebug(); // clear the stats @@ -498,9 +528,15 @@ Endpoint::Endpoint(const QByteArray& datagramHeader, Mode mode) : ReliableChannel* output = _sequencer->getReliableOutputChannel(1); output->setPriority(0.25f); output->setMessagesEnabled(false); - const int MIN_STREAM_BYTES = 100000; - const int MAX_STREAM_BYTES = 200000; - QByteArray bytes = createRandomBytes(MIN_STREAM_BYTES, MAX_STREAM_BYTES); + QByteArray bytes; + if (mode == CONGESTION_MODE) { + const int HUGE_STREAM_BYTES = 50 * 1024 * 1024; + bytes = createRandomBytes(HUGE_STREAM_BYTES, HUGE_STREAM_BYTES); + } else { + const int MIN_STREAM_BYTES = 100000; + const int MAX_STREAM_BYTES = 200000; + bytes = createRandomBytes(MIN_STREAM_BYTES, MAX_STREAM_BYTES); + } _dataStreamed.append(bytes); output->getBuffer().write(bytes); streamedBytesSent += bytes.size(); @@ -646,7 +682,16 @@ bool Endpoint::simulate(int iterationNumber) { int oldDatagramsSent = datagramsSent; int oldBytesSent = bytesSent; - if (_mode == METAVOXEL_CLIENT_MODE) { + if (_mode == CONGESTION_MODE) { + Bitstream& out = _sequencer->startPacket(); + out << QVariant(); + _sequencer->endPacket(); + + // record the send + SendRecord record = { _sequencer->getOutgoingPacketNumber() }; + _sendRecords.append(record); + + } else if (_mode == METAVOXEL_CLIENT_MODE) { Bitstream& out = _sequencer->startPacket(); ClientStateMessage state = { _lod }; @@ -748,13 +793,14 @@ void Endpoint::sendDatagram(const QByteArray& datagram) { // some datagrams are dropped const float DROP_PROBABILITY = 0.1f; - if (randFloat() < DROP_PROBABILITY) { + float probabilityMultiplier = (_mode == CONGESTION_MODE) ? 0.01f : 1.0f; + if (randFloat() < DROP_PROBABILITY * probabilityMultiplier) { return; } // some are received out of order const float REORDER_PROBABILITY = 0.1f; - if (randFloat() < REORDER_PROBABILITY) { + if (randFloat() < REORDER_PROBABILITY * probabilityMultiplier) { const int MIN_DELAY = 1; const int MAX_DELAY = 5; // have to copy the datagram; the one we're passed is a reference to a shared buffer @@ -763,7 +809,7 @@ void Endpoint::sendDatagram(const QByteArray& datagram) { // and some are duplicated const float DUPLICATE_PROBABILITY = 0.01f; - if (randFloat() > DUPLICATE_PROBABILITY) { + if (randFloat() > DUPLICATE_PROBABILITY * probabilityMultiplier) { return; } } @@ -788,6 +834,15 @@ void Endpoint::handleHighPriorityMessage(const QVariant& message) { } void Endpoint::readMessage(Bitstream& in) { + if (_mode == CONGESTION_MODE) { + QVariant message; + in >> message; + + // record the receipt + ReceiveRecord record = { _sequencer->getIncomingPacketNumber() }; + _receiveRecords.append(record); + return; + } if (_mode == METAVOXEL_CLIENT_MODE) { QVariant message; in >> message; diff --git a/tests/metavoxels/src/MetavoxelTests.h b/tests/metavoxels/src/MetavoxelTests.h index c340d78963..46bac319fa 100644 --- a/tests/metavoxels/src/MetavoxelTests.h +++ b/tests/metavoxels/src/MetavoxelTests.h @@ -40,7 +40,7 @@ class Endpoint : public QObject { public: - enum Mode { BASIC_PEER_MODE, METAVOXEL_SERVER_MODE, METAVOXEL_CLIENT_MODE }; + enum Mode { BASIC_PEER_MODE, CONGESTION_MODE, METAVOXEL_SERVER_MODE, METAVOXEL_CLIENT_MODE }; Endpoint(const QByteArray& datagramHeader, Mode mode = BASIC_PEER_MODE); From 65e50f32e489249dc5f54146f973da3e9107f072 Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Tue, 24 Jun 2014 19:10:52 -0700 Subject: [PATCH 2/6] Tests, fixes for SpanList. --- .../metavoxels/src/DatagramSequencer.cpp | 143 ++++++++------- libraries/metavoxels/src/DatagramSequencer.h | 5 +- tests/metavoxels/src/MetavoxelTests.cpp | 172 ++++++++++++++++-- tests/metavoxels/src/MetavoxelTests.h | 11 +- 4 files changed, 253 insertions(+), 78 deletions(-) diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index f1f60e4d87..babbff6f2b 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -520,7 +520,9 @@ int SpanList::set(int offset, int length) { // look for an intersection within the list int position = 0; - for (QList::iterator it = _spans.begin(); it != _spans.end(); it++) { + for (int i = 0; i < _spans.size(); i++) { + QList::iterator it = _spans.begin() + i; + // if we intersect the unset portion, contract it position += it->unset; if (offset <= position) { @@ -530,16 +532,20 @@ int SpanList::set(int offset, int length) { // if we continue into the set portion, expand it and consume following spans int extra = offset + length - position; if (extra >= 0) { - int amount = setSpans(it + 1, extra); - it->set += amount; - _totalSet += amount; - + extra -= it->set; + it->set += remove; + _totalSet += remove; + if (extra > 0) { + int amount = setSpans(it + 1, extra); + _spans[i].set += amount; + _totalSet += amount; + } // otherwise, insert a new span } else { - Span span = { it->unset, length + extra }; - _spans.insert(it, span); + Span span = { it->unset, length }; it->unset = -extra; - _totalSet += span.set; + _spans.insert(it, span); + _totalSet += length; } return 0; } @@ -548,9 +554,11 @@ int SpanList::set(int offset, int length) { position += it->set; if (offset <= position) { int extra = offset + length - position; - int amount = setSpans(it + 1, extra); - it->set += amount; - _totalSet += amount; + if (extra > 0) { + int amount = setSpans(it + 1, extra); + _spans[i].set += amount; + _totalSet += amount; + } return 0; } } @@ -629,67 +637,71 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o } void ReliableChannel::writeData(QDataStream& out, int bytes, QVector& spans) { - // find out how many spans we want to write - int spanCount = 0; - int remainingBytes = bytes; - bool first = true; - while (remainingBytes > 0) { + if (bytes > 0) { + _writePosition %= _buffer.pos(); + int position = 0; - foreach (const SpanList::Span& span, _acknowledged.getSpans()) { - if (remainingBytes <= 0) { - break; + for (int i = 0; i < _acknowledged.getSpans().size(); i++) { + const SpanList::Span& span = _acknowledged.getSpans().at(i); + position += span.unset; + if (_writePosition < position) { + int start = qMax(position - span.unset, _writePosition); + int length = qMin(bytes, position - start); + writeSpan(out, start, length, spans); + writeFullSpans(out, bytes - length, i + 1, position + span.set, spans); + out << (quint32)0; + return; } - spanCount++; - remainingBytes -= getBytesToWrite(first, qMin(remainingBytes, span.unset)); - position += (span.unset + span.set); + position += span.set; } int leftover = _buffer.pos() - position; - if (remainingBytes > 0 && leftover > 0) { - spanCount++; - remainingBytes -= getBytesToWrite(first, qMin(remainingBytes, leftover)); + position = _buffer.pos(); + + if (_writePosition < position && leftover > 0) { + int start = qMax(position - leftover, _writePosition); + int length = qMin(bytes, position - start); + writeSpan(out, start, length, spans); + writeFullSpans(out, bytes - length, 0, 0, spans); } } - - // write the count and the spans - out << (quint32)spanCount; - remainingBytes = bytes; - first = true; - while (remainingBytes > 0) { - int position = 0; - foreach (const SpanList::Span& span, _acknowledged.getSpans()) { - if (remainingBytes <= 0) { - break; - } - remainingBytes -= writeSpan(out, first, position, qMin(remainingBytes, span.unset), spans); - position += (span.unset + span.set); + out << (quint32)0; +} + +void ReliableChannel::writeFullSpans(QDataStream& out, int bytes, int startingIndex, int position, + QVector& spans) { + int expandedSize = _acknowledged.getSpans().size() + 1; + for (int i = 0; i < expandedSize; i++) { + if (bytes == 0) { + return; } - int leftover = _buffer.pos() - position; - if (remainingBytes > 0 && leftover > 0) { - remainingBytes -= writeSpan(out, first, position, qMin(remainingBytes, leftover), spans); + int index = (startingIndex + i) % expandedSize; + if (index == _acknowledged.getSpans().size()) { + int leftover = _buffer.pos() - position; + if (leftover > 0) { + int length = qMin(leftover, bytes); + writeSpan(out, position, length, spans); + bytes -= length; + } + position = 0; + + } else { + const SpanList::Span& span = _acknowledged.getSpans().at(index); + int length = qMin(span.unset, bytes); + writeSpan(out, position, length, spans); + bytes -= length; + position += (span.unset + span.set); } } } -int ReliableChannel::getBytesToWrite(bool& first, int length) const { - if (first) { - first = false; - return length - (_writePosition % length); - } - return length; -} - -int ReliableChannel::writeSpan(QDataStream& out, bool& first, int position, int length, QVector& spans) { - if (first) { - first = false; - position = _writePosition % length; - length -= position; - _writePosition += length; - } +int ReliableChannel::writeSpan(QDataStream& out, int position, int length, QVector& spans) { DatagramSequencer::ChannelSpan span = { _index, _offset + position, length }; spans.append(span); - out << (quint32)span.offset; out << (quint32)length; + out << (quint32)span.offset; _buffer.writeToStream(position, length, out); + _writePosition = position + length; + return length; } @@ -700,17 +712,20 @@ void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& spa _buffer.seek(_buffer.size()); _offset += advancement; - _writePosition = qMax(_writePosition - advancement, 0); - } + _writePosition = qMax(_writePosition - advancement, 0); + } } void ReliableChannel::readData(QDataStream& in) { - quint32 segments; - in >> segments; bool readSome = false; - for (quint32 i = 0; i < segments; i++) { - quint32 offset, size; - in >> offset >> size; + forever { + quint32 size; + in >> size; + if (size == 0) { + break; + } + quint32 offset; + in >> offset; int position = offset - _offset; int end = position + size; diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index 5ac88556f0..47fef8e645 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -343,8 +343,9 @@ private: ReliableChannel(DatagramSequencer* sequencer, int index, bool output); void writeData(QDataStream& out, int bytes, QVector& spans); - int getBytesToWrite(bool& first, int length) const; - int writeSpan(QDataStream& out, bool& first, int position, int length, QVector& spans); + void writeFullSpans(QDataStream& out, int bytes, int startingIndex, int position, + QVector& spans); + int writeSpan(QDataStream& out, int position, int length, QVector& spans); void spanAcknowledged(const DatagramSequencer::ChannelSpan& span); diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index 609d36c085..688749f39b 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -28,6 +28,119 @@ MetavoxelTests::MetavoxelTests(int& argc, char** argv) : QCoreApplication(argc, argv) { } +static bool testSpanList() { + SpanList list; + + if (list.getTotalSet() != 0 || !list.getSpans().isEmpty()) { + qDebug() << "Failed empty state test."; + return true; + } + + if (list.set(-5, 15) != 10 || list.getTotalSet() != 0 || !list.getSpans().isEmpty()) { + qDebug() << "Failed initial front set."; + return true; + } + + if (list.set(5, 15) != 0 || list.getTotalSet() != 15 || list.getSpans().size() != 1 || + list.getSpans().at(0).unset != 5 || list.getSpans().at(0).set != 15) { + qDebug() << "Failed initial middle set."; + return true; + } + + if (list.set(25, 5) != 0 || list.getTotalSet() != 20 || list.getSpans().size() != 2 || + list.getSpans().at(0).unset != 5 || list.getSpans().at(0).set != 15 || + list.getSpans().at(1).unset != 5 || list.getSpans().at(1).set != 5) { + qDebug() << "Failed initial end set."; + return true; + } + + if (list.set(1, 3) != 0 || list.getTotalSet() != 23 || list.getSpans().size() != 3 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 || + list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 15 || + list.getSpans().at(2).unset != 5 || list.getSpans().at(2).set != 5) { + qDebug() << "Failed second front set."; + return true; + } + SpanList threeSet = list; + + if (list.set(20, 5) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 || + list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) { + qDebug() << "Failed minimal join last two."; + return true; + } + + list = threeSet; + if (list.set(5, 25) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 || + list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) { + qDebug() << "Failed maximal join last two."; + return true; + } + + list = threeSet; + if (list.set(10, 18) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 || + list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) { + qDebug() << "Failed middle join last two."; + return true; + } + + list = threeSet; + if (list.set(10, 18) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 || + list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) { + qDebug() << "Failed middle join last two."; + return true; + } + + list = threeSet; + if (list.set(2, 26) != 0 || list.getTotalSet() != 29 || list.getSpans().size() != 1 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 29) { + qDebug() << "Failed middle join three."; + return true; + } + + list = threeSet; + if (list.set(0, 2) != 4 || list.getTotalSet() != 20 || list.getSpans().size() != 2 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 15 || + list.getSpans().at(1).unset != 5 || list.getSpans().at(1).set != 5) { + qDebug() << "Failed front advance."; + return true; + } + + list = threeSet; + if (list.set(-10, 15) != 20 || list.getTotalSet() != 5 || list.getSpans().size() != 1 || + list.getSpans().at(0).unset != 5 || list.getSpans().at(0).set != 5) { + qDebug() << "Failed middle advance."; + return true; + } + + list = threeSet; + if (list.set(-10, 38) != 30 || list.getTotalSet() != 0 || list.getSpans().size() != 0) { + qDebug() << "Failed end advance."; + return true; + } + + list = threeSet; + if (list.set(-10, 100) != 90 || list.getTotalSet() != 0 || list.getSpans().size() != 0) { + qDebug() << "Failed clobber advance."; + return true; + } + + list = threeSet; + if (list.set(21, 3) != 0 || list.getTotalSet() != 26 || list.getSpans().size() != 4 || + list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 || + list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 15 || + list.getSpans().at(2).unset != 1 || list.getSpans().at(2).set != 3 || + list.getSpans().at(3).unset != 1 || list.getSpans().at(3).set != 5) { + qDebug() << "Failed adding fourth."; + return true; + } + + return false; +} + static int datagramsSent = 0; static int datagramsReceived = 0; static int bytesSent = 0; @@ -332,9 +445,18 @@ bool MetavoxelTests::run() { QStringList arguments = this->arguments(); int test = (arguments.size() > 1) ? arguments.at(1).toInt() : 0; + if (test == 0 || test == 1) { + qDebug() << "Running SpanList test..."; + qDebug(); + + if (testSpanList()) { + return true; + } + } + QByteArray datagramHeader("testheader"); const int SIMULATION_ITERATIONS = 10000; - if (test == 0 || test == 1) { + if (test == 0 || test == 2) { qDebug() << "Running transmission test..."; qDebug(); @@ -364,7 +486,7 @@ bool MetavoxelTests::run() { qDebug(); } - if (test == 0 || test == 2) { + if (test == 0 || test == 3) { qDebug() << "Running congestion control test..."; qDebug(); @@ -394,7 +516,7 @@ bool MetavoxelTests::run() { qDebug() << "Efficiency:" << ((float)streamedBytesReceived / bytesReceived); } - if (test == 0 || test == 3) { + if (test == 0 || test == 4) { qDebug() << "Running serialization test..."; qDebug(); @@ -403,7 +525,7 @@ bool MetavoxelTests::run() { } } - if (test == 0 || test == 4) { + if (test == 0 || test == 5) { qDebug() << "Running metavoxel data test..."; qDebug(); @@ -532,6 +654,13 @@ Endpoint::Endpoint(const QByteArray& datagramHeader, Mode mode) : if (mode == CONGESTION_MODE) { const int HUGE_STREAM_BYTES = 50 * 1024 * 1024; bytes = createRandomBytes(HUGE_STREAM_BYTES, HUGE_STREAM_BYTES); + + // initialize the pipeline + for (int i = 0; i < 10; i++) { + _pipeline.append(ByteArrayVector()); + } + _remainingPipelineCapacity = 100 * 1024; + } else { const int MIN_STREAM_BYTES = 100000; const int MAX_STREAM_BYTES = 200000; @@ -669,10 +798,9 @@ int MutateVisitor::visit(MetavoxelInfo& info) { bool Endpoint::simulate(int iterationNumber) { // update/send our delayed datagrams - for (QList >::iterator it = _delayedDatagrams.begin(); it != _delayedDatagrams.end(); ) { + for (QList::iterator it = _delayedDatagrams.begin(); it != _delayedDatagrams.end(); ) { if (it->second-- == 1) { - _other->_sequencer->receivedDatagram(it->first); - datagramsReceived++; + _other->receiveDatagram(it->first); it = _delayedDatagrams.erase(it); } else { @@ -683,6 +811,16 @@ bool Endpoint::simulate(int iterationNumber) { int oldDatagramsSent = datagramsSent; int oldBytesSent = bytesSent; if (_mode == CONGESTION_MODE) { + // cycle our pipeline + ByteArrayVector datagrams = _pipeline.takeLast(); + _pipeline.prepend(ByteArrayVector()); + foreach (const QByteArray& datagram, datagrams) { + _sequencer->receivedDatagram(datagram); + datagramsReceived++; + bytesReceived += datagram.size(); + _remainingPipelineCapacity += datagram.size(); + } + Bitstream& out = _sequencer->startPacket(); out << QVariant(); _sequencer->endPacket(); @@ -804,7 +942,7 @@ void Endpoint::sendDatagram(const QByteArray& datagram) { const int MIN_DELAY = 1; const int MAX_DELAY = 5; // have to copy the datagram; the one we're passed is a reference to a shared buffer - _delayedDatagrams.append(QPair(QByteArray(datagram.constData(), datagram.size()), + _delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()), randIntInRange(MIN_DELAY, MAX_DELAY))); // and some are duplicated @@ -814,9 +952,7 @@ void Endpoint::sendDatagram(const QByteArray& datagram) { } } - _other->_sequencer->receivedDatagram(datagram); - datagramsReceived++; - bytesReceived += datagram.size(); + _other->receiveDatagram(datagram); } void Endpoint::handleHighPriorityMessage(const QVariant& message) { @@ -942,6 +1078,20 @@ void Endpoint::clearReceiveRecordsBefore(int index) { _receiveRecords.erase(_receiveRecords.begin(), _receiveRecords.begin() + index + 1); } +void Endpoint::receiveDatagram(const QByteArray& datagram) { + if (_mode == CONGESTION_MODE) { + if (datagram.size() <= _remainingPipelineCapacity) { + // have to copy the datagram; the one we're passed is a reference to a shared buffer + _pipeline[0].append(QByteArray(datagram.constData(), datagram.size())); + _remainingPipelineCapacity -= datagram.size(); + } + } else { + _sequencer->receivedDatagram(datagram); + datagramsReceived++; + bytesReceived += datagram.size(); + } +} + void Endpoint::handleMessage(const QVariant& message, Bitstream& in) { int userType = message.userType(); if (userType == ClientStateMessage::Type) { diff --git a/tests/metavoxels/src/MetavoxelTests.h b/tests/metavoxels/src/MetavoxelTests.h index 46bac319fa..f9a314dcd7 100644 --- a/tests/metavoxels/src/MetavoxelTests.h +++ b/tests/metavoxels/src/MetavoxelTests.h @@ -63,6 +63,8 @@ private slots: private: + void receiveDatagram(const QByteArray& datagram); + void handleMessage(const QVariant& message, Bitstream& in); class SendRecord { @@ -96,7 +98,14 @@ private: SharedObjectPointer _sphere; Endpoint* _other; - QList > _delayedDatagrams; + + typedef QPair ByteArrayIntPair; + QList _delayedDatagrams; + + typedef QVector ByteArrayVector; + QList _pipeline; + int _remainingPipelineCapacity; + float _highPriorityMessagesToSend; QVariantList _highPriorityMessagesSent; QList _unreliableMessagesSent; From d913ac4486b50a9bd20f36b10f0d01427a1631e0 Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Tue, 24 Jun 2014 19:34:19 -0700 Subject: [PATCH 3/6] Fix for streams' getting stuck on the final part. --- libraries/metavoxels/src/DatagramSequencer.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index babbff6f2b..35d0876391 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -637,9 +637,12 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o } void ReliableChannel::writeData(QDataStream& out, int bytes, QVector& spans) { - if (bytes > 0) { - _writePosition %= _buffer.pos(); - + if (bytes == 0) { + out << (quint32)0; + return; + } + _writePosition %= _buffer.pos(); + while (bytes > 0) { int position = 0; for (int i = 0; i < _acknowledged.getSpans().size(); i++) { const SpanList::Span& span = _acknowledged.getSpans().at(i); @@ -662,9 +665,11 @@ void ReliableChannel::writeData(QDataStream& out, int bytes, QVector Date: Wed, 25 Jun 2014 11:42:02 -0700 Subject: [PATCH 4/6] Reset the write position when we hear of packet loss (up to once per round trip time). --- .../metavoxels/src/DatagramSequencer.cpp | 21 ++++++++++++++++++- libraries/metavoxels/src/DatagramSequencer.h | 5 +++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index 35d0876391..bb3e8723ac 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -172,7 +172,10 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) { if (index < 0 || index >= _sendRecords.size()) { continue; } - QList::iterator it = _sendRecords.begin() + index; + QList::iterator it = _sendRecords.begin(); + for (int i = 0; i < index; i++) { + sendRecordLost(*it++); + } sendRecordAcknowledged(*it); emit sendAcknowledged(index); _sendRecords.erase(_sendRecords.begin(), it + 1); @@ -255,6 +258,13 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { } } +void DatagramSequencer::sendRecordLost(const SendRecord& record) { + // notify the channels of their lost spans + foreach (const ChannelSpan& span, record.spans) { + getReliableOutputChannel(span.channel)->spanLost(record.packetNumber, _outgoingPacketNumber + 1); + } +} + void DatagramSequencer::appendReliableData(int bytes, QVector& spans) { // gather total number of bytes to write, priority int totalBytes = 0; @@ -627,6 +637,7 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o _priority(1.0f), _offset(0), _writePosition(0), + _writePositionResetPacketNumber(0), _messagesEnabled(true) { _buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly); @@ -721,6 +732,14 @@ void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& spa } } +void ReliableChannel::spanLost(int packetNumber, int nextOutgoingPacketNumber) { + // reset the write position up to once each round trip time + if (packetNumber >= _writePositionResetPacketNumber) { + _writePosition = 0; + _writePositionResetPacketNumber = nextOutgoingPacketNumber; + } +} + void ReliableChannel::readData(QDataStream& in) { bool readSome = false; forever { diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index 47fef8e645..4ab9f7667b 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -165,6 +165,9 @@ private: /// Notes that the described send was acknowledged by the other party. void sendRecordAcknowledged(const SendRecord& record); + /// Notes that the described send was lost in transit. + void sendRecordLost(const SendRecord& record); + /// Appends some reliable data to the outgoing packet. void appendReliableData(int bytes, QVector& spans); @@ -348,6 +351,7 @@ private: int writeSpan(QDataStream& out, int position, int length, QVector& spans); void spanAcknowledged(const DatagramSequencer::ChannelSpan& span); + void spanLost(int packetNumber, int nextOutgoingPacketNumber); void readData(QDataStream& in); @@ -360,6 +364,7 @@ private: int _offset; int _writePosition; + int _writePositionResetPacketNumber; SpanList _acknowledged; bool _messagesEnabled; }; From 705445ce622e762481d3c9f71654d1cbd3f441c0 Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Wed, 25 Jun 2014 15:48:46 -0700 Subject: [PATCH 5/6] Basic congestion control using TCP-esque strategy. --- .../metavoxels/src/DatagramSequencer.cpp | 33 ++++++++++++++++++- libraries/metavoxels/src/DatagramSequencer.h | 10 ++++++ tests/metavoxels/src/MetavoxelTests.cpp | 27 ++++++++++----- 3 files changed, 60 insertions(+), 10 deletions(-) diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index bb3e8723ac..0097c0fd1d 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -23,6 +23,9 @@ const int MAX_DATAGRAM_SIZE = MAX_PACKET_SIZE; const int DEFAULT_MAX_PACKET_SIZE = 3000; +// the default slow-start threshold, which will be lowered quickly when we first encounter packet loss +const float DEFAULT_SLOW_START_THRESHOLD = 1000.0f; + DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* parent) : QObject(parent), _outgoingPacketStream(&_outgoingPacketData, QIODevice::WriteOnly), @@ -37,7 +40,12 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* _incomingPacketStream(&_incomingPacketData, QIODevice::ReadOnly), _inputStream(_incomingPacketStream), _receivedHighPriorityMessages(0), - _maxPacketSize(DEFAULT_MAX_PACKET_SIZE) { + _maxPacketSize(DEFAULT_MAX_PACKET_SIZE), + _packetsPerGroup(1.0f), + _packetsToWrite(0.0f), + _slowStartThreshold(DEFAULT_SLOW_START_THRESHOLD), + _packetRateIncreasePacketNumber(0), + _packetRateDecreasePacketNumber(0) { _outgoingPacketStream.setByteOrder(QDataStream::LittleEndian); _incomingDatagramStream.setByteOrder(QDataStream::LittleEndian); @@ -71,6 +79,14 @@ ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) { return channel; } +int DatagramSequencer::startPacketGroup() { + // increment our packet counter and subtract/return the integer portion + _packetsToWrite += _packetsPerGroup; + int wholePackets = (int)_packetsToWrite; + _packetsToWrite -= wholePackets; + return wholePackets; +} + Bitstream& DatagramSequencer::startPacket() { // start with the list of acknowledgements _outgoingPacketStream << (quint32)_receiveRecords.size(); @@ -256,6 +272,14 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) { foreach (const ChannelSpan& span, record.spans) { getReliableOutputChannel(span.channel)->spanAcknowledged(span); } + + // increase the packet rate with every ack until we pass the slow start threshold; then, every round trip + if (record.packetNumber >= _packetRateIncreasePacketNumber) { + if (_packetsPerGroup >= _slowStartThreshold) { + _packetRateIncreasePacketNumber = _outgoingPacketNumber + 1; + } + _packetsPerGroup += 1.0f; + } } void DatagramSequencer::sendRecordLost(const SendRecord& record) { @@ -263,6 +287,13 @@ void DatagramSequencer::sendRecordLost(const SendRecord& record) { foreach (const ChannelSpan& span, record.spans) { getReliableOutputChannel(span.channel)->spanLost(record.packetNumber, _outgoingPacketNumber + 1); } + + // halve the rate and remember as threshold + if (record.packetNumber >= _packetRateDecreasePacketNumber) { + _packetsPerGroup = qMax(_packetsPerGroup * 0.5f, 1.0f); + _slowStartThreshold = _packetsPerGroup; + _packetRateDecreasePacketNumber = _outgoingPacketNumber + 1; + } } void DatagramSequencer::appendReliableData(int bytes, QVector& spans) { diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index 4ab9f7667b..e2ea9d00af 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -99,6 +99,10 @@ public: /// Returns the intput channel at the specified index, creating it if necessary. ReliableChannel* getReliableInputChannel(int index = 0); + /// Starts a packet group. + /// \return the number of packets to write in the group + int startPacketGroup(); + /// Starts a new packet for transmission. /// \return a reference to the Bitstream to use for writing to the packet Bitstream& startPacket(); @@ -203,6 +207,12 @@ private: int _maxPacketSize; + float _packetsPerGroup; + float _packetsToWrite; + float _slowStartThreshold; + int _packetRateIncreasePacketNumber; + int _packetRateDecreasePacketNumber; + QHash _reliableOutputChannels; QHash _reliableInputChannels; }; diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index 688749f39b..9cb21faf06 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -652,7 +652,7 @@ Endpoint::Endpoint(const QByteArray& datagramHeader, Mode mode) : output->setMessagesEnabled(false); QByteArray bytes; if (mode == CONGESTION_MODE) { - const int HUGE_STREAM_BYTES = 50 * 1024 * 1024; + const int HUGE_STREAM_BYTES = 60 * 1024 * 1024; bytes = createRandomBytes(HUGE_STREAM_BYTES, HUGE_STREAM_BYTES); // initialize the pipeline @@ -820,14 +820,23 @@ bool Endpoint::simulate(int iterationNumber) { bytesReceived += datagram.size(); _remainingPipelineCapacity += datagram.size(); } - - Bitstream& out = _sequencer->startPacket(); - out << QVariant(); - _sequencer->endPacket(); - - // record the send - SendRecord record = { _sequencer->getOutgoingPacketNumber() }; - _sendRecords.append(record); + int packetCount = _sequencer->startPacketGroup(); + for (int i = 0; i < packetCount; i++) { + oldDatagramsSent = datagramsSent; + oldBytesSent = bytesSent; + + Bitstream& out = _sequencer->startPacket(); + out << QVariant(); + _sequencer->endPacket(); + + maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent); + maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent); + + // record the send + SendRecord record = { _sequencer->getOutgoingPacketNumber() }; + _sendRecords.append(record); + } + return false; } else if (_mode == METAVOXEL_CLIENT_MODE) { Bitstream& out = _sequencer->startPacket(); From 5effcd24ff5e65c7d6551537232d68a6fe2a7cfb Mon Sep 17 00:00:00 2001 From: Andrzej Kapolka Date: Wed, 25 Jun 2014 16:56:02 -0700 Subject: [PATCH 6/6] Only increase/decrease rate when we want to send more/have sent more than the minimum, respectively. --- .../metavoxels/src/DatagramSequencer.cpp | 21 ++++++++++++++++++- libraries/metavoxels/src/DatagramSequencer.h | 3 ++- tests/metavoxels/src/MetavoxelTests.cpp | 12 ++++++++--- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/libraries/metavoxels/src/DatagramSequencer.cpp b/libraries/metavoxels/src/DatagramSequencer.cpp index 0097c0fd1d..a75a4bf95b 100644 --- a/libraries/metavoxels/src/DatagramSequencer.cpp +++ b/libraries/metavoxels/src/DatagramSequencer.cpp @@ -79,11 +79,30 @@ ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) { return channel; } -int DatagramSequencer::startPacketGroup() { +int DatagramSequencer::startPacketGroup(int desiredPackets) { + // figure out how much data we have enqueued and increase the number of packets desired + int totalAvailable = 0; + foreach (ReliableChannel* channel, _reliableOutputChannels) { + totalAvailable += channel->getBytesAvailable(); + } + desiredPackets += (totalAvailable / _maxPacketSize); + // increment our packet counter and subtract/return the integer portion _packetsToWrite += _packetsPerGroup; int wholePackets = (int)_packetsToWrite; _packetsToWrite -= wholePackets; + wholePackets = qMin(wholePackets, desiredPackets); + + // if we don't want to send any more, push out the rate increase number past the group + if (desiredPackets <= _packetsPerGroup) { + _packetRateIncreasePacketNumber = _outgoingPacketNumber + wholePackets + 1; + } + + // likewise, if we're only sending one packet, don't let its loss cause rate decrease + if (wholePackets == 1) { + _packetRateDecreasePacketNumber = _outgoingPacketNumber + 2; + } + return wholePackets; } diff --git a/libraries/metavoxels/src/DatagramSequencer.h b/libraries/metavoxels/src/DatagramSequencer.h index e2ea9d00af..9a4cd1334b 100644 --- a/libraries/metavoxels/src/DatagramSequencer.h +++ b/libraries/metavoxels/src/DatagramSequencer.h @@ -100,8 +100,9 @@ public: ReliableChannel* getReliableInputChannel(int index = 0); /// Starts a packet group. + /// \param desiredPackets the number of packets we'd like to write in the group /// \return the number of packets to write in the group - int startPacketGroup(); + int startPacketGroup(int desiredPackets = 1); /// Starts a new packet for transmission. /// \return a reference to the Bitstream to use for writing to the packet diff --git a/tests/metavoxels/src/MetavoxelTests.cpp b/tests/metavoxels/src/MetavoxelTests.cpp index 9cb21faf06..68aaf7ec70 100644 --- a/tests/metavoxels/src/MetavoxelTests.cpp +++ b/tests/metavoxels/src/MetavoxelTests.cpp @@ -147,6 +147,8 @@ static int bytesSent = 0; static int bytesReceived = 0; static int maxDatagramsPerPacket = 0; static int maxBytesPerPacket = 0; +static int groupsSent = 0; +static int maxPacketsPerGroup = 0; static int highPriorityMessagesSent = 0; static int highPriorityMessagesReceived = 0; static int unreliableMessagesSent = 0; @@ -508,10 +510,12 @@ bool MetavoxelTests::run() { } qDebug() << "Sent" << streamedBytesSent << "streamed bytes, received" << streamedBytesReceived; - qDebug() << "Sent" << datagramsSent << "datagrams with" << bytesSent << "bytes, received" << - datagramsReceived << "with" << bytesReceived << "bytes"; + qDebug() << "Sent" << datagramsSent << "datagrams in" << groupsSent << "groups with" << bytesSent << + "bytes, received" << datagramsReceived << "with" << bytesReceived << "bytes"; qDebug() << "Max" << maxDatagramsPerPacket << "datagrams," << maxBytesPerPacket << "bytes per packet"; - qDebug() << "Average" << (bytesReceived / datagramsReceived) << "bytes per datagram"; + qDebug() << "Max" << maxPacketsPerGroup << "packets per group"; + qDebug() << "Average" << (bytesReceived / datagramsReceived) << "bytes per datagram," << + (datagramsSent / groupsSent) << "datagrams per group"; qDebug() << "Speed:" << (bytesReceived / SIMULATION_ITERATIONS) << "bytes per iteration"; qDebug() << "Efficiency:" << ((float)streamedBytesReceived / bytesReceived); } @@ -821,6 +825,8 @@ bool Endpoint::simulate(int iterationNumber) { _remainingPipelineCapacity += datagram.size(); } int packetCount = _sequencer->startPacketGroup(); + groupsSent++; + maxPacketsPerGroup = qMax(maxPacketsPerGroup, packetCount); for (int i = 0; i < packetCount; i++) { oldDatagramsSent = datagramsSent; oldBytesSent = bytesSent;