mirror of
https://github.com/HifiExperiments/overte.git
synced 2025-08-09 05:48:26 +02:00
moved most of VoxelEditPacketSender into new generic base class OctreeEditPacketSender
This commit is contained in:
parent
072ced0185
commit
c067f8ad11
4 changed files with 426 additions and 373 deletions
299
libraries/octree/src/OctreeEditPacketSender.cpp
Normal file
299
libraries/octree/src/OctreeEditPacketSender.cpp
Normal file
|
@ -0,0 +1,299 @@
|
||||||
|
//
|
||||||
|
// OctreeEditPacketSender.cpp
|
||||||
|
// interface
|
||||||
|
//
|
||||||
|
// Created by Brad Hefta-Gaub on 8/12/13.
|
||||||
|
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
|
||||||
|
//
|
||||||
|
// Threaded or non-threaded packet Sender for the Application
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
|
||||||
|
#include <PerfStat.h>
|
||||||
|
|
||||||
|
#include <OctalCode.h>
|
||||||
|
#include <PacketHeaders.h>
|
||||||
|
#include "OctreeEditPacketSender.h"
|
||||||
|
|
||||||
|
|
||||||
|
EditPacketBuffer::EditPacketBuffer(PACKET_TYPE type, unsigned char* buffer, ssize_t length, QUuid nodeUUID) {
|
||||||
|
_nodeUUID = nodeUUID;
|
||||||
|
_currentType = type;
|
||||||
|
_currentSize = length;
|
||||||
|
memcpy(_currentBuffer, buffer, length);
|
||||||
|
};
|
||||||
|
|
||||||
|
const int OctreeEditPacketSender::DEFAULT_MAX_PENDING_MESSAGES = PacketSender::DEFAULT_PACKETS_PER_SECOND;
|
||||||
|
|
||||||
|
|
||||||
|
OctreeEditPacketSender::OctreeEditPacketSender(PacketSenderNotify* notify) :
|
||||||
|
PacketSender(notify),
|
||||||
|
_shouldSend(true),
|
||||||
|
_maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES),
|
||||||
|
_releaseQueuedMessagesPending(false),
|
||||||
|
_serverJurisdictions(NULL),
|
||||||
|
_sequenceNumber(0),
|
||||||
|
_maxPacketSize(MAX_PACKET_SIZE) {
|
||||||
|
}
|
||||||
|
|
||||||
|
OctreeEditPacketSender::~OctreeEditPacketSender() {
|
||||||
|
while (!_preServerSingleMessagePackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
||||||
|
}
|
||||||
|
while (!_preServerPackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerPackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerPackets.erase(_preServerPackets.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool OctreeEditPacketSender::serversExist() const {
|
||||||
|
bool hasServers = false;
|
||||||
|
bool atLeastOnJurisdictionMissing = false; // assume the best
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||||
|
// only send to the NodeTypes that are getMyNodeType()
|
||||||
|
if (node->getType() == getMyNodeType()) {
|
||||||
|
if (nodeList->getNodeActiveSocketOrPing(&(*node))) {
|
||||||
|
QUuid nodeUUID = node->getUUID();
|
||||||
|
// If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server
|
||||||
|
if (_serverJurisdictions) {
|
||||||
|
// lookup our nodeUUID in the jurisdiction map, if it's missing then we're
|
||||||
|
// missing at least one jurisdiction
|
||||||
|
if ((*_serverJurisdictions).find(nodeUUID) == (*_serverJurisdictions).end()) {
|
||||||
|
atLeastOnJurisdictionMissing = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hasServers = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (atLeastOnJurisdictionMissing) {
|
||||||
|
break; // no point in looking further...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (hasServers && !atLeastOnJurisdictionMissing);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for
|
||||||
|
// a known nodeID. However, we also want to handle the case where the
|
||||||
|
void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, ssize_t length) {
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||||
|
// only send to the NodeTypes that are getMyNodeType()
|
||||||
|
if (node->getType() == getMyNodeType() &&
|
||||||
|
((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) {
|
||||||
|
if (nodeList->getNodeActiveSocketOrPing(&(*node))) {
|
||||||
|
const HifiSockAddr* nodeAddress = node->getActiveSocket();
|
||||||
|
queuePacketForSending(*nodeAddress, buffer, length);
|
||||||
|
|
||||||
|
// debugging output...
|
||||||
|
bool wantDebugging = false;
|
||||||
|
if (wantDebugging) {
|
||||||
|
int numBytesPacketHeader = numBytesForPacketHeader(buffer);
|
||||||
|
unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader)));
|
||||||
|
uint64_t createdAt = (*((uint64_t*)(buffer + numBytesPacketHeader + sizeof(sequence))));
|
||||||
|
uint64_t queuedAt = usecTimestampNow();
|
||||||
|
uint64_t transitTime = queuedAt - createdAt;
|
||||||
|
printf("OctreeEditPacketSender::queuePacketToNode() queued %c - command to node bytes=%ld sequence=%d transitTimeSoFar=%llu usecs\n",
|
||||||
|
buffer[0], length, sequence, transitTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeEditPacketSender::processPreServerExistsPackets() {
|
||||||
|
assert(serversExist()); // we should only be here if we have jurisdictions
|
||||||
|
|
||||||
|
// First send out all the single message packets...
|
||||||
|
while (!_preServerSingleMessagePackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
||||||
|
queuePacketToNodes(&packet->_currentBuffer[0], packet->_currentSize);
|
||||||
|
delete packet;
|
||||||
|
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then "process" all the packable messages...
|
||||||
|
while (!_preServerPackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerPackets.front();
|
||||||
|
queueOctreeEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize);
|
||||||
|
delete packet;
|
||||||
|
_preServerPackets.erase(_preServerPackets.begin());
|
||||||
|
}
|
||||||
|
|
||||||
|
// if while waiting for the jurisdictions the caller called releaseQueuedMessages()
|
||||||
|
// then we want to honor that request now.
|
||||||
|
if (_releaseQueuedMessagesPending) {
|
||||||
|
releaseQueuedMessages();
|
||||||
|
_releaseQueuedMessagesPending = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeEditPacketSender::queuePendingPacketToNodes(PACKET_TYPE type, unsigned char* buffer, ssize_t length) {
|
||||||
|
// If we're asked to save messages while waiting for voxel servers to arrive, then do so...
|
||||||
|
if (_maxPendingMessages > 0) {
|
||||||
|
EditPacketBuffer* packet = new EditPacketBuffer(type, buffer, length);
|
||||||
|
_preServerSingleMessagePackets.push_back(packet);
|
||||||
|
// if we've saved MORE than our max, then clear out the oldest packet...
|
||||||
|
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
|
||||||
|
if (allPendingMessages > _maxPendingMessages) {
|
||||||
|
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, ssize_t length) {
|
||||||
|
if (!_shouldSend) {
|
||||||
|
return; // bail early
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(serversExist()); // we must have jurisdictions to be here!!
|
||||||
|
|
||||||
|
int headerBytes = numBytesForPacketHeader(buffer) + sizeof(short) + sizeof(uint64_t);
|
||||||
|
unsigned char* octCode = buffer + headerBytes; // skip the packet header to get to the octcode
|
||||||
|
|
||||||
|
// We want to filter out edit messages for servers based on the server's Jurisdiction
|
||||||
|
// But we can't really do that with a packed message, since each edit message could be destined
|
||||||
|
// for a different server... So we need to actually manage multiple queued packets... one
|
||||||
|
// for each server
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||||
|
// only send to the NodeTypes that are getMyNodeType()
|
||||||
|
if (node->getActiveSocket() != NULL && node->getType() == getMyNodeType()) {
|
||||||
|
QUuid nodeUUID = node->getUUID();
|
||||||
|
bool isMyJurisdiction = true;
|
||||||
|
// we need to get the jurisdiction for this
|
||||||
|
// here we need to get the "pending packet" for this server
|
||||||
|
const JurisdictionMap& map = (*_serverJurisdictions)[nodeUUID];
|
||||||
|
isMyJurisdiction = (map.isMyJurisdiction(octCode, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
|
||||||
|
if (isMyJurisdiction) {
|
||||||
|
queuePacketToNode(nodeUUID, buffer, length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// NOTE: codeColorBuffer - is JUST the octcode/color and does not contain the packet header!
|
||||||
|
void OctreeEditPacketSender::queueOctreeEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
|
||||||
|
if (!_shouldSend) {
|
||||||
|
return; // bail early
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have jurisdictions, then we will simply queue up all of these packets and wait till we have
|
||||||
|
// jurisdictions for processing
|
||||||
|
if (!serversExist()) {
|
||||||
|
if (_maxPendingMessages > 0) {
|
||||||
|
EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length);
|
||||||
|
_preServerPackets.push_back(packet);
|
||||||
|
|
||||||
|
// if we've saved MORE than out max, then clear out the oldest packet...
|
||||||
|
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
|
||||||
|
if (allPendingMessages > _maxPendingMessages) {
|
||||||
|
EditPacketBuffer* packet = _preServerPackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerPackets.erase(_preServerPackets.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return; // bail early
|
||||||
|
}
|
||||||
|
|
||||||
|
// We want to filter out edit messages for servers based on the server's Jurisdiction
|
||||||
|
// But we can't really do that with a packed message, since each edit message could be destined
|
||||||
|
// for a different server... So we need to actually manage multiple queued packets... one
|
||||||
|
// for each server
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||||
|
// only send to the NodeTypes that are getMyNodeType()
|
||||||
|
if (node->getActiveSocket() != NULL && node->getType() == getMyNodeType()) {
|
||||||
|
QUuid nodeUUID = node->getUUID();
|
||||||
|
bool isMyJurisdiction = true;
|
||||||
|
|
||||||
|
if (_serverJurisdictions) {
|
||||||
|
// we need to get the jurisdiction for this
|
||||||
|
// here we need to get the "pending packet" for this server
|
||||||
|
if ((*_serverJurisdictions).find(nodeUUID) != (*_serverJurisdictions).end()) {
|
||||||
|
const JurisdictionMap& map = (*_serverJurisdictions)[nodeUUID];
|
||||||
|
isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
|
||||||
|
} else {
|
||||||
|
isMyJurisdiction = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (isMyJurisdiction) {
|
||||||
|
EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID];
|
||||||
|
packetBuffer._nodeUUID = nodeUUID;
|
||||||
|
|
||||||
|
// If we're switching type, then we send the last one and start over
|
||||||
|
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) ||
|
||||||
|
(packetBuffer._currentSize + length >= _maxPacketSize)) {
|
||||||
|
releaseQueuedPacket(packetBuffer);
|
||||||
|
initializePacket(packetBuffer, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the buffer is empty and not correctly initialized for our type...
|
||||||
|
if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) {
|
||||||
|
initializePacket(packetBuffer, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], codeColorBuffer, length);
|
||||||
|
packetBuffer._currentSize += length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeEditPacketSender::releaseQueuedMessages() {
|
||||||
|
// if we don't yet have jurisdictions then we can't actually release messages yet because we don't
|
||||||
|
// know where to send them to. Instead, just remember this request and when we eventually get jurisdictions
|
||||||
|
// call release again at that time.
|
||||||
|
if (!serversExist()) {
|
||||||
|
_releaseQueuedMessagesPending = true;
|
||||||
|
} else {
|
||||||
|
for (std::map<QUuid, EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
|
||||||
|
releaseQueuedPacket(i->second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) {
|
||||||
|
if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PACKET_TYPE_UNKNOWN) {
|
||||||
|
queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize);
|
||||||
|
}
|
||||||
|
packetBuffer._currentSize = 0;
|
||||||
|
packetBuffer._currentType = PACKET_TYPE_UNKNOWN;
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type) {
|
||||||
|
packetBuffer._currentSize = populateTypeAndVersion(&packetBuffer._currentBuffer[0], type);
|
||||||
|
|
||||||
|
// pack in sequence number
|
||||||
|
unsigned short int* sequenceAt = (unsigned short int*)&packetBuffer._currentBuffer[packetBuffer._currentSize];
|
||||||
|
*sequenceAt = _sequenceNumber;
|
||||||
|
packetBuffer._currentSize += sizeof(unsigned short int); // nudge past sequence
|
||||||
|
_sequenceNumber++;
|
||||||
|
|
||||||
|
// pack in timestamp
|
||||||
|
uint64_t now = usecTimestampNow();
|
||||||
|
uint64_t* timeAt = (uint64_t*)&packetBuffer._currentBuffer[packetBuffer._currentSize];
|
||||||
|
*timeAt = now;
|
||||||
|
packetBuffer._currentSize += sizeof(uint64_t); // nudge past timestamp
|
||||||
|
|
||||||
|
packetBuffer._currentType = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool OctreeEditPacketSender::process() {
|
||||||
|
// if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those
|
||||||
|
// before doing our normal process step. This processPreJurisdictionPackets()
|
||||||
|
if (serversExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) {
|
||||||
|
processPreServerExistsPackets();
|
||||||
|
}
|
||||||
|
|
||||||
|
// base class does most of the work.
|
||||||
|
return PacketSender::process();
|
||||||
|
}
|
114
libraries/octree/src/OctreeEditPacketSender.h
Normal file
114
libraries/octree/src/OctreeEditPacketSender.h
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
//
|
||||||
|
// OctreeEditPacketSender.h
|
||||||
|
// shared
|
||||||
|
//
|
||||||
|
// Created by Brad Hefta-Gaub on 8/12/13.
|
||||||
|
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
|
||||||
|
//
|
||||||
|
// Octree Edit Packet Sender
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef __shared__OctreeEditPacketSender__
|
||||||
|
#define __shared__OctreeEditPacketSender__
|
||||||
|
|
||||||
|
#include <PacketSender.h>
|
||||||
|
#include <PacketHeaders.h>
|
||||||
|
#include "JurisdictionMap.h"
|
||||||
|
|
||||||
|
/// Used for construction of edit packets
|
||||||
|
class EditPacketBuffer {
|
||||||
|
public:
|
||||||
|
EditPacketBuffer() : _nodeUUID(), _currentType(PACKET_TYPE_UNKNOWN), _currentSize(0) { }
|
||||||
|
EditPacketBuffer(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length, const QUuid nodeUUID = QUuid());
|
||||||
|
QUuid _nodeUUID;
|
||||||
|
PACKET_TYPE _currentType;
|
||||||
|
unsigned char _currentBuffer[MAX_PACKET_SIZE];
|
||||||
|
ssize_t _currentSize;
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Utility for processing, packing, queueing and sending of outbound edit messages.
|
||||||
|
class OctreeEditPacketSender : public virtual PacketSender {
|
||||||
|
public:
|
||||||
|
OctreeEditPacketSender(PacketSenderNotify* notify = NULL);
|
||||||
|
~OctreeEditPacketSender();
|
||||||
|
|
||||||
|
/// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server
|
||||||
|
/// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to
|
||||||
|
/// MaxPendingMessages will be buffered and processed when servers are known.
|
||||||
|
void queueOctreeEditMessage(PACKET_TYPE type, unsigned char* buffer, ssize_t length);
|
||||||
|
|
||||||
|
/// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message
|
||||||
|
/// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to
|
||||||
|
/// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular
|
||||||
|
/// interval to ensure that the packets are actually sent. Can be called even before servers are known, in
|
||||||
|
/// which case up to MaxPendingMessages of the released messages will be buffered and actually released when
|
||||||
|
/// servers are known.
|
||||||
|
void releaseQueuedMessages();
|
||||||
|
|
||||||
|
/// are we in sending mode. If we're not in sending mode then all packets and messages will be ignored and
|
||||||
|
/// not queued and not sent
|
||||||
|
bool getShouldSend() const { return _shouldSend; }
|
||||||
|
|
||||||
|
/// set sending mode. By default we are set to shouldSend=TRUE and packets will be sent. If shouldSend=FALSE, then we'll
|
||||||
|
/// switch to not sending mode, and all packets and messages will be ignored, not queued, and not sent. This might be used
|
||||||
|
/// in an application like interface when all octree features are disabled.
|
||||||
|
void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; }
|
||||||
|
|
||||||
|
/// call this to inform the OctreeEditPacketSender of the server jurisdictions. This is required for normal operation.
|
||||||
|
/// The internal contents of the jurisdiction map may change throughout the lifetime of the OctreeEditPacketSender. This map
|
||||||
|
/// can be set prior to servers being present, so long as the contents of the map accurately reflect the current
|
||||||
|
/// known jurisdictions.
|
||||||
|
void setServerJurisdictions(NodeToJurisdictionMap* serverJurisdictions) {
|
||||||
|
_serverJurisdictions = serverJurisdictions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// if you're running in non-threaded mode, you must call this method regularly
|
||||||
|
virtual bool process();
|
||||||
|
|
||||||
|
/// Set the desired number of pending messages that the OctreeEditPacketSender should attempt to queue even if
|
||||||
|
/// servers are not present. This only applies to how the OctreeEditPacketSender will manage messages when no
|
||||||
|
/// servers are present. By default, this value is the same as the default packets that will be sent in one second.
|
||||||
|
/// Which means the OctreeEditPacketSender will not buffer all messages given to it if no servers are present.
|
||||||
|
/// This is the maximum number of queued messages and single messages.
|
||||||
|
void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; }
|
||||||
|
|
||||||
|
// the default number of pending messages we will store if no servers are available
|
||||||
|
static const int DEFAULT_MAX_PENDING_MESSAGES;
|
||||||
|
|
||||||
|
// is there an octree server available to send packets to
|
||||||
|
bool serversExist() const;
|
||||||
|
|
||||||
|
/// Set the desired max packet size in bytes that the OctreeEditPacketSender should create
|
||||||
|
void setMaxPacketSize(int maxPacketSize) { _maxPacketSize = maxPacketSize; }
|
||||||
|
|
||||||
|
/// returns the current desired max packet size in bytes that the OctreeEditPacketSender will create
|
||||||
|
int getMaxPacketSize() const { return _maxPacketSize; }
|
||||||
|
|
||||||
|
// you must override these...
|
||||||
|
virtual unsigned char getMyNodeType() const = 0;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
bool _shouldSend;
|
||||||
|
void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, ssize_t length);
|
||||||
|
void queuePendingPacketToNodes(PACKET_TYPE type, unsigned char* buffer, ssize_t length);
|
||||||
|
void queuePacketToNodes(unsigned char* buffer, ssize_t length);
|
||||||
|
void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type);
|
||||||
|
void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet
|
||||||
|
|
||||||
|
void processPreServerExistsPackets();
|
||||||
|
|
||||||
|
// These are packets which are destined from know servers but haven't been released because they're still too small
|
||||||
|
std::map<QUuid, EditPacketBuffer> _pendingEditPackets;
|
||||||
|
|
||||||
|
// These are packets that are waiting to be processed because we don't yet know if there are servers
|
||||||
|
int _maxPendingMessages;
|
||||||
|
bool _releaseQueuedMessagesPending;
|
||||||
|
std::vector<EditPacketBuffer*> _preServerPackets; // these will get packed into other larger packets
|
||||||
|
std::vector<EditPacketBuffer*> _preServerSingleMessagePackets; // these will go out as is
|
||||||
|
|
||||||
|
NodeToJurisdictionMap* _serverJurisdictions;
|
||||||
|
|
||||||
|
unsigned short int _sequenceNumber;
|
||||||
|
int _maxPacketSize;
|
||||||
|
};
|
||||||
|
#endif // __shared__OctreeEditPacketSender__
|
|
@ -9,48 +9,12 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
#include <PerfStat.h>
|
#include <PerfStat.h>
|
||||||
|
|
||||||
#include <OctalCode.h>
|
#include <OctalCode.h>
|
||||||
#include <PacketHeaders.h>
|
#include <PacketHeaders.h>
|
||||||
#include "VoxelEditPacketSender.h"
|
#include "VoxelEditPacketSender.h"
|
||||||
|
|
||||||
|
|
||||||
EditPacketBuffer::EditPacketBuffer(PACKET_TYPE type, unsigned char* buffer, ssize_t length, QUuid nodeUUID) {
|
|
||||||
_nodeUUID = nodeUUID;
|
|
||||||
_currentType = type;
|
|
||||||
_currentSize = length;
|
|
||||||
memcpy(_currentBuffer, buffer, length);
|
|
||||||
};
|
|
||||||
|
|
||||||
const int VoxelEditPacketSender::DEFAULT_MAX_PENDING_MESSAGES = PacketSender::DEFAULT_PACKETS_PER_SECOND;
|
|
||||||
|
|
||||||
|
|
||||||
VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) :
|
|
||||||
PacketSender(notify),
|
|
||||||
_shouldSend(true),
|
|
||||||
_maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES),
|
|
||||||
_releaseQueuedMessagesPending(false),
|
|
||||||
_voxelServerJurisdictions(NULL),
|
|
||||||
_sequenceNumber(0),
|
|
||||||
_maxPacketSize(MAX_PACKET_SIZE) {
|
|
||||||
}
|
|
||||||
|
|
||||||
VoxelEditPacketSender::~VoxelEditPacketSender() {
|
|
||||||
while (!_preServerSingleMessagePackets.empty()) {
|
|
||||||
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
|
||||||
delete packet;
|
|
||||||
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
|
||||||
}
|
|
||||||
while (!_preServerPackets.empty()) {
|
|
||||||
EditPacketBuffer* packet = _preServerPackets.front();
|
|
||||||
delete packet;
|
|
||||||
_preServerPackets.erase(_preServerPackets.begin());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
|
void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
|
||||||
// allows app to disable sending if for example voxels have been disabled
|
// allows app to disable sending if for example voxels have been disabled
|
||||||
if (!_shouldSend) {
|
if (!_shouldSend) {
|
||||||
|
@ -65,19 +29,7 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
|
||||||
// If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have
|
// If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have
|
||||||
// jurisdictions for processing
|
// jurisdictions for processing
|
||||||
if (!voxelServersExist()) {
|
if (!voxelServersExist()) {
|
||||||
// If we're asked to save messages while waiting for voxel servers to arrive, then do so...
|
queuePendingPacketToNodes(type, bufferOut, sizeOut);
|
||||||
if (_maxPendingMessages > 0) {
|
|
||||||
EditPacketBuffer* packet = new EditPacketBuffer(type, bufferOut, sizeOut);
|
|
||||||
_preServerSingleMessagePackets.push_back(packet);
|
|
||||||
// if we've saved MORE than out max, then clear out the oldest packet...
|
|
||||||
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
|
|
||||||
if (allPendingMessages > _maxPendingMessages) {
|
|
||||||
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
|
||||||
delete packet;
|
|
||||||
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return; // bail early
|
|
||||||
} else {
|
} else {
|
||||||
queuePacketToNodes(bufferOut, sizeOut);
|
queuePacketToNodes(bufferOut, sizeOut);
|
||||||
}
|
}
|
||||||
|
@ -87,74 +39,6 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool VoxelEditPacketSender::voxelServersExist() const {
|
|
||||||
bool hasVoxelServers = false;
|
|
||||||
bool atLeastOnJurisdictionMissing = false; // assume the best
|
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
|
||||||
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
|
||||||
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
|
||||||
if (node->getType() == NODE_TYPE_VOXEL_SERVER) {
|
|
||||||
if (nodeList->getNodeActiveSocketOrPing(&(*node))) {
|
|
||||||
QUuid nodeUUID = node->getUUID();
|
|
||||||
// If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server
|
|
||||||
if (_voxelServerJurisdictions) {
|
|
||||||
// lookup our nodeUUID in the jurisdiction map, if it's missing then we're
|
|
||||||
// missing at least one jurisdiction
|
|
||||||
if ((*_voxelServerJurisdictions).find(nodeUUID) == (*_voxelServerJurisdictions).end()) {
|
|
||||||
atLeastOnJurisdictionMissing = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hasVoxelServers = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (atLeastOnJurisdictionMissing) {
|
|
||||||
break; // no point in looking further...
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return (hasVoxelServers && !atLeastOnJurisdictionMissing);
|
|
||||||
}
|
|
||||||
|
|
||||||
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for
|
|
||||||
// a known nodeID. However, we also want to handle the case where the
|
|
||||||
void VoxelEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, ssize_t length) {
|
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
|
||||||
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
|
||||||
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
|
||||||
if (node->getType() == NODE_TYPE_VOXEL_SERVER &&
|
|
||||||
((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) {
|
|
||||||
if (nodeList->getNodeActiveSocketOrPing(&(*node))) {
|
|
||||||
const HifiSockAddr* nodeAddress = node->getActiveSocket();
|
|
||||||
queuePacketForSending(*nodeAddress, buffer, length);
|
|
||||||
|
|
||||||
// debugging output...
|
|
||||||
bool wantDebugging = false;
|
|
||||||
if (wantDebugging) {
|
|
||||||
int numBytesPacketHeader = numBytesForPacketHeader(buffer);
|
|
||||||
unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader)));
|
|
||||||
uint64_t createdAt = (*((uint64_t*)(buffer + numBytesPacketHeader + sizeof(sequence))));
|
|
||||||
uint64_t queuedAt = usecTimestampNow();
|
|
||||||
uint64_t transitTime = queuedAt - createdAt;
|
|
||||||
|
|
||||||
const char* messageName;
|
|
||||||
switch (buffer[0]) {
|
|
||||||
case PACKET_TYPE_VOXEL_SET:
|
|
||||||
messageName = "PACKET_TYPE_VOXEL_SET";
|
|
||||||
break;
|
|
||||||
case PACKET_TYPE_VOXEL_SET_DESTRUCTIVE:
|
|
||||||
messageName = "PACKET_TYPE_VOXEL_SET_DESTRUCTIVE";
|
|
||||||
break;
|
|
||||||
case PACKET_TYPE_VOXEL_ERASE:
|
|
||||||
messageName = "PACKET_TYPE_VOXEL_ERASE";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
printf("VoxelEditPacketSender::queuePacketToNode() queued %s - command to node bytes=%ld sequence=%d transitTimeSoFar=%llu usecs\n",
|
|
||||||
messageName, length, sequence, transitTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) {
|
void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) {
|
||||||
if (!_shouldSend) {
|
if (!_shouldSend) {
|
||||||
return; // bail early
|
return; // bail early
|
||||||
|
@ -166,184 +50,8 @@ void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberO
|
||||||
int sizeOut = 0;
|
int sizeOut = 0;
|
||||||
|
|
||||||
if (encodeVoxelEditMessageDetails(type, 1, &details[i], &bufferOut[0], _maxPacketSize, sizeOut)) {
|
if (encodeVoxelEditMessageDetails(type, 1, &details[i], &bufferOut[0], _maxPacketSize, sizeOut)) {
|
||||||
queueVoxelEditMessage(type, bufferOut, sizeOut);
|
queueOctreeEditMessage(type, bufferOut, sizeOut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void VoxelEditPacketSender::processPreServerExistsPackets() {
|
|
||||||
assert(voxelServersExist()); // we should only be here if we have jurisdictions
|
|
||||||
|
|
||||||
// First send out all the single message packets...
|
|
||||||
while (!_preServerSingleMessagePackets.empty()) {
|
|
||||||
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
|
||||||
queuePacketToNodes(&packet->_currentBuffer[0], packet->_currentSize);
|
|
||||||
delete packet;
|
|
||||||
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then "process" all the packable messages...
|
|
||||||
while (!_preServerPackets.empty()) {
|
|
||||||
EditPacketBuffer* packet = _preServerPackets.front();
|
|
||||||
queueVoxelEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize);
|
|
||||||
delete packet;
|
|
||||||
_preServerPackets.erase(_preServerPackets.begin());
|
|
||||||
}
|
|
||||||
|
|
||||||
// if while waiting for the jurisdictions the caller called releaseQueuedMessages()
|
|
||||||
// then we want to honor that request now.
|
|
||||||
if (_releaseQueuedMessagesPending) {
|
|
||||||
releaseQueuedMessages();
|
|
||||||
_releaseQueuedMessagesPending = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void VoxelEditPacketSender::queuePacketToNodes(unsigned char* buffer, ssize_t length) {
|
|
||||||
if (!_shouldSend) {
|
|
||||||
return; // bail early
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(voxelServersExist()); // we must have jurisdictions to be here!!
|
|
||||||
|
|
||||||
int headerBytes = numBytesForPacketHeader(buffer) + sizeof(short) + sizeof(uint64_t);
|
|
||||||
unsigned char* octCode = buffer + headerBytes; // skip the packet header to get to the octcode
|
|
||||||
|
|
||||||
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction
|
|
||||||
// But we can't really do that with a packed message, since each edit message could be destined
|
|
||||||
// for a different voxel server... So we need to actually manage multiple queued packets... one
|
|
||||||
// for each voxel server
|
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
|
||||||
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
|
||||||
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
|
||||||
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) {
|
|
||||||
QUuid nodeUUID = node->getUUID();
|
|
||||||
bool isMyJurisdiction = true;
|
|
||||||
// we need to get the jurisdiction for this
|
|
||||||
// here we need to get the "pending packet" for this server
|
|
||||||
const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeUUID];
|
|
||||||
isMyJurisdiction = (map.isMyJurisdiction(octCode, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
|
|
||||||
if (isMyJurisdiction) {
|
|
||||||
queuePacketToNode(nodeUUID, buffer, length);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// NOTE: codeColorBuffer - is JUST the octcode/color and does not contain the packet header!
|
|
||||||
void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
|
|
||||||
if (!_shouldSend) {
|
|
||||||
return; // bail early
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we don't have voxel jurisdictions, then we will simply queue up all of these packets and wait till we have
|
|
||||||
// jurisdictions for processing
|
|
||||||
if (!voxelServersExist()) {
|
|
||||||
if (_maxPendingMessages > 0) {
|
|
||||||
EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length);
|
|
||||||
_preServerPackets.push_back(packet);
|
|
||||||
|
|
||||||
// if we've saved MORE than out max, then clear out the oldest packet...
|
|
||||||
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
|
|
||||||
if (allPendingMessages > _maxPendingMessages) {
|
|
||||||
EditPacketBuffer* packet = _preServerPackets.front();
|
|
||||||
delete packet;
|
|
||||||
_preServerPackets.erase(_preServerPackets.begin());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return; // bail early
|
|
||||||
}
|
|
||||||
|
|
||||||
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction
|
|
||||||
// But we can't really do that with a packed message, since each edit message could be destined
|
|
||||||
// for a different voxel server... So we need to actually manage multiple queued packets... one
|
|
||||||
// for each voxel server
|
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
|
||||||
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
|
||||||
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
|
||||||
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) {
|
|
||||||
QUuid nodeUUID = node->getUUID();
|
|
||||||
bool isMyJurisdiction = true;
|
|
||||||
|
|
||||||
if (_voxelServerJurisdictions) {
|
|
||||||
// we need to get the jurisdiction for this
|
|
||||||
// here we need to get the "pending packet" for this server
|
|
||||||
if ((*_voxelServerJurisdictions).find(nodeUUID) != (*_voxelServerJurisdictions).end()) {
|
|
||||||
const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeUUID];
|
|
||||||
isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
|
|
||||||
} else {
|
|
||||||
isMyJurisdiction = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (isMyJurisdiction) {
|
|
||||||
EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID];
|
|
||||||
packetBuffer._nodeUUID = nodeUUID;
|
|
||||||
|
|
||||||
// If we're switching type, then we send the last one and start over
|
|
||||||
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) ||
|
|
||||||
(packetBuffer._currentSize + length >= _maxPacketSize)) {
|
|
||||||
releaseQueuedPacket(packetBuffer);
|
|
||||||
initializePacket(packetBuffer, type);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the buffer is empty and not correctly initialized for our type...
|
|
||||||
if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) {
|
|
||||||
initializePacket(packetBuffer, type);
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], codeColorBuffer, length);
|
|
||||||
packetBuffer._currentSize += length;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void VoxelEditPacketSender::releaseQueuedMessages() {
|
|
||||||
// if we don't yet have jurisdictions then we can't actually release messages yet because we don't
|
|
||||||
// know where to send them to. Instead, just remember this request and when we eventually get jurisdictions
|
|
||||||
// call release again at that time.
|
|
||||||
if (!voxelServersExist()) {
|
|
||||||
_releaseQueuedMessagesPending = true;
|
|
||||||
} else {
|
|
||||||
for (std::map<QUuid, EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
|
|
||||||
releaseQueuedPacket(i->second);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void VoxelEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) {
|
|
||||||
if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PACKET_TYPE_UNKNOWN) {
|
|
||||||
queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize);
|
|
||||||
}
|
|
||||||
packetBuffer._currentSize = 0;
|
|
||||||
packetBuffer._currentType = PACKET_TYPE_UNKNOWN;
|
|
||||||
}
|
|
||||||
|
|
||||||
void VoxelEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type) {
|
|
||||||
packetBuffer._currentSize = populateTypeAndVersion(&packetBuffer._currentBuffer[0], type);
|
|
||||||
|
|
||||||
// pack in sequence number
|
|
||||||
unsigned short int* sequenceAt = (unsigned short int*)&packetBuffer._currentBuffer[packetBuffer._currentSize];
|
|
||||||
*sequenceAt = _sequenceNumber;
|
|
||||||
packetBuffer._currentSize += sizeof(unsigned short int); // nudge past sequence
|
|
||||||
_sequenceNumber++;
|
|
||||||
|
|
||||||
// pack in timestamp
|
|
||||||
uint64_t now = usecTimestampNow();
|
|
||||||
uint64_t* timeAt = (uint64_t*)&packetBuffer._currentBuffer[packetBuffer._currentSize];
|
|
||||||
*timeAt = now;
|
|
||||||
packetBuffer._currentSize += sizeof(uint64_t); // nudge past timestamp
|
|
||||||
|
|
||||||
packetBuffer._currentType = type;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool VoxelEditPacketSender::process() {
|
|
||||||
// if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those
|
|
||||||
// before doing our normal process step. This processPreJurisdictionPackets()
|
|
||||||
if (voxelServersExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) {
|
|
||||||
processPreServerExistsPackets();
|
|
||||||
}
|
|
||||||
|
|
||||||
// base class does most of the work.
|
|
||||||
return PacketSender::process();
|
|
||||||
}
|
|
||||||
|
|
|
@ -11,27 +11,13 @@
|
||||||
#ifndef __shared__VoxelEditPacketSender__
|
#ifndef __shared__VoxelEditPacketSender__
|
||||||
#define __shared__VoxelEditPacketSender__
|
#define __shared__VoxelEditPacketSender__
|
||||||
|
|
||||||
#include <PacketSender.h>
|
#include <OctreeEditPacketSender.h>
|
||||||
#include <PacketHeaders.h>
|
|
||||||
#include <SharedUtil.h> // for VoxelDetail
|
|
||||||
#include "JurisdictionMap.h"
|
|
||||||
|
|
||||||
/// Used for construction of edit voxel packets
|
|
||||||
class EditPacketBuffer {
|
|
||||||
public:
|
|
||||||
EditPacketBuffer() : _nodeUUID(), _currentType(PACKET_TYPE_UNKNOWN), _currentSize(0) { }
|
|
||||||
EditPacketBuffer(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length, const QUuid nodeUUID = QUuid());
|
|
||||||
QUuid _nodeUUID;
|
|
||||||
PACKET_TYPE _currentType;
|
|
||||||
unsigned char _currentBuffer[MAX_PACKET_SIZE];
|
|
||||||
ssize_t _currentSize;
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Utility for processing, packing, queueing and sending of outbound edit voxel messages.
|
/// Utility for processing, packing, queueing and sending of outbound edit voxel messages.
|
||||||
class VoxelEditPacketSender : public virtual PacketSender {
|
class VoxelEditPacketSender : public virtual OctreeEditPacketSender {
|
||||||
public:
|
public:
|
||||||
VoxelEditPacketSender(PacketSenderNotify* notify = NULL);
|
VoxelEditPacketSender(PacketSenderNotify* notify = NULL) : OctreeEditPacketSender(notify) { }
|
||||||
~VoxelEditPacketSender();
|
~VoxelEditPacketSender() { }
|
||||||
|
|
||||||
/// Send voxel edit message immediately
|
/// Send voxel edit message immediately
|
||||||
void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail);
|
void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail);
|
||||||
|
@ -39,81 +25,27 @@ public:
|
||||||
/// Queues a single voxel edit message. Will potentially send a pending multi-command packet. Determines which voxel-server
|
/// Queues a single voxel edit message. Will potentially send a pending multi-command packet. Determines which voxel-server
|
||||||
/// node or nodes the packet should be sent to. Can be called even before voxel servers are known, in which case up to
|
/// node or nodes the packet should be sent to. Can be called even before voxel servers are known, in which case up to
|
||||||
/// MaxPendingMessages will be buffered and processed when voxel servers are known.
|
/// MaxPendingMessages will be buffered and processed when voxel servers are known.
|
||||||
void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length);
|
void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
|
||||||
|
queueOctreeEditMessage(type, codeColorBuffer, length);
|
||||||
|
}
|
||||||
|
|
||||||
/// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines
|
/// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines
|
||||||
/// which voxel-server node or nodes the packet should be sent to. Can be called even before voxel servers are known, in
|
/// which voxel-server node or nodes the packet should be sent to. Can be called even before voxel servers are known, in
|
||||||
/// which case up to MaxPendingMessages will be buffered and processed when voxel servers are known.
|
/// which case up to MaxPendingMessages will be buffered and processed when voxel servers are known.
|
||||||
void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details);
|
void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details);
|
||||||
|
|
||||||
/// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message
|
|
||||||
/// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to
|
|
||||||
/// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular
|
|
||||||
/// interval to ensure that the packets are actually sent. Can be called even before voxel servers are known, in
|
|
||||||
/// which case up to MaxPendingMessages of the released messages will be buffered and actually released when
|
|
||||||
/// voxel servers are known.
|
|
||||||
void releaseQueuedMessages();
|
|
||||||
|
|
||||||
/// are we in sending mode. If we're not in sending mode then all packets and messages will be ignored and
|
|
||||||
/// not queued and not sent
|
|
||||||
bool getShouldSend() const { return _shouldSend; }
|
|
||||||
|
|
||||||
/// set sending mode. By default we are set to shouldSend=TRUE and packets will be sent. If shouldSend=FALSE, then we'll
|
|
||||||
/// switch to not sending mode, and all packets and messages will be ignored, not queued, and not sent. This might be used
|
|
||||||
/// in an application like interface when all voxel features are disabled.
|
|
||||||
void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; }
|
|
||||||
|
|
||||||
/// call this to inform the VoxelEditPacketSender of the voxel server jurisdictions. This is required for normal operation.
|
/// call this to inform the VoxelEditPacketSender of the voxel server jurisdictions. This is required for normal operation.
|
||||||
/// The internal contents of the jurisdiction map may change throughout the lifetime of the VoxelEditPacketSender. This map
|
/// The internal contents of the jurisdiction map may change throughout the lifetime of the VoxelEditPacketSender. This map
|
||||||
/// can be set prior to voxel servers being present, so long as the contents of the map accurately reflect the current
|
/// can be set prior to voxel servers being present, so long as the contents of the map accurately reflect the current
|
||||||
/// known jurisdictions.
|
/// known jurisdictions.
|
||||||
void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) {
|
void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) {
|
||||||
_voxelServerJurisdictions = voxelServerJurisdictions;
|
setServerJurisdictions(voxelServerJurisdictions);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// if you're running in non-threaded mode, you must call this method regularly
|
|
||||||
virtual bool process();
|
|
||||||
|
|
||||||
/// Set the desired number of pending messages that the VoxelEditPacketSender should attempt to queue even if voxel
|
|
||||||
/// servers are not present. This only applies to how the VoxelEditPacketSender will manage messages when no voxel
|
|
||||||
/// servers are present. By default, this value is the same as the default packets that will be sent in one second.
|
|
||||||
/// Which means the VoxelEditPacketSender will not buffer all messages given to it if no voxel servers are present.
|
|
||||||
/// This is the maximum number of queued messages and single messages.
|
|
||||||
void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; }
|
|
||||||
|
|
||||||
// the default number of pending messages we will store if no voxel servers are available
|
|
||||||
static const int DEFAULT_MAX_PENDING_MESSAGES;
|
|
||||||
|
|
||||||
// is there a voxel server available to send packets to
|
// is there a voxel server available to send packets to
|
||||||
bool voxelServersExist() const;
|
bool voxelServersExist() const { return serversExist(); }
|
||||||
|
|
||||||
/// Set the desired max packet size in bytes that the VoxelEditPacketSender should create
|
// My server type is the voxel server
|
||||||
void setMaxPacketSize(int maxPacketSize) { _maxPacketSize = maxPacketSize; }
|
virtual unsigned char getMyNodeType() const { return NODE_TYPE_VOXEL_SERVER; }
|
||||||
|
|
||||||
/// returns the current desired max packet size in bytes that the VoxelEditPacketSender will create
|
|
||||||
int getMaxPacketSize() const { return _maxPacketSize; }
|
|
||||||
|
|
||||||
private:
|
|
||||||
bool _shouldSend;
|
|
||||||
void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, ssize_t length);
|
|
||||||
void queuePacketToNodes(unsigned char* buffer, ssize_t length);
|
|
||||||
void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type);
|
|
||||||
void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet
|
|
||||||
|
|
||||||
void processPreServerExistsPackets();
|
|
||||||
|
|
||||||
// These are packets which are destined from know servers but haven't been released because they're still too small
|
|
||||||
std::map<QUuid, EditPacketBuffer> _pendingEditPackets;
|
|
||||||
|
|
||||||
// These are packets that are waiting to be processed because we don't yet know if there are voxel servers
|
|
||||||
int _maxPendingMessages;
|
|
||||||
bool _releaseQueuedMessagesPending;
|
|
||||||
std::vector<EditPacketBuffer*> _preServerPackets; // these will get packed into other larger packets
|
|
||||||
std::vector<EditPacketBuffer*> _preServerSingleMessagePackets; // these will go out as is
|
|
||||||
|
|
||||||
NodeToJurisdictionMap* _voxelServerJurisdictions;
|
|
||||||
|
|
||||||
unsigned short int _sequenceNumber;
|
|
||||||
int _maxPacketSize;
|
|
||||||
};
|
};
|
||||||
#endif // __shared__VoxelEditPacketSender__
|
#endif // __shared__VoxelEditPacketSender__
|
||||||
|
|
Loading…
Reference in a new issue