add TODO for EntityEditNack read fix

This commit is contained in:
Stephen Birarda 2015-07-07 15:35:45 -07:00
parent 56880e8858
commit 55a775de38

View file

@ -30,7 +30,7 @@ OctreeEditPacketSender::OctreeEditPacketSender() :
_maxPacketSize(MAX_PACKET_SIZE), _maxPacketSize(MAX_PACKET_SIZE),
_destinationWalletUUID() _destinationWalletUUID()
{ {
} }
OctreeEditPacketSender::~OctreeEditPacketSender() { OctreeEditPacketSender::~OctreeEditPacketSender() {
@ -52,10 +52,10 @@ OctreeEditPacketSender::~OctreeEditPacketSender() {
bool OctreeEditPacketSender::serversExist() const { bool OctreeEditPacketSender::serversExist() const {
bool hasServers = false; bool hasServers = false;
bool atLeastOneJurisdictionMissing = false; // assume the best bool atLeastOneJurisdictionMissing = false; // assume the best
DependencyManager::get<NodeList>()->eachNodeBreakable([&](const SharedNodePointer& node){ DependencyManager::get<NodeList>()->eachNodeBreakable([&](const SharedNodePointer& node){
if (node->getType() == getMyNodeType() && node->getActiveSocket()) { if (node->getType() == getMyNodeType() && node->getActiveSocket()) {
QUuid nodeUUID = node->getUUID(); QUuid nodeUUID = node->getUUID();
// If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server // If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server
if (_serverJurisdictions) { if (_serverJurisdictions) {
@ -69,7 +69,7 @@ bool OctreeEditPacketSender::serversExist() const {
} }
hasServers = true; hasServers = true;
} }
if (atLeastOneJurisdictionMissing) { if (atLeastOneJurisdictionMissing) {
return false; // no point in looking further - return false from anonymous function return false; // no point in looking further - return false from anonymous function
} else { } else {
@ -91,27 +91,27 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
if (node->getType() == getMyNodeType() if (node->getType() == getMyNodeType()
&& ((node->getUUID() == nodeUUID) || (nodeUUID.isNull())) && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))
&& node->getActiveSocket()) { && node->getActiveSocket()) {
// pack sequence number // pack sequence number
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<char*>(buffer)); int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<char*>(buffer));
unsigned char* sequenceAt = buffer + numBytesPacketHeader; unsigned char* sequenceAt = buffer + numBytesPacketHeader;
quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++; quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++;
memcpy(sequenceAt, &sequence, sizeof(quint16)); memcpy(sequenceAt, &sequence, sizeof(quint16));
// send packet // send packet
QByteArray packet(reinterpret_cast<const char*>(buffer), length); QByteArray packet(reinterpret_cast<const char*>(buffer), length);
queuePacketForSending(node, packet); queuePacketForSending(node, packet);
if (hasDestinationWalletUUID() && satoshiCost > 0) { if (hasDestinationWalletUUID() && satoshiCost > 0) {
// if we have a destination wallet UUID and a cost associated with this packet, signal that it // if we have a destination wallet UUID and a cost associated with this packet, signal that it
// needs to be sent // needs to be sent
emit octreePaymentRequired(satoshiCost, nodeUUID, _destinationWalletUUID); emit octreePaymentRequired(satoshiCost, nodeUUID, _destinationWalletUUID);
} }
// add packet to history // add packet to history
_sentPacketHistories[nodeUUID].packetSent(sequence, packet); _sentPacketHistories[nodeUUID].packetSent(sequence, packet);
// debugging output... // debugging output...
if (wantDebug) { if (wantDebug) {
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(buffer)); int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(buffer));
@ -119,7 +119,7 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence)))); quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence))));
quint64 queuedAt = usecTimestampNow(); quint64 queuedAt = usecTimestampNow();
quint64 transitTime = queuedAt - createdAt; quint64 transitTime = queuedAt - createdAt;
qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] << qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] <<
" - command to node bytes=" << length << " - command to node bytes=" << length <<
" satoshiCost=" << satoshiCost << " satoshiCost=" << satoshiCost <<
@ -192,7 +192,7 @@ void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t le
// But we can't really do that with a packed message, since each edit message could be destined // But we can't really do that with a packed message, since each edit message could be destined
// for a different server... So we need to actually manage multiple queued packets... one // for a different server... So we need to actually manage multiple queued packets... one
// for each server // for each server
DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node){ DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node){
// only send to the NodeTypes that are getMyNodeType() // only send to the NodeTypes that are getMyNodeType()
if (node->getActiveSocket() && node->getType() == getMyNodeType()) { if (node->getActiveSocket() && node->getType() == getMyNodeType()) {
@ -251,7 +251,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
if (node->getActiveSocket() && node->getType() == getMyNodeType()) { if (node->getActiveSocket() && node->getType() == getMyNodeType()) {
QUuid nodeUUID = node->getUUID(); QUuid nodeUUID = node->getUUID();
bool isMyJurisdiction = true; bool isMyJurisdiction = true;
if (type == PacketTypeEntityErase) { if (type == PacketTypeEntityErase) {
isMyJurisdiction = true; // send erase messages to all servers isMyJurisdiction = true; // send erase messages to all servers
} else if (_serverJurisdictions) { } else if (_serverJurisdictions) {
@ -269,19 +269,19 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
if (isMyJurisdiction) { if (isMyJurisdiction) {
EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID]; EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID];
packetBuffer._nodeUUID = nodeUUID; packetBuffer._nodeUUID = nodeUUID;
// If we're switching type, then we send the last one and start over // If we're switching type, then we send the last one and start over
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) || if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) ||
(packetBuffer._currentSize + length >= (size_t)_maxPacketSize)) { (packetBuffer._currentSize + length >= (size_t)_maxPacketSize)) {
releaseQueuedPacket(packetBuffer); releaseQueuedPacket(packetBuffer);
initializePacket(packetBuffer, type, node->getClockSkewUsec()); initializePacket(packetBuffer, type, node->getClockSkewUsec());
} }
// If the buffer is empty and not correctly initialized for our type... // If the buffer is empty and not correctly initialized for our type...
if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) { if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) {
initializePacket(packetBuffer, type, node->getClockSkewUsec()); initializePacket(packetBuffer, type, node->getClockSkewUsec());
} }
// This is really the first time we know which server/node this particular edit message // This is really the first time we know which server/node this particular edit message
// is going to, so we couldn't adjust for clock skew till now. But here's our chance. // is going to, so we couldn't adjust for clock skew till now. But here's our chance.
// We call this virtual function that allows our specific type of EditPacketSender to // We call this virtual function that allows our specific type of EditPacketSender to
@ -289,7 +289,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
if (node->getClockSkewUsec() != 0) { if (node->getClockSkewUsec() != 0) {
adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec()); adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec());
} }
memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], editPacketBuffer, length); memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], editPacketBuffer, length);
packetBuffer._currentSize += length; packetBuffer._currentSize += length;
packetBuffer._satoshiCost += satoshiCost; packetBuffer._satoshiCost += satoshiCost;
@ -341,7 +341,7 @@ void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, Pa
packetBuffer._currentSize += sizeof(quint64); // nudge past timestamp packetBuffer._currentSize += sizeof(quint64); // nudge past timestamp
packetBuffer._currentType = type; packetBuffer._currentType = type;
// reset cost for packet to 0 // reset cost for packet to 0
packetBuffer._satoshiCost = 0; packetBuffer._satoshiCost = 0;
} }
@ -360,20 +360,22 @@ bool OctreeEditPacketSender::process() {
void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) { void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) {
// parse sending node from packet, retrieve packet history for that node // parse sending node from packet, retrieve packet history for that node
QUuid sendingNodeUUID = uuidFromPacketHeader(packet); QUuid sendingNodeUUID = uuidFromPacketHeader(packet);
// if packet history doesn't exist for the sender node (somehow), bail // if packet history doesn't exist for the sender node (somehow), bail
if (!_sentPacketHistories.contains(sendingNodeUUID)) { if (!_sentPacketHistories.contains(sendingNodeUUID)) {
return; return;
} }
const SentPacketHistory& sentPacketHistory = _sentPacketHistories.value(sendingNodeUUID); const SentPacketHistory& sentPacketHistory = _sentPacketHistories.value(sendingNodeUUID);
// TODO: these NAK packets no longer send the number of sequence numbers - just read out sequence numbers in blocks
int numBytesPacketHeader = numBytesForPacketHeader(packet); int numBytesPacketHeader = numBytesForPacketHeader(packet);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader; const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
// read number of sequence numbers // read number of sequence numbers
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
dataAt += sizeof(uint16_t); dataAt += sizeof(uint16_t);
// read sequence numbers and queue packets for resend // read sequence numbers and queue packets for resend
for (int i = 0; i < numSequenceNumbers; i++) { for (int i = 0; i < numSequenceNumbers; i++) {
unsigned short int sequenceNumber = (*(unsigned short int*)dataAt); unsigned short int sequenceNumber = (*(unsigned short int*)dataAt);