use of new API in OctreeEditPacketSender

This commit is contained in:
Stephen Birarda 2015-07-08 13:43:37 -07:00
parent 1e34a63234
commit d9e10db011
5 changed files with 47 additions and 45 deletions

View file

@ -12,6 +12,7 @@
#ifndef hifi_LimitedNodeList_h
#define hifi_LimitedNodeList_h
#include <assert.h>
#include <stdint.h>
#include <iterator>
#include <memory>
@ -145,14 +146,20 @@ public:
// const HifiSockAddr& overridenSockAddr = HifiSockAddr());
//
qint64 sendUnreliablePacket(std::unique_ptr<NLPacket>& packet, const SharedNodePointer& destinationNode) {};
qint64 sendUnreliablePacket(std::unique_ptr<NLPacket>& packet, const HifiSockAddr& sockAddr) {};
qint64 sendUnreliablePacket(std::unique_ptr<NLPacket>& packet, const SharedNodePointer& destinationNode)
{ assert(false); return 0; }
qint64 sendUnreliablePacket(std::unique_ptr<NLPacket>& packet, const HifiSockAddr& sockAddr)
{ assert(false); return 0; }
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const SharedNodePointer& destinationNode) {};
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const HifiSockAddr& sockAddr) {};
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const SharedNodePointer& destinationNode)
{ assert(false); return 0; }
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const HifiSockAddr& sockAddr)
{ assert(false); return 0; }
qint64 sendPacketList(NLPacketList& packetList, const SharedNodePointer& destinationNode) {};
qint64 sendPacketList(NLPacketList& packetList, const HifiSockAddr& sockAddr) {};
qint64 sendPacketList(NLPacketList& packetList, const SharedNodePointer& destinationNode)
{ assert(false); return 0; }
qint64 sendPacketList(NLPacketList& packetList, const HifiSockAddr& sockAddr)
{ assert(false); return 0; }
void (*linkedDataCreateCallback)(Node *);
@ -177,7 +184,8 @@ public:
int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet);
int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet);
unsigned broadcastToNodes(std::unique_ptr<NLPacket> packet, const NodeSet& destinationNodeTypes) {};
unsigned broadcastToNodes(std::unique_ptr<NLPacket> packet, const NodeSet& destinationNodeTypes)
{ assert(false); return 0; }
SharedNodePointer soloNodeOfType(char nodeType);
void getPacketStats(float &packetsPerSecond, float &bytesPerSecond);
@ -293,7 +301,8 @@ protected:
const QUuid& peerRequestID = QUuid());
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const SharedNodePointer& destinationNode,
const HifiSockAddr& overridenSockAddr) {};
const HifiSockAddr& overridenSockAddr)
{ assert(false); return 0; }
QUuid _sessionUUID;

View file

@ -20,6 +20,7 @@
#include <QtCore/QUuid>
#include <QReadWriteLock>
#include <NLPacket.h>
#include <Node.h>
class JurisdictionMap {
@ -29,7 +30,7 @@ public:
WITHIN,
BELOW
};
// standard constructors
JurisdictionMap(NodeType_t type = NodeType::EntityServer); // default constructor
JurisdictionMap(const JurisdictionMap& other); // copy constructor
@ -42,8 +43,8 @@ public:
JurisdictionMap(JurisdictionMap&& other); // move constructor
JurisdictionMap& operator= (JurisdictionMap&& other); // move assignment
#endif
// application constructors
// application constructors
JurisdictionMap(const char* filename);
JurisdictionMap(unsigned char* rootOctalCode, const std::vector<unsigned char*>& endNodes);
JurisdictionMap(const char* rootHextString, const char* endNodesHextString);
@ -62,15 +63,15 @@ public:
int unpackFromMessage(const unsigned char* sourceBuffer, int availableBytes);
std::unique_ptr<NLPacket> packIntoMessage();
/// Available to pack an empty or unknown jurisdiction into a network packet, used when no JurisdictionMap is available
static std::unique_ptr<NLPacket> packEmptyJurisdictionIntoMessage(NodeType_t type);
void displayDebugDetails() const;
NodeType_t getNodeType() const { return _nodeType; }
void setNodeType(NodeType_t type) { _nodeType = type; }
private:
void copyContents(const JurisdictionMap& other); // use assignment instead
void clear();
@ -81,7 +82,7 @@ private:
NodeType_t _nodeType;
};
/// Map between node IDs and their reported JurisdictionMap. Typically used by classes that need to know which nodes are
/// Map between node IDs and their reported JurisdictionMap. Typically used by classes that need to know which nodes are
/// managing which jurisdictions.
class NodeToJurisdictionMap : public QMap<QUuid, JurisdictionMap>, public QReadWriteLock {};
typedef QMap<QUuid, JurisdictionMap>::iterator NodeToJurisdictionMapIterator;

View file

@ -56,7 +56,7 @@ bool JurisdictionSender::process() {
SharedNodePointer node = DependencyManager::get<NodeList>()->nodeWithUUID(nodeUUID);
if (node && node->getActiveSocket()) {
_packetSender.queuePacketForSending(node, packet);
_packetSender.queuePacketForSending(node, std::move(packet));
nodeCount++;
}
}

View file

@ -26,8 +26,7 @@ OctreeEditPacketSender::OctreeEditPacketSender() :
_shouldSend(true),
_maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES),
_releaseQueuedMessagesPending(false),
_serverJurisdictions(NULL),
_destinationWalletUUID()
_serverJurisdictions(NULL)
{
}
@ -103,17 +102,15 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu
quint64 queuedAt = usecTimestampNow();
quint64 transitTime = queuedAt - createdAt;
qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] <<
" - command to node bytes=" << length <<
" satoshiCost=" << satoshiCost <<
" sequence=" << sequence <<
" transitTimeSoFar=" << transitTime << " usecs";
qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << packet->readType()
<< " - command to node bytes=" << packet->getSizeWithHeader()
<< " sequence=" << sequence << " transitTimeSoFar=" << transitTime << " usecs";
}
// add packet to history
_sentPacketHistories[nodeUUID].packetSent(sequence, packet);
queuePacketForSending(node, packet);
queuePacketForSending(node, std::move(packet));
}
});
}
@ -130,10 +127,10 @@ void OctreeEditPacketSender::processPreServerExistsPackets() {
}
// Then "process" all the packable messages...
while (!_preServerPackets.empty()) {
while (!_preServerEdits.empty()) {
EditMessageTuple editMessage = std::move(_preServerEdits.front());
queueOctreeEditMessage(editMessage.first(), editMessage.second(), editMessage.third());
_preServerPackets.pop_front();
queueOctreeEditMessage(std::get<0>(editMessage), std::get<1>(editMessage), std::get<2>(editMessage));
_preServerEdits.pop_front();
}
_pendingPacketsLock.unlock();
@ -153,7 +150,7 @@ void OctreeEditPacketSender::queuePendingPacketToNodes(std::unique_ptr<NLPacket>
_pendingPacketsLock.lock();
_preServerSingleMessagePackets.push_back(packet);
// if we've saved MORE than our max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerEdits.size();
if (allPendingMessages > _maxPendingMessages) {
_preServerSingleMessagePackets.pop_front();
}
@ -168,7 +165,7 @@ void OctreeEditPacketSender::queuePacketToNodes(std::unique_ptr<NLPacket> packet
assert(serversExist()); // we must have jurisdictions to be here!!
const unsigned char* octCode = reinterpret_cast<unsigned_char*>(packet->getPayload()) + sizeof(short) + sizeof(quint64);
const unsigned char* octCode = reinterpret_cast<unsigned char*>(packet->getPayload()) + sizeof(short) + sizeof(quint64);
// We want to filter out edit messages for servers based on the server's Jurisdiction
// But we can't really do that with a packed message, since each edit message could be destined
@ -190,7 +187,7 @@ void OctreeEditPacketSender::queuePacketToNodes(std::unique_ptr<NLPacket> packet
if (isMyJurisdiction) {
// make a copy of this packet for this node and queue
auto packetCopy = NLPacket::createCopy(packet);
queuePacketToNode(std::move(packetCopy));
queuePacketToNode(nodeUUID, std::move(packetCopy));
}
}
});
@ -213,7 +210,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
_preServerEdits.push_back(messageTuple);
// if we've saved MORE than out max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerEdits.size();
if (allPendingMessages > _maxPendingMessages) {
_preServerEdits.pop_front();
}
@ -255,7 +252,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
bufferedPacket = NLPacket::create(type);
} else {
// If we're switching type, then we send the last one and start over
if ((type != bufferedPacket->getType() && bufferedPacket->getSizeUsed() > 0) ||
if ((type != bufferedPacket->readType() && bufferedPacket->getSizeUsed() > 0) ||
(length >= bufferedPacket->bytesAvailable())) {
// create the new packet and swap it with the packet in _pendingEditPackets
@ -263,7 +260,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
bufferedPacket.swap(packetToRelease);
// release the previously buffered packet
releaseQueuedPacket(packetToRelease);
releaseQueuedPacket(nodeUUID, std::move(packetToRelease));
}
}
@ -306,12 +303,10 @@ void OctreeEditPacketSender::releaseQueuedMessages() {
}
}
void OctreeEditPacketSender::releaseQueuedPacket(std::unique_ptr<NLPacket> packet) {
void OctreeEditPacketSender::releaseQueuedPacket(const QUuid& nodeID, std::unique_ptr<NLPacket> packet) {
_releaseQueuedPacketMutex.lock();
if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketType::Unknown) {
queuePacketToNode(std::move(packet));
packetBuffer._currentSize = 0;
packetBuffer._currentType = PacketType::Unknown;
if (packet->getSizeUsed() > 0 && packet->readType() != PacketType::Unknown) {
queuePacketToNode(nodeID, std::move(packet));
}
_releaseQueuedPacketMutex.unlock();
}
@ -363,10 +358,10 @@ void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) {
dataAt += sizeof(unsigned short int);
// retrieve packet from history
const QByteArray* packet = sentPacketHistory.getPacket(sequenceNumber);
const std::unique_ptr<NLPacket>& packet = sentPacketHistory.getPacket(sequenceNumber);
if (packet) {
const SharedNodePointer& node = DependencyManager::get<NodeList>()->nodeWithUUID(sendingNodeUUID);
queuePacketForSending(node, *packet);
queuePacketForSending(node, NLPacket::createCopy(packet));
}
}
}

View file

@ -29,7 +29,7 @@ public:
/// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server
/// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to
/// MaxPendingMessages will be buffered and processed when servers are known.
void queueOctreeEditMessage(EditMessageTuple);
void queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, size_t length);
/// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message
/// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to
@ -82,15 +82,12 @@ public:
public slots:
void nodeKilled(SharedNodePointer node);
signals:
void octreePaymentRequired(qint64 satoshiAmount, const QUuid& nodeUUID, const QUuid& destinationWalletUUID);
protected:
using EditMessageTuple = std::tuple<PacketType::Value, unsigned char*, int>;
bool _shouldSend;
void queuePacketToNode(const QUuid& nodeID, std::unique_ptr<NLPacket> packet);
void queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, size_t length);
void queuePendingPacketToNodes(std::unique_ptr<NLPacket> packet);
void queuePacketToNodes(std::unique_ptr<NLPacket> packet);
std::unique_ptr<NLPacket> initializePacket(PacketType::Value type, int nodeClockSkew);
void releaseQueuedPacket(const QUuid& nodeUUID, std::unique_ptr<NLPacket> packetBuffer); // releases specific queued packet