Merge pull request #3079 from ey6es/metavoxels

Basic congestion control on TCP model, with tests; fixed errors in, added tests for SpanList.
This commit is contained in:
Clément Brisset 2014-06-25 18:34:07 -07:00
commit 96e81d5596
4 changed files with 427 additions and 92 deletions

View file

@ -23,6 +23,9 @@ const int MAX_DATAGRAM_SIZE = MAX_PACKET_SIZE;
const int DEFAULT_MAX_PACKET_SIZE = 3000;
// the default slow-start threshold, which will be lowered quickly when we first encounter packet loss
const float DEFAULT_SLOW_START_THRESHOLD = 1000.0f;
DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* parent) :
QObject(parent),
_outgoingPacketStream(&_outgoingPacketData, QIODevice::WriteOnly),
@ -37,7 +40,12 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject*
_incomingPacketStream(&_incomingPacketData, QIODevice::ReadOnly),
_inputStream(_incomingPacketStream),
_receivedHighPriorityMessages(0),
_maxPacketSize(DEFAULT_MAX_PACKET_SIZE) {
_maxPacketSize(DEFAULT_MAX_PACKET_SIZE),
_packetsPerGroup(1.0f),
_packetsToWrite(0.0f),
_slowStartThreshold(DEFAULT_SLOW_START_THRESHOLD),
_packetRateIncreasePacketNumber(0),
_packetRateDecreasePacketNumber(0) {
_outgoingPacketStream.setByteOrder(QDataStream::LittleEndian);
_incomingDatagramStream.setByteOrder(QDataStream::LittleEndian);
@ -71,6 +79,33 @@ ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) {
return channel;
}
int DatagramSequencer::startPacketGroup(int desiredPackets) {
// figure out how much data we have enqueued and increase the number of packets desired
int totalAvailable = 0;
foreach (ReliableChannel* channel, _reliableOutputChannels) {
totalAvailable += channel->getBytesAvailable();
}
desiredPackets += (totalAvailable / _maxPacketSize);
// increment our packet counter and subtract/return the integer portion
_packetsToWrite += _packetsPerGroup;
int wholePackets = (int)_packetsToWrite;
_packetsToWrite -= wholePackets;
wholePackets = qMin(wholePackets, desiredPackets);
// if we don't want to send any more, push out the rate increase number past the group
if (desiredPackets <= _packetsPerGroup) {
_packetRateIncreasePacketNumber = _outgoingPacketNumber + wholePackets + 1;
}
// likewise, if we're only sending one packet, don't let its loss cause rate decrease
if (wholePackets == 1) {
_packetRateDecreasePacketNumber = _outgoingPacketNumber + 2;
}
return wholePackets;
}
Bitstream& DatagramSequencer::startPacket() {
// start with the list of acknowledgements
_outgoingPacketStream << (quint32)_receiveRecords.size();
@ -172,7 +207,10 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
if (index < 0 || index >= _sendRecords.size()) {
continue;
}
QList<SendRecord>::iterator it = _sendRecords.begin() + index;
QList<SendRecord>::iterator it = _sendRecords.begin();
for (int i = 0; i < index; i++) {
sendRecordLost(*it++);
}
sendRecordAcknowledged(*it);
emit sendAcknowledged(index);
_sendRecords.erase(_sendRecords.begin(), it + 1);
@ -253,6 +291,28 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
foreach (const ChannelSpan& span, record.spans) {
getReliableOutputChannel(span.channel)->spanAcknowledged(span);
}
// increase the packet rate with every ack until we pass the slow start threshold; then, every round trip
if (record.packetNumber >= _packetRateIncreasePacketNumber) {
if (_packetsPerGroup >= _slowStartThreshold) {
_packetRateIncreasePacketNumber = _outgoingPacketNumber + 1;
}
_packetsPerGroup += 1.0f;
}
}
void DatagramSequencer::sendRecordLost(const SendRecord& record) {
// notify the channels of their lost spans
foreach (const ChannelSpan& span, record.spans) {
getReliableOutputChannel(span.channel)->spanLost(record.packetNumber, _outgoingPacketNumber + 1);
}
// halve the rate and remember as threshold
if (record.packetNumber >= _packetRateDecreasePacketNumber) {
_packetsPerGroup = qMax(_packetsPerGroup * 0.5f, 1.0f);
_slowStartThreshold = _packetsPerGroup;
_packetRateDecreasePacketNumber = _outgoingPacketNumber + 1;
}
}
void DatagramSequencer::appendReliableData(int bytes, QVector<ChannelSpan>& spans) {
@ -520,7 +580,9 @@ int SpanList::set(int offset, int length) {
// look for an intersection within the list
int position = 0;
for (QList<Span>::iterator it = _spans.begin(); it != _spans.end(); it++) {
for (int i = 0; i < _spans.size(); i++) {
QList<Span>::iterator it = _spans.begin() + i;
// if we intersect the unset portion, contract it
position += it->unset;
if (offset <= position) {
@ -530,16 +592,20 @@ int SpanList::set(int offset, int length) {
// if we continue into the set portion, expand it and consume following spans
int extra = offset + length - position;
if (extra >= 0) {
int amount = setSpans(it + 1, extra);
it->set += amount;
_totalSet += amount;
extra -= it->set;
it->set += remove;
_totalSet += remove;
if (extra > 0) {
int amount = setSpans(it + 1, extra);
_spans[i].set += amount;
_totalSet += amount;
}
// otherwise, insert a new span
} else {
Span span = { it->unset, length + extra };
_spans.insert(it, span);
Span span = { it->unset, length };
it->unset = -extra;
_totalSet += span.set;
_spans.insert(it, span);
_totalSet += length;
}
return 0;
}
@ -548,9 +614,11 @@ int SpanList::set(int offset, int length) {
position += it->set;
if (offset <= position) {
int extra = offset + length - position;
int amount = setSpans(it + 1, extra);
it->set += amount;
_totalSet += amount;
if (extra > 0) {
int amount = setSpans(it + 1, extra);
_spans[i].set += amount;
_totalSet += amount;
}
return 0;
}
}
@ -619,6 +687,7 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
_priority(1.0f),
_offset(0),
_writePosition(0),
_writePositionResetPacketNumber(0),
_messagesEnabled(true) {
_buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly);
@ -629,67 +698,76 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
}
void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) {
// find out how many spans we want to write
int spanCount = 0;
int remainingBytes = bytes;
bool first = true;
while (remainingBytes > 0) {
int position = 0;
foreach (const SpanList::Span& span, _acknowledged.getSpans()) {
if (remainingBytes <= 0) {
break;
}
spanCount++;
remainingBytes -= getBytesToWrite(first, qMin(remainingBytes, span.unset));
position += (span.unset + span.set);
}
int leftover = _buffer.pos() - position;
if (remainingBytes > 0 && leftover > 0) {
spanCount++;
remainingBytes -= getBytesToWrite(first, qMin(remainingBytes, leftover));
}
if (bytes == 0) {
out << (quint32)0;
return;
}
// write the count and the spans
out << (quint32)spanCount;
remainingBytes = bytes;
first = true;
while (remainingBytes > 0) {
_writePosition %= _buffer.pos();
while (bytes > 0) {
int position = 0;
foreach (const SpanList::Span& span, _acknowledged.getSpans()) {
if (remainingBytes <= 0) {
break;
for (int i = 0; i < _acknowledged.getSpans().size(); i++) {
const SpanList::Span& span = _acknowledged.getSpans().at(i);
position += span.unset;
if (_writePosition < position) {
int start = qMax(position - span.unset, _writePosition);
int length = qMin(bytes, position - start);
writeSpan(out, start, length, spans);
writeFullSpans(out, bytes - length, i + 1, position + span.set, spans);
out << (quint32)0;
return;
}
remainingBytes -= writeSpan(out, first, position, qMin(remainingBytes, span.unset), spans);
position += (span.unset + span.set);
position += span.set;
}
int leftover = _buffer.pos() - position;
if (remainingBytes > 0 && leftover > 0) {
remainingBytes -= writeSpan(out, first, position, qMin(remainingBytes, leftover), spans);
position = _buffer.pos();
if (_writePosition < position && leftover > 0) {
int start = qMax(position - leftover, _writePosition);
int length = qMin(bytes, position - start);
writeSpan(out, start, length, spans);
writeFullSpans(out, bytes - length, 0, 0, spans);
out << (quint32)0;
return;
}
_writePosition = 0;
}
}
void ReliableChannel::writeFullSpans(QDataStream& out, int bytes, int startingIndex, int position,
QVector<DatagramSequencer::ChannelSpan>& spans) {
int expandedSize = _acknowledged.getSpans().size() + 1;
for (int i = 0; i < expandedSize; i++) {
if (bytes == 0) {
return;
}
int index = (startingIndex + i) % expandedSize;
if (index == _acknowledged.getSpans().size()) {
int leftover = _buffer.pos() - position;
if (leftover > 0) {
int length = qMin(leftover, bytes);
writeSpan(out, position, length, spans);
bytes -= length;
}
position = 0;
} else {
const SpanList::Span& span = _acknowledged.getSpans().at(index);
int length = qMin(span.unset, bytes);
writeSpan(out, position, length, spans);
bytes -= length;
position += (span.unset + span.set);
}
}
}
int ReliableChannel::getBytesToWrite(bool& first, int length) const {
if (first) {
first = false;
return length - (_writePosition % length);
}
return length;
}
int ReliableChannel::writeSpan(QDataStream& out, bool& first, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans) {
if (first) {
first = false;
position = _writePosition % length;
length -= position;
_writePosition += length;
}
int ReliableChannel::writeSpan(QDataStream& out, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans) {
DatagramSequencer::ChannelSpan span = { _index, _offset + position, length };
spans.append(span);
out << (quint32)span.offset;
out << (quint32)length;
out << (quint32)span.offset;
_buffer.writeToStream(position, length, out);
_writePosition = position + length;
return length;
}
@ -700,17 +778,28 @@ void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& spa
_buffer.seek(_buffer.size());
_offset += advancement;
_writePosition = qMax(_writePosition - advancement, 0);
}
_writePosition = qMax(_writePosition - advancement, 0);
}
}
void ReliableChannel::spanLost(int packetNumber, int nextOutgoingPacketNumber) {
// reset the write position up to once each round trip time
if (packetNumber >= _writePositionResetPacketNumber) {
_writePosition = 0;
_writePositionResetPacketNumber = nextOutgoingPacketNumber;
}
}
void ReliableChannel::readData(QDataStream& in) {
quint32 segments;
in >> segments;
bool readSome = false;
for (quint32 i = 0; i < segments; i++) {
quint32 offset, size;
in >> offset >> size;
forever {
quint32 size;
in >> size;
if (size == 0) {
break;
}
quint32 offset;
in >> offset;
int position = offset - _offset;
int end = position + size;

View file

@ -99,6 +99,11 @@ public:
/// Returns the intput channel at the specified index, creating it if necessary.
ReliableChannel* getReliableInputChannel(int index = 0);
/// Starts a packet group.
/// \param desiredPackets the number of packets we'd like to write in the group
/// \return the number of packets to write in the group
int startPacketGroup(int desiredPackets = 1);
/// Starts a new packet for transmission.
/// \return a reference to the Bitstream to use for writing to the packet
Bitstream& startPacket();
@ -165,6 +170,9 @@ private:
/// Notes that the described send was acknowledged by the other party.
void sendRecordAcknowledged(const SendRecord& record);
/// Notes that the described send was lost in transit.
void sendRecordLost(const SendRecord& record);
/// Appends some reliable data to the outgoing packet.
void appendReliableData(int bytes, QVector<ChannelSpan>& spans);
@ -200,6 +208,12 @@ private:
int _maxPacketSize;
float _packetsPerGroup;
float _packetsToWrite;
float _slowStartThreshold;
int _packetRateIncreasePacketNumber;
int _packetRateDecreasePacketNumber;
QHash<int, ReliableChannel*> _reliableOutputChannels;
QHash<int, ReliableChannel*> _reliableInputChannels;
};
@ -343,10 +357,12 @@ private:
ReliableChannel(DatagramSequencer* sequencer, int index, bool output);
void writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans);
int getBytesToWrite(bool& first, int length) const;
int writeSpan(QDataStream& out, bool& first, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans);
void writeFullSpans(QDataStream& out, int bytes, int startingIndex, int position,
QVector<DatagramSequencer::ChannelSpan>& spans);
int writeSpan(QDataStream& out, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans);
void spanAcknowledged(const DatagramSequencer::ChannelSpan& span);
void spanLost(int packetNumber, int nextOutgoingPacketNumber);
void readData(QDataStream& in);
@ -359,6 +375,7 @@ private:
int _offset;
int _writePosition;
int _writePositionResetPacketNumber;
SpanList _acknowledged;
bool _messagesEnabled;
};

View file

@ -28,12 +28,127 @@ MetavoxelTests::MetavoxelTests(int& argc, char** argv) :
QCoreApplication(argc, argv) {
}
static bool testSpanList() {
SpanList list;
if (list.getTotalSet() != 0 || !list.getSpans().isEmpty()) {
qDebug() << "Failed empty state test.";
return true;
}
if (list.set(-5, 15) != 10 || list.getTotalSet() != 0 || !list.getSpans().isEmpty()) {
qDebug() << "Failed initial front set.";
return true;
}
if (list.set(5, 15) != 0 || list.getTotalSet() != 15 || list.getSpans().size() != 1 ||
list.getSpans().at(0).unset != 5 || list.getSpans().at(0).set != 15) {
qDebug() << "Failed initial middle set.";
return true;
}
if (list.set(25, 5) != 0 || list.getTotalSet() != 20 || list.getSpans().size() != 2 ||
list.getSpans().at(0).unset != 5 || list.getSpans().at(0).set != 15 ||
list.getSpans().at(1).unset != 5 || list.getSpans().at(1).set != 5) {
qDebug() << "Failed initial end set.";
return true;
}
if (list.set(1, 3) != 0 || list.getTotalSet() != 23 || list.getSpans().size() != 3 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 ||
list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 15 ||
list.getSpans().at(2).unset != 5 || list.getSpans().at(2).set != 5) {
qDebug() << "Failed second front set.";
return true;
}
SpanList threeSet = list;
if (list.set(20, 5) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 ||
list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) {
qDebug() << "Failed minimal join last two.";
return true;
}
list = threeSet;
if (list.set(5, 25) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 ||
list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) {
qDebug() << "Failed maximal join last two.";
return true;
}
list = threeSet;
if (list.set(10, 18) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 ||
list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) {
qDebug() << "Failed middle join last two.";
return true;
}
list = threeSet;
if (list.set(10, 18) != 0 || list.getTotalSet() != 28 || list.getSpans().size() != 2 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 ||
list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 25) {
qDebug() << "Failed middle join last two.";
return true;
}
list = threeSet;
if (list.set(2, 26) != 0 || list.getTotalSet() != 29 || list.getSpans().size() != 1 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 29) {
qDebug() << "Failed middle join three.";
return true;
}
list = threeSet;
if (list.set(0, 2) != 4 || list.getTotalSet() != 20 || list.getSpans().size() != 2 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 15 ||
list.getSpans().at(1).unset != 5 || list.getSpans().at(1).set != 5) {
qDebug() << "Failed front advance.";
return true;
}
list = threeSet;
if (list.set(-10, 15) != 20 || list.getTotalSet() != 5 || list.getSpans().size() != 1 ||
list.getSpans().at(0).unset != 5 || list.getSpans().at(0).set != 5) {
qDebug() << "Failed middle advance.";
return true;
}
list = threeSet;
if (list.set(-10, 38) != 30 || list.getTotalSet() != 0 || list.getSpans().size() != 0) {
qDebug() << "Failed end advance.";
return true;
}
list = threeSet;
if (list.set(-10, 100) != 90 || list.getTotalSet() != 0 || list.getSpans().size() != 0) {
qDebug() << "Failed clobber advance.";
return true;
}
list = threeSet;
if (list.set(21, 3) != 0 || list.getTotalSet() != 26 || list.getSpans().size() != 4 ||
list.getSpans().at(0).unset != 1 || list.getSpans().at(0).set != 3 ||
list.getSpans().at(1).unset != 1 || list.getSpans().at(1).set != 15 ||
list.getSpans().at(2).unset != 1 || list.getSpans().at(2).set != 3 ||
list.getSpans().at(3).unset != 1 || list.getSpans().at(3).set != 5) {
qDebug() << "Failed adding fourth.";
return true;
}
return false;
}
static int datagramsSent = 0;
static int datagramsReceived = 0;
static int bytesSent = 0;
static int bytesReceived = 0;
static int maxDatagramsPerPacket = 0;
static int maxBytesPerPacket = 0;
static int groupsSent = 0;
static int maxPacketsPerGroup = 0;
static int highPriorityMessagesSent = 0;
static int highPriorityMessagesReceived = 0;
static int unreliableMessagesSent = 0;
@ -332,10 +447,19 @@ bool MetavoxelTests::run() {
QStringList arguments = this->arguments();
int test = (arguments.size() > 1) ? arguments.at(1).toInt() : 0;
if (test == 0 || test == 1) {
qDebug() << "Running SpanList test...";
qDebug();
if (testSpanList()) {
return true;
}
}
QByteArray datagramHeader("testheader");
const int SIMULATION_ITERATIONS = 10000;
if (test == 0 || test == 1) {
qDebug() << "Running transmission tests...";
if (test == 0 || test == 2) {
qDebug() << "Running transmission test...";
qDebug();
// create two endpoints with the same header
@ -364,8 +488,40 @@ bool MetavoxelTests::run() {
qDebug();
}
if (test == 0 || test == 2) {
qDebug() << "Running serialization tests...";
if (test == 0 || test == 3) {
qDebug() << "Running congestion control test...";
qDebug();
// clear the stats
streamedBytesSent = streamedBytesReceived = datagramsSent = bytesSent = 0;
datagramsReceived = bytesReceived = maxDatagramsPerPacket = maxBytesPerPacket = 0;
// create two endpoints with the same header
Endpoint alice(datagramHeader, Endpoint::CONGESTION_MODE), bob(datagramHeader, Endpoint::CONGESTION_MODE);
alice.setOther(&bob);
bob.setOther(&alice);
// perform a large number of simulation iterations
for (int i = 0; i < SIMULATION_ITERATIONS; i++) {
if (alice.simulate(i) || bob.simulate(i)) {
return true;
}
}
qDebug() << "Sent" << streamedBytesSent << "streamed bytes, received" << streamedBytesReceived;
qDebug() << "Sent" << datagramsSent << "datagrams in" << groupsSent << "groups with" << bytesSent <<
"bytes, received" << datagramsReceived << "with" << bytesReceived << "bytes";
qDebug() << "Max" << maxDatagramsPerPacket << "datagrams," << maxBytesPerPacket << "bytes per packet";
qDebug() << "Max" << maxPacketsPerGroup << "packets per group";
qDebug() << "Average" << (bytesReceived / datagramsReceived) << "bytes per datagram," <<
(datagramsSent / groupsSent) << "datagrams per group";
qDebug() << "Speed:" << (bytesReceived / SIMULATION_ITERATIONS) << "bytes per iteration";
qDebug() << "Efficiency:" << ((float)streamedBytesReceived / bytesReceived);
}
if (test == 0 || test == 4) {
qDebug() << "Running serialization test...";
qDebug();
if (testSerialization(Bitstream::HASH_METADATA) || testSerialization(Bitstream::FULL_METADATA)) {
@ -373,8 +529,8 @@ bool MetavoxelTests::run() {
}
}
if (test == 0 || test == 3) {
qDebug() << "Running metavoxel data tests...";
if (test == 0 || test == 5) {
qDebug() << "Running metavoxel data test...";
qDebug();
// clear the stats
@ -498,9 +654,22 @@ Endpoint::Endpoint(const QByteArray& datagramHeader, Mode mode) :
ReliableChannel* output = _sequencer->getReliableOutputChannel(1);
output->setPriority(0.25f);
output->setMessagesEnabled(false);
const int MIN_STREAM_BYTES = 100000;
const int MAX_STREAM_BYTES = 200000;
QByteArray bytes = createRandomBytes(MIN_STREAM_BYTES, MAX_STREAM_BYTES);
QByteArray bytes;
if (mode == CONGESTION_MODE) {
const int HUGE_STREAM_BYTES = 60 * 1024 * 1024;
bytes = createRandomBytes(HUGE_STREAM_BYTES, HUGE_STREAM_BYTES);
// initialize the pipeline
for (int i = 0; i < 10; i++) {
_pipeline.append(ByteArrayVector());
}
_remainingPipelineCapacity = 100 * 1024;
} else {
const int MIN_STREAM_BYTES = 100000;
const int MAX_STREAM_BYTES = 200000;
bytes = createRandomBytes(MIN_STREAM_BYTES, MAX_STREAM_BYTES);
}
_dataStreamed.append(bytes);
output->getBuffer().write(bytes);
streamedBytesSent += bytes.size();
@ -633,10 +802,9 @@ int MutateVisitor::visit(MetavoxelInfo& info) {
bool Endpoint::simulate(int iterationNumber) {
// update/send our delayed datagrams
for (QList<QPair<QByteArray, int> >::iterator it = _delayedDatagrams.begin(); it != _delayedDatagrams.end(); ) {
for (QList<ByteArrayIntPair>::iterator it = _delayedDatagrams.begin(); it != _delayedDatagrams.end(); ) {
if (it->second-- == 1) {
_other->_sequencer->receivedDatagram(it->first);
datagramsReceived++;
_other->receiveDatagram(it->first);
it = _delayedDatagrams.erase(it);
} else {
@ -646,7 +814,37 @@ bool Endpoint::simulate(int iterationNumber) {
int oldDatagramsSent = datagramsSent;
int oldBytesSent = bytesSent;
if (_mode == METAVOXEL_CLIENT_MODE) {
if (_mode == CONGESTION_MODE) {
// cycle our pipeline
ByteArrayVector datagrams = _pipeline.takeLast();
_pipeline.prepend(ByteArrayVector());
foreach (const QByteArray& datagram, datagrams) {
_sequencer->receivedDatagram(datagram);
datagramsReceived++;
bytesReceived += datagram.size();
_remainingPipelineCapacity += datagram.size();
}
int packetCount = _sequencer->startPacketGroup();
groupsSent++;
maxPacketsPerGroup = qMax(maxPacketsPerGroup, packetCount);
for (int i = 0; i < packetCount; i++) {
oldDatagramsSent = datagramsSent;
oldBytesSent = bytesSent;
Bitstream& out = _sequencer->startPacket();
out << QVariant();
_sequencer->endPacket();
maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent);
maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent);
// record the send
SendRecord record = { _sequencer->getOutgoingPacketNumber() };
_sendRecords.append(record);
}
return false;
} else if (_mode == METAVOXEL_CLIENT_MODE) {
Bitstream& out = _sequencer->startPacket();
ClientStateMessage state = { _lod };
@ -748,29 +946,28 @@ void Endpoint::sendDatagram(const QByteArray& datagram) {
// some datagrams are dropped
const float DROP_PROBABILITY = 0.1f;
if (randFloat() < DROP_PROBABILITY) {
float probabilityMultiplier = (_mode == CONGESTION_MODE) ? 0.01f : 1.0f;
if (randFloat() < DROP_PROBABILITY * probabilityMultiplier) {
return;
}
// some are received out of order
const float REORDER_PROBABILITY = 0.1f;
if (randFloat() < REORDER_PROBABILITY) {
if (randFloat() < REORDER_PROBABILITY * probabilityMultiplier) {
const int MIN_DELAY = 1;
const int MAX_DELAY = 5;
// have to copy the datagram; the one we're passed is a reference to a shared buffer
_delayedDatagrams.append(QPair<QByteArray, int>(QByteArray(datagram.constData(), datagram.size()),
_delayedDatagrams.append(ByteArrayIntPair(QByteArray(datagram.constData(), datagram.size()),
randIntInRange(MIN_DELAY, MAX_DELAY)));
// and some are duplicated
const float DUPLICATE_PROBABILITY = 0.01f;
if (randFloat() > DUPLICATE_PROBABILITY) {
if (randFloat() > DUPLICATE_PROBABILITY * probabilityMultiplier) {
return;
}
}
_other->_sequencer->receivedDatagram(datagram);
datagramsReceived++;
bytesReceived += datagram.size();
_other->receiveDatagram(datagram);
}
void Endpoint::handleHighPriorityMessage(const QVariant& message) {
@ -788,6 +985,15 @@ void Endpoint::handleHighPriorityMessage(const QVariant& message) {
}
void Endpoint::readMessage(Bitstream& in) {
if (_mode == CONGESTION_MODE) {
QVariant message;
in >> message;
// record the receipt
ReceiveRecord record = { _sequencer->getIncomingPacketNumber() };
_receiveRecords.append(record);
return;
}
if (_mode == METAVOXEL_CLIENT_MODE) {
QVariant message;
in >> message;
@ -887,6 +1093,20 @@ void Endpoint::clearReceiveRecordsBefore(int index) {
_receiveRecords.erase(_receiveRecords.begin(), _receiveRecords.begin() + index + 1);
}
void Endpoint::receiveDatagram(const QByteArray& datagram) {
if (_mode == CONGESTION_MODE) {
if (datagram.size() <= _remainingPipelineCapacity) {
// have to copy the datagram; the one we're passed is a reference to a shared buffer
_pipeline[0].append(QByteArray(datagram.constData(), datagram.size()));
_remainingPipelineCapacity -= datagram.size();
}
} else {
_sequencer->receivedDatagram(datagram);
datagramsReceived++;
bytesReceived += datagram.size();
}
}
void Endpoint::handleMessage(const QVariant& message, Bitstream& in) {
int userType = message.userType();
if (userType == ClientStateMessage::Type) {

View file

@ -40,7 +40,7 @@ class Endpoint : public QObject {
public:
enum Mode { BASIC_PEER_MODE, METAVOXEL_SERVER_MODE, METAVOXEL_CLIENT_MODE };
enum Mode { BASIC_PEER_MODE, CONGESTION_MODE, METAVOXEL_SERVER_MODE, METAVOXEL_CLIENT_MODE };
Endpoint(const QByteArray& datagramHeader, Mode mode = BASIC_PEER_MODE);
@ -63,6 +63,8 @@ private slots:
private:
void receiveDatagram(const QByteArray& datagram);
void handleMessage(const QVariant& message, Bitstream& in);
class SendRecord {
@ -96,7 +98,14 @@ private:
SharedObjectPointer _sphere;
Endpoint* _other;
QList<QPair<QByteArray, int> > _delayedDatagrams;
typedef QPair<QByteArray, int> ByteArrayIntPair;
QList<ByteArrayIntPair> _delayedDatagrams;
typedef QVector<QByteArray> ByteArrayVector;
QList<ByteArrayVector> _pipeline;
int _remainingPipelineCapacity;
float _highPriorityMessagesToSend;
QVariantList _highPriorityMessagesSent;
QList<SequencedTestMessage> _unreliableMessagesSent;