repairs for other ThreadedAssignment subclasses

This commit is contained in:
Stephen Birarda 2015-07-13 15:16:55 -07:00
parent 26be492475
commit f06636d45e
12 changed files with 75 additions and 72 deletions

View file

@ -32,7 +32,7 @@
static const int RECEIVED_AUDIO_STREAM_CAPACITY_FRAMES = 10;
Agent::Agent(const QByteArray& packet) :
Agent::Agent(NLPacket& packet) :
ThreadedAssignment(packet),
_entityEditSender(),
_receivedAudioStream(AudioConstants::NETWORK_FRAME_SAMPLES_STEREO, RECEIVED_AUDIO_STREAM_CAPACITY_FRAMES,
@ -49,15 +49,17 @@ Agent::Agent(const QByteArray& packet) :
DependencyManager::set<SoundCache>();
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerPacketListener(PacketType::MixedAudio, this, "handleAudioPacket");
packetReceiver.registerPacketListener(PacketType::SilentAudioFrame, this, "handleAudioPacket");
packetReceiver.registerPacketListener(PacketType::OctreeStats, this, "handleOctreePacket");
packetReceiver.registerPacketListener(PacketType::EntityData, this, "handleOctreePacket");
packetReceiver.registerPacketListener(PacketType::EntityErase, this, "handleOctreePacket");
packetReceiver.registerPacketListenerForTypes(
QSet<PacketType::Value>({ PacketType::MixedAudio, PacketType::SilentAudioFrame }),
this, "handleAudioPacket");
packetReceiver.registerPacketListenerForTypes(
QSet<PacketType::Value>({ PacketType::OctreeStats, PacketType::EntityData, PacketType::EntityErase }),
this, "handleOctreePacket");
packetReceiver.registerPacketListener(PacketType::Jurisdiction, this, "handleJurisdictionPacket");
}
void Agent::handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode, HifiSockAddr senderSockAddr) {
void Agent::handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
QByteArray mutablePacket = QByteArray(packet->getData(), packet->getSizeWithHeader());
int messageLength = mutablePacket.size();

View file

@ -35,7 +35,7 @@ class Agent : public ThreadedAssignment {
Q_PROPERTY(bool isListeningToAudioStream READ isListeningToAudioStream WRITE setIsListeningToAudioStream)
Q_PROPERTY(float lastReceivedAudioLoudness READ getLastReceivedAudioLoudness)
public:
Agent(const QByteArray& packet);
Agent(NLPacket& packet);
void setIsAvatar(bool isAvatar) { QMetaObject::invokeMethod(&_scriptEngine, "setIsAvatar", Q_ARG(bool, isAvatar)); }
bool isAvatar() const { return _scriptEngine.isAvatar(); }
@ -55,9 +55,9 @@ public slots:
void playAvatarSound(Sound* avatarSound) { _scriptEngine.setAvatarSound(avatarSound); }
private slots:
void handleAudioPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode, HifiSockAddr senderSockAddr);
void handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode, HifiSockAddr senderSockAddr);
void handleJurisdictionPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode, HifiSockAddr senderSockAddr);
void handleAudioPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleJurisdictionPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
private:
ScriptEngine _scriptEngine;

View file

@ -104,7 +104,7 @@ AudioMixer::AudioMixer(NLPacket& packet) :
PacketType::AudioStreamStats
};
packetReceiver.registerPacketListenerForSet(nodeAudioPackets, this, "handleNodeAudioPacket");
packetReceiver.registerPacketListenerForTypes(nodeAudioPackets, this, "handleNodeAudioPacket");
packetReceiver.registerPacketListener(PacketType::MuteEnvironment, this, "handleMuteEnvironmentPacket");
}
@ -475,7 +475,6 @@ int AudioMixer::prepareMixForListeningNode(Node* node) {
}
void AudioMixer::sendAudioEnvironmentPacket(SharedNodePointer node) {
static char clientEnvBuffer[MAX_PACKET_SIZE];
// Send stream properties
bool hasReverb = false;
@ -501,6 +500,7 @@ void AudioMixer::sendAudioEnvironmentPacket(SharedNodePointer node) {
break;
}
}
AudioMixerClientData* nodeData = static_cast<AudioMixerClientData*>(node->getLinkedData());
AvatarAudioStream* stream = nodeData->getAvatarAudioStream();
bool dataChanged = (stream->hasReverb() != hasReverb) ||
@ -550,13 +550,13 @@ void AudioMixer::handleNodeAudioPacket(QSharedPointer<NLPacket> packet, SharedNo
DependencyManager::get<NodeList>()->updateNodeWithDataFromPacket(packet, sendingNode);
}
void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer<NLPacket> packet, HifiSockAddr senderSockAddr) {
void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer<NLPacket> packet, SharedNodePointer sendingNode) {
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer sendingNode = nodeList->nodeWithUUID(packet->getSourceID());
if (sendingNode->getCanAdjustLocks()) {
auto newPacket = NLPacket::create(PacketType::MuteEnvironment);
auto newPacket = NLPacket::create(PacketType::MuteEnvironment, packet->getSizeUsed());
// Copy payload
newPacket->write(newPacket->getPayload());
newPacket->write(packet->getPayload(), packet->getSizeUsed());
nodeList->eachNode([&](const SharedNodePointer& node){
if (node->getType() == NodeType::Agent && node->getActiveSocket() &&

View file

@ -33,7 +33,7 @@ const QString AVATAR_MIXER_LOGGING_NAME = "avatar-mixer";
const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 60;
const unsigned int AVATAR_DATA_SEND_INTERVAL_MSECS = (1.0f / (float) AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND) * 1000;
AvatarMixer::AvatarMixer(const QByteArray& packet) :
AvatarMixer::AvatarMixer(NLPacket& packet) :
ThreadedAssignment(packet),
_broadcastThread(),
_lastFrameTimestamp(QDateTime::currentMSecsSinceEpoch()),
@ -402,7 +402,7 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) {
void AvatarMixer::handleAvatarDataPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->findNodeAndUpdateWithDataFromPacket(packet);
nodeList->updateNodeWithDataFromPacket(packet, senderNode);
}
void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {

View file

@ -20,7 +20,7 @@
/// Handles assignments of type AvatarMixer - distribution of avatar data to various clients
class AvatarMixer : public ThreadedAssignment {
public:
AvatarMixer(const QByteArray& packet);
AvatarMixer(NLPacket& packet);
~AvatarMixer();
public slots:
/// runs the avatar mixer

View file

@ -74,7 +74,7 @@ void OctreeInboundPacketProcessor::midProcess() {
}
}
void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
void OctreeInboundPacketProcessor::processPacket(QSharedPointer<NLPacket> packet, SharedNodePointer sendingNode) {
if (_shuttingDown) {
qDebug() << "OctreeInboundPacketProcessor::processPacket() while shutting down... ignoring incoming packet";
return;
@ -83,22 +83,24 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin
bool debugProcessPacket = _myServer->wantsVerboseDebug();
if (debugProcessPacket) {
qDebug("OctreeInboundPacketProcessor::processPacket() packetData=%p packetLength=%d", &packet, packet.size());
qDebug("OctreeInboundPacketProcessor::processPacket() payload=%p payloadLength=%lld",
packet->getPayload(),
packet->getSizeUsed());
}
int numBytesPacketHeader = numBytesForPacketHeader(packet);
// Ask our tree subclass if it can handle the incoming packet...
PacketType::Value packetType = packetTypeForPacket(packet);
PacketType::Value packetType = packet->getType();
if (_myServer->getOctree()->handlesEditPacketType(packetType)) {
PerformanceWarning warn(debugProcessPacket, "processPacket KNOWN TYPE",debugProcessPacket);
PerformanceWarning warn(debugProcessPacket, "processPacket KNOWN TYPE", debugProcessPacket);
_receivedPacketCount++;
const unsigned char* packetData = reinterpret_cast<const unsigned char*>(packet.data());
unsigned short int sequence;
packet->readPrimitive(&sequence);
unsigned short int sequence = (*((unsigned short int*)(packetData + numBytesPacketHeader)));
quint64 sentAt = (*((quint64*)(packetData + numBytesPacketHeader + sizeof(sequence))));
quint64 sentAt;
packet->readPrimitive(&sentAt);
quint64 arrivedAt = usecTimestampNow();
if (sentAt > arrivedAt) {
if (debugProcessPacket || _myServer->wantsDebugReceiving()) {
@ -107,6 +109,7 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin
}
sentAt = arrivedAt;
}
quint64 transitTime = arrivedAt - sentAt;
int editsInPacket = 0;
quint64 processTime = 0;
@ -114,7 +117,7 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin
if (debugProcessPacket || _myServer->wantsDebugReceiving()) {
qDebug() << "PROCESSING THREAD: got '" << packetType << "' packet - " << _receivedPacketCount << " command from client";
qDebug() << " receivedBytes=" << packet.size();
qDebug() << " receivedBytes=" << packet->getSizeWithHeader();
qDebug() << " sequence=" << sequence;
qDebug() << " sentAt=" << sentAt << " usecs";
qDebug() << " arrivedAt=" << arrivedAt << " usecs";
@ -125,42 +128,41 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin
}
if (debugProcessPacket) {
qDebug() << " numBytesPacketHeader=" << numBytesPacketHeader;
qDebug() << " numBytesPacketHeader=" << packet->localHeaderSize();
qDebug() << " sizeof(sequence)=" << sizeof(sequence);
qDebug() << " sizeof(sentAt)=" << sizeof(sentAt);
}
int atByte = numBytesPacketHeader + sizeof(sequence) + sizeof(sentAt);
if (debugProcessPacket) {
qDebug() << " atByte=" << atByte;
qDebug() << " packet.size()=" << packet.size();
if (atByte >= packet.size()) {
qDebug() << " atByte (in payload)=" << packet->pos();
qDebug() << " payload size=" << packet->getSizeUsed();
if (!packet->bytesAvailable()) {
qDebug() << " ----- UNEXPECTED ---- got a packet without any edit details!!!! --------";
}
}
const unsigned char* editData = nullptr;
while (packet->bytesAvailable() > 0) {
unsigned char* editData = (unsigned char*)&packetData[atByte];
while (atByte < packet.size()) {
editData = reinterpret_cast<const unsigned char*>(packet->getPayload() + packet->pos());
int maxSize = packet.size() - atByte;
int maxSize = packet->bytesAvailable();
if (debugProcessPacket) {
qDebug() << " --- inside while loop ---";
qDebug() << " maxSize=" << maxSize;
qDebug("OctreeInboundPacketProcessor::processPacket() %c "
"packetData=%p packetLength=%d editData=%p atByte=%d maxSize=%d",
packetType, packetData, packet.size(), editData, atByte, maxSize);
"payload=%p payloadLength=%lld editData=%p payloadPosition=%lld maxSize=%d",
packetType, packet->getPayload(), packet->getSizeUsed(), editData,
packet->pos(), maxSize);
}
quint64 startLock = usecTimestampNow();
_myServer->getOctree()->lockForWrite();
quint64 startProcess = usecTimestampNow();
int editDataBytesRead = _myServer->getOctree()->processEditPacketData(packetType,
reinterpret_cast<const unsigned char*>(packet.data()),
packet.size(),
editData, maxSize, sendingNode);
int editDataBytesRead =
_myServer->getOctree()->processEditPacketData(*packet, editData, maxSize, sendingNode);
if (debugProcessPacket) {
qDebug() << "OctreeInboundPacketProcessor::processPacket() after processEditPacketData()..."
@ -177,27 +179,25 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin
lockWaitTime += thisLockWaitTime;
// skip to next edit record in the packet
editData += editDataBytesRead;
atByte += editDataBytesRead;
packet->seek(packet->pos() + editDataBytesRead);
if (debugProcessPacket) {
qDebug() << " editDataBytesRead=" << editDataBytesRead;
qDebug() << " AFTER processEditPacketData atByte=" << atByte;
qDebug() << " AFTER processEditPacketData packet.size()=" << packet.size();
qDebug() << " AFTER processEditPacketData payload position=" << packet->pos();
qDebug() << " AFTER processEditPacketData payload size=" << packet->getSizeUsed();
}
}
if (debugProcessPacket) {
qDebug("OctreeInboundPacketProcessor::processPacket() DONE LOOPING FOR %c "
"packetData=%p packetLength=%d editData=%p atByte=%d",
packetType, packetData, packet.size(), editData, atByte);
"payload=%p payloadLength=%lld editData=%p payloadPosition=%lld",
packetType, packet->getPayload(), packet->getSizeUsed(), editData, packet->pos());
}
// Make sure our Node and NodeList knows we've heard from this node.
QUuid& nodeUUID = DEFAULT_NODE_ID_REF;
if (sendingNode) {
sendingNode->setLastHeardMicrostamp(usecTimestampNow());
nodeUUID = sendingNode->getUUID();
if (debugProcessPacket) {
qDebug() << "sender has uuid=" << nodeUUID;

View file

@ -29,9 +29,9 @@ public:
quint64 getAverageLockWaitTimePerPacket() const { return _totalPackets == 0 ? 0 : _totalLockWaitTime / _totalPackets; }
quint64 getTotalElementsProcessed() const { return _totalElementsInPacket; }
quint64 getTotalPacketsProcessed() const { return _totalPackets; }
quint64 getAverageProcessTimePerElement() const
quint64 getAverageProcessTimePerElement() const
{ return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; }
quint64 getAverageLockWaitTimePerElement() const
quint64 getAverageLockWaitTimePerElement() const
{ return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; }
const SequenceNumberStats& getIncomingEditSequenceNumberStats() const { return _incomingEditSequenceNumberStats; }
@ -40,7 +40,7 @@ public:
void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime,
int editsInPacket, quint64 processTime, quint64 lockWaitTime);
quint64 _totalTransitTime;
quint64 _totalTransitTime;
quint64 _totalProcessTime;
quint64 _totalLockWaitTime;
quint64 _totalElementsInPacket;
@ -53,7 +53,7 @@ typedef QHash<QUuid, SingleSenderStats>::iterator NodeToSenderStatsMapIterator;
typedef QHash<QUuid, SingleSenderStats>::const_iterator NodeToSenderStatsMapConstIterator;
/// Handles processing of incoming network packets for the octee servers. As with other ReceivedPacketProcessor classes
/// Handles processing of incoming network packets for the octee servers. As with other ReceivedPacketProcessor classes
/// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket()
class OctreeInboundPacketProcessor : public ReceivedPacketProcessor {
Q_OBJECT
@ -65,9 +65,9 @@ public:
quint64 getAverageLockWaitTimePerPacket() const { return _totalPackets == 0 ? 0 : _totalLockWaitTime / _totalPackets; }
quint64 getTotalElementsProcessed() const { return _totalElementsInPacket; }
quint64 getTotalPacketsProcessed() const { return _totalPackets; }
quint64 getAverageProcessTimePerElement() const
quint64 getAverageProcessTimePerElement() const
{ return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; }
quint64 getAverageLockWaitTimePerElement() const
quint64 getAverageLockWaitTimePerElement() const
{ return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; }
void resetStats();
@ -78,7 +78,7 @@ public:
protected:
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
virtual void processPacket(QSharedPointer<NLPacket> packet, SharedNodePointer sendingNode);
virtual unsigned long getMaxWait() const;
virtual void preProcess();
@ -88,13 +88,13 @@ private:
int sendNackPackets();
private:
void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime,
void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime,
int elementsInPacket, quint64 processTime, quint64 lockWaitTime);
OctreeServer* _myServer;
int _receivedPacketCount;
quint64 _totalTransitTime;
quint64 _totalTransitTime;
quint64 _totalProcessTime;
quint64 _totalLockWaitTime;
quint64 _totalElementsInPacket;

View file

@ -213,7 +213,7 @@ void OctreeServer::trackProcessWaitTime(float time) {
_averageProcessWaitTime.updateAverage(time);
}
OctreeServer::OctreeServer(const QByteArray& packet) :
OctreeServer::OctreeServer(NLPacket& packet) :
ThreadedAssignment(packet),
_argc(0),
_argv(NULL),
@ -247,11 +247,6 @@ OctreeServer::OctreeServer(const QByteArray& packet) :
// make sure the AccountManager has an Auth URL for payment redemptions
AccountManager::getInstance().setAuthURL(NetworkingConstants::METAVERSE_SERVER_URL);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerPacketListener(getMyQueryMessageType(), this, "handleOctreeQueryPacket");
packetReceiver.registerPacketListener(PacketType::OctreeDataNack, this, "handleOctreeDataNackPacket");
packetReceiver.registerPacketListener(PacketType::JurisdictionRequest, this, "handleJurisdictionRequestPacket");
}
OctreeServer::~OctreeServer() {
@ -1108,6 +1103,12 @@ void OctreeServer::readConfiguration() {
}
void OctreeServer::run() {
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerPacketListener(getMyQueryMessageType(), this, "handleOctreeQueryPacket");
packetReceiver.registerPacketListener(PacketType::OctreeDataNack, this, "handleOctreeDataNackPacket");
packetReceiver.registerPacketListener(PacketType::JurisdictionRequest, this, "handleJurisdictionRequestPacket");
_safeServerName = getMyServerName();
// Before we do anything else, create our tree...

View file

@ -32,7 +32,7 @@ const int DEFAULT_PACKETS_PER_INTERVAL = 2000; // some 120,000 packets per secon
class OctreeServer : public ThreadedAssignment, public HTTPRequestHandler {
Q_OBJECT
public:
OctreeServer(const QByteArray& packet);
OctreeServer(NLPacket& packet);
~OctreeServer();
/// allows setting of run arguments

View file

@ -24,7 +24,7 @@ PacketReceiver::PacketReceiver(QObject* parent) :
}
void PacketReceiver::registerPacketListenerForSet(const QSet<PacketType::Value>& types, QObject* object, const char* slot) {
void PacketReceiver::registerPacketListenerForTypes(const QSet<PacketType::Value>& types, QObject* object, const char* slot) {
QSet<PacketType::Value> nonSourcedTypes;
QSet<PacketType::Value> sourcedTypes;

View file

@ -39,7 +39,7 @@ public:
void shutdown() { _isShuttingDown = true; }
void registerPacketListenerForSet(const QSet<PacketType::Value>& types, QObject* listener, const char* slot);
void registerPacketListenerForTypes(const QSet<PacketType::Value>& types, QObject* listener, const char* slot);
void registerPacketListener(PacketType::Value type, QObject* listener, const char* slot);
public slots:

View file

@ -110,7 +110,7 @@ void OctreeRenderer::processDatagram(NLPacket& packet, SharedNodePointer sourceN
while (packet.bytesAvailable() && !error) {
if (packetIsCompressed) {
if (packet.bytesAvailable() > sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE)) {
if (packet.bytesAvailable() > (qint64) sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE)) {
packet.readPrimitive(&sectionLength);
} else {
sectionLength = 0;