Working on support for sending messages through reliable channels.

This commit is contained in:
Andrzej Kapolka 2014-02-10 20:26:09 -08:00
parent 0527cf886e
commit 36f1e59201
3 changed files with 86 additions and 10 deletions

View file

@ -343,16 +343,32 @@ void CircularBuffer::remove(int length) {
}
QByteArray CircularBuffer::readBytes(int offset, int length) const {
// write in up to two segments
QByteArray bytes(length, 0);
readBytes(offset, length, bytes.data());
return bytes;
}
void CircularBuffer::readBytes(int offset, int length, char* data) const {
// read in up to two segments
QByteArray array;
int start = (_position + offset) % _data.size();
int firstSegment = qMin(length, _data.size() - start);
array.append(_data.constData() + start, firstSegment);
memcpy(data, _data.constData() + start, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
array.append(_data.constData(), secondSegment);
memcpy(data + firstSegment, _data.constData(), secondSegment);
}
}
void CircularBuffer::writeBytes(int offset, int length, const char* data) {
// write in up to two segments
int start = (_position + offset) % _data.size();
int firstSegment = qMin(length, _data.size() - start);
memcpy(_data.data() + start, data, firstSegment);
int secondSegment = length - firstSegment;
if (secondSegment > 0) {
memcpy(_data.data(), data + firstSegment, secondSegment);
}
return array;
}
void CircularBuffer::writeToStream(int offset, int length, QDataStream& out) const {
@ -561,7 +577,14 @@ int ReliableChannel::getBytesAvailable() const {
}
void ReliableChannel::sendMessage(const QVariant& message) {
// write a placeholder for the length, then fill it in when we know what it is
int placeholder = _buffer.pos();
_dataStream << (quint32)0;
_bitstream << message;
_bitstream.flush();
quint32 length = _buffer.pos() - placeholder;
_buffer.writeBytes(placeholder, sizeof(quint32), (const char*)&length);
}
void ReliableChannel::sendClearSharedObjectMessage(int id) {
@ -576,7 +599,8 @@ ReliableChannel::ReliableChannel(DatagramSequencer* sequencer, int index, bool o
_bitstream(_dataStream),
_priority(1.0f),
_offset(0),
_writePosition(0) {
_writePosition(0),
_expectingMessage(true) {
_buffer.open(output ? QIODevice::WriteOnly : QIODevice::ReadOnly);
_dataStream.setByteOrder(QDataStream::LittleEndian);
@ -688,10 +712,32 @@ void ReliableChannel::readData(QDataStream& in) {
readSome = true;
}
}
if (!readSome) {
return;
}
// let listeners know that there's data to read
if (readSome) {
emit _buffer.readyRead();
forever {
// if we're expecting a message, peek into the buffer to see if we have the whole thing.
// if so, read it in, handle it, and loop back around in case there are more
if (_expectingMessage) {
int available = _buffer.bytesAvailable();
if (available >= sizeof(quint32)) {
quint32 length;
_buffer.readBytes(_buffer.pos(), sizeof(quint32), (char*)&length);
if (available >= length) {
_dataStream.skipRawData(sizeof(quint32));
QVariant message;
_bitstream >> message;
_bitstream.reset();
handleMessage(message);
continue;
}
}
// otherwise, just let whoever's listening know that data is available
} else {
emit _buffer.readyRead();
}
break;
}
// prune any read data from the buffer
@ -701,3 +747,11 @@ void ReliableChannel::readData(QDataStream& in) {
}
}
void ReliableChannel::handleMessage(const QVariant& message) {
if (message.userType() == ClearSharedObjectMessage::Type) {
_bitstream.clearSharedObject(message.value<ClearSharedObjectMessage>().id);
} else {
emit receivedMessage(message);
}
}

View file

@ -185,6 +185,12 @@ public:
/// Reads part of the data from the buffer.
QByteArray readBytes(int offset, int length) const;
/// Reads part of the data from the buffer.
void readBytes(int offset, int length, char* data) const;
/// Writes to part of the data in the buffer.
void writeBytes(int offset, int length, const char* data);
/// Writes part of the buffer to the supplied stream.
void writeToStream(int offset, int length, QDataStream& out) const;
@ -267,8 +273,16 @@ public:
int getBytesAvailable() const;
/// Sends a framed message on this channel.
void sendMessage(const QVariant& message);
/// For input channels, sets whether the channel is expecting a framed message.
void setExpectingMessage(bool expectingMessage) { _expectingMessage = expectingMessage; }
signals:
void receivedMessage(const QVariant& message);
private slots:
void sendClearSharedObjectMessage(int id);
@ -286,6 +300,7 @@ private:
void spanAcknowledged(const DatagramSequencer::ChannelSpan& span);
void readData(QDataStream& in);
void handleMessage(const QVariant& message);
int _index;
CircularBuffer _buffer;
@ -297,6 +312,7 @@ private:
int _offset;
int _writePosition;
SpanList _acknowledged;
bool _expectingMessage;
};
#endif /* defined(__interface__DatagramSequencer__) */

View file

@ -83,8 +83,14 @@ Endpoint::Endpoint(const QByteArray& datagramHeader) :
connect(_sequencer, SIGNAL(readyToRead(Bitstream&)), SLOT(readMessage(Bitstream&)));
connect(_sequencer, SIGNAL(receivedHighPriorityMessage(const QVariant&)),
SLOT(handleHighPriorityMessage(const QVariant&)));
connect(&_sequencer->getReliableInputChannel()->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
connect(&_sequencer->getReliableInputChannel(1)->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel()));
ReliableChannel* firstInput = _sequencer->getReliableInputChannel();
firstInput->setExpectingMessage(false);
connect(&firstInput->getBuffer(), SIGNAL(readyRead()), SLOT(readReliableChannel()));
ReliableChannel* secondInput = _sequencer->getReliableInputChannel(1);
secondInput->setExpectingMessage(false);
connect(&secondInput->getBuffer(), SIGNAL(readyRead()), SLOT(readLowPriorityReliableChannel()));
// enqueue a large amount of data in a low-priority channel
ReliableChannel* output = _sequencer->getReliableOutputChannel(1);