Merge pull request #14181 from Atlante45/feat/timebox-packet-processing

Timebox packet processing
This commit is contained in:
Stephen Birarda 2018-10-22 09:43:30 -07:00 committed by GitHub
commit ea3bdeb26e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 75 deletions

View file

@ -37,7 +37,6 @@ Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) :
_shouldChangeSocketOptions(shouldChangeSocketOptions)
{
connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams);
connect(this, &Socket::pendingDatagrams, this, &Socket::processPendingDatagrams, Qt::QueuedConnection);
// make sure we hear about errors and state changes from the underlying socket
connect(&_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)),
@ -316,85 +315,64 @@ void Socket::checkForReadyReadBackup() {
}
void Socket::readPendingDatagrams() {
int packetsRead = 0;
using namespace std::chrono;
static const auto MAX_PROCESS_TIME { 100ms };
const auto abortTime = system_clock::now() + MAX_PROCESS_TIME;
int packetSizeWithHeader = -1;
// Max datagrams to read before processing:
static const int MAX_DATAGRAMS_CONSECUTIVELY = 10000;
while (_udpSocket.hasPendingDatagrams()
&& (packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1
&& packetsRead <= MAX_DATAGRAMS_CONSECUTIVELY) {
// grab a time point we can mark as the receive time of this packet
auto receiveTime = p_high_resolution_clock::now();
// setup a buffer to read the packet into
auto buffer = std::unique_ptr<char[]>(new char[packetSizeWithHeader]);
QHostAddress senderAddress;
quint16 senderPort;
// pull the datagram
auto sizeRead = _udpSocket.readDatagram(buffer.get(), packetSizeWithHeader,
&senderAddress, &senderPort);
// we either didn't pull anything for this packet or there was an error reading (this seems to trigger
// on windows even if there's not a packet available)
if (sizeRead < 0) {
continue;
while (_udpSocket.hasPendingDatagrams() &&
(packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1) {
if (system_clock::now() > abortTime) {
// We've been running for too long, stop processing packets for now
// Once we've processed the event queue, we'll come back to packet processing
break;
}
_incomingDatagrams.push_back({ senderAddress, senderPort, packetSizeWithHeader,
std::move(buffer), receiveTime });
++packetsRead;
}
if (packetsRead > _maxDatagramsRead) {
_maxDatagramsRead = packetsRead;
qCDebug(networking) << "readPendingDatagrams: Datagrams read:" << packetsRead;
}
emit pendingDatagrams(packetsRead);
}
void Socket::processPendingDatagrams(int) {
// setup a HifiSockAddr to read into
HifiSockAddr senderSockAddr;
while (!_incomingDatagrams.empty()) {
auto& datagram = _incomingDatagrams.front();
senderSockAddr.setAddress(datagram._senderAddress);
senderSockAddr.setPort(datagram._senderPort);
int datagramSize = datagram._datagramLength;
auto receiveTime = datagram._receiveTime;
// we're reading a packet so re-start the readyRead backup timer
_readyReadBackupTimer->start();
// grab a time point we can mark as the receive time of this packet
auto receiveTime = p_high_resolution_clock::now();
// setup a HifiSockAddr to read into
HifiSockAddr senderSockAddr;
// setup a buffer to read the packet into
auto buffer = std::unique_ptr<char[]>(new char[packetSizeWithHeader]);
// pull the datagram
auto sizeRead = _udpSocket.readDatagram(buffer.get(), packetSizeWithHeader,
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
// save information for this packet, in case it is the one that sticks readyRead
_lastPacketSizeRead = datagramSize;
_lastPacketSizeRead = sizeRead;
_lastPacketSockAddr = senderSockAddr;
// Process unfiltered packets first.
if (sizeRead <= 0) {
// we either didn't pull anything for this packet or there was an error reading (this seems to trigger
// on windows even if there's not a packet available)
continue;
}
auto it = _unfilteredHandlers.find(senderSockAddr);
if (it != _unfilteredHandlers.end()) {
// we have a registered unfiltered handler for this HifiSockAddr (eg. STUN packet) - call that and return
// we have a registered unfiltered handler for this HifiSockAddr - call that and return
if (it->second) {
auto basePacket = BasePacket::fromReceivedPacket(std::move(datagram._datagram),
datagramSize, senderSockAddr);
auto basePacket = BasePacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
basePacket->setReceiveTime(receiveTime);
it->second(std::move(basePacket));
}
_incomingDatagrams.pop_front();
continue;
}
// check if this was a control packet or a data packet
bool isControlPacket = *reinterpret_cast<uint32_t*>(datagram._datagram.get()) & CONTROL_BIT_MASK;
bool isControlPacket = *reinterpret_cast<uint32_t*>(buffer.get()) & CONTROL_BIT_MASK;
if (isControlPacket) {
// setup a control packet from the data we just read
auto controlPacket = ControlPacket::fromReceivedPacket(std::move(datagram._datagram), datagramSize, senderSockAddr);
auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
controlPacket->setReceiveTime(receiveTime);
// move this control packet to the matching connection, if there is one
@ -406,13 +384,13 @@ void Socket::processPendingDatagrams(int) {
} else {
// setup a Packet from the data we just read
auto packet = Packet::fromReceivedPacket(std::move(datagram._datagram), datagramSize, senderSockAddr);
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
packet->setReceiveTime(receiveTime);
// save the sequence number in case this is the packet that sticks readyRead
_lastReceivedSequenceNumber = packet->getSequenceNumber();
// call our hash verification operator to see if this packet is verified
// call our verification operator to see if this packet is verified
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
@ -426,7 +404,6 @@ void Socket::processPendingDatagrams(int) {
qCDebug(networking) << "Can't process packet: version" << (unsigned int)NLPacket::versionInHeader(*packet)
<< ", type" << NLPacket::typeInHeader(*packet);
#endif
_incomingDatagrams.pop_front();
continue;
}
}
@ -442,8 +419,6 @@ void Socket::processPendingDatagrams(int) {
}
}
}
_incomingDatagrams.pop_front();
}
}

View file

@ -95,7 +95,6 @@ public:
signals:
void clientHandshakeRequestComplete(const HifiSockAddr& sockAddr);
void pendingDatagrams(int datagramCount);
public slots:
void cleanupConnection(HifiSockAddr sockAddr);
@ -103,7 +102,6 @@ public slots:
private slots:
void readPendingDatagrams();
void processPendingDatagrams(int datagramCount);
void checkForReadyReadBackup();
void handleSocketError(QAbstractSocket::SocketError socketError);
@ -147,17 +145,6 @@ private:
int _lastPacketSizeRead { 0 };
SequenceNumber _lastReceivedSequenceNumber;
HifiSockAddr _lastPacketSockAddr;
struct Datagram {
QHostAddress _senderAddress;
int _senderPort;
int _datagramLength;
std::unique_ptr<char[]> _datagram;
p_high_resolution_clock::time_point _receiveTime;
};
std::list<Datagram> _incomingDatagrams;
int _maxDatagramsRead { 0 };
friend UDTTest;
};