I think this covers the basic bits necessary for the reliable streaming. Of

course, I haven't tested it yet.
This commit is contained in:
Andrzej Kapolka 2014-02-06 12:27:46 -08:00
parent 6b4c8b62c6
commit f6320d3a81
2 changed files with 113 additions and 27 deletions

View file

@ -312,7 +312,7 @@ SpanList::SpanList() : _totalSet(0) {
}
int SpanList::set(int offset, int length) {
// see if it intersects the front of the list
// if we intersect the front of the list, consume beginning spans and return advancement
if (offset <= 0) {
int intersection = offset + length;
return (intersection > 0) ? setSpans(_spans.begin(), intersection) : 0;
@ -321,15 +321,36 @@ int SpanList::set(int offset, int length) {
// look for an intersection within the list
int position = 0;
for (QList<Span>::iterator it = _spans.begin(); it != _spans.end(); it++) {
// if we intersect the unset portion, contract it
position += it->unset;
if (offset <= position) {
int remove = position - offset;
it->unset -= remove;
// if we continue into the set portion, expand it and consume following spans
int extra = offset + length - position;
if (extra >= 0) {
int amount = setSpans(it + 1, extra);
it->set += amount;
_totalSet += amount;
// otherwise, insert a new span
} else {
Span span = { it->unset, length + extra };
_spans.insert(it, span);
it->unset = -extra;
_totalSet += span.set;
}
return 0;
}
// if we intersect the set portion, expand it and consume following spans
position += it->set;
if (offset <= position) {
int extra = offset + length - position;
int amount = setSpans(it + 1, extra);
it->set += amount;
_totalSet += amount;
return 0;
}
}
@ -337,6 +358,7 @@ int SpanList::set(int offset, int length) {
// add to end of list
Span span = { offset - position, length };
_spans.append(span);
_totalSet += length;
return 0;
}
@ -354,6 +376,7 @@ int SpanList::setSpans(QList<Span>::iterator it, int length) {
remainingLength = qMax(remainingLength - combined, 0);
totalRemoved += combined;
it = _spans.erase(it);
_totalSet -= it->set;
}
return qMax(length, totalRemoved);
}
@ -377,7 +400,8 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
_dataStream(&_buffer),
_bitstream(_dataStream),
_priority(1.0f),
_offset(0) {
_offset(0),
_writePosition(0) {
_buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly);
_dataStream.setByteOrder(QDataStream::LittleEndian);
@ -386,25 +410,78 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
}
void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) {
// nothing for now
out << (quint32)0;
// find out how many spans we want to write
int spanCount = 0;
int remainingBytes = bytes;
bool first = true;
while (remainingBytes > 0) {
int position = 0;
foreach (const SpanList::Span& span, _acknowledged.getSpans()) {
if (remainingBytes <= 0) {
break;
}
spanCount++;
remainingBytes -= getBytesToWrite(first, span.unset);
position += (span.unset + span.set);
}
int leftover = _buffer.pos() - position;
if (remainingBytes > 0 && leftover > 0) {
spanCount++;
remainingBytes -= getBytesToWrite(first, leftover);
}
}
// write the count and the spans
out << (quint32)spanCount;
remainingBytes = bytes;
first = true;
while (remainingBytes > 0) {
int position = 0;
foreach (const SpanList::Span& span, _acknowledged.getSpans()) {
if (remainingBytes <= 0) {
break;
}
remainingBytes -= writeSpan(out, first, position, qMin(remainingBytes, span.unset), spans);
position += (span.unset + span.set);
}
if (remainingBytes > 0 && position < _buffer.pos()) {
remainingBytes -= writeSpan(out, first, position, qMin(remainingBytes, (int)(_buffer.pos() - position)), spans);
}
}
}
void ReliableChannel::writeSpan(QDataStream& out, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans) {
int ReliableChannel::getBytesToWrite(bool& first, int length) const {
if (first) {
first = false;
return length - (_writePosition % length);
}
return length;
}
int ReliableChannel::writeSpan(QDataStream& out, bool& first, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans) {
if (first) {
first = false;
position = _writePosition % length;
length -= position;
_writePosition += length;
}
DatagramSequencer::ChannelSpan span = { _index, _offset + position, length };
spans.append(span);
out << (quint32)span.offset;
out << (quint32)length;
out.writeRawData(_buffer.data().constData() + position, length);
return length;
}
void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& span) {
int acknowledged = _acknowledged.set(span.offset - _offset, span.length);
if (acknowledged > 0) {
int advancement = _acknowledged.set(span.offset - _offset, span.length);
if (advancement > 0) {
// TODO: better way of pruning buffer
_buffer.buffer() = _buffer.buffer().right(_buffer.size() - acknowledged);
_buffer.buffer() = _buffer.buffer().right(_buffer.size() - advancement);
_buffer.seek(_buffer.size());
_offset += acknowledged;
_offset += advancement;
_writePosition = qMax(_writePosition - advancement, 0);
}
}
@ -428,13 +505,13 @@ void ReliableChannel::readData(QDataStream& in) {
} else {
in.readRawData(_assemblyBuffer.data() + position, size);
}
int acknowledged = _acknowledged.set(position, size);
if (acknowledged > 0) {
int advancement = _acknowledged.set(position, size);
if (advancement > 0) {
// TODO: better way of pruning buffer
_buffer.buffer().append(_assemblyBuffer.constData(), acknowledged);
_buffer.buffer().append(_assemblyBuffer.constData(), advancement);
emit _buffer.readyRead();
_assemblyBuffer = _assemblyBuffer.right(_assemblyBuffer.size() - acknowledged);
_offset += acknowledged;
_assemblyBuffer = _assemblyBuffer.right(_assemblyBuffer.size() - advancement);
_offset += advancement;
}
}

View file

@ -167,26 +167,33 @@ private:
QHash<int, ReliableChannel*> _reliableInputChannels;
};
/// A list of contiguous spans, alternating between set and unset.
/// A list of contiguous spans, alternating between set and unset. Conceptually, the list is preceeded by a set
/// span of infinite length and followed by an unset span of infinite length. Within those bounds, it alternates
/// between unset and set.
class SpanList {
public:
SpanList();
int getTotalSet() const { return _totalSet; }
/// Sets a region of the list.
/// \return the set length at the beginning of the list
int set(int offset, int length);
private:
class Span {
public:
int unset;
int set;
};
SpanList();
const QList<Span>& getSpans() const { return _spans; }
/// Returns the total length set.
int getTotalSet() const { return _totalSet; }
/// Sets a region of the list.
/// \return the advancement of the set length at the beginning of the list
int set(int offset, int length);
private:
/// Sets the spans starting at the specified iterator, consuming at least the given length.
/// \return the actual amount set, which may be greater if we ran into an existing set span
int setSpans(QList<Span>::iterator it, int length);
QList<Span> _spans;
@ -222,7 +229,8 @@ private:
ReliableChannel(DatagramSequencer* sequencer, int index, bool output);
void writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans);
void writeSpan(QDataStream& out, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans);
int getBytesToWrite(bool& first, int length) const;
int writeSpan(QDataStream& out, bool& first, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans);
void spanAcknowledged(const DatagramSequencer::ChannelSpan& span);
@ -236,6 +244,7 @@ private:
float _priority;
int _offset;
int _writePosition;
SpanList _acknowledged;
};