initial changes to OctreeEditPacketSender for new API

This commit is contained in:
Stephen Birarda 2015-07-08 12:18:38 -07:00
parent bb6065e7dd
commit e13360b1b6
7 changed files with 110 additions and 214 deletions

View file

@ -24,7 +24,7 @@ SentPacketHistory::SentPacketHistory(int size)
} }
void SentPacketHistory::packetSent(uint16_t sequenceNumber, const NLPacket& packet) { void SentPacketHistory::packetSent(uint16_t sequenceNumber, const std::unique_ptr<NLPacket>& packet) {
// check if given seq number has the expected value. if not, something's wrong with // check if given seq number has the expected value. if not, something's wrong with
// the code calling this function // the code calling this function
@ -34,10 +34,8 @@ void SentPacketHistory::packetSent(uint16_t sequenceNumber, const NLPacket& pack
<< "Expected:" << expectedSequenceNumber << "Actual:" << sequenceNumber; << "Expected:" << expectedSequenceNumber << "Actual:" << sequenceNumber;
} }
_newestSequenceNumber = sequenceNumber; _newestSequenceNumber = sequenceNumber;
auto temp = std::unique_ptr<NLPacket>(const_cast<NLPacket*>(&packet)); _sentPackets.insert(NLPacket::createCopy(packet));
_sentPackets.insert(NLPacket::createCopy(temp));
temp.release();
} }
const std::unique_ptr<NLPacket>& SentPacketHistory::getPacket(uint16_t sequenceNumber) const { const std::unique_ptr<NLPacket>& SentPacketHistory::getPacket(uint16_t sequenceNumber) const {

View file

@ -24,7 +24,7 @@ class SentPacketHistory {
public: public:
SentPacketHistory(int size = MAX_REASONABLE_SEQUENCE_GAP); SentPacketHistory(int size = MAX_REASONABLE_SEQUENCE_GAP);
void packetSent(uint16_t sequenceNumber, const NLPacket& packet); void packetSent(uint16_t sequenceNumber, const std::unique_ptr<NLPacket>& packet);
const std::unique_ptr<NLPacket>& getPacket(uint16_t sequenceNumber) const; const std::unique_ptr<NLPacket>& getPacket(uint16_t sequenceNumber) const;
private: private:

View file

@ -1,30 +0,0 @@
//
// EditPacketBuffer.cpp
// libraries/octree/src
//
// Created by Stephen Birarda on 2014-07-30.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "EditPacketBuffer.h"
EditPacketBuffer::EditPacketBuffer() :
_nodeUUID(),
_currentType(PacketType::Unknown),
_currentSize(0),
_satoshiCost(0)
{
}
EditPacketBuffer::EditPacketBuffer(PacketType::Value type, unsigned char* buffer, size_t length, qint64 satoshiCost, QUuid nodeUUID) :
_nodeUUID(nodeUUID),
_currentType(type),
_currentSize(length),
_satoshiCost(satoshiCost)
{
memcpy(_currentBuffer, buffer, length);
}

View file

@ -1,34 +0,0 @@
//
// EditPacketBuffer.h
// libraries/octree/src
//
// Created by Stephen Birarda on 2014-07-30.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_EditPacketBuffer_h
#define hifi_EditPacketBuffer_h
#include <QtCore/QUuid>
#include <LimitedNodeList.h>
#include <PacketHeaders.h>
/// Used for construction of edit packets
class EditPacketBuffer {
public:
EditPacketBuffer();
EditPacketBuffer(PacketType::Value type, unsigned char* codeColorBuffer, size_t length,
qint64 satoshiCost = 0, const QUuid nodeUUID = QUuid());
QUuid _nodeUUID;
PacketType::Value _currentType;
unsigned char _currentBuffer[MAX_PACKET_SIZE];
size_t _currentSize;
qint64 _satoshiCost;
};
#endif // hifi_EditPacketBuffer_h

View file

@ -27,7 +27,6 @@ OctreeEditPacketSender::OctreeEditPacketSender() :
_maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES), _maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES),
_releaseQueuedMessagesPending(false), _releaseQueuedMessagesPending(false),
_serverJurisdictions(NULL), _serverJurisdictions(NULL),
_maxPacketSize(MAX_PACKET_SIZE),
_destinationWalletUUID() _destinationWalletUUID()
{ {
@ -35,16 +34,8 @@ OctreeEditPacketSender::OctreeEditPacketSender() :
OctreeEditPacketSender::~OctreeEditPacketSender() { OctreeEditPacketSender::~OctreeEditPacketSender() {
_pendingPacketsLock.lock(); _pendingPacketsLock.lock();
while (!_preServerSingleMessagePackets.empty()) { _preServerSingleMessagePackets.clear();
EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); _preServerEdits.clear();
delete packet;
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
}
while (!_preServerPackets.empty()) {
EditPacketBuffer* packet = _preServerPackets.front();
delete packet;
_preServerPackets.erase(_preServerPackets.begin());
}
_pendingPacketsLock.unlock(); _pendingPacketsLock.unlock();
} }
@ -82,8 +73,7 @@ bool OctreeEditPacketSender::serversExist() const {
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for // This method is called when the edit packet layer has determined that it has a fully formed packet destined for
// a known nodeID. // a known nodeID.
void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::unique_ptr<NLPacket> packet) {
size_t length, qint64 satoshiCost) {
bool wantDebug = false; bool wantDebug = false;
DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node){ DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node){
@ -92,40 +82,38 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
&& ((node->getUUID() == nodeUUID) || (nodeUUID.isNull())) && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))
&& node->getActiveSocket()) { && node->getActiveSocket()) {
// jump to the beginning of the payload
packet->seek(0);
// pack sequence number // pack sequence number
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<char*>(buffer));
unsigned char* sequenceAt = buffer + numBytesPacketHeader;
quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++; quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++;
memcpy(sequenceAt, &sequence, sizeof(quint16)); packet->write(reinterpret_cast<char*>(&sequence), sizeof(sequence));
// send packet // debugging output...
QByteArray packet(reinterpret_cast<const char*>(buffer), length); if (wantDebug) {
unsigned short int sequence;
quint64 createdAt;
queuePacketForSending(node, packet); packet->seek(0);
if (hasDestinationWalletUUID() && satoshiCost > 0) { // read the sequence number and createdAt
// if we have a destination wallet UUID and a cost associated with this packet, signal that it packet->read(&sequence);
// needs to be sent packet->read(&createdAt);
emit octreePaymentRequired(satoshiCost, nodeUUID, _destinationWalletUUID);
quint64 queuedAt = usecTimestampNow();
quint64 transitTime = queuedAt - createdAt;
qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] <<
" - command to node bytes=" << length <<
" satoshiCost=" << satoshiCost <<
" sequence=" << sequence <<
" transitTimeSoFar=" << transitTime << " usecs";
} }
// add packet to history // add packet to history
_sentPacketHistories[nodeUUID].packetSent(sequence, packet); _sentPacketHistories[nodeUUID].packetSent(sequence, packet);
// debugging output... queuePacketForSending(node, packet);
if (wantDebug) {
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(buffer));
unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader)));
quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence))));
quint64 queuedAt = usecTimestampNow();
quint64 transitTime = queuedAt - createdAt;
qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] <<
" - command to node bytes=" << length <<
" satoshiCost=" << satoshiCost <<
" sequence=" << sequence <<
" transitTimeSoFar=" << transitTime << " usecs";
}
} }
}); });
} }
@ -136,19 +124,18 @@ void OctreeEditPacketSender::processPreServerExistsPackets() {
// First send out all the single message packets... // First send out all the single message packets...
_pendingPacketsLock.lock(); _pendingPacketsLock.lock();
while (!_preServerSingleMessagePackets.empty()) { while (!_preServerSingleMessagePackets.empty()) {
EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); std::unique_ptr<NLPacket> packet = std::move(_preServerSingleMessagePackets.front());
queuePacketToNodes(&packet->_currentBuffer[0], packet->_currentSize, packet->_satoshiCost); queuePacketToNodes(std::move(packet));
delete packet; _preServerSingleMessagePackets.pop_front();
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
} }
// Then "process" all the packable messages... // Then "process" all the packable messages...
while (!_preServerPackets.empty()) { while (!_preServerPackets.empty()) {
EditPacketBuffer* packet = _preServerPackets.front(); EditMessageTuple editMessage = std::move(_preServerEdits.front());
queueOctreeEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize); queueOctreeEditMessage(editMessage.first(), editMessage.second(), editMessage.third());
delete packet; _preServerPackets.pop_front();
_preServerPackets.erase(_preServerPackets.begin());
} }
_pendingPacketsLock.unlock(); _pendingPacketsLock.unlock();
// if while waiting for the jurisdictions the caller called releaseQueuedMessages() // if while waiting for the jurisdictions the caller called releaseQueuedMessages()
@ -159,34 +146,29 @@ void OctreeEditPacketSender::processPreServerExistsPackets() {
} }
} }
void OctreeEditPacketSender::queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, void OctreeEditPacketSender::queuePendingPacketToNodes(std::unique_ptr<NLPacket> packet) {
size_t length, qint64 satoshiCost) {
// If we're asked to save messages while waiting for voxel servers to arrive, then do so... // If we're asked to save messages while waiting for voxel servers to arrive, then do so...
if (_maxPendingMessages > 0) { if (_maxPendingMessages > 0) {
EditPacketBuffer* packet = new EditPacketBuffer(type, buffer, length, satoshiCost);
_pendingPacketsLock.lock(); _pendingPacketsLock.lock();
_preServerSingleMessagePackets.push_back(packet); _preServerSingleMessagePackets.push_back(packet);
// if we've saved MORE than our max, then clear out the oldest packet... // if we've saved MORE than our max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
if (allPendingMessages > _maxPendingMessages) { if (allPendingMessages > _maxPendingMessages) {
EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); _preServerSingleMessagePackets.pop_front();
delete packet;
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
} }
_pendingPacketsLock.unlock(); _pendingPacketsLock.unlock();
} }
} }
void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t length, qint64 satoshiCost) { void OctreeEditPacketSender::queuePacketToNodes(std::unique_ptr<NLPacket> packet) {
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early
} }
assert(serversExist()); // we must have jurisdictions to be here!! assert(serversExist()); // we must have jurisdictions to be here!!
int headerBytes = numBytesForPacketHeader(reinterpret_cast<char*>(buffer)) + sizeof(short) + sizeof(quint64); const unsigned char* octCode = reinterpret_cast<unsigned_char*>(packet->getPayload()) + sizeof(short) + sizeof(quint64);
unsigned char* octCode = buffer + headerBytes; // skip the packet header to get to the octcode
// We want to filter out edit messages for servers based on the server's Jurisdiction // We want to filter out edit messages for servers based on the server's Jurisdiction
// But we can't really do that with a packed message, since each edit message could be destined // But we can't really do that with a packed message, since each edit message could be destined
@ -204,8 +186,11 @@ void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t le
const JurisdictionMap& map = (*_serverJurisdictions)[nodeUUID]; const JurisdictionMap& map = (*_serverJurisdictions)[nodeUUID];
isMyJurisdiction = (map.isMyJurisdiction(octCode, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN); isMyJurisdiction = (map.isMyJurisdiction(octCode, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
_serverJurisdictions->unlock(); _serverJurisdictions->unlock();
if (isMyJurisdiction) { if (isMyJurisdiction) {
queuePacketToNode(nodeUUID, buffer, length, satoshiCost); // make a copy of this packet for this node and queue
auto packetCopy = NLPacket::createCopy(packet);
queuePacketToNode(std::move(packetCopy));
} }
} }
}); });
@ -213,8 +198,7 @@ void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t le
// NOTE: editPacketBuffer - is JUST the octcode/color and does not contain the packet header! // NOTE: editPacketBuffer - is JUST the octcode/color and does not contain the packet header!
void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, size_t length) {
size_t length, qint64 satoshiCost) {
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early
@ -224,16 +208,14 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
// jurisdictions for processing // jurisdictions for processing
if (!serversExist()) { if (!serversExist()) {
if (_maxPendingMessages > 0) { if (_maxPendingMessages > 0) {
EditPacketBuffer* packet = new EditPacketBuffer(type, editPacketBuffer, length); EditMessageTuple messageTuple { type, editPacketBuffer, length };
_pendingPacketsLock.lock(); _pendingPacketsLock.lock();
_preServerPackets.push_back(packet); _preServerEdits.push_back(messageTuple);
// if we've saved MORE than out max, then clear out the oldest packet... // if we've saved MORE than out max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
if (allPendingMessages > _maxPendingMessages) { if (allPendingMessages > _maxPendingMessages) {
EditPacketBuffer* packet = _preServerPackets.front(); _preServerEdits.pop_front();
delete packet;
_preServerPackets.erase(_preServerPackets.begin());
} }
_pendingPacketsLock.unlock(); _pendingPacketsLock.unlock();
} }
@ -267,19 +249,22 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
_serverJurisdictions->unlock(); _serverJurisdictions->unlock();
} }
if (isMyJurisdiction) { if (isMyJurisdiction) {
EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID]; std::unique_ptr<NLPacket>& bufferedPacket = _pendingEditPackets[nodeUUID];
packetBuffer._nodeUUID = nodeUUID;
// If we're switching type, then we send the last one and start over if (!bufferedPacket) {
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) || bufferedPacket = NLPacket::create(type);
(packetBuffer._currentSize + length >= (size_t)_maxPacketSize)) { } else {
releaseQueuedPacket(packetBuffer); // If we're switching type, then we send the last one and start over
initializePacket(packetBuffer, type, node->getClockSkewUsec()); if ((type != bufferedPacket->getType() && bufferedPacket->getSizeUsed() > 0) ||
} (length >= bufferedPacket->bytesAvailable())) {
// If the buffer is empty and not correctly initialized for our type... // create the new packet and swap it with the packet in _pendingEditPackets
if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) { auto packetToRelease = initializePacket(type, node->getClockSkewUsec());
initializePacket(packetBuffer, type, node->getClockSkewUsec()); bufferedPacket.swap(packetToRelease);
// release the previously buffered packet
releaseQueuedPacket(packetToRelease);
}
} }
// This is really the first time we know which server/node this particular edit message // This is really the first time we know which server/node this particular edit message
@ -290,9 +275,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec()); adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec());
} }
memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], editPacketBuffer, length); bufferedPacket->write(reinterpret_cast<char*>(editPacketBuffer), length);
packetBuffer._currentSize += length;
packetBuffer._satoshiCost += satoshiCost;
} }
} }
}); });
@ -309,47 +292,45 @@ void OctreeEditPacketSender::releaseQueuedMessages() {
_releaseQueuedMessagesPending = true; _releaseQueuedMessagesPending = true;
} else { } else {
_packetsQueueLock.lock(); _packetsQueueLock.lock();
for (QHash<QUuid, EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { for (auto i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
releaseQueuedPacket(i.value()); // construct a null unique_ptr to an NL packet
std::unique_ptr<NLPacket> releasedPacket;
// swap the null ptr with the packet we want to release
i.value().swap(releasedPacket);
// move and release the queued packet
releaseQueuedPacket(i.key(), std::move(releasedPacket));
} }
_packetsQueueLock.unlock(); _packetsQueueLock.unlock();
} }
} }
void OctreeEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { void OctreeEditPacketSender::releaseQueuedPacket(std::unique_ptr<NLPacket> packet) {
_releaseQueuedPacketMutex.lock(); _releaseQueuedPacketMutex.lock();
if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketType::Unknown) { if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketType::Unknown) {
queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], queuePacketToNode(std::move(packet));
packetBuffer._currentSize, packetBuffer._satoshiCost);
packetBuffer._currentSize = 0; packetBuffer._currentSize = 0;
packetBuffer._currentType = PacketType::Unknown; packetBuffer._currentType = PacketType::Unknown;
} }
_releaseQueuedPacketMutex.unlock(); _releaseQueuedPacketMutex.unlock();
} }
void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PacketType::Value type, int nodeClockSkew) { std::unique_ptr<NLPacket> OctreeEditPacketSender::initializePacket(PacketType::Value type, int nodeClockSkew) {
packetBuffer._currentSize = auto newPacket = NLPacket::create(type);
DependencyManager::get<NodeList>()->populatePacketHeader(reinterpret_cast<char*>(&packetBuffer._currentBuffer[0]), type);
// skip over sequence number for now; will be packed when packet is ready to be sent out // skip over sequence number for now; will be packed when packet is ready to be sent out
packetBuffer._currentSize += sizeof(quint16); newPacket->seek(sizeof(quint16));
// pack in timestamp // pack in timestamp
quint64 now = usecTimestampNow() + nodeClockSkew; quint64 now = usecTimestampNow() + nodeClockSkew;
quint64* timeAt = (quint64*)&packetBuffer._currentBuffer[packetBuffer._currentSize]; newPacket->write(reinterpret_cast<char*>(&now), sizeof(now));
*timeAt = now;
packetBuffer._currentSize += sizeof(quint64); // nudge past timestamp
packetBuffer._currentType = type;
// reset cost for packet to 0
packetBuffer._satoshiCost = 0;
} }
bool OctreeEditPacketSender::process() { bool OctreeEditPacketSender::process() {
// if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those // if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those
// before doing our normal process step. This processPreJurisdictionPackets() // before doing our normal process step. This processPreJurisdictionPackets()
if (serversExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) { if (serversExist() && (!_preServerEdits.empty() || !_preServerSingleMessagePackets.empty() )) {
processPreServerExistsPackets(); processPreServerExistsPackets();
} }

View file

@ -16,7 +16,6 @@
#include <PacketSender.h> #include <PacketSender.h>
#include <PacketHeaders.h> #include <PacketHeaders.h>
#include "EditPacketBuffer.h"
#include "JurisdictionMap.h" #include "JurisdictionMap.h"
#include "SentPacketHistory.h" #include "SentPacketHistory.h"
@ -26,17 +25,17 @@ class OctreeEditPacketSender : public PacketSender {
public: public:
OctreeEditPacketSender(); OctreeEditPacketSender();
~OctreeEditPacketSender(); ~OctreeEditPacketSender();
/// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server
/// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to
/// MaxPendingMessages will be buffered and processed when servers are known.
void queueOctreeEditMessage(PacketType::Value type, unsigned char* buffer, size_t length, qint64 satoshiCost = 0);
/// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message /// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server
/// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to
/// MaxPendingMessages will be buffered and processed when servers are known.
void queueOctreeEditMessage(EditMessageTuple);
/// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message
/// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to /// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to
/// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular /// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular
/// interval to ensure that the packets are actually sent. Can be called even before servers are known, in /// interval to ensure that the packets are actually sent. Can be called even before servers are known, in
/// which case up to MaxPendingMessages of the released messages will be buffered and actually released when /// which case up to MaxPendingMessages of the released messages will be buffered and actually released when
/// servers are known. /// servers are known.
void releaseQueuedMessages(); void releaseQueuedMessages();
@ -53,7 +52,7 @@ public:
/// The internal contents of the jurisdiction map may change throughout the lifetime of the OctreeEditPacketSender. This map /// The internal contents of the jurisdiction map may change throughout the lifetime of the OctreeEditPacketSender. This map
/// can be set prior to servers being present, so long as the contents of the map accurately reflect the current /// can be set prior to servers being present, so long as the contents of the map accurately reflect the current
/// known jurisdictions. /// known jurisdictions.
void setServerJurisdictions(NodeToJurisdictionMap* serverJurisdictions) { void setServerJurisdictions(NodeToJurisdictionMap* serverJurisdictions) {
_serverJurisdictions = serverJurisdictions; _serverJurisdictions = serverJurisdictions;
} }
@ -61,33 +60,23 @@ public:
virtual bool process(); virtual bool process();
/// Set the desired number of pending messages that the OctreeEditPacketSender should attempt to queue even if /// Set the desired number of pending messages that the OctreeEditPacketSender should attempt to queue even if
/// servers are not present. This only applies to how the OctreeEditPacketSender will manage messages when no /// servers are not present. This only applies to how the OctreeEditPacketSender will manage messages when no
/// servers are present. By default, this value is the same as the default packets that will be sent in one second. /// servers are present. By default, this value is the same as the default packets that will be sent in one second.
/// Which means the OctreeEditPacketSender will not buffer all messages given to it if no servers are present. /// Which means the OctreeEditPacketSender will not buffer all messages given to it if no servers are present.
/// This is the maximum number of queued messages and single messages. /// This is the maximum number of queued messages and single messages.
void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; } void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; }
// the default number of pending messages we will store if no servers are available // the default number of pending messages we will store if no servers are available
static const int DEFAULT_MAX_PENDING_MESSAGES; static const int DEFAULT_MAX_PENDING_MESSAGES;
// is there an octree server available to send packets to // is there an octree server available to send packets to
bool serversExist() const; bool serversExist() const;
/// Set the desired max packet size in bytes that the OctreeEditPacketSender should create
void setMaxPacketSize(int maxPacketSize) { _maxPacketSize = maxPacketSize; }
/// returns the current desired max packet size in bytes that the OctreeEditPacketSender will create
int getMaxPacketSize() const { return _maxPacketSize; }
// you must override these... // you must override these...
virtual char getMyNodeType() const = 0; virtual char getMyNodeType() const = 0;
virtual void adjustEditPacketForClockSkew(PacketType::Value type, virtual void adjustEditPacketForClockSkew(PacketType::Value type,
unsigned char* editPacketBuffer, size_t length, int clockSkew) { } unsigned char* editPacketBuffer, size_t length, int clockSkew) { }
bool hasDestinationWalletUUID() const { return !_destinationWalletUUID.isNull(); }
void setDestinationWalletUUID(const QUuid& destinationWalletUUID) { _destinationWalletUUID = destinationWalletUUID; }
const QUuid& getDestinationWalletUUID() { return _destinationWalletUUID; }
void processNackPacket(const QByteArray& packet); void processNackPacket(const QByteArray& packet);
public slots: public slots:
@ -95,38 +84,36 @@ public slots:
signals: signals:
void octreePaymentRequired(qint64 satoshiAmount, const QUuid& nodeUUID, const QUuid& destinationWalletUUID); void octreePaymentRequired(qint64 satoshiAmount, const QUuid& nodeUUID, const QUuid& destinationWalletUUID);
protected: protected:
using EditMessageTuple = std::tuple<PacketType::Value, unsigned char*, int>;
bool _shouldSend; bool _shouldSend;
void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, size_t length, qint64 satoshiCost = 0); void queuePacketToNode(const QUuid& nodeID, std::unique_ptr<NLPacket> packet);
void queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, size_t length, qint64 satoshiCost = 0); void queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, size_t length);
void queuePacketToNodes(unsigned char* buffer, size_t length, qint64 satoshiCost = 0); void queuePacketToNodes(std::unique_ptr<NLPacket> packet);
void initializePacket(EditPacketBuffer& packetBuffer, PacketType::Value type, int nodeClockSkew); std::unique_ptr<NLPacket> initializePacket(PacketType::Value type, int nodeClockSkew);
void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet void releaseQueuedPacket(const QUuid& nodeUUID, std::unique_ptr<NLPacket> packetBuffer); // releases specific queued packet
void processPreServerExistsPackets(); void processPreServerExistsPackets();
// These are packets which are destined from know servers but haven't been released because they're still too small // These are packets which are destined from know servers but haven't been released because they're still too small
QHash<QUuid, EditPacketBuffer> _pendingEditPackets; QHash<QUuid, std::unique_ptr<NLPacket>> _pendingEditPackets;
// These are packets that are waiting to be processed because we don't yet know if there are servers // These are packets that are waiting to be processed because we don't yet know if there are servers
int _maxPendingMessages; int _maxPendingMessages;
bool _releaseQueuedMessagesPending; bool _releaseQueuedMessagesPending;
QMutex _pendingPacketsLock; QMutex _pendingPacketsLock;
QMutex _packetsQueueLock; // don't let different threads release the queue while another thread is writing to it QMutex _packetsQueueLock; // don't let different threads release the queue while another thread is writing to it
QVector<EditPacketBuffer*> _preServerPackets; // these will get packed into other larger packets std::list<EditMessageTuple> _preServerEdits; // these will get packed into other larger packets
QVector<EditPacketBuffer*> _preServerSingleMessagePackets; // these will go out as is std::list<std::unique_ptr<NLPacket>> _preServerSingleMessagePackets; // these will go out as is
NodeToJurisdictionMap* _serverJurisdictions; NodeToJurisdictionMap* _serverJurisdictions;
int _maxPacketSize;
QMutex _releaseQueuedPacketMutex; QMutex _releaseQueuedPacketMutex;
// TODO: add locks for this and _pendingEditPackets // TODO: add locks for this and _pendingEditPackets
QHash<QUuid, SentPacketHistory> _sentPacketHistories; QHash<QUuid, SentPacketHistory> _sentPacketHistories;
QHash<QUuid, quint16> _outgoingSequenceNumbers; QHash<QUuid, quint16> _outgoingSequenceNumbers;
QUuid _destinationWalletUUID;
}; };
#endif // hifi_OctreeEditPacketSender_h #endif // hifi_OctreeEditPacketSender_h

View file

@ -21,7 +21,7 @@
class OctreeScriptingInterface : public QObject { class OctreeScriptingInterface : public QObject {
Q_OBJECT Q_OBJECT
public: public:
OctreeScriptingInterface(OctreeEditPacketSender* packetSender = NULL, OctreeScriptingInterface(OctreeEditPacketSender* packetSender = NULL,
JurisdictionListener* jurisdictionListener = NULL); JurisdictionListener* jurisdictionListener = NULL);
~OctreeScriptingInterface(); ~OctreeScriptingInterface();
@ -31,20 +31,14 @@ public:
void setPacketSender(OctreeEditPacketSender* packetSender); void setPacketSender(OctreeEditPacketSender* packetSender);
void setJurisdictionListener(JurisdictionListener* jurisdictionListener); void setJurisdictionListener(JurisdictionListener* jurisdictionListener);
void init(); void init();
virtual NodeType_t getServerNodeType() const = 0; virtual NodeType_t getServerNodeType() const = 0;
virtual OctreeEditPacketSender* createPacketSender() = 0; virtual OctreeEditPacketSender* createPacketSender() = 0;
private slots: private slots:
void cleanupManagedObjects(); void cleanupManagedObjects();
public slots: public slots:
/// Set the desired max packet size in bytes that should be created
void setMaxPacketSize(int maxPacketSize) { return _packetSender->setMaxPacketSize(maxPacketSize); }
/// returns the current desired max packet size in bytes that will be created
int getMaxPacketSize() const { return _packetSender->getMaxPacketSize(); }
/// set the max packets per second send rate /// set the max packets per second send rate
void setPacketsPerSecond(int packetsPerSecond) { return _packetSender->setPacketsPerSecond(packetsPerSecond); } void setPacketsPerSecond(int packetsPerSecond) { return _packetSender->setPacketsPerSecond(packetsPerSecond); }
@ -65,7 +59,7 @@ public slots:
/// returns the bytes per second send rate of this object over its lifetime /// returns the bytes per second send rate of this object over its lifetime
float getLifetimeBPS() const { return _packetSender->getLifetimeBPS(); } float getLifetimeBPS() const { return _packetSender->getLifetimeBPS(); }
/// returns the packets per second queued rate of this object over its lifetime /// returns the packets per second queued rate of this object over its lifetime
float getLifetimePPSQueued() const { return _packetSender->getLifetimePPSQueued(); } float getLifetimePPSQueued() const { return _packetSender->getLifetimePPSQueued(); }