mirror of
https://github.com/JulianGro/overte.git
synced 2025-04-29 19:42:53 +02:00
changed OctreeEditPacketSender to queue packets for resend as nack is parsed
This commit is contained in:
parent
95b2524784
commit
18a9d74b88
3 changed files with 38 additions and 60 deletions
|
@ -18,7 +18,7 @@
|
||||||
class SentPacketHistory {
|
class SentPacketHistory {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
SentPacketHistory(int size);
|
SentPacketHistory(int size = 1000);
|
||||||
|
|
||||||
void packetSent(uint16_t sequenceNumber, const QByteArray& packet);
|
void packetSent(uint16_t sequenceNumber, const QByteArray& packet);
|
||||||
const QByteArray* getPacket(uint16_t sequenceNumber) const;
|
const QByteArray* getPacket(uint16_t sequenceNumber) const;
|
||||||
|
|
|
@ -17,44 +17,6 @@
|
||||||
#include <PacketHeaders.h>
|
#include <PacketHeaders.h>
|
||||||
#include "OctreeEditPacketSender.h"
|
#include "OctreeEditPacketSender.h"
|
||||||
|
|
||||||
void NackedPacketHistory::packetSent(const QByteArray& packet) {
|
|
||||||
// extract sequence number for the sent packet history
|
|
||||||
int numBytesPacketHeader = numBytesForPacketHeader(packet);
|
|
||||||
const char* dataAt = reinterpret_cast<const char*>(packet.data());
|
|
||||||
unsigned short int sequence = (*((unsigned short int*)(dataAt + numBytesPacketHeader)));
|
|
||||||
|
|
||||||
// add packet to history
|
|
||||||
_sentPacketHistory.packetSent(sequence, packet);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool NackedPacketHistory::hasNextNackedPacket() const {
|
|
||||||
return !_nackedSequenceNumbers.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
const QByteArray* NackedPacketHistory::getNextNackedPacket() {
|
|
||||||
if (!_nackedSequenceNumbers.isEmpty()) {
|
|
||||||
// could return null if packet is not in the history
|
|
||||||
return _sentPacketHistory.getPacket(_nackedSequenceNumbers.dequeue());
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void NackedPacketHistory::parseNackPacket(const QByteArray& packet) {
|
|
||||||
int numBytesPacketHeader = numBytesForPacketHeader(packet);
|
|
||||||
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
|
|
||||||
|
|
||||||
// read number of sequence numbers
|
|
||||||
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
|
|
||||||
dataAt += sizeof(uint16_t);
|
|
||||||
|
|
||||||
// read sequence numbers
|
|
||||||
for (int i = 0; i < numSequenceNumbers; i++) {
|
|
||||||
unsigned short int sequenceNumber = (*(unsigned short int*)dataAt);
|
|
||||||
_nackedSequenceNumbers.enqueue(sequenceNumber);
|
|
||||||
dataAt += sizeof(unsigned short int);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
EditPacketBuffer::EditPacketBuffer(PacketType type, unsigned char* buffer, ssize_t length, QUuid nodeUUID) :
|
EditPacketBuffer::EditPacketBuffer(PacketType type, unsigned char* buffer, ssize_t length, QUuid nodeUUID) :
|
||||||
_nodeUUID(nodeUUID),
|
_nodeUUID(nodeUUID),
|
||||||
_currentType(type),
|
_currentType(type),
|
||||||
|
@ -126,7 +88,7 @@ bool OctreeEditPacketSender::serversExist() const {
|
||||||
|
|
||||||
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for
|
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for
|
||||||
// a known nodeID.
|
// a known nodeID.
|
||||||
void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, ssize_t length) {
|
void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsigned char* buffer, ssize_t length) {
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
|
||||||
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
|
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
|
||||||
|
@ -134,14 +96,19 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
|
||||||
if (node->getType() == getMyNodeType() &&
|
if (node->getType() == getMyNodeType() &&
|
||||||
((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) {
|
((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) {
|
||||||
if (node->getActiveSocket()) {
|
if (node->getActiveSocket()) {
|
||||||
QByteArray packet(reinterpret_cast<char*>(buffer), length);
|
QByteArray packet(reinterpret_cast<const char*>(buffer), length);
|
||||||
queuePacketForSending(node, packet);
|
queuePacketForSending(node, packet);
|
||||||
_nackedPacketHistories[nodeUUID].packetSent(packet);
|
|
||||||
|
// extract sequence number and add packet to history
|
||||||
|
int numBytesPacketHeader = numBytesForPacketHeader(packet);
|
||||||
|
const char* dataAt = reinterpret_cast<const char*>(packet.data());
|
||||||
|
unsigned short int sequence = (*((unsigned short int*)(dataAt + numBytesPacketHeader)));
|
||||||
|
_sentPacketHistories[nodeUUID].packetSent(sequence, packet);
|
||||||
|
|
||||||
// debugging output...
|
// debugging output...
|
||||||
bool wantDebugging = false;
|
bool wantDebugging = false;
|
||||||
if (wantDebugging) {
|
if (wantDebugging) {
|
||||||
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<char*>(buffer));
|
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(buffer));
|
||||||
unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader)));
|
unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader)));
|
||||||
quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence))));
|
quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence))));
|
||||||
quint64 queuedAt = usecTimestampNow();
|
quint64 queuedAt = usecTimestampNow();
|
||||||
|
@ -377,7 +344,31 @@ bool OctreeEditPacketSender::process() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void OctreeEditPacketSender::parseNackPacket(const QByteArray& packet) {
|
void OctreeEditPacketSender::parseNackPacket(const QByteArray& packet) {
|
||||||
// parse sending node from packet
|
// parse sending node from packet, retrieve packet history for that node
|
||||||
QUuid sendingNodeUUID = uuidFromPacketHeader(packet);
|
QUuid sendingNodeUUID = uuidFromPacketHeader(packet);
|
||||||
_nackedPacketHistories[sendingNodeUUID].parseNackPacket(packet);
|
|
||||||
|
// if packet history doesn't exist for the sender node (somehow), bail
|
||||||
|
if (!_sentPacketHistories.contains(sendingNodeUUID)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const SentPacketHistory& sentPacketHistory = _sentPacketHistories.value(sendingNodeUUID);
|
||||||
|
|
||||||
|
int numBytesPacketHeader = numBytesForPacketHeader(packet);
|
||||||
|
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
|
||||||
|
|
||||||
|
// read number of sequence numbers
|
||||||
|
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
|
||||||
|
dataAt += sizeof(uint16_t);
|
||||||
|
|
||||||
|
// read sequence numbers and queue packets for resend
|
||||||
|
for (int i = 0; i < numSequenceNumbers; i++) {
|
||||||
|
unsigned short int sequenceNumber = (*(unsigned short int*)dataAt);
|
||||||
|
dataAt += sizeof(unsigned short int);
|
||||||
|
|
||||||
|
// retrieve packet from history
|
||||||
|
const QByteArray* packet = sentPacketHistory.getPacket(sequenceNumber);
|
||||||
|
if (packet) {
|
||||||
|
queuePacketToNode(sendingNodeUUID, (const unsigned char*)packet->constData(), packet->length());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,19 +18,6 @@
|
||||||
#include "JurisdictionMap.h"
|
#include "JurisdictionMap.h"
|
||||||
#include "SentPacketHistory.h"
|
#include "SentPacketHistory.h"
|
||||||
|
|
||||||
class NackedPacketHistory {
|
|
||||||
public:
|
|
||||||
NackedPacketHistory() : _sentPacketHistory(1000), _nackedSequenceNumbers() { }
|
|
||||||
public:
|
|
||||||
void packetSent(const QByteArray& packet);
|
|
||||||
bool hasNextNackedPacket() const;
|
|
||||||
const QByteArray* getNextNackedPacket();
|
|
||||||
void parseNackPacket(const QByteArray& packet);
|
|
||||||
private:
|
|
||||||
SentPacketHistory _sentPacketHistory;
|
|
||||||
QQueue<unsigned short int> _nackedSequenceNumbers;
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Used for construction of edit packets
|
/// Used for construction of edit packets
|
||||||
class EditPacketBuffer {
|
class EditPacketBuffer {
|
||||||
public:
|
public:
|
||||||
|
@ -110,7 +97,7 @@ public:
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool _shouldSend;
|
bool _shouldSend;
|
||||||
void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, ssize_t length);
|
void queuePacketToNode(const QUuid& nodeID, const unsigned char* buffer, ssize_t length);
|
||||||
void queuePendingPacketToNodes(PacketType type, unsigned char* buffer, ssize_t length);
|
void queuePendingPacketToNodes(PacketType type, unsigned char* buffer, ssize_t length);
|
||||||
void queuePacketToNodes(unsigned char* buffer, ssize_t length);
|
void queuePacketToNodes(unsigned char* buffer, ssize_t length);
|
||||||
void initializePacket(EditPacketBuffer& packetBuffer, PacketType type);
|
void initializePacket(EditPacketBuffer& packetBuffer, PacketType type);
|
||||||
|
@ -134,6 +121,6 @@ protected:
|
||||||
int _maxPacketSize;
|
int _maxPacketSize;
|
||||||
|
|
||||||
// TODO: garbage-collect this and _pendingEditPackets
|
// TODO: garbage-collect this and _pendingEditPackets
|
||||||
QHash<QUuid, NackedPacketHistory> _nackedPacketHistories;
|
QHash<QUuid, SentPacketHistory> _sentPacketHistories;
|
||||||
};
|
};
|
||||||
#endif // hifi_OctreeEditPacketSender_h
|
#endif // hifi_OctreeEditPacketSender_h
|
||||||
|
|
Loading…
Reference in a new issue