mirror of
https://github.com/overte-org/overte.git
synced 2025-04-21 06:44:06 +02:00
Merge pull request #3367 from ZappoMan/improveClockSkew
improve ping behavior and clockskew in octree servers
This commit is contained in:
commit
de4b423c46
6 changed files with 165 additions and 44 deletions
|
@ -389,7 +389,7 @@ const QByteArray* OctreeQueryNode::getNextNackedPacket() {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void OctreeQueryNode::parseNackPacket(QByteArray& packet) {
|
||||
void OctreeQueryNode::parseNackPacket(const QByteArray& packet) {
|
||||
|
||||
int numBytesPacketHeader = numBytesForPacketHeader(packet);
|
||||
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
|
||||
|
|
|
@ -109,7 +109,7 @@ public:
|
|||
|
||||
OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; }
|
||||
|
||||
void parseNackPacket(QByteArray& packet);
|
||||
void parseNackPacket(const QByteArray& packet);
|
||||
bool hasNextNackedPacket() const;
|
||||
const QByteArray* getNextNackedPacket();
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
#include "OctreeServer.h"
|
||||
#include "OctreeServerConsts.h"
|
||||
#include "OctreeServerDatagramProcessor.h"
|
||||
|
||||
OctreeServer* OctreeServer::_instance = NULL;
|
||||
int OctreeServer::_clientCount = 0;
|
||||
|
@ -827,55 +828,83 @@ void OctreeServer::parsePayload() {
|
|||
}
|
||||
}
|
||||
|
||||
void OctreeServer::readPendingDatagrams() {
|
||||
QByteArray receivedPacket;
|
||||
HifiSockAddr senderSockAddr;
|
||||
|
||||
void OctreeServer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) {
|
||||
NodeList* nodeList = NodeList::getInstance();
|
||||
|
||||
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
|
||||
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
|
||||
PacketType packetType = packetTypeForPacket(receivedPacket);
|
||||
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
|
||||
if (packetType == getMyQueryMessageType()) {
|
||||
// If we got a query packet, then we're talking to an agent, and we
|
||||
// need to make sure we have it in our nodeList.
|
||||
if (matchingNode) {
|
||||
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
|
||||
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
||||
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
|
||||
|
||||
// NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to
|
||||
// know that the OctreeServer/Assignment will not get deleted on it while it's still active. The
|
||||
// solution is to get the shared pointer for the current assignment. We need to make sure this is the
|
||||
// same SharedAssignmentPointer that was ref counted by the assignment client.
|
||||
SharedAssignmentPointer sharedAssignment = AssignmentClient::getCurrentAssignment();
|
||||
nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode);
|
||||
}
|
||||
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
|
||||
PacketType packetType = packetTypeForPacket(receivedPacket);
|
||||
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
|
||||
if (packetType == getMyQueryMessageType()) {
|
||||
// If we got a query packet, then we're talking to an agent, and we
|
||||
// need to make sure we have it in our nodeList.
|
||||
if (matchingNode) {
|
||||
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
|
||||
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
||||
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
|
||||
|
||||
// NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to
|
||||
// know that the OctreeServer/Assignment will not get deleted on it while it's still active. The
|
||||
// solution is to get the shared pointer for the current assignment. We need to make sure this is the
|
||||
// same SharedAssignmentPointer that was ref counted by the assignment client.
|
||||
SharedAssignmentPointer sharedAssignment = AssignmentClient::getCurrentAssignment();
|
||||
nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode);
|
||||
}
|
||||
} else if (packetType == PacketTypeOctreeDataNack) {
|
||||
// If we got a nack packet, then we're talking to an agent, and we
|
||||
// need to make sure we have it in our nodeList.
|
||||
if (matchingNode) {
|
||||
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
||||
if (nodeData) {
|
||||
nodeData->parseNackPacket(receivedPacket);
|
||||
}
|
||||
}
|
||||
} else if (packetType == PacketTypeJurisdictionRequest) {
|
||||
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
|
||||
} else if (packetType == PacketTypeSignedTransactionPayment) {
|
||||
handleSignedTransactionPayment(packetType, receivedPacket);
|
||||
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
|
||||
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
|
||||
} else {
|
||||
// let processNodeData handle it.
|
||||
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
|
||||
}
|
||||
} else if (packetType == PacketTypeOctreeDataNack) {
|
||||
// If we got a nack packet, then we're talking to an agent, and we
|
||||
// need to make sure we have it in our nodeList.
|
||||
if (matchingNode) {
|
||||
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
|
||||
if (nodeData) {
|
||||
nodeData->parseNackPacket(receivedPacket);
|
||||
}
|
||||
}
|
||||
} else if (packetType == PacketTypeJurisdictionRequest) {
|
||||
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
|
||||
} else if (packetType == PacketTypeSignedTransactionPayment) {
|
||||
handleSignedTransactionPayment(packetType, receivedPacket);
|
||||
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
|
||||
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
|
||||
} else {
|
||||
// let processNodeData handle it.
|
||||
NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void OctreeServer::setupDatagramProcessingThread() {
|
||||
NodeList* nodeList = NodeList::getInstance();
|
||||
|
||||
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
|
||||
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
|
||||
|
||||
// setup a QThread with us as parent that will house the AudioMixerDatagramProcessor
|
||||
_datagramProcessingThread = new QThread(this);
|
||||
|
||||
// create an AudioMixerDatagramProcessor and move it to that thread
|
||||
OctreeServerDatagramProcessor* datagramProcessor = new OctreeServerDatagramProcessor(nodeList->getNodeSocket(), thread());
|
||||
datagramProcessor->moveToThread(_datagramProcessingThread);
|
||||
|
||||
// remove the NodeList as the parent of the node socket
|
||||
nodeList->getNodeSocket().setParent(NULL);
|
||||
nodeList->getNodeSocket().moveToThread(_datagramProcessingThread);
|
||||
|
||||
// let the datagram processor handle readyRead from node socket
|
||||
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead,
|
||||
datagramProcessor, &OctreeServerDatagramProcessor::readPendingDatagrams);
|
||||
|
||||
// connect to the datagram processing thread signal that tells us we have to handle a packet
|
||||
connect(datagramProcessor, &OctreeServerDatagramProcessor::packetRequiresProcessing, this, &OctreeServer::readPendingDatagram);
|
||||
|
||||
// delete the datagram processor and the associated thread when the QThread quits
|
||||
connect(_datagramProcessingThread, &QThread::finished, datagramProcessor, &QObject::deleteLater);
|
||||
connect(datagramProcessor, &QObject::destroyed, _datagramProcessingThread, &QThread::deleteLater);
|
||||
|
||||
// start the datagram processing thread
|
||||
_datagramProcessingThread->start();
|
||||
|
||||
}
|
||||
|
||||
void OctreeServer::run() {
|
||||
_safeServerName = getMyServerName();
|
||||
|
||||
|
@ -887,6 +916,8 @@ void OctreeServer::run() {
|
|||
// use common init to setup common timers and logging
|
||||
commonInit(getMyLoggingServerTargetName(), getMyNodeType());
|
||||
|
||||
setupDatagramProcessingThread();
|
||||
|
||||
// Now would be a good time to parse our arguments, if we got them as assignment
|
||||
if (getPayload().size() > 0) {
|
||||
parsePayload();
|
||||
|
|
|
@ -123,13 +123,15 @@ public:
|
|||
public slots:
|
||||
/// runs the voxel server assignment
|
||||
void run();
|
||||
void readPendingDatagrams();
|
||||
void nodeAdded(SharedNodePointer node);
|
||||
void nodeKilled(SharedNodePointer node);
|
||||
void sendStatsPacket();
|
||||
|
||||
void handleSignedTransactionPaymentResponse(const QJsonObject& jsonObject);
|
||||
|
||||
void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle
|
||||
void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
|
||||
|
||||
protected:
|
||||
void parsePayload();
|
||||
void initHTTPManager(int port);
|
||||
|
@ -140,6 +142,7 @@ protected:
|
|||
QString getStatusLink();
|
||||
|
||||
void handleSignedTransactionPayment(PacketType packetType, const QByteArray& datagram);
|
||||
void setupDatagramProcessingThread();
|
||||
|
||||
int _argc;
|
||||
const char** _argv;
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
//
|
||||
// OctreeServerDatagramProcessor.cpp
|
||||
// assignment-client/src
|
||||
//
|
||||
// Created by Brad Hefta-Gaub on 2014-09-05
|
||||
// Copyright 2014 High Fidelity, Inc.
|
||||
//
|
||||
// Distributed under the Apache License, Version 2.0.
|
||||
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||
//
|
||||
|
||||
#include <QDebug>
|
||||
|
||||
#include <HifiSockAddr.h>
|
||||
#include <NodeList.h>
|
||||
#include <PacketHeaders.h>
|
||||
#include <SharedUtil.h>
|
||||
|
||||
#include "OctreeServerDatagramProcessor.h"
|
||||
|
||||
OctreeServerDatagramProcessor::OctreeServerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread) :
|
||||
_nodeSocket(nodeSocket),
|
||||
_previousNodeSocketThread(previousNodeSocketThread)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
OctreeServerDatagramProcessor::~OctreeServerDatagramProcessor() {
|
||||
// return the node socket to its previous thread
|
||||
_nodeSocket.moveToThread(_previousNodeSocketThread);
|
||||
}
|
||||
|
||||
void OctreeServerDatagramProcessor::readPendingDatagrams() {
|
||||
|
||||
HifiSockAddr senderSockAddr;
|
||||
static QByteArray incomingPacket;
|
||||
|
||||
// read everything that is available
|
||||
while (_nodeSocket.hasPendingDatagrams()) {
|
||||
incomingPacket.resize(_nodeSocket.pendingDatagramSize());
|
||||
|
||||
// just get this packet off the stack
|
||||
_nodeSocket.readDatagram(incomingPacket.data(), incomingPacket.size(),
|
||||
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
|
||||
|
||||
PacketType packetType = packetTypeForPacket(incomingPacket);
|
||||
if (packetType == PacketTypePing) {
|
||||
NodeList::getInstance()->processNodeData(senderSockAddr, incomingPacket);
|
||||
return; // don't emit
|
||||
}
|
||||
|
||||
// emit the signal to tell AudioMixer it needs to process a packet
|
||||
emit packetRequiresProcessing(incomingPacket, senderSockAddr);
|
||||
}
|
||||
}
|
32
assignment-client/src/octree/OctreeServerDatagramProcessor.h
Normal file
32
assignment-client/src/octree/OctreeServerDatagramProcessor.h
Normal file
|
@ -0,0 +1,32 @@
|
|||
//
|
||||
// OctreeServerDatagramProcessor.h
|
||||
// assignment-client/src
|
||||
//
|
||||
// Created by Brad Hefta-Gaub on 2014-09-05
|
||||
// Copyright 2014 High Fidelity, Inc.
|
||||
//
|
||||
// Distributed under the Apache License, Version 2.0.
|
||||
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||
//
|
||||
|
||||
#ifndef hifi_OctreeServerDatagramProcessor_h
|
||||
#define hifi_OctreeServerDatagramProcessor_h
|
||||
|
||||
#include <qobject.h>
|
||||
#include <qudpsocket.h>
|
||||
|
||||
class OctreeServerDatagramProcessor : public QObject {
|
||||
Q_OBJECT
|
||||
public:
|
||||
OctreeServerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread);
|
||||
~OctreeServerDatagramProcessor();
|
||||
public slots:
|
||||
void readPendingDatagrams();
|
||||
signals:
|
||||
void packetRequiresProcessing(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
|
||||
private:
|
||||
QUdpSocket& _nodeSocket;
|
||||
QThread* _previousNodeSocketThread;
|
||||
};
|
||||
|
||||
#endif // hifi_OctreeServerDatagramProcessor_h
|
Loading…
Reference in a new issue