initial changes to make _nodeSocket a udt::Socket

This commit is contained in:
Stephen Birarda 2015-07-20 17:10:22 -07:00
parent e63d122f2a
commit 9556fecbe2
11 changed files with 333 additions and 210 deletions

View file

@ -652,9 +652,6 @@ void AudioMixer::run() {
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
nodeList->addNodeTypeToInterestSet(NodeType::Agent); nodeList->addNodeTypeToInterestSet(NodeType::Agent);
nodeList->linkedDataCreateCallback = [](Node* node) { nodeList->linkedDataCreateCallback = [](Node* node) {

View file

@ -259,7 +259,7 @@ void DomainServer::setupNodeListAndAssignments(const QUuid& sessionUUID) {
auto nodeList = DependencyManager::set<LimitedNodeList>(domainServerPort, domainServerDTLSPort); auto nodeList = DependencyManager::set<LimitedNodeList>(domainServerPort, domainServerDTLSPort);
// no matter the local port, save it to shared mem so that local assignment clients can ask what it is // no matter the local port, save it to shared mem so that local assignment clients can ask what it is
nodeList->putLocalPortIntoSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, this, nodeList->getNodeSocket().localPort()); nodeList->putLocalPortIntoSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, this, nodeList->getSocketLocalPort());
// store our local http ports in shared memory // store our local http ports in shared memory
quint16 localHttpPort = DOMAIN_SERVER_HTTP_PORT; quint16 localHttpPort = DOMAIN_SERVER_HTTP_PORT;

View file

@ -82,7 +82,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
} }
const int LARGER_BUFFER_SIZE = 1048576; const int LARGER_BUFFER_SIZE = 1048576;
changeSocketBufferSizes(LARGER_BUFFER_SIZE); _nodeSocket.setBufferSizes(LARGER_BUFFER_SIZE);
// check for local socket updates every so often // check for local socket updates every so often
const int LOCAL_SOCKET_UPDATE_INTERVAL_MSECS = 5 * 1000; const int LOCAL_SOCKET_UPDATE_INTERVAL_MSECS = 5 * 1000;
@ -97,9 +97,9 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
// check the local socket right now // check the local socket right now
updateLocalSockAddr(); updateLocalSockAddr();
// TODO: Create a new thread, and move PacketReceiver to it // set &PacketReceiver::handleVerifiedPacket as the verified packet function for the udt::Socket
using std::placeholders::_1;
connect(&_nodeSocket, &QUdpSocket::readyRead, _packetReceiver, &PacketReceiver::processDatagrams); _nodeSocket.setVerifiedPacketFunction(std::bind(&PacketReceiver::handleVerifiedPacket, _packetReceiver, _1));
_packetStatTimer.start(); _packetStatTimer.start();
@ -149,32 +149,6 @@ QUdpSocket& LimitedNodeList::getDTLSSocket() {
return *_dtlsSocket; return *_dtlsSocket;
} }
void LimitedNodeList::changeSocketBufferSizes(int numBytes) {
for (int i = 0; i < 2; i++) {
QAbstractSocket::SocketOption bufferOpt;
QString bufferTypeString;
if (i == 0) {
bufferOpt = QAbstractSocket::SendBufferSizeSocketOption;
bufferTypeString = "send";
} else {
bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption;
bufferTypeString = "receive";
}
int oldBufferSize = _nodeSocket.socketOption(bufferOpt).toInt();
if (oldBufferSize < numBytes) {
int newBufferSize = _nodeSocket.socketOption(bufferOpt).toInt();
qCDebug(networking) << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to"
<< newBufferSize << "bytes";
} else {
// don't make the buffer smaller
qCDebug(networking) << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize
<< "since it is larger than desired size of" << numBytes;
}
}
}
bool LimitedNodeList::packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode) { bool LimitedNodeList::packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode) {
if (NON_SOURCED_PACKETS.contains(packet.getType())) { if (NON_SOURCED_PACKETS.contains(packet.getType())) {
@ -255,12 +229,7 @@ qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, const HifiSock
++_numCollectedPackets; ++_numCollectedPackets;
_numCollectedBytes += datagram.size(); _numCollectedBytes += datagram.size();
qint64 bytesWritten = _nodeSocket.writeDatagram(datagram, qint64 bytesWritten = _nodeSocket.writeDatagram(datagram, destinationSockAddr);
destinationSockAddr.getAddress(), destinationSockAddr.getPort());
if (bytesWritten < 0) {
qCDebug(networking) << "ERROR in writeDatagram:" << _nodeSocket.error() << "-" << _nodeSocket.errorString();
}
return bytesWritten; return bytesWritten;
} }
@ -571,7 +540,7 @@ void LimitedNodeList::sendSTUNRequest() {
++_numInitialSTUNRequests; ++_numInitialSTUNRequests;
} }
unsigned char stunRequestPacket[NUM_BYTES_STUN_HEADER]; char stunRequestPacket[NUM_BYTES_STUN_HEADER];
int packetIndex = 0; int packetIndex = 0;
@ -597,15 +566,7 @@ void LimitedNodeList::sendSTUNRequest() {
flagTimeForConnectionStep(ConnectionStep::SendSTUNRequest); flagTimeForConnectionStep(ConnectionStep::SendSTUNRequest);
_nodeSocket.writeDatagram((char*) stunRequestPacket, sizeof(stunRequestPacket), _nodeSocket.writeDatagram(stunRequestPacket, _stunSockAddr);
_stunSockAddr.getAddress(), _stunSockAddr.getPort());
}
void LimitedNodeList::rebindNodeSocket() {
quint16 oldPort = _nodeSocket.localPort();
_nodeSocket.close();
_nodeSocket.bind(QHostAddress::AnyIPv4, oldPort);
} }
bool LimitedNodeList::processSTUNResponse(QSharedPointer<NLPacket> packet) { bool LimitedNodeList::processSTUNResponse(QSharedPointer<NLPacket> packet) {

View file

@ -37,9 +37,10 @@
#include "DomainHandler.h" #include "DomainHandler.h"
#include "Node.h" #include "Node.h"
#include "NLPacket.h" #include "NLPacket.h"
#include "udt/PacketHeaders.h"
#include "PacketReceiver.h" #include "PacketReceiver.h"
#include "NLPacketList.h" #include "NLPacketList.h"
#include "udt/PacketHeaders.h"
#include "udt/Socket.h"
#include "UUIDHasher.h" #include "UUIDHasher.h"
const quint64 NODE_SILENCE_THRESHOLD_MSECS = 2 * 1000; const quint64 NODE_SILENCE_THRESHOLD_MSECS = 2 * 1000;
@ -110,8 +111,7 @@ public:
bool getThisNodeCanRez() const { return _thisNodeCanRez; } bool getThisNodeCanRez() const { return _thisNodeCanRez; }
void setThisNodeCanRez(bool canRez); void setThisNodeCanRez(bool canRez);
void rebindNodeSocket(); quint16 getSocketLocalPort() const { return _nodeSocket.localPort(); }
QUdpSocket& getNodeSocket() { return _nodeSocket; }
QUdpSocket& getDTLSSocket(); QUdpSocket& getDTLSSocket();
bool packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode); bool packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode);
@ -256,8 +256,6 @@ protected:
PacketSequenceNumber getNextSequenceNumberForPacket(const QUuid& nodeUUID, PacketType::Value packetType); PacketSequenceNumber getNextSequenceNumberForPacket(const QUuid& nodeUUID, PacketType::Value packetType);
void changeSocketBufferSizes(int numBytes);
void handleNodeKill(const SharedNodePointer& node); void handleNodeKill(const SharedNodePointer& node);
void stopInitialSTUNUpdate(bool success); void stopInitialSTUNUpdate(bool success);
@ -272,7 +270,7 @@ protected:
QUuid _sessionUUID; QUuid _sessionUUID;
NodeHash _nodeHash; NodeHash _nodeHash;
QReadWriteLock _nodeMutex; QReadWriteLock _nodeMutex;
QUdpSocket _nodeSocket; udt::Socket _nodeSocket;
QUdpSocket* _dtlsSocket; QUdpSocket* _dtlsSocket;
HifiSockAddr _localSockAddr; HifiSockAddr _localSockAddr;
HifiSockAddr _publicSockAddr; HifiSockAddr _publicSockAddr;

View file

@ -63,6 +63,14 @@ std::unique_ptr<NLPacket> NLPacket::fromReceivedPacket(std::unique_ptr<char> dat
} }
std::unique_ptr<NLPacket> NLPacket::fromBase(std::unique_ptr<Packet> packet) {
// Fail with null packet
Q_ASSERT(packet);
// call our constructor to create an NLPacket from this Packet
return std::unique_ptr<NLPacket>(new NLPacket(std::move(packet)));
}
std::unique_ptr<NLPacket> NLPacket::createCopy(const NLPacket& other) { std::unique_ptr<NLPacket> NLPacket::createCopy(const NLPacket& other) {
return std::unique_ptr<NLPacket>(new NLPacket(other)); return std::unique_ptr<NLPacket>(new NLPacket(other));
} }
@ -81,24 +89,59 @@ NLPacket::NLPacket(PacketType::Value type) :
adjustPayloadStartAndCapacity(); adjustPayloadStartAndCapacity();
} }
NLPacket::NLPacket(const NLPacket& other) : Packet(other) { NLPacket::NLPacket(std::unique_ptr<Packet> packet) :
Packet(*packet)
{
adjustPayloadStartAndCapacity(_payloadSize > 0);
readSourceID();
readVerificationHash();
}
NLPacket::NLPacket(const NLPacket& other) : Packet(other) {
*this = other;
}
NLPacket& NLPacket::operator=(const NLPacket& other) {
Packet::operator=(other);
_sourceID = other._sourceID;
_verificationHash = other._verificationHash;
return *this;
} }
NLPacket::NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) : NLPacket::NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) :
Packet(std::move(data), size, senderSockAddr) Packet(std::move(data), size, senderSockAddr)
{ {
adjustPayloadStartAndCapacity(); adjustPayloadStartAndCapacity();
_payloadSize = _payloadCapacity;
readSourceID(); readSourceID();
readVerificationHash(); readVerificationHash();
} }
void NLPacket::adjustPayloadStartAndCapacity() { NLPacket::NLPacket(NLPacket&& other) :
Packet(other)
{
*this = std::move(other);
}
NLPacket& NLPacket::operator=(NLPacket&& other) {
_sourceID = std::move(other._sourceID);
_verificationHash = std::move(other._verificationHash);
return *this;
}
void NLPacket::adjustPayloadStartAndCapacity(bool shouldDecreasePayloadSize) {
qint64 headerSize = localHeaderSize(_type); qint64 headerSize = localHeaderSize(_type);
_payloadStart += headerSize; _payloadStart += headerSize;
_payloadCapacity -= headerSize; _payloadCapacity -= headerSize;
if (shouldDecreasePayloadSize) {
_payloadSize -= headerSize;
}
} }
void NLPacket::readSourceID() { void NLPacket::readSourceID() {

View file

@ -22,6 +22,8 @@ public:
static std::unique_ptr<NLPacket> create(PacketType::Value type, qint64 size = -1); static std::unique_ptr<NLPacket> create(PacketType::Value type, qint64 size = -1);
static std::unique_ptr<NLPacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size, static std::unique_ptr<NLPacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
const HifiSockAddr& senderSockAddr); const HifiSockAddr& senderSockAddr);
static std::unique_ptr<NLPacket> fromBase(std::unique_ptr<Packet> packet);
// Provided for convenience, try to limit use // Provided for convenience, try to limit use
static std::unique_ptr<NLPacket> createCopy(const NLPacket& other); static std::unique_ptr<NLPacket> createCopy(const NLPacket& other);
@ -41,12 +43,19 @@ public:
protected: protected:
void adjustPayloadStartAndCapacity(); void adjustPayloadStartAndCapacity(bool shouldDecreasePayloadSize = false);
NLPacket(PacketType::Value type); NLPacket(PacketType::Value type);
NLPacket(PacketType::Value type, qint64 size); NLPacket(PacketType::Value type, qint64 size);
NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr); NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
NLPacket(std::unique_ptr<Packet> packet);
NLPacket(const NLPacket& other); NLPacket(const NLPacket& other);
NLPacket& operator=(const NLPacket& other);
NLPacket(NLPacket&& other);
NLPacket& operator=(NLPacket&& other);
void readSourceID(); void readSourceID();
void readVerificationHash(); void readVerificationHash();

View file

@ -234,45 +234,34 @@ bool PacketReceiver::packetVersionMatch(const NLPacket& packet) {
} }
} }
void PacketReceiver::processDatagrams() { void PacketReceiver::handleVerifiedPacket(std::unique_ptr<Packet> packet) {
//PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
//"PacketReceiver::processDatagrams()");
auto nodeList = DependencyManager::get<LimitedNodeList>();
while (nodeList && nodeList->getNodeSocket().hasPendingDatagrams()) {
// setup a buffer to read the packet into
int packetSizeWithHeader = nodeList->getNodeSocket().pendingDatagramSize();
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
// if we're supposed to drop this packet then break out here // if we're supposed to drop this packet then break out here
if (_shouldDropPackets) { if (_shouldDropPackets) {
break; return;
} }
auto nodeList = DependencyManager::get<LimitedNodeList>();
// setup a HifiSockAddr to read into // setup a HifiSockAddr to read into
HifiSockAddr senderSockAddr; HifiSockAddr senderSockAddr;
// pull the datagram
nodeList->getNodeSocket().readDatagram(buffer.get(), packetSizeWithHeader,
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
// setup an NLPacket from the data we just read // setup an NLPacket from the data we just read
auto packet = NLPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); auto nlPacket = NLPacket::fromBase(std::move(packet));
_inPacketCount++; _inPacketCount++;
_inByteCount += packetSizeWithHeader; _inByteCount += nlPacket->getDataSize();
if (packetVersionMatch(*packet)) {
SharedNodePointer matchingNode; SharedNodePointer matchingNode;
if (nodeList->packetSourceAndHashMatch(*packet, matchingNode)) { if (!nlPacket->getSourceID().isNull()) {
matchingNode = nodeList->nodeWithUUID(nlPacket->getSourceID());
if (matchingNode) { if (matchingNode) {
// No matter if this packet is handled or not, we update the timestamp for the last time we heard // No matter if this packet is handled or not, we update the timestamp for the last time we heard
// from this sending node // from this sending node
matchingNode->setLastHeardMicrostamp(usecTimestampNow()); matchingNode->setLastHeardMicrostamp(usecTimestampNow());
} }
}
_packetListenerLock.lock(); _packetListenerLock.lock();
@ -317,21 +306,21 @@ void PacketReceiver::processDatagrams() {
success = metaMethod.invoke(listener.first, success = metaMethod.invoke(listener.first,
connectionType, connectionType,
Q_ARG(QSharedPointer<NLPacket>, Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())), QSharedPointer<NLPacket>(nlPacket.release())),
Q_ARG(SharedNodePointer, matchingNode)); Q_ARG(SharedNodePointer, matchingNode));
} else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) { } else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first, success = metaMethod.invoke(listener.first,
connectionType, connectionType,
Q_ARG(QSharedPointer<NLPacket>, Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())), QSharedPointer<NLPacket>(nlPacket.release())),
Q_ARG(QSharedPointer<Node>, matchingNode)); Q_ARG(QSharedPointer<Node>, matchingNode));
} else { } else {
success = metaMethod.invoke(listener.first, success = metaMethod.invoke(listener.first,
connectionType, connectionType,
Q_ARG(QSharedPointer<NLPacket>, Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release()))); QSharedPointer<NLPacket>(nlPacket.release())));
} }
} else { } else {
listenerIsDead = true; listenerIsDead = true;
@ -341,7 +330,7 @@ void PacketReceiver::processDatagrams() {
emit dataReceived(NodeType::Unassigned, packet->getDataSize()); emit dataReceived(NodeType::Unassigned, packet->getDataSize());
success = listener.second.invoke(listener.first, success = listener.second.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release()))); Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(nlPacket.release())));
} }
if (!success) { if (!success) {
@ -374,7 +363,5 @@ void PacketReceiver::processDatagrams() {
} }
_packetListenerLock.unlock(); _packetListenerLock.unlock();
}
}
}
} }

View file

@ -45,8 +45,7 @@ public:
bool registerListener(PacketType::Value type, QObject* listener, const char* slot); bool registerListener(PacketType::Value type, QObject* listener, const char* slot);
void unregisterListener(QObject* listener); void unregisterListener(QObject* listener);
public slots: void handleVerifiedPacket(std::unique_ptr<Packet> packet);
void processDatagrams();
signals: signals:
void dataReceived(quint8 channelType, int bytes); void dataReceived(quint8 channelType, int bytes);

View file

@ -98,12 +98,6 @@ Packet::Packet(const Packet& other) :
QIODevice() QIODevice()
{ {
*this = other; *this = other;
if (other.isOpen()) {
this->open(other.openMode());
}
this->seek(other.pos());
} }
Packet& Packet::operator=(const Packet& other) { Packet& Packet::operator=(const Packet& other) {
@ -118,6 +112,12 @@ Packet& Packet::operator=(const Packet& other) {
_payloadSize = other._payloadSize; _payloadSize = other._payloadSize;
if (other.isOpen() && !isOpen()) {
open(other.openMode());
}
seek(other.pos());
return *this; return *this;
} }
@ -136,6 +136,12 @@ Packet& Packet::operator=(Packet&& other) {
_payloadSize = other._payloadSize; _payloadSize = other._payloadSize;
if (other.isOpen() && !isOpen()) {
open(other.openMode());
}
seek(other.pos());
return *this; return *this;
} }

View file

@ -0,0 +1,68 @@
//
// Socket.cpp
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-20.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "Socket.h"
#include "../NetworkLogging.h"
using namespace udt;
Socket::Socket(QObject* parent) :
QObject(parent)
{
}
void Socket::rebind() {
quint16 oldPort = _udpSocket.localPort();
_udpSocket.close();
_udpSocket.bind(QHostAddress::AnyIPv4, oldPort);
}
void Socket::setBufferSizes(int numBytes) {
for (int i = 0; i < 2; i++) {
QAbstractSocket::SocketOption bufferOpt;
QString bufferTypeString;
if (i == 0) {
bufferOpt = QAbstractSocket::SendBufferSizeSocketOption;
bufferTypeString = "send";
} else {
bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption;
bufferTypeString = "receive";
}
int oldBufferSize = _udpSocket.socketOption(bufferOpt).toInt();
if (oldBufferSize < numBytes) {
int newBufferSize = _udpSocket.socketOption(bufferOpt).toInt();
qCDebug(networking) << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to"
<< newBufferSize << "bytes";
} else {
// don't make the buffer smaller
qCDebug(networking) << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize
<< "since it is larger than desired size of" << numBytes;
}
}
}
qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) {
qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort());
if (bytesWritten < 0) {
qCDebug(networking) << "ERROR in writeDatagram:" << _udpSocket.error() << "-" << _udpSocket.errorString();
}
return bytesWritten;
}

View file

@ -0,0 +1,55 @@
//
// Socket.h
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-20.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_Socket_h
#define hifi_Socket_h
#include <functional>
#include <QtCore/QObject>
#include <QtNetwork/QUdpSocket>
#include "../HifiSockAddr.h"
#include "Packet.h"
namespace udt {
using VerifiedPacketFunction = std::function<void(std::unique_ptr<Packet>)>;
class Socket : public QObject {
Q_OBJECT
public:
Socket(QObject* object = 0);
quint16 localPort() const { return _udpSocket.localPort(); }
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr)
{ return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); }
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);
void bind(const QHostAddress& address, quint16 port = 0) { _udpSocket.bind(address, port); }
void rebind();
void setVerifiedPacketFunction(VerifiedPacketFunction verifiedPacketFunction)
{ _verifiedPacketFunction = verifiedPacketFunction; }
void setBufferSizes(int numBytes);
private:
QUdpSocket _udpSocket { this };
VerifiedPacketFunction _verifiedPacketFunction;
};
}
#endif // hifi_Socket_h