Circular buffer work.

This commit is contained in:
Andrzej Kapolka 2014-02-09 22:29:23 -08:00
parent 8c9f06ceec
commit f81a9d9fe8
4 changed files with 253 additions and 36 deletions

View file

@ -311,6 +311,174 @@ void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
}
}
const int INITIAL_CIRCULAR_BUFFER_CAPACITY = 16;
CircularBuffer::CircularBuffer(QObject* parent) :
QIODevice(parent),
_data(INITIAL_CIRCULAR_BUFFER_CAPACITY, 0),
_position(0),
_size(0),
_offset(0) {
}
void CircularBuffer::append(const char* data, int length) {
// resize to fit
int oldSize = _size;
resize(_size + length);
// write our data in up to two segments: one from the position to the end, one from the beginning
int end = (_position + oldSize) % _data.size();
int firstSegment = qMin(length, _data.size() - end);
memcpy(_data.data() + end, data, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
memcpy(_data.data(), data + firstSegment, secondSegment);
}
}
void CircularBuffer::remove(int length) {
_position = (_position + length) % _data.size();
_size -= length;
}
QByteArray CircularBuffer::readBytes(int offset, int length) const {
// write in up to two segments
QByteArray array;
int start = (_position + offset) % _data.size();
int firstSegment = qMin(length, _data.size() - start);
array.append(_data.constData() + start, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
array.append(_data.constData(), secondSegment);
}
return array;
}
void CircularBuffer::writeToStream(int offset, int length, QDataStream& out) const {
// write in up to two segments
int start = (_position + offset) % _data.size();
int firstSegment = qMin(length, _data.size() - start);
out.writeRawData(_data.constData() + start, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
out.writeRawData(_data.constData(), secondSegment);
}
}
void CircularBuffer::readFromStream(int offset, int length, QDataStream& in) {
// resize to fit
int requiredSize = offset + length;
if (requiredSize > _size) {
resize(requiredSize);
}
// read in up to two segments
int start = (_position + offset) % _data.size();
int firstSegment = qMin(length, _data.size() - start);
in.readRawData(_data.data() + start, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
in.readRawData(_data.data(), secondSegment);
}
}
void CircularBuffer::appendToBuffer(int offset, int length, CircularBuffer& buffer) const {
// append in up to two segments
int start = (_position + offset) % _data.size();
int firstSegment = qMin(length, _data.size() - start);
buffer.append(_data.constData() + start, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
buffer.append(_data.constData(), secondSegment);
}
}
bool CircularBuffer::atEnd() const {
return _offset >= _size;
}
bool CircularBuffer::canReadLine() const {
for (int offset = _offset; offset < _size; offset++) {
if (_data.at((_position + offset) % _data.size()) == '\n') {
return true;
}
}
return false;
}
bool CircularBuffer::open(OpenMode flags) {
return QIODevice::open(flags | QIODevice::Unbuffered);
}
qint64 CircularBuffer::pos() const {
return _offset;
}
bool CircularBuffer::seek(qint64 pos) {
if (pos < 0 || pos > _size) {
return false;
}
_offset = pos;
return true;
}
qint64 CircularBuffer::size() const {
return _size;
}
qint64 CircularBuffer::readData(char* data, qint64 length) {
int readable = qMin((int)length, _size - _offset);
// read in up to two segments
int start = (_position + _offset) % _data.size();
int firstSegment = qMin((int)length, _data.size() - start);
memcpy(data, _data.constData() + start, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
memcpy(data + firstSegment, _data.constData(), secondSegment);
}
_offset += readable;
return readable;
}
qint64 CircularBuffer::writeData(const char* data, qint64 length) {
// resize to fit
int requiredSize = _offset + length;
if (requiredSize > _size) {
resize(requiredSize);
}
// write in up to two segments
int start = (_position + _offset) % _data.size();
int firstSegment = qMin((int)length, _data.size() - start);
memcpy(_data.data() + start, data, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
memcpy(_data.data(), data + firstSegment, secondSegment);
}
_offset += length;
return length;
}
void CircularBuffer::resize(int size) {
if (size > _data.size()) {
// double our capacity until we can fit the desired length
int newCapacity = _data.size();
do {
newCapacity *= 2;
} while (size > newCapacity);
int oldCapacity = _data.size();
_data.resize(newCapacity);
int trailing = _position + _size - oldCapacity;
if (trailing > 0) {
memcpy(_data.data() + oldCapacity, _data.constData(), trailing);
}
}
_size = size;
}
SpanList::SpanList() : _totalSet(0) {
}
@ -472,16 +640,16 @@ int ReliableChannel::writeSpan(QDataStream& out, bool& first, int position, int
spans.append(span);
out << (quint32)span.offset;
out << (quint32)length;
out.writeRawData(_buffer.data().constData() + position, length);
_buffer.writeToStream(position, length, out);
return length;
}
void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& span) {
int advancement = _acknowledged.set(span.offset - _offset, span.length);
if (advancement > 0) {
// TODO: better way of pruning buffer
_buffer.buffer() = _buffer.buffer().right(_buffer.size() - advancement);
_buffer.remove(advancement);
_buffer.seek(_buffer.size());
_offset += advancement;
_writePosition = qMax(_writePosition - advancement, 0);
}
@ -490,38 +658,40 @@ void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& spa
void ReliableChannel::readData(QDataStream& in) {
quint32 segments;
in >> segments;
bool readSome = false;
for (int i = 0; i < segments; i++) {
quint32 offset, size;
in >> offset >> size;
int position = offset - _offset;
int end = position + size;
if (_assemblyBuffer.size() < end) {
_assemblyBuffer.resize(end);
}
if (end <= 0) {
in.skipRawData(size);
} else if (position < 0) {
in.skipRawData(-position);
in.readRawData(_assemblyBuffer.data(), size + position);
_assemblyBuffer.readFromStream(0, end, in);
} else {
in.readRawData(_assemblyBuffer.data() + position, size);
_assemblyBuffer.readFromStream(position, size, in);
}
int advancement = _acknowledged.set(position, size);
if (advancement > 0) {
// TODO: better way of pruning buffer
_buffer.buffer().append(_assemblyBuffer.constData(), advancement);
emit _buffer.readyRead();
_assemblyBuffer = _assemblyBuffer.right(_assemblyBuffer.size() - advancement);
_assemblyBuffer.appendToBuffer(0, advancement, _buffer);
_assemblyBuffer.remove(advancement);
_offset += advancement;
readSome = true;
}
}
// when the read head is sufficiently advanced into the buffer, prune it off. this along
// with other buffer usages should be replaced with a circular buffer
const int PRUNE_SIZE = 8192;
if (_buffer.pos() > PRUNE_SIZE) {
_buffer.buffer() = _buffer.buffer().right(_buffer.size() - _buffer.pos());
// let listeners know that there's data to read
if (readSome) {
emit _buffer.readyRead();
}
// prune any read data from the buffer
if (_buffer.pos() > 0) {
_buffer.remove((int)_buffer.pos());
_buffer.seek(0);
}
}

View file

@ -167,6 +167,55 @@ private:
QHash<int, ReliableChannel*> _reliableInputChannels;
};
/// A circular buffer, where one may efficiently append data to the end or remove data from the beginning.
class CircularBuffer : public QIODevice {
public:
CircularBuffer(QObject* parent = NULL);
/// Appends data to the end of the buffer.
void append(const QByteArray& data) { append(data.constData(), data.size()); }
/// Appends data to the end of the buffer.
void append(const char* data, int length);
/// Removes data from the beginning of the buffer.
void remove(int length);
/// Reads part of the data from the buffer.
QByteArray readBytes(int offset, int length) const;
/// Writes part of the buffer to the supplied stream.
void writeToStream(int offset, int length, QDataStream& out) const;
/// Reads part of the buffer from the supplied stream.
void readFromStream(int offset, int length, QDataStream& in);
/// Appends part of the buffer to the supplied other buffer.
void appendToBuffer(int offset, int length, CircularBuffer& buffer) const;
virtual bool atEnd() const;
virtual bool canReadLine() const;
virtual bool open(OpenMode flags);
virtual qint64 pos() const;
virtual bool seek(qint64 pos);
virtual qint64 size() const;
protected:
virtual qint64 readData(char* data, qint64 length);
virtual qint64 writeData(const char* data, qint64 length);
private:
void resize(int size);
QByteArray _data;
int _position;
int _size;
int _offset;
};
/// A list of contiguous spans, alternating between set and unset. Conceptually, the list is preceeded by a set
/// span of infinite length and followed by an unset span of infinite length. Within those bounds, it alternates
/// between unset and set.
@ -208,7 +257,7 @@ public:
int getIndex() const { return _index; }
QBuffer& getBuffer() { return _buffer; }
CircularBuffer& getBuffer() { return _buffer; }
QDataStream& getDataStream() { return _dataStream; }
Bitstream& getBitstream() { return _bitstream; }
@ -238,8 +287,8 @@ private:
void readData(QDataStream& in);
int _index;
QBuffer _buffer;
QByteArray _assemblyBuffer;
CircularBuffer _buffer;
CircularBuffer _assemblyBuffer;
QDataStream _dataStream;
Bitstream _bitstream;
float _priority;

View file

@ -8,7 +8,6 @@
#include <stdlib.h>
#include <DatagramSequencer.h>
#include <SharedUtil.h>
#include "MetavoxelTests.h"
@ -88,9 +87,9 @@ Endpoint::Endpoint(const QByteArray& datagramHeader) :
output->setPriority(0.25f);
const int MIN_LOW_PRIORITY_DATA = 100000;
const int MAX_LOW_PRIORITY_DATA = 200000;
_lowPriorityDataStreamed = createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA);
output->getBuffer().write(_lowPriorityDataStreamed);
lowPriorityStreamedBytesSent += _lowPriorityDataStreamed.size();
_lowPriorityDataStreamed.append(createRandomBytes(MIN_LOW_PRIORITY_DATA, MAX_LOW_PRIORITY_DATA));
//output->getBuffer().write(_lowPriorityDataStreamed);
//lowPriorityStreamedBytesSent += _lowPriorityDataStreamed.size();
}
static QVariant createRandomMessage() {
@ -210,26 +209,26 @@ void Endpoint::readMessage(Bitstream& in) {
}
void Endpoint::readReliableChannel() {
QByteArray bytes = _sequencer->getReliableInputChannel()->getBuffer().readAll();
CircularBuffer& buffer = _sequencer->getReliableInputChannel()->getBuffer();
QByteArray bytes = buffer.read(buffer.bytesAvailable());
if (_other->_dataStreamed.size() < bytes.size()) {
throw QString("Received unsent/already sent streamed data.");
}
QByteArray compare = _other->_dataStreamed;
_other->_dataStreamed = _other->_dataStreamed.mid(bytes.size());
compare.truncate(bytes.size());
QByteArray compare = _other->_dataStreamed.readBytes(0, bytes.size());
_other->_dataStreamed.remove(bytes.size());
if (compare != bytes) {
throw QString("Sent/received streamed data mismatch.");
}
}
void Endpoint::readLowPriorityReliableChannel() {
QByteArray bytes = _sequencer->getReliableInputChannel(1)->getBuffer().readAll();
CircularBuffer& buffer = _sequencer->getReliableInputChannel(1)->getBuffer();
QByteArray bytes = buffer.read(buffer.bytesAvailable());
if (_other->_lowPriorityDataStreamed.size() < bytes.size()) {
throw QString("Received unsent/already sent low-priority streamed data.");
}
QByteArray compare = _other->_lowPriorityDataStreamed;
_other->_lowPriorityDataStreamed = _other->_lowPriorityDataStreamed.mid(bytes.size());
compare.truncate(bytes.size());
QByteArray compare = _other->_lowPriorityDataStreamed.readBytes(0, bytes.size());
_other->_lowPriorityDataStreamed.remove(bytes.size());
if (compare != bytes) {
throw QString("Sent/received low-priority streamed data mismatch.");
}

View file

@ -12,9 +12,8 @@
#include <QCoreApplication>
#include <QVariantList>
#include <Bitstream.h>
#include <DatagramSequencer.h>
class DatagramSequencer;
class SequencedTestMessage;
/// Tests various aspects of the metavoxel library.
@ -59,8 +58,8 @@ private:
float _highPriorityMessagesToSend;
QVariantList _highPriorityMessagesSent;
QList<SequencedTestMessage> _unreliableMessagesSent;
QByteArray _dataStreamed;
QByteArray _lowPriorityDataStreamed;
CircularBuffer _dataStreamed;
CircularBuffer _lowPriorityDataStreamed;
};
/// A simple test message.