LNL send cleanup / Added connection hash

This commit is contained in:
Atlante45 2015-07-28 15:32:00 -07:00
parent f53637f19e
commit 24520c5856
9 changed files with 97 additions and 62 deletions

View file

@ -145,7 +145,7 @@ void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSoc
peerPacket->write(peer.toByteArray());
// write the current packet
_serverSocket.writeUnreliablePacket(*peerPacket, *destinationSockAddr);
_serverSocket.writePacket(*peerPacket, *destinationSockAddr);
}
void IceServer::clearInactivePeers() {

View file

@ -244,18 +244,13 @@ bool LimitedNodeList::packetSourceAndHashMatch(const udt::Packet& packet) {
return false;
}
qint64 LimitedNodeList::writePacket(const NLPacket& packet, const Node& destinationNode) {
if (!destinationNode.getActiveSocket()) {
return 0;
}
emit dataSent(destinationNode.getType(), packet.getDataSize());
return writePacket(packet, *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret());
void LimitedNodeList::collectPacketStats(const NLPacket& packet) {
// stat collection for packets
++_numCollectedPackets;
_numCollectedBytes += packet.getDataSize();
}
qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret) {
void LimitedNodeList::fillPacketHeader(const NLPacket& packet, const QUuid& connectionSecret) {
if (!NON_SOURCED_PACKETS.contains(packet.getType())) {
const_cast<NLPacket&>(packet).writeSourceID(getSessionUUID());
}
@ -265,55 +260,66 @@ qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr&
&& !NON_VERIFIED_PACKETS.contains(packet.getType())) {
const_cast<NLPacket&>(packet).writeVerificationHashGivenSecret(connectionSecret);
}
emit dataSent(NodeType::Unassigned, packet.getDataSize());
return writePacketAndCollectStats(packet, destinationSockAddr);
}
qint64 LimitedNodeList::writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr) {
// XXX can BandwidthRecorder be used for this?
// stat collection for packets
++_numCollectedPackets;
_numCollectedBytes += packet.getDataSize();
qint64 bytesWritten = _nodeSocket.writeUnreliablePacket(packet, destinationSockAddr);
return bytesWritten;
}
qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode) {
return writePacket(packet, destinationNode);
if (!destinationNode.getActiveSocket()) {
return 0;
}
emit dataSent(destinationNode.getType(), packet.getDataSize());
return sendUnreliablePacket(packet, *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret());
}
qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const HifiSockAddr& sockAddr,
const QUuid& connectionSecret) {
return writePacket(packet, sockAddr, connectionSecret);
Q_ASSERT_X(!packet.isReliable(), "LimitedNodeList::sendUnreliablePacket",
"Trying to send a reliable packet unreliably.");
collectPacketStats(packet);
fillPacketHeader(packet, connectionSecret);
return _nodeSocket.writePacket(packet, sockAddr);
}
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode) {
// Keep unique_ptr alive during write
auto result = writePacket(*packet, destinationNode);
return result;
if (!destinationNode.getActiveSocket()) {
return 0;
}
emit dataSent(destinationNode.getType(), packet->getDataSize());
return sendPacket(std::move(packet), *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret());
}
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const HifiSockAddr& sockAddr,
const QUuid& connectionSecret) {
// Keep unique_ptr alive during write
auto result = writePacket(*packet, sockAddr, connectionSecret);
return result;
if (packet->isReliable()) {
collectPacketStats(*packet);
fillPacketHeader(*packet, connectionSecret);
auto size = packet->getDataSize();
_nodeSocket.writePacket(std::move(packet), sockAddr);
return size;
} else {
return sendUnreliablePacket(*packet, sockAddr, connectionSecret);
}
}
qint64 LimitedNodeList::sendPacketList(NLPacketList& packetList, const Node& destinationNode) {
auto activeSocket = destinationNode.getActiveSocket();
if (!activeSocket) {
return 0;
}
qint64 bytesSent = 0;
auto connectionSecret = destinationNode.getConnectionSecret();
// close the last packet in the list
packetList.closeCurrentPacket();
while (!packetList._packets.empty()) {
bytesSent += sendPacket(packetList.takeFront<NLPacket>(), destinationNode);
bytesSent += sendPacket(packetList.takeFront<NLPacket>(), *activeSocket, connectionSecret);
}
emit dataSent(destinationNode.getType(), bytesSent);
return bytesSent;
}

View file

@ -247,11 +247,13 @@ protected:
LimitedNodeList(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
void operator=(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
qint64 writePacket(const NLPacket& packet, const Node& destinationNode);
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode,
const HifiSockAddr& overridenSockAddr);
qint64 writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret = QUuid());
qint64 writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr);
void collectPacketStats(const NLPacket& packet);
void fillPacketHeader(const NLPacket& packet, const QUuid& connectionSecret);
bool isPacketVerified(const udt::Packet& packet);
bool packetVersionMatch(const udt::Packet& packet);
bool packetSourceAndHashMatch(const udt::Packet& packet);
@ -264,8 +266,6 @@ protected:
void sendPacketToIceServer(PacketType packetType, const HifiSockAddr& iceServerSockAddr, const QUuid& clientID,
const QUuid& peerRequestID = QUuid());
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode,
const HifiSockAddr& overridenSockAddr);
QUuid _sessionUUID;

View file

@ -20,14 +20,21 @@ using namespace udt;
using namespace std;
using namespace std::chrono;
Connection::Connection(Socket* parentSocket, HifiSockAddr destination) {
Connection::Connection(Socket* parentSocket, HifiSockAddr destination) :
_parentSocket(parentSocket),
_destination(destination)
{
}
void Connection::send(unique_ptr<Packet> packet) {
if (_sendQueue) {
_sendQueue->queuePacket(move(packet));
void Connection::sendReliablePacket(unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
if (!_sendQueue) {
// Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination);
}
_sendQueue->queuePacket(move(packet));
}
void Connection::sendACK(bool wasCausedBySyncTimeout) {
@ -163,7 +170,6 @@ void Connection::processControl(unique_ptr<ControlPacket> controlPacket) {
processACK(move(controlPacket));
}
break;
}
case ControlPacket::ACK2:
processACK2(move(controlPacket));
break;

View file

@ -17,8 +17,7 @@
#include "LossList.h"
#include "SendQueue.h"
class HifiSockAddr;
#include "../HifiSockAddr.h"
namespace udt {
@ -34,7 +33,7 @@ public:
Connection(Socket* parentSocket, HifiSockAddr destination);
void send(std::unique_ptr<Packet> packet);
void sendReliablePacket(std::unique_ptr<Packet> packet);
void sendACK(bool wasCausedBySyncTimeout = true);
void sendLightACK() const;
@ -74,6 +73,8 @@ private:
SentACKMap _sentACKs; // Map of ACK sub-sequence numbers to ACKed sequence number and sent time
Socket* _parentSocket { nullptr };
HifiSockAddr _destination;
std::unique_ptr<SendQueue> _sendQueue;
};

View file

@ -80,9 +80,9 @@ void SendQueue::stop() {
_running = false;
}
void SendQueue::sendPacket(const Packet& packet) {
void SendQueue::sendPacket(const BasePacket& packet) {
if (_socket) {
_socket->writePacket(packet, _destination);
_socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
}
}

View file

@ -47,6 +47,9 @@ public:
int getPacketSendPeriod() const { return _packetSendPeriod; }
void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; }
// Send a packet through the socket
void sendPacket(const BasePacket& packet);
public slots:
void start();
void stop();
@ -67,9 +70,6 @@ private:
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
// Send a packet through the socket
void sendPacket(const Packet& packet);
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent

View file

@ -14,6 +14,7 @@
#include <QtCore/QThread>
#include "../NetworkLogging.h"
#include "ControlPacket.h"
#include "Packet.h"
using namespace udt;
@ -66,17 +67,36 @@ void Socket::setBufferSizes(int numBytes) {
}
}
qint64 Socket::writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr) {
qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
Q_ASSERT_X(!packet.isReliable(), "Socket::writePacket", "Cannot send a reliable packet unreliably");
// TODO: write the correct sequence number to the Packet here
// const_cast<NLPacket&>(packet).writeSequenceNumber(sequenceNumber);
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}
qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr) {
if (packet->isReliable()) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr)));
}
it->second->sendReliablePacket(std::move(packet));
return 0;
}
return writePacket(*packet, sockAddr);
}
qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) {
return writeDatagram(QByteArray::fromRawData(data, size), sockAddr);
}
qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) {
qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort());
// TODO: write the correct sequence number to the Packet here
// const_cast<NLPacket&>(packet).writeSequenceNumber(sequenceNumber);
if (bytesWritten < 0) {
qCDebug(networking) << "ERROR in writeDatagram:" << _udpSocket.error() << "-" << _udpSocket.errorString();
}

View file

@ -22,6 +22,7 @@
#include <QtNetwork/QUdpSocket>
#include "../HifiSockAddr.h"
#include "Connection.h"
namespace udt {
@ -42,10 +43,10 @@ public:
quint16 localPort() const { return _udpSocket.localPort(); }
qint64 writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr)
{ return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); }
// Simple functions writing to the socket with no processing
qint64 writePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);
void bind(const QHostAddress& address, quint16 port = 0) { _udpSocket.bind(address, port); }
@ -71,6 +72,7 @@ private:
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SequenceNumber> _packetSequenceNumbers;
std::unordered_map<HifiSockAddr, Connection*> _connectionsHash;
int32_t _synInterval = 10; // 10ms
QTimer _synTimer;