Basic congestion control using TCP-esque strategy.

This commit is contained in:
Andrzej Kapolka 2014-06-25 15:48:46 -07:00
parent 8999fcef17
commit 705445ce62
3 changed files with 60 additions and 10 deletions

View file

@ -23,6 +23,9 @@ const int MAX_DATAGRAM_SIZE = MAX_PACKET_SIZE;
const int DEFAULT_MAX_PACKET_SIZE = 3000;
// the default slow-start threshold, which will be lowered quickly when we first encounter packet loss
const float DEFAULT_SLOW_START_THRESHOLD = 1000.0f;
DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject* parent) :
QObject(parent),
_outgoingPacketStream(&_outgoingPacketData, QIODevice::WriteOnly),
@ -37,7 +40,12 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader, QObject*
_incomingPacketStream(&_incomingPacketData, QIODevice::ReadOnly),
_inputStream(_incomingPacketStream),
_receivedHighPriorityMessages(0),
_maxPacketSize(DEFAULT_MAX_PACKET_SIZE) {
_maxPacketSize(DEFAULT_MAX_PACKET_SIZE),
_packetsPerGroup(1.0f),
_packetsToWrite(0.0f),
_slowStartThreshold(DEFAULT_SLOW_START_THRESHOLD),
_packetRateIncreasePacketNumber(0),
_packetRateDecreasePacketNumber(0) {
_outgoingPacketStream.setByteOrder(QDataStream::LittleEndian);
_incomingDatagramStream.setByteOrder(QDataStream::LittleEndian);
@ -71,6 +79,14 @@ ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) {
return channel;
}
int DatagramSequencer::startPacketGroup() {
// increment our packet counter and subtract/return the integer portion
_packetsToWrite += _packetsPerGroup;
int wholePackets = (int)_packetsToWrite;
_packetsToWrite -= wholePackets;
return wholePackets;
}
Bitstream& DatagramSequencer::startPacket() {
// start with the list of acknowledgements
_outgoingPacketStream << (quint32)_receiveRecords.size();
@ -256,6 +272,14 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
foreach (const ChannelSpan& span, record.spans) {
getReliableOutputChannel(span.channel)->spanAcknowledged(span);
}
// increase the packet rate with every ack until we pass the slow start threshold; then, every round trip
if (record.packetNumber >= _packetRateIncreasePacketNumber) {
if (_packetsPerGroup >= _slowStartThreshold) {
_packetRateIncreasePacketNumber = _outgoingPacketNumber + 1;
}
_packetsPerGroup += 1.0f;
}
}
void DatagramSequencer::sendRecordLost(const SendRecord& record) {
@ -263,6 +287,13 @@ void DatagramSequencer::sendRecordLost(const SendRecord& record) {
foreach (const ChannelSpan& span, record.spans) {
getReliableOutputChannel(span.channel)->spanLost(record.packetNumber, _outgoingPacketNumber + 1);
}
// halve the rate and remember as threshold
if (record.packetNumber >= _packetRateDecreasePacketNumber) {
_packetsPerGroup = qMax(_packetsPerGroup * 0.5f, 1.0f);
_slowStartThreshold = _packetsPerGroup;
_packetRateDecreasePacketNumber = _outgoingPacketNumber + 1;
}
}
void DatagramSequencer::appendReliableData(int bytes, QVector<ChannelSpan>& spans) {

View file

@ -99,6 +99,10 @@ public:
/// Returns the intput channel at the specified index, creating it if necessary.
ReliableChannel* getReliableInputChannel(int index = 0);
/// Starts a packet group.
/// \return the number of packets to write in the group
int startPacketGroup();
/// Starts a new packet for transmission.
/// \return a reference to the Bitstream to use for writing to the packet
Bitstream& startPacket();
@ -203,6 +207,12 @@ private:
int _maxPacketSize;
float _packetsPerGroup;
float _packetsToWrite;
float _slowStartThreshold;
int _packetRateIncreasePacketNumber;
int _packetRateDecreasePacketNumber;
QHash<int, ReliableChannel*> _reliableOutputChannels;
QHash<int, ReliableChannel*> _reliableInputChannels;
};

View file

@ -652,7 +652,7 @@ Endpoint::Endpoint(const QByteArray& datagramHeader, Mode mode) :
output->setMessagesEnabled(false);
QByteArray bytes;
if (mode == CONGESTION_MODE) {
const int HUGE_STREAM_BYTES = 50 * 1024 * 1024;
const int HUGE_STREAM_BYTES = 60 * 1024 * 1024;
bytes = createRandomBytes(HUGE_STREAM_BYTES, HUGE_STREAM_BYTES);
// initialize the pipeline
@ -820,14 +820,23 @@ bool Endpoint::simulate(int iterationNumber) {
bytesReceived += datagram.size();
_remainingPipelineCapacity += datagram.size();
}
Bitstream& out = _sequencer->startPacket();
out << QVariant();
_sequencer->endPacket();
// record the send
SendRecord record = { _sequencer->getOutgoingPacketNumber() };
_sendRecords.append(record);
int packetCount = _sequencer->startPacketGroup();
for (int i = 0; i < packetCount; i++) {
oldDatagramsSent = datagramsSent;
oldBytesSent = bytesSent;
Bitstream& out = _sequencer->startPacket();
out << QVariant();
_sequencer->endPacket();
maxDatagramsPerPacket = qMax(maxDatagramsPerPacket, datagramsSent - oldDatagramsSent);
maxBytesPerPacket = qMax(maxBytesPerPacket, bytesSent - oldBytesSent);
// record the send
SendRecord record = { _sequencer->getOutgoingPacketNumber() };
_sendRecords.append(record);
}
return false;
} else if (_mode == METAVOXEL_CLIENT_MODE) {
Bitstream& out = _sequencer->startPacket();