More work on reliable transfer.

This commit is contained in:
Andrzej Kapolka 2014-02-05 16:19:16 -08:00
parent 5e9916302b
commit e77feb8efd
3 changed files with 166 additions and 34 deletions

View file

@ -69,6 +69,8 @@ public:
int takePersistentID(T value) { return _persistentIDs.take(value); }
void removePersistentValue(int id) { _persistentValues.remove(id); }
RepeatedValueStreamer& operator<<(T value);
RepeatedValueStreamer& operator>>(T& value);
@ -227,6 +229,9 @@ public:
/// Persists a set of read mappings recorded earlier.
void persistReadMappings(const ReadMappings& mappings);
/// Removes a shared object from the read mappings.
void clearSharedObject(int id) { _sharedObjectStreamer.removePersistentValue(id); }
Bitstream& operator<<(bool value);
Bitstream& operator>>(bool& value);

View file

@ -11,6 +11,7 @@
#include <QtDebug>
#include "DatagramSequencer.h"
#include "MetavoxelMessages.h"
const int MAX_DATAGRAM_SIZE = 1500;
@ -41,10 +42,6 @@ DatagramSequencer::DatagramSequencer(const QByteArray& datagramHeader) :
memcpy(_outgoingDatagram.data(), datagramHeader.constData(), _datagramHeaderSize);
}
void DatagramSequencer::sendReliableMessage(const QVariant& data, int channel) {
}
void DatagramSequencer::sendHighPriorityMessage(const QVariant& data) {
HighPriorityMessage message = { data, _outgoingPacketNumber + 1 };
_highPriorityMessages.append(message);
@ -53,7 +50,7 @@ void DatagramSequencer::sendHighPriorityMessage(const QVariant& data) {
ReliableChannel* DatagramSequencer::getReliableOutputChannel(int index) {
ReliableChannel*& channel = _reliableOutputChannels[index];
if (!channel) {
channel = new ReliableChannel(this);
channel = new ReliableChannel(this, index);
}
return channel;
}
@ -61,7 +58,7 @@ ReliableChannel* DatagramSequencer::getReliableOutputChannel(int index) {
ReliableChannel* DatagramSequencer::getReliableInputChannel(int index) {
ReliableChannel*& channel = _reliableInputChannels[index];
if (!channel) {
channel = new ReliableChannel(this);
channel = new ReliableChannel(this, index);
}
return channel;
}
@ -88,14 +85,15 @@ void DatagramSequencer::endPacket() {
// if we have space remaining, send some data from our reliable channels
int remaining = _maxPacketSize - _outgoingPacketStream.device()->pos();
const int MINIMUM_RELIABLE_SIZE = sizeof(quint32) * 4; // count, channel number, offset, size
const int MINIMUM_RELIABLE_SIZE = sizeof(quint32) * 5; // count, channel number, segment count, offset, size
QVector<ChannelSpan> spans;
if (remaining > MINIMUM_RELIABLE_SIZE) {
appendReliableData(remaining);
appendReliableData(remaining, spans);
} else {
_outgoingPacketStream << (quint32)0;
}
sendPacket(QByteArray::fromRawData(_outgoingPacketData.constData(), _outgoingPacketStream.device()->pos()));
sendPacket(QByteArray::fromRawData(_outgoingPacketData.constData(), _outgoingPacketStream.device()->pos()), spans);
_outgoingPacketStream.device()->seek(0);
}
@ -180,7 +178,7 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
QVariant data;
_inputStream >> data;
if (i >= _receivedHighPriorityMessages) {
emit receivedHighPriorityMessage(data);
handleHighPriorityMessage(data);
}
}
_receivedHighPriorityMessages = highPriorityMessageCount;
@ -206,7 +204,9 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
}
void DatagramSequencer::sendClearSharedObjectMessage(int id) {
qDebug() << "cleared " << id;
// for now, high priority
ClearSharedObjectMessage message = { id };
sendHighPriorityMessage(QVariant::fromValue(message));
}
void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
@ -227,18 +227,46 @@ void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
break;
}
}
}
void DatagramSequencer::appendReliableData(int bytes) {
_outgoingPacketStream << (quint32)0;
for (QHash<int, ReliableChannel*>::const_iterator it = _reliableOutputChannels.constBegin();
it != _reliableOutputChannels.constEnd(); it++) {
// acknowledge the received spans
foreach (const ChannelSpan& span, record.spans) {
getReliableOutputChannel(span.channel)->spanAcknowledged(span);
}
}
void DatagramSequencer::sendPacket(const QByteArray& packet) {
void DatagramSequencer::appendReliableData(int bytes, QVector<ChannelSpan>& spans) {
// gather total number of bytes to write, priority
int totalBytes = 0;
float totalPriority = 0.0f;
int totalChannels = 0;
foreach (ReliableChannel* channel, _reliableOutputChannels) {
int channelBytes = channel->getBytesAvailable();
if (channelBytes > 0) {
totalBytes += channelBytes;
totalPriority += channel->getPriority();
totalChannels++;
}
}
_outgoingPacketStream << (quint32)totalChannels;
if (totalChannels == 0) {
return;
}
totalBytes = qMin(bytes, totalBytes);
foreach (ReliableChannel* channel, _reliableOutputChannels) {
int channelBytes = channel->getBytesAvailable();
if (channelBytes == 0) {
continue;
}
_outgoingPacketStream << (quint32)channel->getIndex();
channelBytes = qMin(channelBytes, (int)(totalBytes * channel->getPriority() / totalPriority));
channel->writeData(_outgoingPacketStream, channelBytes, spans);
totalBytes -= channelBytes;
totalPriority -= channel->getPriority();
}
}
void DatagramSequencer::sendPacket(const QByteArray& packet, const QVector<ChannelSpan>& spans) {
QIODeviceOpener opener(&_outgoingDatagramBuffer, QIODevice::WriteOnly);
// increment the packet number
@ -246,7 +274,7 @@ void DatagramSequencer::sendPacket(const QByteArray& packet) {
// record the send
SendRecord record = { _outgoingPacketNumber, _receiveRecords.isEmpty() ? 0 : _receiveRecords.last().packetNumber,
_outputStream.getAndResetWriteMappings() };
_outputStream.getAndResetWriteMappings(), spans };
_sendRecords.append(record);
// write the sequence number and size, which are the same between all fragments
@ -271,18 +299,36 @@ void DatagramSequencer::sendPacket(const QByteArray& packet) {
} while(offset < packet.size());
}
void DatagramSequencer::handleHighPriorityMessage(const QVariant& data) {
if (data.userType() == ClearSharedObjectMessage::Type) {
_inputStream.clearSharedObject(data.value<ClearSharedObjectMessage>().id);
} else {
emit receivedHighPriorityMessage(data);
}
}
int ReliableChannel::getBytesAvailable() const {
return _buffer.pos() - _sent;
}
void ReliableChannel::sendMessage(const QVariant& message) {
_bitstream << message;
}
void ReliableChannel::sendClearSharedObjectMessage(int id) {
ClearSharedObjectMessage message = { id };
sendMessage(QVariant::fromValue(message));
}
ReliableChannel::ReliableChannel(DatagramSequencer* sequencer) :
ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index) :
QObject(sequencer),
_index(index),
_dataStream(&_buffer),
_bitstream(_dataStream),
_priority(1.0f) {
_priority(1.0f),
_offset(0),
_sent(0) {
_buffer.open(QIODevice::WriteOnly);
_dataStream.setByteOrder(QDataStream::LittleEndian);
@ -290,10 +336,61 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer) :
connect(&_bitstream, SIGNAL(sharedObjectCleared(int)), SLOT(sendClearSharedObjectMessage(int)));
}
void ReliableChannel::readData(QDataStream& in) {
quint32 offset, size;
in >> offset >> size;
void ReliableChannel::writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans) {
// determine how many spans we can send
int remainingBytes = bytes;
int position = 0;
int spanCount = 0;
foreach (const RemainingSpan& remainingSpan, _remainingSpans) {
if (remainingBytes == 0) {
break;
}
int spanBytes = qMin(remainingSpan.unacknowledged, remainingBytes);
remainingBytes -= spanBytes;
spanCount++;
position += remainingSpan.unacknowledged + remainingSpan.acknowledged;
}
if (remainingBytes > 0 && position < _buffer.pos()) {
spanCount++;
}
out << (quint32)spanCount;
in.skipRawData(size);
remainingBytes = bytes;
position = 0;
foreach (const RemainingSpan& remainingSpan, _remainingSpans) {
if (remainingBytes == 0) {
break;
}
int spanBytes = qMin(remainingSpan.unacknowledged, remainingBytes);
writeSpan(out, position, spanBytes, spans);
remainingBytes -= spanBytes;
position += remainingSpan.unacknowledged + remainingSpan.acknowledged;
}
if (remainingBytes > 0 && position < _buffer.pos()) {
int spanBytes = qMin((int)_buffer.pos() - position, remainingBytes);
writeSpan(out, position, spanBytes, spans);
}
}
void ReliableChannel::writeSpan(QDataStream& out, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans) {
DatagramSequencer::ChannelSpan span = { _index, _offset + position, length };
spans.append(span);
out << (quint32)span.offset;
out << (quint32)length;
out.writeRawData(_buffer.data().constData() + position, length);
}
void ReliableChannel::spanAcknowledged(const DatagramSequencer::ChannelSpan& span) {
}
void ReliableChannel::readData(QDataStream& in) {
quint32 segments;
in >> segments;
for (int i = 0; i < segments; i++) {
quint32 offset, size;
in >> offset >> size;
in.skipRawData(size);
}
}

View file

@ -14,6 +14,7 @@
#include <QByteArray>
#include <QList>
#include <QSet>
#include <QVector>
#include "Bitstream.h"
@ -42,9 +43,6 @@ public:
/// Returns the packet number of the sent packet at the specified index.
int getSentPacketNumber(int index) const { return _sendRecords.at(index).packetNumber; }
/// Sends a normal-priority reliable message.
void sendReliableMessage(const QVariant& data, int channel = 0);
/// Adds a message to the high priority queue. Will be sent with every outgoing packet until received.
void sendHighPriorityMessage(const QVariant& data);
@ -96,14 +94,24 @@ signals:
private slots:
void sendClearSharedObjectMessage(int id);
private:
friend class ReliableChannel;
class ChannelSpan {
public:
int channel;
int offset;
int length;
};
class SendRecord {
public:
int packetNumber;
int lastReceivedPacketNumber;
Bitstream::WriteMappings mappings;
QVector<ChannelSpan> spans;
};
class ReceiveRecord {
@ -119,11 +127,13 @@ private:
void sendRecordAcknowledged(const SendRecord& record);
/// Appends some reliable data to the outgoing packet.
void appendReliableData(int bytes);
void appendReliableData(int bytes, QVector<ChannelSpan>& spans);
/// Sends a packet to the other party, fragmenting it into multiple datagrams (and emitting
/// readyToWrite) as necessary.
void sendPacket(const QByteArray& packet);
void sendPacket(const QByteArray& packet, const QVector<ChannelSpan>& spans);
void handleHighPriorityMessage(const QVariant& data);
QList<SendRecord> _sendRecords;
QList<ReceiveRecord> _receiveRecords;
@ -163,12 +173,16 @@ class ReliableChannel : public QObject {
public:
int getIndex() const { return _index; }
QDataStream& getDataStream() { return _dataStream; }
Bitstream& getBitstream() { return _bitstream; }
void setPriority(float priority) { _priority = priority; }
float getPriority() const { return _priority; }
int getBytesAvailable() const;
void sendMessage(const QVariant& message);
private slots:
@ -179,14 +193,30 @@ private:
friend class DatagramSequencer;
ReliableChannel(DatagramSequencer* sequencer);
class RemainingSpan {
public:
int unacknowledged;
int acknowledged;
};
ReliableChannel(DatagramSequencer* sequencer, int index);
void writeData(QDataStream& out, int bytes, QVector<DatagramSequencer::ChannelSpan>& spans);
void writeSpan(QDataStream& out, int position, int length, QVector<DatagramSequencer::ChannelSpan>& spans);
void spanAcknowledged(const DatagramSequencer::ChannelSpan& span);
void readData(QDataStream& in);
int _index;
QBuffer _buffer;
QDataStream _dataStream;
Bitstream _bitstream;
float _priority;
int _offset;
int _sent;
QList<RemainingSpan> _remainingSpans;
};
#endif /* defined(__interface__DatagramSequencer__) */