mirror of
https://github.com/overte-org/overte.git
synced 2025-08-06 18:50:00 +02:00
improve ping behavior and clockskew in octree servers by introducing datagramprocessing thread
This commit is contained in:
parent
7476c61647
commit
9e21d26a6d
6 changed files with 165 additions and 44 deletions
|
@ -389,7 +389,7 @@ const QByteArray* OctreeQueryNode::getNextNackedPacket() {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void OctreeQueryNode::parseNackPacket(QByteArray& packet) {
|
void OctreeQueryNode::parseNackPacket(const QByteArray& packet) {
|
||||||
|
|
||||||
int numBytesPacketHeader = numBytesForPacketHeader(packet);
|
int numBytesPacketHeader = numBytesForPacketHeader(packet);
|
||||||
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
|
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
|
||||||
|
|
|
@ -109,7 +109,7 @@ public:
|
||||||
|
|
||||||
OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; }
|
OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; }
|
||||||
|
|
||||||
void parseNackPacket(QByteArray& packet);
|
void parseNackPacket(const QByteArray& packet);
|
||||||
bool hasNextNackedPacket() const;
|
bool hasNextNackedPacket() const;
|
||||||
const QByteArray* getNextNackedPacket();
|
const QByteArray* getNextNackedPacket();
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
|
|
||||||
#include "OctreeServer.h"
|
#include "OctreeServer.h"
|
||||||
#include "OctreeServerConsts.h"
|
#include "OctreeServerConsts.h"
|
||||||
|
#include "OctreeServerDatagramProcessor.h"
|
||||||
|
|
||||||
OctreeServer* OctreeServer::_instance = NULL;
|
OctreeServer* OctreeServer::_instance = NULL;
|
||||||
int OctreeServer::_clientCount = 0;
|
int OctreeServer::_clientCount = 0;
|
||||||
|
@ -827,55 +828,83 @@ void OctreeServer::parsePayload() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void OctreeServer::readPendingDatagrams() {
|
void OctreeServer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) {
|
||||||
QByteArray receivedPacket;
|
|
||||||
HifiSockAddr senderSockAddr;
|
|
||||||
|
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
|
||||||
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
|
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
|
||||||
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
|
PacketType packetType = packetTypeForPacket(receivedPacket);
|
||||||
PacketType packetType = packetTypeForPacket(receivedPacket);
|
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
|
||||||
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
|
if (packetType == getMyQueryMessageType()) {
|
||||||
if (packetType == getMyQueryMessageType()) {
|
// If we got a query packet, then we're talking to an agent, and we
|
||||||
// If we got a query packet, then we're talking to an agent, and we
|
// need to make sure we have it in our nodeList.
|
||||||
// need to make sure we have it in our nodeList.
|
if (matchingNode) {
|
||||||
if (matchingNode) {
|
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
|
||||||
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
|
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
||||||
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
|
||||||
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
|
|
||||||
|
// NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to
|
||||||
// NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to
|
// know that the OctreeServer/Assignment will not get deleted on it while it's still active. The
|
||||||
// know that the OctreeServer/Assignment will not get deleted on it while it's still active. The
|
// solution is to get the shared pointer for the current assignment. We need to make sure this is the
|
||||||
// solution is to get the shared pointer for the current assignment. We need to make sure this is the
|
// same SharedAssignmentPointer that was ref counted by the assignment client.
|
||||||
// same SharedAssignmentPointer that was ref counted by the assignment client.
|
SharedAssignmentPointer sharedAssignment = AssignmentClient::getCurrentAssignment();
|
||||||
SharedAssignmentPointer sharedAssignment = AssignmentClient::getCurrentAssignment();
|
nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode);
|
||||||
nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if (packetType == PacketTypeOctreeDataNack) {
|
|
||||||
// If we got a nack packet, then we're talking to an agent, and we
|
|
||||||
// need to make sure we have it in our nodeList.
|
|
||||||
if (matchingNode) {
|
|
||||||
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
|
||||||
if (nodeData) {
|
|
||||||
nodeData->parseNackPacket(receivedPacket);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (packetType == PacketTypeJurisdictionRequest) {
|
|
||||||
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
|
|
||||||
} else if (packetType == PacketTypeSignedTransactionPayment) {
|
|
||||||
handleSignedTransactionPayment(packetType, receivedPacket);
|
|
||||||
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
|
|
||||||
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
|
|
||||||
} else {
|
|
||||||
// let processNodeData handle it.
|
|
||||||
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
|
|
||||||
}
|
}
|
||||||
|
} else if (packetType == PacketTypeOctreeDataNack) {
|
||||||
|
// If we got a nack packet, then we're talking to an agent, and we
|
||||||
|
// need to make sure we have it in our nodeList.
|
||||||
|
if (matchingNode) {
|
||||||
|
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
||||||
|
if (nodeData) {
|
||||||
|
nodeData->parseNackPacket(receivedPacket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (packetType == PacketTypeJurisdictionRequest) {
|
||||||
|
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
|
||||||
|
} else if (packetType == PacketTypeSignedTransactionPayment) {
|
||||||
|
handleSignedTransactionPayment(packetType, receivedPacket);
|
||||||
|
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
|
||||||
|
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
|
||||||
|
} else {
|
||||||
|
// let processNodeData handle it.
|
||||||
|
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void OctreeServer::setupDatagramProcessingThread() {
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
|
||||||
|
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
|
||||||
|
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
|
||||||
|
|
||||||
|
// setup a QThread with us as parent that will house the AudioMixerDatagramProcessor
|
||||||
|
_datagramProcessingThread = new QThread(this);
|
||||||
|
|
||||||
|
// create an AudioMixerDatagramProcessor and move it to that thread
|
||||||
|
OctreeServerDatagramProcessor* datagramProcessor = new OctreeServerDatagramProcessor(nodeList->getNodeSocket(), thread());
|
||||||
|
datagramProcessor->moveToThread(_datagramProcessingThread);
|
||||||
|
|
||||||
|
// remove the NodeList as the parent of the node socket
|
||||||
|
nodeList->getNodeSocket().setParent(NULL);
|
||||||
|
nodeList->getNodeSocket().moveToThread(_datagramProcessingThread);
|
||||||
|
|
||||||
|
// let the datagram processor handle readyRead from node socket
|
||||||
|
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead,
|
||||||
|
datagramProcessor, &OctreeServerDatagramProcessor::readPendingDatagrams);
|
||||||
|
|
||||||
|
// connect to the datagram processing thread signal that tells us we have to handle a packet
|
||||||
|
connect(datagramProcessor, &OctreeServerDatagramProcessor::packetRequiresProcessing, this, &OctreeServer::readPendingDatagram);
|
||||||
|
|
||||||
|
// delete the datagram processor and the associated thread when the QThread quits
|
||||||
|
connect(_datagramProcessingThread, &QThread::finished, datagramProcessor, &QObject::deleteLater);
|
||||||
|
connect(datagramProcessor, &QObject::destroyed, _datagramProcessingThread, &QThread::deleteLater);
|
||||||
|
|
||||||
|
// start the datagram processing thread
|
||||||
|
_datagramProcessingThread->start();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
void OctreeServer::run() {
|
void OctreeServer::run() {
|
||||||
_safeServerName = getMyServerName();
|
_safeServerName = getMyServerName();
|
||||||
|
|
||||||
|
@ -887,6 +916,8 @@ void OctreeServer::run() {
|
||||||
// use common init to setup common timers and logging
|
// use common init to setup common timers and logging
|
||||||
commonInit(getMyLoggingServerTargetName(), getMyNodeType());
|
commonInit(getMyLoggingServerTargetName(), getMyNodeType());
|
||||||
|
|
||||||
|
setupDatagramProcessingThread();
|
||||||
|
|
||||||
// Now would be a good time to parse our arguments, if we got them as assignment
|
// Now would be a good time to parse our arguments, if we got them as assignment
|
||||||
if (getPayload().size() > 0) {
|
if (getPayload().size() > 0) {
|
||||||
parsePayload();
|
parsePayload();
|
||||||
|
|
|
@ -123,13 +123,15 @@ public:
|
||||||
public slots:
|
public slots:
|
||||||
/// runs the voxel server assignment
|
/// runs the voxel server assignment
|
||||||
void run();
|
void run();
|
||||||
void readPendingDatagrams();
|
|
||||||
void nodeAdded(SharedNodePointer node);
|
void nodeAdded(SharedNodePointer node);
|
||||||
void nodeKilled(SharedNodePointer node);
|
void nodeKilled(SharedNodePointer node);
|
||||||
void sendStatsPacket();
|
void sendStatsPacket();
|
||||||
|
|
||||||
void handleSignedTransactionPaymentResponse(const QJsonObject& jsonObject);
|
void handleSignedTransactionPaymentResponse(const QJsonObject& jsonObject);
|
||||||
|
|
||||||
|
void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle
|
||||||
|
void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void parsePayload();
|
void parsePayload();
|
||||||
void initHTTPManager(int port);
|
void initHTTPManager(int port);
|
||||||
|
@ -140,6 +142,7 @@ protected:
|
||||||
QString getStatusLink();
|
QString getStatusLink();
|
||||||
|
|
||||||
void handleSignedTransactionPayment(PacketType packetType, const QByteArray& datagram);
|
void handleSignedTransactionPayment(PacketType packetType, const QByteArray& datagram);
|
||||||
|
void setupDatagramProcessingThread();
|
||||||
|
|
||||||
int _argc;
|
int _argc;
|
||||||
const char** _argv;
|
const char** _argv;
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
//
|
||||||
|
// OctreeServerDatagramProcessor.cpp
|
||||||
|
// assignment-client/src
|
||||||
|
//
|
||||||
|
// Created by Brad Hefta-Gaub on 2014-09-05
|
||||||
|
// Copyright 2014 High Fidelity, Inc.
|
||||||
|
//
|
||||||
|
// Distributed under the Apache License, Version 2.0.
|
||||||
|
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <QDebug>
|
||||||
|
|
||||||
|
#include <HifiSockAddr.h>
|
||||||
|
#include <NodeList.h>
|
||||||
|
#include <PacketHeaders.h>
|
||||||
|
#include <SharedUtil.h>
|
||||||
|
|
||||||
|
#include "OctreeServerDatagramProcessor.h"
|
||||||
|
|
||||||
|
OctreeServerDatagramProcessor::OctreeServerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread) :
|
||||||
|
_nodeSocket(nodeSocket),
|
||||||
|
_previousNodeSocketThread(previousNodeSocketThread)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
OctreeServerDatagramProcessor::~OctreeServerDatagramProcessor() {
|
||||||
|
// return the node socket to its previous thread
|
||||||
|
_nodeSocket.moveToThread(_previousNodeSocketThread);
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeServerDatagramProcessor::readPendingDatagrams() {
|
||||||
|
|
||||||
|
HifiSockAddr senderSockAddr;
|
||||||
|
static QByteArray incomingPacket;
|
||||||
|
|
||||||
|
// read everything that is available
|
||||||
|
while (_nodeSocket.hasPendingDatagrams()) {
|
||||||
|
incomingPacket.resize(_nodeSocket.pendingDatagramSize());
|
||||||
|
|
||||||
|
// just get this packet off the stack
|
||||||
|
_nodeSocket.readDatagram(incomingPacket.data(), incomingPacket.size(),
|
||||||
|
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
|
||||||
|
|
||||||
|
PacketType packetType = packetTypeForPacket(incomingPacket);
|
||||||
|
if (packetType == PacketTypePing) {
|
||||||
|
NodeList::getInstance()->processNodeData(senderSockAddr, incomingPacket);
|
||||||
|
return; // don't emit
|
||||||
|
}
|
||||||
|
|
||||||
|
// emit the signal to tell AudioMixer it needs to process a packet
|
||||||
|
emit packetRequiresProcessing(incomingPacket, senderSockAddr);
|
||||||
|
}
|
||||||
|
}
|
32
assignment-client/src/octree/OctreeServerDatagramProcessor.h
Normal file
32
assignment-client/src/octree/OctreeServerDatagramProcessor.h
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
//
|
||||||
|
// OctreeServerDatagramProcessor.h
|
||||||
|
// assignment-client/src
|
||||||
|
//
|
||||||
|
// Created by Stephen Birarda on 2014-08-14.
|
||||||
|
// Copyright 2014 High Fidelity, Inc.
|
||||||
|
//
|
||||||
|
// Distributed under the Apache License, Version 2.0.
|
||||||
|
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef hifi_OctreeServerDatagramProcessor_h
|
||||||
|
#define hifi_OctreeServerDatagramProcessor_h
|
||||||
|
|
||||||
|
#include <qobject.h>
|
||||||
|
#include <qudpsocket.h>
|
||||||
|
|
||||||
|
class OctreeServerDatagramProcessor : public QObject {
|
||||||
|
Q_OBJECT
|
||||||
|
public:
|
||||||
|
OctreeServerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread);
|
||||||
|
~OctreeServerDatagramProcessor();
|
||||||
|
public slots:
|
||||||
|
void readPendingDatagrams();
|
||||||
|
signals:
|
||||||
|
void packetRequiresProcessing(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
|
||||||
|
private:
|
||||||
|
QUdpSocket& _nodeSocket;
|
||||||
|
QThread* _previousNodeSocketThread;
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif // hifi_OctreeServerDatagramProcessor_h
|
Loading…
Reference in a new issue