Back down the rabbit hole I go, weee!

This commit is contained in:
Andrzej Kapolka 2014-02-05 21:28:36 -08:00
parent 98a432d3e5
commit 6b4c8b62c6
2 changed files with 121 additions and 12 deletions

View file

@ -50,7 +50,7 @@ void DatagramSequencer::sendHighPriorityMessage(const QVariant& data) {
ReliableChannel* DatagramSequencer::getReliableOutputChannel(int index) {
ReliableChannel*& channel = _reliableOutputChannels[index];
if (!channel) {
channel = new ReliableChannel(this, index);
channel = new ReliableChannel(this, index, true);
}
return channel;
}
@ -58,7 +58,7 @@ ReliableChannel* DatagramSequencer::getReliableOutputChannel(int index) {
ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) {
ReliableChannel*& channel = _reliableInputChannels[index];
if (!channel) {
channel = new ReliableChannel(this, index);
channel = new ReliableChannel(this, index, false);
}
return channel;
}
@ -308,8 +308,58 @@ void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
}
}
SpanList::SpanList() : _totalSet(0) {
}
int SpanList::set(int offset, int length) {
// see if it intersects the front of the list
if (offset <= 0) {
int intersection = offset + length;
return (intersection > 0) ? setSpans(_spans.begin(), intersection) : 0;
}
// look for an intersection within the list
int position = 0;
for (QList<Span>::iterator it = _spans.begin(); it != _spans.end(); it++) {
position += it->unset;
if (offset <= position) {
return 0;
}
position += it->set;
if (offset <= position) {
return 0;
}
}
// add to end of list
Span span = { offset - position, length };
_spans.append(span);
return 0;
}
int SpanList::setSpans(QList<Span>::iterator it, int length) {
int remainingLength = length;
int totalRemoved = 0;
for (; it != _spans.end(); it++) {
if (remainingLength < it->unset) {
it->unset -= remainingLength;
totalRemoved += remainingLength;
break;
}
int combined = it->unset + it->set;
remainingLength = qMax(remainingLength - combined, 0);
totalRemoved += combined;
it = _spans.erase(it);
}
return qMax(length, totalRemoved);
}
int ReliableChannel::getBytesAvailable() const {
return _buffer.size() - _acknowledged;
return _buffer.size() - _acknowledged.getTotalSet();
}
void ReliableChannel::sendMessage(const QVariant& message) {
@ -321,16 +371,15 @@ void ReliableChannel::sendClearSharedObjectMessage(int id) {
sendMessage(QVariant::fromValue(message));
}
ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index) :
ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool output) :
QObject(sequencer),
_index(index),
_dataStream(&_buffer),
_bitstream(_dataStream),
_priority(1.0f),
_offset(0),
_acknowledged(0) {
_offset(0) {
_buffer.open(QIODevice::WriteOnly);
_buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly);
_dataStream.setByteOrder(QDataStream::LittleEndian);
connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
@ -350,17 +399,50 @@ void ReliableChannel::writeSpan(QDataStream& out, int position, int length, QVec
}
void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& span) {
// no-op for now
int acknowledged = _acknowledged.set(span.offset - _offset, span.length);
if (acknowledged > 0) {
// TODO: better way of pruning buffer
_buffer.buffer() = _buffer.buffer().right(_buffer.size() - acknowledged);
_buffer.seek(_buffer.size());
_offset += acknowledged;
}
}
void ReliableChannel::readData(QDataStream& in) {
quint32 segments;
in >> segments;
for (int i = 0; i < segments; i++) {
// ignore for now
quint32 offset, size;
in >> offset >> size;
in.skipRawData(size);
int position = offset - _offset;
int end = position + size;
if (_assemblyBuffer.size() < end) {
_assemblyBuffer.resize(end);
}
if (end <= 0) {
in.skipRawData(size);
} else if (position < 0) {
in.skipRawData(-position);
in.readRawData(_assemblyBuffer.data(), size + position);
} else {
in.readRawData(_assemblyBuffer.data() + position, size);
}
int acknowledged = _acknowledged.set(position, size);
if (acknowledged > 0) {
// TODO: better way of pruning buffer
_buffer.buffer().append(_assemblyBuffer.constData(), acknowledged);
emit _buffer.readyRead();
_assemblyBuffer = _assemblyBuffer.right(_assemblyBuffer.size() - acknowledged);
_offset += acknowledged;
}
}
// TODO: better way of pruning buffer?
const int PRUNE_SIZE = 8192;
if (_buffer.pos() > PRUNE_SIZE) {
_buffer.buffer() = _buffer.buffer().right(_buffer.size() - _buffer.pos());
_buffer.seek(0);
}
}

View file

@ -167,6 +167,32 @@ private:
QHash<int, ReliableChannel*> _reliableInputChannels;
};
/// A list of contiguous spans, alternating between set and unset.
class SpanList {
public:
SpanList();
int getTotalSet() const { return _totalSet; }
/// Sets a region of the list.
/// \return the set length at the beginning of the list
int set(int offset, int length);
private:
class Span {
public:
int unset;
int set;
};
int setSpans(QList<Span>::iterator it, int length);
QList<Span> _spans;
int _totalSet;
};
/// Represents a single reliable channel multiplexed onto the datagram sequence.
class ReliableChannel : public QObject {
Q_OBJECT
@ -193,7 +219,7 @@ private:
friend class DatagramSequencer;
ReliableChannel(DatagramSequencer* sequencer, int index);
ReliableChannel(DatagramSequencer* sequencer, int index, bool output);
void writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans);
void writeSpan(QDataStream& out, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans);
@ -204,12 +230,13 @@ private:
int _index;
QBuffer _buffer;
QByteArray _assemblyBuffer;
QDataStream _dataStream;
Bitstream _bitstream;
float _priority;
int _offset;
int _acknowledged;
SpanList _acknowledged;
};
#endif /* defined(__interface__DatagramSequencer__) */