More work on datagram streaming.

This commit is contained in:
Andrzej Kapolka 2013-12-24 18:19:35 -08:00
parent 71ce244a0a
commit 51d5a37613
4 changed files with 59 additions and 17 deletions

View file

@ -107,6 +107,12 @@ void Bitstream::reset() {
_position = 0;
}
Bitstream::WriteMappings Bitstream::getAndResetWriteMappings() {
WriteMappings mappings = { _classNameStreamer.getAndResetTransientOffsets(),
_attributeStreamer.getAndResetTransientOffsets() };
return mappings;
}
Bitstream& Bitstream::operator<<(bool value) {
if (value) {
_byte |= (1 << _position);

View file

@ -47,7 +47,10 @@ private:
template<class T> class RepeatedValueStreamer {
public:
RepeatedValueStreamer(Bitstream& stream) : _stream(stream), _idStreamer(stream), _lastNewID(0) { }
RepeatedValueStreamer(Bitstream& stream) : _stream(stream), _idStreamer(stream),
_lastPersistentID(0), _lastTransientOffset(0) { }
QHash<T, int> getAndResetTransientOffsets();
RepeatedValueStreamer& operator<<(T value);
RepeatedValueStreamer& operator>>(T& value);
@ -56,17 +59,31 @@ private:
Bitstream& _stream;
IDStreamer _idStreamer;
int _lastNewID;
QHash<T, int> _ids;
int _lastPersistentID;
int _lastTransientOffset;
QHash<T, int> _persistentIDs;
QHash<T, int> _transientOffsets;
QHash<int, T> _values;
};
template<class T> inline QHash<T, int> RepeatedValueStreamer<T>::getAndResetTransientOffsets() {
QHash<T, int> transientOffsets;
_transientOffsets.swap(transientOffsets);
_lastTransientOffset = 0;
return transientOffsets;
}
template<class T> inline RepeatedValueStreamer<T>& RepeatedValueStreamer<T>::operator<<(T value) {
int& id = _ids[value];
int id = _persistentIDs.value(value);
if (id == 0) {
_idStreamer << (id = ++_lastNewID);
_stream << value;
int& offset = _transientOffsets[value];
if (offset == 0) {
_idStreamer << (_lastPersistentID + (offset = ++_lastTransientOffset));
_stream << value;
} else {
_idStreamer << (_lastPersistentID + offset);
}
} else {
_idStreamer << id;
}
@ -91,6 +108,12 @@ template<class T> inline RepeatedValueStreamer<T>& RepeatedValueStreamer<T>::ope
class Bitstream {
public:
class WriteMappings {
public:
QHash<QByteArray, int> classNameOffsets;
QHash<AttributePointer, int> attributeOffsets;
};
/// Registers a metaobject under its name so that instances of it can be streamed.
/// \return zero; the function only returns a value so that it can be used in static initialization
static int registerMetaObject(const char* className, const QMetaObject* metaObject);
@ -121,6 +144,9 @@ public:
/// Returns a reference to the attribute streamer.
RepeatedValueStreamer<AttributePointer>& getAttributeStreamer() { return _attributeStreamer; }
/// Returns the set of transient mappings gathered during writing and resets them.
WriteMappings getAndResetWriteMappings();
Bitstream& operator<<(bool value);
Bitstream& operator>>(bool& value);

View file

@ -100,14 +100,16 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
for (int i = 0; i < acknowledgementCount; i++) {
quint32 packetNumber;
_incomingPacketStream >> packetNumber;
for (QList<SendRecord>::iterator it = _sendRecords.begin(); it != _sendRecords.end(); ) {
if (it->packetNumber == packetNumber) {
sendRecordAcknowledged(*it);
it = _sendRecords.erase(_sendRecords.begin(), ++it);
} else {
it++;
}
if (_sendRecords.isEmpty()) {
continue;
}
int index = packetNumber - _sendRecords.first().packetNumber;
if (index >= _sendRecords.size()) {
continue;
}
QList<SendRecord>::iterator it = _sendRecords.begin() + index;
sendRecordAcknowledged(*it);
_sendRecords.erase(_sendRecords.begin(), it + 1);
}
emit readyToRead(_inputStream);
@ -116,7 +118,13 @@ void DatagramSequencer::receivedDatagram(const QByteArray& datagram) {
}
void DatagramSequencer::sendRecordAcknowledged(const SendRecord& record) {
// stop acknowledging the recorded packets
// stop acknowledging the recorded packets (TODO: replace with interpolation search?)
QList<int>::iterator it = qBinaryFind(_receivedPacketNumbers.begin(), _receivedPacketNumbers.end(),
record.lastReceivedPacketNumber);
if (it != _receivedPacketNumbers.end()) {
_receivedPacketNumbers.erase(it + 1);
}
}
@ -128,7 +136,8 @@ void DatagramSequencer::sendPacket(const QByteArray& packet) {
_outgoingPacketNumber++;
// record the send
SendRecord record = { _outgoingPacketNumber, _receivedPacketNumbers };
SendRecord record = { _outgoingPacketNumber, _receivedPacketNumbers.isEmpty() ? 0 : _receivedPacketNumbers.last(),
_outputStream.getAndResetWriteMappings() };
_sendRecords.append(record);
// write the sequence number and size, which are the same between all fragments

View file

@ -49,7 +49,8 @@ private:
class SendRecord {
public:
int packetNumber;
QList<int> acknowledgedPacketNumbers;
int lastReceivedPacketNumber;
Bitstream::WriteMappings mappings;
};
/// Notes that the described send was acknowledged by the other party.