From d64833a9270bf5a54d882780ad956aa1ae0b73dc Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 9 Jan 2013 13:15:22 -0700 Subject: [PATCH 01/25] initial commit using 24 UDP server code --- socket.rb | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 socket.rb diff --git a/socket.rb b/socket.rb new file mode 100644 index 0000000000..b94816e667 --- /dev/null +++ b/socket.rb @@ -0,0 +1,25 @@ +require 'socket' +begin + # create the UDPSocket object + sock = UDPSocket.new + + # bind it to 0.0.0.0 at the given port + port = 55443 + sock.bind "0.0.0.0", port + puts "UDP Socket bound to port #{port} and listening." + + # while true loop to keep listening for new packets + while true do + data, sender = sock.recvfrom 1024 + # puts "#{sender[3]} sent #{data} on port #{sender[1]}" + puts "Recieved #{data.size} from #{sender[3]}" + rand_photo_url = "http://d4gpsb1dbo4rf.cloudfront.net/photo#{rand(11)}.jpg" + puts "Sending #{rand_photo_url.size} back" + sock.send rand_photo_url, 0, sender[3], sender[1] + end + + rescue SystemExit, Interrupt => e + puts + puts "Okay, fine. I'll stop listening." + sock.close +end From 8033440932c813f73f2901e140e19e8f29a207b2 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 9 Jan 2013 15:20:24 -0700 Subject: [PATCH 02/25] echo back audio packet to sender --- socket.rb | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/socket.rb b/socket.rb index b94816e667..97d82b2a6c 100644 --- a/socket.rb +++ b/socket.rb @@ -11,11 +11,9 @@ begin # while true loop to keep listening for new packets while true do data, sender = sock.recvfrom 1024 - # puts "#{sender[3]} sent #{data} on port #{sender[1]}" - puts "Recieved #{data.size} from #{sender[3]}" - rand_photo_url = "http://d4gpsb1dbo4rf.cloudfront.net/photo#{rand(11)}.jpg" - puts "Sending #{rand_photo_url.size} back" - sock.send rand_photo_url, 0, sender[3], sender[1] + puts "Recieved #{data.size} bytes from #{sender[3]}" + puts "Echoing data back to #{sender[3]}" + sock.send data, 0, sender[3], sender[1] end rescue SystemExit, Interrupt => e From 168d3661202c17c5110758b18d989816bb67e5e9 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 9 Jan 2013 17:00:58 -0700 Subject: [PATCH 03/25] added 20msec delay in reply of buffered data --- socket.rb | 55 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/socket.rb b/socket.rb index 97d82b2a6c..e3697548f3 100644 --- a/socket.rb +++ b/socket.rb @@ -1,23 +1,50 @@ require 'socket' +BUFFER_SLEEP_DURATION_MSECS = 20 + begin # create the UDPSocket object - sock = UDPSocket.new - - # bind it to 0.0.0.0 at the given port - port = 55443 - sock.bind "0.0.0.0", port - puts "UDP Socket bound to port #{port} and listening." + socket = UDPSocket.new - # while true loop to keep listening for new packets - while true do - data, sender = sock.recvfrom 1024 - puts "Recieved #{data.size} bytes from #{sender[3]}" - puts "Echoing data back to #{sender[3]}" - sock.send data, 0, sender[3], sender[1] - end + $dataBuffer = Array.new + $receivers = Hash.new + + listenThread = Thread.new(socket) { + # bind it to 0.0.0.0 at the given port + port = 55443 + socket.bind "0.0.0.0", port + puts "UDP Socket bound to port #{port} and listening." + + # while true loop to keep listening for new packets + while true do + data, sender = socket.recvfrom 1024 + puts "Recieved #{data.size} bytes from #{sender[3]}" + $dataBuffer << data + $receivers[sender[3]] = sender[1] + end + } + + sendThread = Thread.new(socket) { + while true do + sleep(BUFFER_SLEEP_DURATION_MSECS/1000) + + if $dataBuffer.size > 0 + $receivers.each do |ip, port| + puts $dataBuffer[0] + puts ip + puts port + socket.send $dataBuffer[0], 0, ip, port + end + + $dataBuffer.clear + end + end + } + + listenThread.join + sendThread.join rescue SystemExit, Interrupt => e puts puts "Okay, fine. I'll stop listening." - sock.close + socket.close end From 3bea53c66b715cce07f80da5f67420d213392ff4 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 11 Jan 2013 10:43:54 -0700 Subject: [PATCH 04/25] mix together multiple packets from clients --- socket.rb | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/socket.rb b/socket.rb index e3697548f3..1eba4e3581 100644 --- a/socket.rb +++ b/socket.rb @@ -1,11 +1,12 @@ require 'socket' BUFFER_SLEEP_DURATION_MSECS = 20 +CLIENT_LIST_MEMORY_DURATION_SECS = 1 begin # create the UDPSocket object socket = UDPSocket.new - $dataBuffer = Array.new + $dataBuffer = nil $receivers = Hash.new listenThread = Thread.new(socket) { @@ -17,9 +18,20 @@ begin # while true loop to keep listening for new packets while true do data, sender = socket.recvfrom 1024 - puts "Recieved #{data.size} bytes from #{sender[3]}" - $dataBuffer << data - $receivers[sender[3]] = sender[1] + puts "Recieved #{data.size} bytes from #{sender[3]} on #{sender[1]}" + + # there are 2 bytes per sample + newSamples = data.unpack("s<*") + + if $dataBuffer.nil? + $dataBuffer = newSamples + else + $dataBuffer.each_with_index do |sample, index| + sample = (sample + newSamples[index]) / 2 + end + end + + $receivers["#{sender[3]}:#{sender[1]}"] = Time.now.to_f end } @@ -27,15 +39,19 @@ begin while true do sleep(BUFFER_SLEEP_DURATION_MSECS/1000) - if $dataBuffer.size > 0 - $receivers.each do |ip, port| - puts $dataBuffer[0] - puts ip - puts port - socket.send $dataBuffer[0], 0, ip, port + unless $dataBuffer.nil? + $receivers.each do |ip_port, time| + if (time > Time.now.to_f - CLIENT_LIST_MEMORY_DURATION_SECS) + ip, port = ip_port.split(':') + socket.send $dataBuffer.pack("s<*"), 0, ip, port + puts "Sent mixed frame to #{ip} on #{port}" + else + puts "Nobody to send mixed frame to" + $receivers.delete(ip_port) + end end - $dataBuffer.clear + $dataBuffer = nil end end } From 2fc8bf9fa5d9e663edd3290dc3742879cc456cb3 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 11 Jan 2013 10:44:38 -0700 Subject: [PATCH 05/25] initial commit of code from Philip's spaceserver --- socket.cpp | 102 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 socket.cpp diff --git a/socket.cpp b/socket.cpp new file mode 100644 index 0000000000..a1cd3b447c --- /dev/null +++ b/socket.cpp @@ -0,0 +1,102 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +const int UDP_PORT = 55443; + +sockaddr_in address, dest_address; +socklen_t destLength = sizeof( dest_address ); + +double diffclock(timeval clock1,timeval clock2) +{ + double diffms = (clock2.tv_sec - clock1.tv_sec) * 1000.0; + diffms += (clock2.tv_usec - clock1.tv_usec) / 1000.0; // us to ms + return diffms; +} + +int network_init() +{ + // Create socket + int handle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + if (handle <= 0) { + printf( "failed to create socket\n" ); + return false; + } + + // Bind socket to port + address.sin_family = AF_INET; + address.sin_addr.s_addr = INADDR_ANY; + address.sin_port = htons( (unsigned short) UDP_PORT ); + + if (bind(handle, (const sockaddr*) &address, sizeof(sockaddr_in)) < 0) { + printf( "failed to bind socket\n" ); + return false; + } + + // Set socket as non-blocking + int nonBlocking = 1; + if (fcntl( handle, F_SETFL, O_NONBLOCK, nonBlocking ) == -1) { + printf( "failed to set non-blocking socket\n" ); + return false; + } + + dest_address.sin_family = AF_INET; + dest_address.sin_addr.s_addr = inet_addr(DESTINATION_IP); + dest_address.sin_port = htons( (unsigned short) UDP_PORT ); + + return handle; +} + +int main(int argc, const char * argv[]) +{ + int received_bytes = 0; + timeval time, last_time; + + int handle = network_init(); + + if (!handle) { + cout << "Failed to create network.\n"; + return 0; + } else { + cout << "Network Started. Waiting for packets.\n"; + } + + gettimeofday(&last_time, NULL); + + while (1) { + received_bytes = recvfrom(handle, (char*)packet_data, MAX_PACKET_SIZE, + 0, (sockaddr*)&dest_address, &destLength ); + if (received_bytes > 0) { + //std::cout << "Packet from: " << inet_ntoa(dest_address.sin_addr) + //<< " " << packet_data << "\n"; + float x,y,z; + sscanf(packet_data, "%f,%f,%f", &x, &y, &z); + if (addAgent(dest_address.sin_addr.s_addr, x, y, z)) { + cout << "Added agent from IP: " << + inet_ntoa(dest_address.sin_addr) << "\n"; + } + // Reply with packet listing nearby active agents + send_agent_list(handle, &dest_address); + } + gettimeofday(&time, NULL); + if (diffclock(last_time, time) > LOGOFF_CHECK_INTERVAL) { + gettimeofday(&last_time, NULL); + update_agent_list(last_time); + } + + usleep(10000); + } + return 0; +} + From 89bcc7a9084d6f4f51663b03dece3e18015a1ce4 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 11 Jan 2013 11:10:41 -0700 Subject: [PATCH 06/25] stub out a thread to send back buffer mix --- socket.cpp | 66 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/socket.cpp b/socket.cpp index a1cd3b447c..040062460c 100644 --- a/socket.cpp +++ b/socket.cpp @@ -8,14 +8,13 @@ #include #include #include -#include - -using namespace std; const int UDP_PORT = 55443; +const int MAX_PACKET_SIZE = 1024; +const float SAMPLE_RATE = 22050.0; +const int SAMPLES_PER_PACKET = 512; sockaddr_in address, dest_address; -socklen_t destLength = sizeof( dest_address ); double diffclock(timeval clock1,timeval clock2) { @@ -24,7 +23,7 @@ double diffclock(timeval clock1,timeval clock2) return diffms; } -int network_init() +int create_socket() { // Create socket int handle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); @@ -33,6 +32,13 @@ int network_init() printf( "failed to create socket\n" ); return false; } + + return handle; +} + +int network_init() +{ + int handle = create_socket(); // Bind socket to port address.sin_family = AF_INET; @@ -42,22 +48,33 @@ int network_init() if (bind(handle, (const sockaddr*) &address, sizeof(sockaddr_in)) < 0) { printf( "failed to bind socket\n" ); return false; - } - - // Set socket as non-blocking - int nonBlocking = 1; - if (fcntl( handle, F_SETFL, O_NONBLOCK, nonBlocking ) == -1) { - printf( "failed to set non-blocking socket\n" ); - return false; - } - - dest_address.sin_family = AF_INET; - dest_address.sin_addr.s_addr = inet_addr(DESTINATION_IP); - dest_address.sin_port = htons( (unsigned short) UDP_PORT ); + } return handle; } +void send_buffer_thread() +{ + // create our send socket + int handle = create_socket(); + + if (!handle) { + std::cout << "Failed to create buffer send socket.\n"; + return 0; + } else { + std::cout << "Buffer send socket created.\n"; + } + + while (1) { + // sleep for the length of a packet of audio + sleep((SAMPLES_PER_PACKET/SAMPLE_RATE) * pow(10, 6)); + + // send out whatever we have in the buffer as mixed audio + // to our recent clients + + } +} + int main(int argc, const char * argv[]) { int received_bytes = 0; @@ -66,30 +83,32 @@ int main(int argc, const char * argv[]) int handle = network_init(); if (!handle) { - cout << "Failed to create network.\n"; + std::cout << "Failed to create listening socket.\n"; return 0; } else { - cout << "Network Started. Waiting for packets.\n"; + std::cout << "Network Started. Waiting for packets.\n"; } gettimeofday(&last_time, NULL); while (1) { received_bytes = recvfrom(handle, (char*)packet_data, MAX_PACKET_SIZE, - 0, (sockaddr*)&dest_address, &destLength ); + 0, (sockaddr*)&dest_address, sizeof(dest_address)); if (received_bytes > 0) { - //std::cout << "Packet from: " << inet_ntoa(dest_address.sin_addr) - //<< " " << packet_data << "\n"; + // std::cout << "Packet from: " << inet_ntoa(dest_address.sin_addr) + // << " " << packet_data << "\n"; float x,y,z; sscanf(packet_data, "%f,%f,%f", &x, &y, &z); if (addAgent(dest_address.sin_addr.s_addr, x, y, z)) { - cout << "Added agent from IP: " << + std::cout << "Added agent from IP: " << inet_ntoa(dest_address.sin_addr) << "\n"; } // Reply with packet listing nearby active agents send_agent_list(handle, &dest_address); } + gettimeofday(&time, NULL); + if (diffclock(last_time, time) > LOGOFF_CHECK_INTERVAL) { gettimeofday(&last_time, NULL); update_agent_list(last_time); @@ -97,6 +116,7 @@ int main(int argc, const char * argv[]) usleep(10000); } + return 0; } From 170eaf7be23e99316c88c9a897f535bd2e5371ee Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 11 Jan 2013 11:25:54 -0700 Subject: [PATCH 07/25] add back Philip's agent code --- socket.cpp | 73 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 61 insertions(+), 12 deletions(-) diff --git a/socket.cpp b/socket.cpp index 040062460c..85244e8819 100644 --- a/socket.cpp +++ b/socket.cpp @@ -11,10 +11,25 @@ const int UDP_PORT = 55443; const int MAX_PACKET_SIZE = 1024; + const float SAMPLE_RATE = 22050.0; const int SAMPLES_PER_PACKET = 512; +const int MAX_AGENTS = 1000; +const int LOGOFF_CHECK_INTERVAL = 1000; + sockaddr_in address, dest_address; +socklen_t destLength = sizeof(dest_address); + +struct AgentList { + uint32_t ip; + in_addr sin_addr; + unsigned short port; + bool active; + timeval time; +} agents[MAX_AGENTS]; + +int num_agents = 0; double diffclock(timeval clock1,timeval clock2) { @@ -53,6 +68,42 @@ int network_init() return handle; } +int addAgent(uint32_t ip, unsigned short port) { + // Search for agent in list and add if needed + int i = 0; + int is_new = 0; + + while ((ip != agents[i].ip) && (i < num_agents)) { + i++; + } + + if ((i == num_agents) || (agents[i].active == false)) is_new = 1; + + agents[i].ip = ip; + agents[i].port = port; + agents[i].active = true; + agents[i].sin_addr.s_addr = ip; + gettimeofday(&agents[i].time, NULL); + + if (i == num_agents) { + num_agents++; + } + + return is_new; +} + +void update_agent_list(timeval now) { + int i; + // std::cout << "Checking agent list" << "\n"; + for (i = 0; i < num_agents; i++) { + if ((diffclock(agents[i].time, now) > LOGOFF_CHECK_INTERVAL) && + agents[i].active) { + std::cout << "Expired Agent #" << i << "\n"; + agents[i].active = false; + } + } +} + void send_buffer_thread() { // create our send socket @@ -60,7 +111,7 @@ void send_buffer_thread() if (!handle) { std::cout << "Failed to create buffer send socket.\n"; - return 0; + // return 0; } else { std::cout << "Buffer send socket created.\n"; } @@ -90,31 +141,29 @@ int main(int argc, const char * argv[]) } gettimeofday(&last_time, NULL); + + char packet_data[MAX_PACKET_SIZE]; while (1) { received_bytes = recvfrom(handle, (char*)packet_data, MAX_PACKET_SIZE, - 0, (sockaddr*)&dest_address, sizeof(dest_address)); + 0, (sockaddr*)&dest_address, &destLength); if (received_bytes > 0) { // std::cout << "Packet from: " << inet_ntoa(dest_address.sin_addr) // << " " << packet_data << "\n"; - float x,y,z; - sscanf(packet_data, "%f,%f,%f", &x, &y, &z); - if (addAgent(dest_address.sin_addr.s_addr, x, y, z)) { - std::cout << "Added agent from IP: " << - inet_ntoa(dest_address.sin_addr) << "\n"; + + if (addAgent(dest_address.sin_addr.s_addr, dest_address.sin_port)) { + std::cout << "Added agent: " << + inet_ntoa(dest_address.sin_addr) << " on " << + dest_address.sin_port << "\n"; } - // Reply with packet listing nearby active agents - send_agent_list(handle, &dest_address); } - + gettimeofday(&time, NULL); if (diffclock(last_time, time) > LOGOFF_CHECK_INTERVAL) { gettimeofday(&last_time, NULL); update_agent_list(last_time); } - - usleep(10000); } return 0; From 1754417af1ca0505c0a8f9edffa239b0ffc4848c Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 11 Jan 2013 13:14:55 -0700 Subject: [PATCH 08/25] add a gitignore for *.out files --- .gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000..fa929750c2 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.out \ No newline at end of file From 6cbe461520a48519ea2d12020f01ea6521ac4494 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 11 Jan 2013 13:15:51 -0700 Subject: [PATCH 09/25] thread client packets, non-blocking receive to remove agents --- socket.cpp | 117 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 33 deletions(-) diff --git a/socket.cpp b/socket.cpp index 85244e8819..1aca7203c3 100644 --- a/socket.cpp +++ b/socket.cpp @@ -8,6 +8,7 @@ #include #include #include +#include const int UDP_PORT = 55443; const int MAX_PACKET_SIZE = 1024; @@ -18,13 +19,13 @@ const int SAMPLES_PER_PACKET = 512; const int MAX_AGENTS = 1000; const int LOGOFF_CHECK_INTERVAL = 1000; +char packet_data[MAX_PACKET_SIZE]; + sockaddr_in address, dest_address; socklen_t destLength = sizeof(dest_address); struct AgentList { - uint32_t ip; - in_addr sin_addr; - unsigned short port; + sockaddr_in agent_addr; bool active; timeval time; } agents[MAX_AGENTS]; @@ -68,21 +69,23 @@ int network_init() return handle; } -int addAgent(uint32_t ip, unsigned short port) { +int addAgent(sockaddr_in dest_address) { // Search for agent in list and add if needed - int i = 0; int is_new = 0; - - while ((ip != agents[i].ip) && (i < num_agents)) { - i++; + int i = 0; + + for (i = 0; i < num_agents; i++) { + if (dest_address.sin_addr.s_addr == agents[i].agent_addr.sin_addr.s_addr) { + break; + } } - if ((i == num_agents) || (agents[i].active == false)) is_new = 1; - - agents[i].ip = ip; - agents[i].port = port; + if ((i == num_agents) || (agents[i].active == false)) { + is_new = 1; + } + + agents[i].agent_addr = dest_address; agents[i].active = true; - agents[i].sin_addr.s_addr = ip; gettimeofday(&agents[i].time, NULL); if (i == num_agents) { @@ -97,14 +100,14 @@ void update_agent_list(timeval now) { // std::cout << "Checking agent list" << "\n"; for (i = 0; i < num_agents; i++) { if ((diffclock(agents[i].time, now) > LOGOFF_CHECK_INTERVAL) && - agents[i].active) { + agents[i].active) { std::cout << "Expired Agent #" << i << "\n"; agents[i].active = false; } } } -void send_buffer_thread() +void *send_buffer_thread(void *args) { // create our send socket int handle = create_socket(); @@ -116,20 +119,55 @@ void send_buffer_thread() std::cout << "Buffer send socket created.\n"; } + int sent_bytes; + while (1) { // sleep for the length of a packet of audio - sleep((SAMPLES_PER_PACKET/SAMPLE_RATE) * pow(10, 6)); + usleep((SAMPLES_PER_PACKET/SAMPLE_RATE) * pow(10, 6)); // send out whatever we have in the buffer as mixed audio // to our recent clients + for (int i = 0; i < num_agents; i++) { + if (agents[i].active) { + + sockaddr_in dest_address = agents[i].agent_addr; + sent_bytes = sendto(handle, packet_data, MAX_PACKET_SIZE, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + + if (sent_bytes < MAX_PACKET_SIZE) { + std::cout << "Error sending mix packet!\n"; + } + } + } } } +struct process_arg_struct { + char packet_data[MAX_PACKET_SIZE]; + sockaddr_in dest_address; +}; + +void *process_client_packet(void *args) +{ + struct process_arg_struct *process_args = (struct process_arg_struct *) args; + + sockaddr_in dest_address = process_args->dest_address; + strcpy(packet_data, process_args->packet_data); + + if (addAgent(dest_address)) { + std::cout << "Added agent: " << + inet_ntoa(dest_address.sin_addr) << " on " << + dest_address.sin_port << "\n"; + } + + pthread_exit(0); +} + int main(int argc, const char * argv[]) { + timeval now, last_agent_update; int received_bytes = 0; - timeval time, last_time; int handle = network_init(); @@ -140,32 +178,45 @@ int main(int argc, const char * argv[]) std::cout << "Network Started. Waiting for packets.\n"; } - gettimeofday(&last_time, NULL); + // Set socket as non-blocking + int nonBlocking = 1; + if ( fcntl( handle, F_SETFL, O_NONBLOCK, nonBlocking ) == -1 ) + { + printf( "failed to set non-blocking socket\n" ); + return false; + } + + gettimeofday(&last_agent_update, NULL); char packet_data[MAX_PACKET_SIZE]; - + + pthread_t buffer_send_thread; + pthread_create(&buffer_send_thread, NULL, send_buffer_thread, NULL); + while (1) { received_bytes = recvfrom(handle, (char*)packet_data, MAX_PACKET_SIZE, 0, (sockaddr*)&dest_address, &destLength); + if (received_bytes > 0) { - // std::cout << "Packet from: " << inet_ntoa(dest_address.sin_addr) - // << " " << packet_data << "\n"; - - if (addAgent(dest_address.sin_addr.s_addr, dest_address.sin_port)) { - std::cout << "Added agent: " << - inet_ntoa(dest_address.sin_addr) << " on " << - dest_address.sin_port << "\n"; - } + struct process_arg_struct args; + strcpy(args.packet_data, packet_data); + args.dest_address = dest_address; + + pthread_t client_process_thread; + pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); + pthread_join(client_process_thread, NULL); } - gettimeofday(&time, NULL); - - if (diffclock(last_time, time) > LOGOFF_CHECK_INTERVAL) { - gettimeofday(&last_time, NULL); - update_agent_list(last_time); - } + gettimeofday(&now, NULL); + + if (diffclock(last_agent_update, now) > LOGOFF_CHECK_INTERVAL) { + gettimeofday(&last_agent_update, NULL); + update_agent_list(last_agent_update); + } } + pthread_join(buffer_send_thread, NULL); + return 0; } From 7252598ecfb62bb825862de4d0b31266f7c87a5c Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 11 Jan 2013 14:07:51 -0700 Subject: [PATCH 10/25] send back last received packet as mixed buffer --- socket.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/socket.cpp b/socket.cpp index 1aca7203c3..393c02aaa3 100644 --- a/socket.cpp +++ b/socket.cpp @@ -19,7 +19,7 @@ const int SAMPLES_PER_PACKET = 512; const int MAX_AGENTS = 1000; const int LOGOFF_CHECK_INTERVAL = 1000; -char packet_data[MAX_PACKET_SIZE]; +char* packet_buffer; sockaddr_in address, dest_address; socklen_t destLength = sizeof(dest_address); @@ -132,7 +132,7 @@ void *send_buffer_thread(void *args) if (agents[i].active) { sockaddr_in dest_address = agents[i].agent_addr; - sent_bytes = sendto(handle, packet_data, MAX_PACKET_SIZE, + sent_bytes = sendto(handle, packet_buffer, MAX_PACKET_SIZE, 0, (sockaddr *) &dest_address, sizeof(dest_address)); if (sent_bytes < MAX_PACKET_SIZE) { @@ -144,7 +144,7 @@ void *send_buffer_thread(void *args) } struct process_arg_struct { - char packet_data[MAX_PACKET_SIZE]; + char *packet_data; sockaddr_in dest_address; }; @@ -153,7 +153,7 @@ void *process_client_packet(void *args) struct process_arg_struct *process_args = (struct process_arg_struct *) args; sockaddr_in dest_address = process_args->dest_address; - strcpy(packet_data, process_args->packet_data); + packet_buffer = process_args->packet_data; if (addAgent(dest_address)) { std::cout << "Added agent: " << @@ -195,16 +195,16 @@ int main(int argc, const char * argv[]) while (1) { received_bytes = recvfrom(handle, (char*)packet_data, MAX_PACKET_SIZE, - 0, (sockaddr*)&dest_address, &destLength); - + 0, (sockaddr*)&dest_address, &destLength); + if (received_bytes > 0) { struct process_arg_struct args; - strcpy(args.packet_data, packet_data); + args.packet_data = packet_data; args.dest_address = dest_address; pthread_t client_process_thread; pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); - pthread_join(client_process_thread, NULL); + pthread_join(client_process_thread, NULL); } gettimeofday(&now, NULL); @@ -215,7 +215,7 @@ int main(int argc, const char * argv[]) } } - pthread_join(buffer_send_thread, NULL); + // pthread_join(buffer_send_thread, NULL); return 0; } From a533e505d3ccdf7418d1d6b8bc44781f189b5bfa Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 16 Jan 2013 14:25:32 -0800 Subject: [PATCH 11/25] add sftp config file and built program to gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index fa929750c2..23fb9577f7 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -*.out \ No newline at end of file +socket +sftp-config.json \ No newline at end of file From 41c0790ffbcf6bcc07b8427abf9b616013c7c6d5 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 16 Jan 2013 15:56:31 -0800 Subject: [PATCH 12/25] echo back last received packet on 20ms interval --- socket.cpp | 69 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/socket.cpp b/socket.cpp index 393c02aaa3..2ffa448b17 100644 --- a/socket.cpp +++ b/socket.cpp @@ -19,6 +19,8 @@ const int SAMPLES_PER_PACKET = 512; const int MAX_AGENTS = 1000; const int LOGOFF_CHECK_INTERVAL = 1000; +const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; + char* packet_buffer; sockaddr_in address, dest_address; @@ -45,7 +47,7 @@ int create_socket() int handle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (handle <= 0) { - printf( "failed to create socket\n" ); + printf("Failed to create socket: %d\n", handle); return false; } @@ -75,7 +77,8 @@ int addAgent(sockaddr_in dest_address) { int i = 0; for (i = 0; i < num_agents; i++) { - if (dest_address.sin_addr.s_addr == agents[i].agent_addr.sin_addr.s_addr) { + if (dest_address.sin_addr.s_addr == agents[i].agent_addr.sin_addr.s_addr + && dest_address.sin_port == agents[i].agent_addr.sin_port) { break; } } @@ -107,40 +110,46 @@ void update_agent_list(timeval now) { } } +struct send_buffer_struct { + int socket_handle; +}; + void *send_buffer_thread(void *args) { - // create our send socket - int handle = create_socket(); - - if (!handle) { - std::cout << "Failed to create buffer send socket.\n"; - // return 0; - } else { - std::cout << "Buffer send socket created.\n"; - } + struct send_buffer_struct *buffer_args = (struct send_buffer_struct *) args; + int handle = buffer_args->socket_handle; int sent_bytes; + timeval last_send, now; - while (1) { - // sleep for the length of a packet of audio - usleep((SAMPLES_PER_PACKET/SAMPLE_RATE) * pow(10, 6)); + gettimeofday(&last_send, NULL); + gettimeofday(&now, NULL); + while (true) { + while (diffclock(last_send, now) < BUFFER_SEND_INTERVAL) { + // loop here until we're allowed to send the buffer + gettimeofday(&now, NULL); + } + // send out whatever we have in the buffer as mixed audio // to our recent clients - for (int i = 0; i < num_agents; i++) { if (agents[i].active) { - sockaddr_in dest_address = agents[i].agent_addr; + sent_bytes = sendto(handle, packet_buffer, MAX_PACKET_SIZE, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); + 0, (sockaddr *) &dest_address, sizeof(sockaddr_in)); if (sent_bytes < MAX_PACKET_SIZE) { std::cout << "Error sending mix packet!\n"; } } } - } + + gettimeofday(&last_send, NULL); + } + + pthread_exit(0); } struct process_arg_struct { @@ -164,6 +173,13 @@ void *process_client_packet(void *args) pthread_exit(0); } +bool different_clients(sockaddr_in addr1, sockaddr_in addr2) +{ + return addr1.sin_addr.s_addr != addr2.sin_addr.s_addr || + (addr1.sin_addr.s_addr == addr2.sin_addr.s_addr && + addr1.sin_port != addr2.sin_port); +} + int main(int argc, const char * argv[]) { timeval now, last_agent_update; @@ -190,8 +206,11 @@ int main(int argc, const char * argv[]) char packet_data[MAX_PACKET_SIZE]; + struct send_buffer_struct send_buffer_args; + send_buffer_args.socket_handle = handle; + pthread_t buffer_send_thread; - pthread_create(&buffer_send_thread, NULL, send_buffer_thread, NULL); + pthread_create(&buffer_send_thread, NULL, send_buffer_thread, (void *)&send_buffer_args); while (1) { received_bytes = recvfrom(handle, (char*)packet_data, MAX_PACKET_SIZE, @@ -204,7 +223,13 @@ int main(int argc, const char * argv[]) pthread_t client_process_thread; pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); - pthread_join(client_process_thread, NULL); + pthread_join(client_process_thread, NULL); + + if (addAgent(dest_address)) { + std::cout << "Added agent: " << + inet_ntoa(dest_address.sin_addr) << " on " << + dest_address.sin_port << "\n"; + } } gettimeofday(&now, NULL); @@ -215,8 +240,8 @@ int main(int argc, const char * argv[]) } } - // pthread_join(buffer_send_thread, NULL); - + pthread_join(buffer_send_thread, NULL); + return 0; } From a65c4b50ca2b36085ea99af9834a40d9c7d816b6 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 17 Jan 2013 15:13:17 -0800 Subject: [PATCH 13/25] mix together packets received in between buffer sends --- socket.cpp | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/socket.cpp b/socket.cpp index 2ffa448b17..d057a1a750 100644 --- a/socket.cpp +++ b/socket.cpp @@ -9,6 +9,7 @@ #include #include #include +#include const int UDP_PORT = 55443; const int MAX_PACKET_SIZE = 1024; @@ -21,7 +22,7 @@ const int LOGOFF_CHECK_INTERVAL = 1000; const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; -char* packet_buffer; +int16_t* packet_buffer; sockaddr_in address, dest_address; socklen_t destLength = sizeof(dest_address); @@ -141,11 +142,12 @@ void *send_buffer_thread(void *args) 0, (sockaddr *) &dest_address, sizeof(sockaddr_in)); if (sent_bytes < MAX_PACKET_SIZE) { - std::cout << "Error sending mix packet!\n"; + std::cout << "Error sending mix packet! " << sent_bytes << strerror(errno) << "\n"; } } } + packet_buffer = NULL; gettimeofday(&last_send, NULL); } @@ -153,7 +155,7 @@ void *send_buffer_thread(void *args) } struct process_arg_struct { - char *packet_data; + int16_t *packet_data; sockaddr_in dest_address; }; @@ -162,7 +164,14 @@ void *process_client_packet(void *args) struct process_arg_struct *process_args = (struct process_arg_struct *) args; sockaddr_in dest_address = process_args->dest_address; - packet_buffer = process_args->packet_data; + + if (packet_buffer == NULL) { + packet_buffer = process_args->packet_data; + } else { + for (int i = 0; i < MAX_PACKET_SIZE; i++) { + packet_buffer[i] = (process_args->packet_data[i] + packet_buffer[i]) / 2; + } + } if (addAgent(dest_address)) { std::cout << "Added agent: " << @@ -184,6 +193,8 @@ int main(int argc, const char * argv[]) { timeval now, last_agent_update; int received_bytes = 0; + + packet_buffer = NULL; int handle = network_init(); @@ -204,7 +215,7 @@ int main(int argc, const char * argv[]) gettimeofday(&last_agent_update, NULL); - char packet_data[MAX_PACKET_SIZE]; + int16_t packet_data[MAX_PACKET_SIZE]; struct send_buffer_struct send_buffer_args; send_buffer_args.socket_handle = handle; @@ -213,7 +224,7 @@ int main(int argc, const char * argv[]) pthread_create(&buffer_send_thread, NULL, send_buffer_thread, (void *)&send_buffer_args); while (1) { - received_bytes = recvfrom(handle, (char*)packet_data, MAX_PACKET_SIZE, + received_bytes = recvfrom(handle, (int16_t*)packet_data, MAX_PACKET_SIZE, 0, (sockaddr*)&dest_address, &destLength); if (received_bytes > 0) { @@ -224,12 +235,6 @@ int main(int argc, const char * argv[]) pthread_t client_process_thread; pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); pthread_join(client_process_thread, NULL); - - if (addAgent(dest_address)) { - std::cout << "Added agent: " << - inet_ntoa(dest_address.sin_addr) << " on " << - dest_address.sin_port << "\n"; - } } gettimeofday(&now, NULL); From ea8729acc499782ced26e7ba16f6d35e3abc1d82 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 18 Jan 2013 11:18:54 -0800 Subject: [PATCH 14/25] better concurrency management for shared buffer --- socket.cpp | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/socket.cpp b/socket.cpp index d057a1a750..b41681b8b3 100644 --- a/socket.cpp +++ b/socket.cpp @@ -22,6 +22,8 @@ const int LOGOFF_CHECK_INTERVAL = 1000; const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; +pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER; + int16_t* packet_buffer; sockaddr_in address, dest_address; @@ -134,12 +136,19 @@ void *send_buffer_thread(void *args) // send out whatever we have in the buffer as mixed audio // to our recent clients + for (int i = 0; i < num_agents; i++) { if (agents[i].active) { sockaddr_in dest_address = agents[i].agent_addr; - sent_bytes = sendto(handle, packet_buffer, MAX_PACKET_SIZE, - 0, (sockaddr *) &dest_address, sizeof(sockaddr_in)); + pthread_mutex_lock(&buffer_mutex); + + if (packet_buffer != NULL) { + sent_bytes = sendto(handle, packet_buffer, MAX_PACKET_SIZE, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + } + + pthread_mutex_unlock(&buffer_mutex); if (sent_bytes < MAX_PACKET_SIZE) { std::cout << "Error sending mix packet! " << sent_bytes << strerror(errno) << "\n"; @@ -165,6 +174,8 @@ void *process_client_packet(void *args) sockaddr_in dest_address = process_args->dest_address; + pthread_mutex_lock(&buffer_mutex); + if (packet_buffer == NULL) { packet_buffer = process_args->packet_data; } else { @@ -173,6 +184,8 @@ void *process_client_packet(void *args) } } + pthread_mutex_unlock(&buffer_mutex); + if (addAgent(dest_address)) { std::cout << "Added agent: " << inet_ntoa(dest_address.sin_addr) << " on " << From d9ddaffab5b31716cf9de353a595e671babc80ac Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 30 Jan 2013 12:39:57 -0800 Subject: [PATCH 15/25] send closer.raw back to active agents for latency test --- socket.cpp | 68 +++++++++++++++++++++++++++--------------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/socket.cpp b/socket.cpp index b41681b8b3..eb7e80ffba 100644 --- a/socket.cpp +++ b/socket.cpp @@ -10,9 +10,12 @@ #include #include #include +#include const int UDP_PORT = 55443; -const int MAX_PACKET_SIZE = 1024; + +const int BUFFER_LENGTH_BYTES = 1024; +const int BUFFER_LENGTH_SAMPLES = BUFFER_LENGTH_BYTES / sizeof(int16_t); const float SAMPLE_RATE = 22050.0; const int SAMPLES_PER_PACKET = 512; @@ -22,9 +25,7 @@ const int LOGOFF_CHECK_INTERVAL = 1000; const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; -pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER; - -int16_t* packet_buffer; +int16_t* wc_noise_buffer; sockaddr_in address, dest_address; socklen_t destLength = sizeof(dest_address); @@ -124,9 +125,7 @@ void *send_buffer_thread(void *args) int sent_bytes; timeval last_send, now; - - gettimeofday(&last_send, NULL); - gettimeofday(&now, NULL); + int noise_byte_pointer = 0; while (true) { while (diffclock(last_send, now) < BUFFER_SEND_INTERVAL) { @@ -140,23 +139,17 @@ void *send_buffer_thread(void *args) for (int i = 0; i < num_agents; i++) { if (agents[i].active) { sockaddr_in dest_address = agents[i].agent_addr; - - pthread_mutex_lock(&buffer_mutex); - - if (packet_buffer != NULL) { - sent_bytes = sendto(handle, packet_buffer, MAX_PACKET_SIZE, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); - } - - pthread_mutex_unlock(&buffer_mutex); - - if (sent_bytes < MAX_PACKET_SIZE) { + + sent_bytes = sendto(handle, wc_noise_buffer + noise_byte_pointer, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + noise_byte_pointer += BUFFER_LENGTH_SAMPLES; + + if (sent_bytes < BUFFER_LENGTH_BYTES) { std::cout << "Error sending mix packet! " << sent_bytes << strerror(errno) << "\n"; } } } - packet_buffer = NULL; gettimeofday(&last_send, NULL); } @@ -173,18 +166,7 @@ void *process_client_packet(void *args) struct process_arg_struct *process_args = (struct process_arg_struct *) args; sockaddr_in dest_address = process_args->dest_address; - - pthread_mutex_lock(&buffer_mutex); - - if (packet_buffer == NULL) { - packet_buffer = process_args->packet_data; - } else { - for (int i = 0; i < MAX_PACKET_SIZE; i++) { - packet_buffer[i] = (process_args->packet_data[i] + packet_buffer[i]) / 2; - } - } - - pthread_mutex_unlock(&buffer_mutex); + dest_address.sin_port = htons((uint16_t) 55444); if (addAgent(dest_address)) { std::cout << "Added agent: " << @@ -202,12 +184,30 @@ bool different_clients(sockaddr_in addr1, sockaddr_in addr2) addr1.sin_port != addr2.sin_port); } +void white_noise_buffer_init() { + // open a pointer to the audio file + FILE *workclubFile = fopen("closer.raw", "r"); + + // get length of file + std::fseek(workclubFile, 0, SEEK_END); + int lengthInSamples = std::ftell(workclubFile) / sizeof(int16_t); + std::rewind(workclubFile); + + // read that amount of samples from the file + wc_noise_buffer = new int16_t[lengthInSamples]; + std::fread(wc_noise_buffer, sizeof(int16_t), lengthInSamples, workclubFile); + + // close it + std::fclose(workclubFile); +} + int main(int argc, const char * argv[]) { timeval now, last_agent_update; int received_bytes = 0; - packet_buffer = NULL; + // read in the workclub white noise file as a base layer of audio + white_noise_buffer_init(); int handle = network_init(); @@ -228,7 +228,7 @@ int main(int argc, const char * argv[]) gettimeofday(&last_agent_update, NULL); - int16_t packet_data[MAX_PACKET_SIZE]; + int16_t packet_data[BUFFER_LENGTH_SAMPLES]; struct send_buffer_struct send_buffer_args; send_buffer_args.socket_handle = handle; @@ -237,7 +237,7 @@ int main(int argc, const char * argv[]) pthread_create(&buffer_send_thread, NULL, send_buffer_thread, (void *)&send_buffer_args); while (1) { - received_bytes = recvfrom(handle, (int16_t*)packet_data, MAX_PACKET_SIZE, + received_bytes = recvfrom(handle, (int16_t*)packet_data, BUFFER_LENGTH_BYTES, 0, (sockaddr*)&dest_address, &destLength); if (received_bytes > 0) { From 50b59b2e54f41dd922e2a9a9c991822b2280f505 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 31 Jan 2013 09:54:59 -0800 Subject: [PATCH 16/25] reply to active agents with mix packets --- socket.cpp | 93 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/socket.cpp b/socket.cpp index eb7e80ffba..5beb88ce63 100644 --- a/socket.cpp +++ b/socket.cpp @@ -12,19 +12,19 @@ #include #include +const int MAX_AGENTS = 1000; +const int LOGOFF_CHECK_INTERVAL = 1000; + const int UDP_PORT = 55443; const int BUFFER_LENGTH_BYTES = 1024; const int BUFFER_LENGTH_SAMPLES = BUFFER_LENGTH_BYTES / sizeof(int16_t); - const float SAMPLE_RATE = 22050.0; const int SAMPLES_PER_PACKET = 512; - -const int MAX_AGENTS = 1000; -const int LOGOFF_CHECK_INTERVAL = 1000; - const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; +const int NUM_SOURCE_BUFFERS = 10; + int16_t* wc_noise_buffer; sockaddr_in address, dest_address; @@ -38,6 +38,12 @@ struct AgentList { int num_agents = 0; +struct SourceBuffer { + int16_t sourceAudioData[BUFFER_LENGTH_BYTES]; + timeval receiveTime; + bool available; +} sourceBuffers[NUM_SOURCE_BUFFERS]; + double diffclock(timeval clock1,timeval clock2) { double diffms = (clock2.tv_sec - clock1.tv_sec) * 1000.0; @@ -124,33 +130,67 @@ void *send_buffer_thread(void *args) int handle = buffer_args->socket_handle; int sent_bytes; - timeval last_send, now; - int noise_byte_pointer = 0; + + timeval firstSend; + timeval lastSend = {}; + timeval now; + + int16_t *clientMix = new int16_t[BUFFER_LENGTH_SAMPLES]; + + gettimeofday(&firstSend, NULL); + gettimeofday(&now, NULL); while (true) { - while (diffclock(last_send, now) < BUFFER_SEND_INTERVAL) { + while (lastSend.tv_sec != 0 && diffclock(lastSend, now) < BUFFER_SEND_INTERVAL) { // loop here until we're allowed to send the buffer gettimeofday(&now, NULL); } - - // send out whatever we have in the buffer as mixed audio - // to our recent clients + + sent_bytes = 0; for (int i = 0; i < num_agents; i++) { if (agents[i].active) { sockaddr_in dest_address = agents[i].agent_addr; - - sent_bytes = sendto(handle, wc_noise_buffer + noise_byte_pointer, BUFFER_LENGTH_BYTES, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); - noise_byte_pointer += BUFFER_LENGTH_SAMPLES; + int sampleOffset = floor(diffclock(firstSend, now) * (SAMPLE_RATE / 1000) + 0.5); + memcpy(clientMix, wc_noise_buffer + sampleOffset, BUFFER_LENGTH_BYTES); + + for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { + if (!sourceBuffers[b].available) { + int outputOffset = 0, dataSampleLength; + + int receiveOffset = BUFFER_LENGTH_SAMPLES - floor(diffclock(sourceBuffers[b].receiveTime, now) * (SAMPLE_RATE / 1000)); + + if (receiveOffset >= 0) { + outputOffset = receiveOffset; + dataSampleLength = BUFFER_LENGTH_SAMPLES - receiveOffset; + } else { + dataSampleLength = BUFFER_LENGTH_SAMPLES + receiveOffset; + } + + // std::cout << "SO: " << outputOffset << " DL: " << dataSampleLength << ". \n"; + + for (int s = outputOffset; s < dataSampleLength; s++) { + // we have source buffer data for this sample + clientMix[s] += sourceBuffers[b].sourceAudioData[s - receiveOffset]; + } + + if (outputOffset == 0) { + sourceBuffers[b].available = true; + } + } + } + + sent_bytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + if (sent_bytes < BUFFER_LENGTH_BYTES) { std::cout << "Error sending mix packet! " << sent_bytes << strerror(errno) << "\n"; } } } - gettimeofday(&last_send, NULL); + gettimeofday(&lastSend, NULL); } pthread_exit(0); @@ -166,7 +206,6 @@ void *process_client_packet(void *args) struct process_arg_struct *process_args = (struct process_arg_struct *) args; sockaddr_in dest_address = process_args->dest_address; - dest_address.sin_port = htons((uint16_t) 55444); if (addAgent(dest_address)) { std::cout << "Added agent: " << @@ -174,6 +213,16 @@ void *process_client_packet(void *args) dest_address.sin_port << "\n"; } + for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { + if (sourceBuffers[b].available) { + gettimeofday(&sourceBuffers[b].receiveTime, NULL); + memcpy(sourceBuffers[b].sourceAudioData, process_args->packet_data, BUFFER_LENGTH_BYTES); + sourceBuffers[b].available = false; + + break; + } + } + pthread_exit(0); } @@ -186,7 +235,7 @@ bool different_clients(sockaddr_in addr1, sockaddr_in addr2) void white_noise_buffer_init() { // open a pointer to the audio file - FILE *workclubFile = fopen("closer.raw", "r"); + FILE *workclubFile = fopen("wild.raw", "r"); // get length of file std::fseek(workclubFile, 0, SEEK_END); @@ -220,12 +269,16 @@ int main(int argc, const char * argv[]) // Set socket as non-blocking int nonBlocking = 1; - if ( fcntl( handle, F_SETFL, O_NONBLOCK, nonBlocking ) == -1 ) - { + if (fcntl(handle, F_SETFL, O_NONBLOCK, nonBlocking) == -1) { printf( "failed to set non-blocking socket\n" ); return false; } + // set source audio buffers availability to true + for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { + sourceBuffers[b].available = true; + } + gettimeofday(&last_agent_update, NULL); int16_t packet_data[BUFFER_LENGTH_SAMPLES]; From 913a206f16df19c8a0fc894d91370c599e4692b0 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 31 Jan 2013 12:00:04 -0800 Subject: [PATCH 17/25] start to change the timeshift strategy to remove clicks --- socket.cpp | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/socket.cpp b/socket.cpp index 5beb88ce63..708ca81ffd 100644 --- a/socket.cpp +++ b/socket.cpp @@ -129,8 +129,8 @@ void *send_buffer_thread(void *args) struct send_buffer_struct *buffer_args = (struct send_buffer_struct *) args; int handle = buffer_args->socket_handle; - int sent_bytes; - + int sentBytes; + int currentSample = 1; timeval firstSend; timeval lastSend = {}; timeval now; @@ -146,20 +146,22 @@ void *send_buffer_thread(void *args) gettimeofday(&now, NULL); } - sent_bytes = 0; + gettimeofday(&lastSend, NULL); + sentBytes = 0; for (int i = 0; i < num_agents; i++) { if (agents[i].active) { sockaddr_in dest_address = agents[i].agent_addr; - int sampleOffset = floor(diffclock(firstSend, now) * (SAMPLE_RATE / 1000) + 0.5); + int sampleOffset = currentSample * BUFFER_LENGTH_SAMPLES; memcpy(clientMix, wc_noise_buffer + sampleOffset, BUFFER_LENGTH_BYTES); for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { if (!sourceBuffers[b].available) { int outputOffset = 0, dataSampleLength; - int receiveOffset = BUFFER_LENGTH_SAMPLES - floor(diffclock(sourceBuffers[b].receiveTime, now) * (SAMPLE_RATE / 1000)); + int receiveSample = floor(diffclock(firstSend, sourceBuffers[b].receiveTime) * (SAMPLE_RATE / 1000)); + int receiveOffset = receiveSample - sampleOffset - BUFFER_LENGTH_SAMPLES; if (receiveOffset >= 0) { outputOffset = receiveOffset; @@ -168,7 +170,7 @@ void *send_buffer_thread(void *args) dataSampleLength = BUFFER_LENGTH_SAMPLES + receiveOffset; } - // std::cout << "SO: " << outputOffset << " DL: " << dataSampleLength << ". \n"; + std::cout << "SO: " << outputOffset << " DL: " << dataSampleLength << ". \n"; for (int s = outputOffset; s < dataSampleLength; s++) { // we have source buffer data for this sample @@ -181,16 +183,16 @@ void *send_buffer_thread(void *args) } } - sent_bytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, + sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, 0, (sockaddr *) &dest_address, sizeof(dest_address)); - if (sent_bytes < BUFFER_LENGTH_BYTES) { - std::cout << "Error sending mix packet! " << sent_bytes << strerror(errno) << "\n"; + if (sentBytes < BUFFER_LENGTH_BYTES) { + std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; } } } - gettimeofday(&lastSend, NULL); + currentSample++; } pthread_exit(0); From bddb09506f8f392646c8f1fd02a85d6b3300de59 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 31 Jan 2013 12:41:00 -0800 Subject: [PATCH 18/25] add an echo mode, no time shifting --- socket.cpp | 101 ++++++++++++++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 52 deletions(-) diff --git a/socket.cpp b/socket.cpp index 708ca81ffd..0ff44a6f52 100644 --- a/socket.cpp +++ b/socket.cpp @@ -23,10 +23,15 @@ const float SAMPLE_RATE = 22050.0; const int SAMPLES_PER_PACKET = 512; const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; +const int16_t MAX_SAMPLE_VALUE = 32767; +const int16_t MIN_SAMPLE_VALUE = -32767; + const int NUM_SOURCE_BUFFERS = 10; int16_t* wc_noise_buffer; +#define ECHO_DEBUG_MODE 1 + sockaddr_in address, dest_address; socklen_t destLength = sizeof(dest_address); @@ -39,8 +44,7 @@ struct AgentList { int num_agents = 0; struct SourceBuffer { - int16_t sourceAudioData[BUFFER_LENGTH_BYTES]; - timeval receiveTime; + int16_t sourceAudioData[BUFFER_LENGTH_SAMPLES]; bool available; } sourceBuffers[NUM_SOURCE_BUFFERS]; @@ -149,50 +153,39 @@ void *send_buffer_thread(void *args) gettimeofday(&lastSend, NULL); sentBytes = 0; - for (int i = 0; i < num_agents; i++) { - if (agents[i].active) { - sockaddr_in dest_address = agents[i].agent_addr; + int sampleOffset = floor(diffclock(firstSend, now) * (SAMPLE_RATE / 1000) + 0.5); + // memcpy(clientMix, wc_noise_buffer + sampleOffset, BUFFER_LENGTH_BYTES); + memset(clientMix, 0, BUFFER_LENGTH_BYTES); - int sampleOffset = currentSample * BUFFER_LENGTH_SAMPLES; - memcpy(clientMix, wc_noise_buffer + sampleOffset, BUFFER_LENGTH_BYTES); - - for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { - if (!sourceBuffers[b].available) { - int outputOffset = 0, dataSampleLength; - - int receiveSample = floor(diffclock(firstSend, sourceBuffers[b].receiveTime) * (SAMPLE_RATE / 1000)); - int receiveOffset = receiveSample - sampleOffset - BUFFER_LENGTH_SAMPLES; - - if (receiveOffset >= 0) { - outputOffset = receiveOffset; - dataSampleLength = BUFFER_LENGTH_SAMPLES - receiveOffset; - } else { - dataSampleLength = BUFFER_LENGTH_SAMPLES + receiveOffset; - } - - std::cout << "SO: " << outputOffset << " DL: " << dataSampleLength << ". \n"; - - for (int s = outputOffset; s < dataSampleLength; s++) { - // we have source buffer data for this sample - clientMix[s] += sourceBuffers[b].sourceAudioData[s - receiveOffset]; - } - - if (outputOffset == 0) { - sourceBuffers[b].available = true; - } + for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { + if (!sourceBuffers[b].available) { + for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { + // we have source buffer data for this sample + int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; + + if (mixSample >= MAX_SAMPLE_VALUE || mixSample <= MIN_SAMPLE_VALUE) { + std::cout << "Sample over at " << mixSample << ".\n"; } + + clientMix[s] += sourceBuffers[b].sourceAudioData[s]; } - sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); - - if (sentBytes < BUFFER_LENGTH_BYTES) { - std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; - } + sourceBuffers[b].available = true; } } - currentSample++; + for (int i = 0; i < num_agents; i++) { + if (agents[i].active) { + sockaddr_in dest_address = agents[i].agent_addr; + } + + sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + + if (sentBytes < BUFFER_LENGTH_BYTES) { + std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; + } + } } pthread_exit(0); @@ -217,7 +210,6 @@ void *process_client_packet(void *args) for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { if (sourceBuffers[b].available) { - gettimeofday(&sourceBuffers[b].receiveTime, NULL); memcpy(sourceBuffers[b].sourceAudioData, process_args->packet_data, BUFFER_LENGTH_BYTES); sourceBuffers[b].available = false; @@ -237,19 +229,19 @@ bool different_clients(sockaddr_in addr1, sockaddr_in addr2) void white_noise_buffer_init() { // open a pointer to the audio file - FILE *workclubFile = fopen("wild.raw", "r"); + FILE *whiteNoiseFile = fopen("workclub.raw", "r"); // get length of file - std::fseek(workclubFile, 0, SEEK_END); - int lengthInSamples = std::ftell(workclubFile) / sizeof(int16_t); - std::rewind(workclubFile); + std::fseek(whiteNoiseFile, 0, SEEK_END); + int lengthInSamples = std::ftell(whiteNoiseFile) / sizeof(int16_t); + std::rewind(whiteNoiseFile); // read that amount of samples from the file wc_noise_buffer = new int16_t[lengthInSamples]; - std::fread(wc_noise_buffer, sizeof(int16_t), lengthInSamples, workclubFile); + std::fread(wc_noise_buffer, sizeof(int16_t), lengthInSamples, whiteNoiseFile); // close it - std::fclose(workclubFile); + std::fclose(whiteNoiseFile); } int main(int argc, const char * argv[]) @@ -296,13 +288,18 @@ int main(int argc, const char * argv[]) 0, (sockaddr*)&dest_address, &destLength); if (received_bytes > 0) { - struct process_arg_struct args; - args.packet_data = packet_data; - args.dest_address = dest_address; + if (ECHO_DEBUG_MODE) { + sendto(handle, packet_data, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + } else { + struct process_arg_struct args; + args.packet_data = packet_data; + args.dest_address = dest_address; - pthread_t client_process_thread; - pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); - pthread_join(client_process_thread, NULL); + pthread_t client_process_thread; + pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); + pthread_join(client_process_thread, NULL); + } } gettimeofday(&now, NULL); From e5b374c6314ad1fb96bd3fe9618df71332cceb12 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 31 Jan 2013 15:15:23 -0800 Subject: [PATCH 19/25] only one source buffer per client --- socket.cpp | 66 ++++++++++++++++++++++++------------------------------ 1 file changed, 29 insertions(+), 37 deletions(-) diff --git a/socket.cpp b/socket.cpp index 0ff44a6f52..9f3ca2510c 100644 --- a/socket.cpp +++ b/socket.cpp @@ -23,14 +23,14 @@ const float SAMPLE_RATE = 22050.0; const int SAMPLES_PER_PACKET = 512; const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; -const int16_t MAX_SAMPLE_VALUE = 32767; -const int16_t MIN_SAMPLE_VALUE = -32767; +const int MAX_SAMPLE_VALUE = std::numeric_limits::max(); +const int MIN_SAMPLE_VALUE = std::numeric_limits::min(); -const int NUM_SOURCE_BUFFERS = 10; +const int MAX_SOURCE_BUFFERS = 10; int16_t* wc_noise_buffer; -#define ECHO_DEBUG_MODE 1 +#define ECHO_DEBUG_MODE 0 sockaddr_in address, dest_address; socklen_t destLength = sizeof(dest_address); @@ -45,8 +45,8 @@ int num_agents = 0; struct SourceBuffer { int16_t sourceAudioData[BUFFER_LENGTH_SAMPLES]; - bool available; -} sourceBuffers[NUM_SOURCE_BUFFERS]; + bool transmitted; +} sourceBuffers[MAX_SOURCE_BUFFERS]; double diffclock(timeval clock1,timeval clock2) { @@ -85,7 +85,7 @@ int network_init() return handle; } -int addAgent(sockaddr_in dest_address) { +int addAgent(sockaddr_in dest_address, void *audioData) { // Search for agent in list and add if needed int is_new = 0; int i = 0; @@ -104,6 +104,9 @@ int addAgent(sockaddr_in dest_address) { agents[i].agent_addr = dest_address; agents[i].active = true; gettimeofday(&agents[i].time, NULL); + + memcpy(sourceBuffers[i].sourceAudioData, audioData, BUFFER_LENGTH_BYTES); + sourceBuffers[i].transmitted = false; if (i == num_agents) { num_agents++; @@ -154,36 +157,35 @@ void *send_buffer_thread(void *args) sentBytes = 0; int sampleOffset = floor(diffclock(firstSend, now) * (SAMPLE_RATE / 1000) + 0.5); - // memcpy(clientMix, wc_noise_buffer + sampleOffset, BUFFER_LENGTH_BYTES); - memset(clientMix, 0, BUFFER_LENGTH_BYTES); + memcpy(clientMix, wc_noise_buffer + sampleOffset, BUFFER_LENGTH_BYTES); - for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { - if (!sourceBuffers[b].available) { + for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { + if (!sourceBuffers[b].transmitted) { for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { // we have source buffer data for this sample int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; - if (mixSample >= MAX_SAMPLE_VALUE || mixSample <= MIN_SAMPLE_VALUE) { - std::cout << "Sample over at " << mixSample << ".\n"; - } + int sampleToAdd = std::max(mixSample, MIN_SAMPLE_VALUE); + sampleToAdd = std::min(sampleToAdd, MAX_SAMPLE_VALUE); - clientMix[s] += sourceBuffers[b].sourceAudioData[s]; + clientMix[s] += sampleToAdd; } - sourceBuffers[b].available = true; + sourceBuffers[b].transmitted = true; } } for (int i = 0; i < num_agents; i++) { if (agents[i].active) { sockaddr_in dest_address = agents[i].agent_addr; - } - sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); - - if (sentBytes < BUFFER_LENGTH_BYTES) { - std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; + sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + + if (sentBytes < BUFFER_LENGTH_BYTES) { + std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; + } + } } } @@ -202,21 +204,12 @@ void *process_client_packet(void *args) sockaddr_in dest_address = process_args->dest_address; - if (addAgent(dest_address)) { + if (addAgent(dest_address, process_args->packet_data)) { std::cout << "Added agent: " << inet_ntoa(dest_address.sin_addr) << " on " << dest_address.sin_port << "\n"; } - for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { - if (sourceBuffers[b].available) { - memcpy(sourceBuffers[b].sourceAudioData, process_args->packet_data, BUFFER_LENGTH_BYTES); - sourceBuffers[b].available = false; - - break; - } - } - pthread_exit(0); } @@ -268,15 +261,14 @@ int main(int argc, const char * argv[]) return false; } - // set source audio buffers availability to true - for (int b = 0; b < NUM_SOURCE_BUFFERS; b++) { - sourceBuffers[b].available = true; - } - gettimeofday(&last_agent_update, NULL); int16_t packet_data[BUFFER_LENGTH_SAMPLES]; + for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { + sourceBuffers[b].transmitted = true; + } + struct send_buffer_struct send_buffer_args; send_buffer_args.socket_handle = handle; From 0787692014bbb6c0f28262a72e97c1b1d962d9e7 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 4 Feb 2013 08:35:49 -0800 Subject: [PATCH 20/25] send each client a different mix without themselves --- socket.cpp | 49 ++++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/socket.cpp b/socket.cpp index 9f3ca2510c..6bfbf1aadb 100644 --- a/socket.cpp +++ b/socket.cpp @@ -11,6 +11,7 @@ #include #include #include +#include const int MAX_AGENTS = 1000; const int LOGOFF_CHECK_INTERVAL = 1000; @@ -28,7 +29,8 @@ const int MIN_SAMPLE_VALUE = std::numeric_limits::min(); const int MAX_SOURCE_BUFFERS = 10; -int16_t* wc_noise_buffer; +int16_t* whiteNoiseBuffer; +int whiteNoiseLength; #define ECHO_DEBUG_MODE 0 @@ -143,6 +145,7 @@ void *send_buffer_thread(void *args) timeval now; int16_t *clientMix = new int16_t[BUFFER_LENGTH_SAMPLES]; + int16_t *masterMix = new int16_t[BUFFER_LENGTH_SAMPLES]; gettimeofday(&firstSend, NULL); gettimeofday(&now, NULL); @@ -157,26 +160,30 @@ void *send_buffer_thread(void *args) sentBytes = 0; int sampleOffset = floor(diffclock(firstSend, now) * (SAMPLE_RATE / 1000) + 0.5); - memcpy(clientMix, wc_noise_buffer + sampleOffset, BUFFER_LENGTH_BYTES); + sampleOffset = sampleOffset % whiteNoiseLength; - for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { - if (!sourceBuffers[b].transmitted) { - for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { - // we have source buffer data for this sample - int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; - - int sampleToAdd = std::max(mixSample, MIN_SAMPLE_VALUE); - sampleToAdd = std::min(sampleToAdd, MAX_SAMPLE_VALUE); - - clientMix[s] += sampleToAdd; - } - - sourceBuffers[b].transmitted = true; - } - } + memcpy(masterMix, whiteNoiseBuffer + sampleOffset, BUFFER_LENGTH_BYTES); for (int i = 0; i < num_agents; i++) { if (agents[i].active) { + memcpy(clientMix, masterMix, BUFFER_LENGTH_BYTES); + + for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { + if (b != i && !sourceBuffers[b].transmitted) { + for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { + // we have source buffer data for this sample + int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; + + int sampleToAdd = std::max(mixSample, MIN_SAMPLE_VALUE); + sampleToAdd = std::min(sampleToAdd, MAX_SAMPLE_VALUE); + + clientMix[s] = sampleToAdd; + } + + sourceBuffers[b].transmitted = true; + } + } + sockaddr_in dest_address = agents[i].agent_addr; sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, @@ -222,16 +229,16 @@ bool different_clients(sockaddr_in addr1, sockaddr_in addr2) void white_noise_buffer_init() { // open a pointer to the audio file - FILE *whiteNoiseFile = fopen("workclub.raw", "r"); + FILE *whiteNoiseFile = fopen("wild.raw", "r"); // get length of file std::fseek(whiteNoiseFile, 0, SEEK_END); - int lengthInSamples = std::ftell(whiteNoiseFile) / sizeof(int16_t); + whiteNoiseLength = std::ftell(whiteNoiseFile) / sizeof(int16_t); std::rewind(whiteNoiseFile); // read that amount of samples from the file - wc_noise_buffer = new int16_t[lengthInSamples]; - std::fread(wc_noise_buffer, sizeof(int16_t), lengthInSamples, whiteNoiseFile); + whiteNoiseBuffer = new int16_t[whiteNoiseLength]; + std::fread(whiteNoiseBuffer, sizeof(int16_t), whiteNoiseLength, whiteNoiseFile); // close it std::fclose(whiteNoiseFile); From 2b00a6432484d6b7088ac77021017c34e43771ef Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 4 Feb 2013 11:05:22 -0800 Subject: [PATCH 21/25] add .DS_Store to gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 23fb9577f7..1bcb570edf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ socket -sftp-config.json \ No newline at end of file +sftp-config.json +.DS_Store \ No newline at end of file From 8035471fa4e98c6be89b59464625c7ae99ba3d3f Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 4 Feb 2013 14:46:40 -0800 Subject: [PATCH 22/25] accomodate possibility of random increase in time between send loops --- socket.cpp | 82 +++++++++++++++++++++++++++--------------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/socket.cpp b/socket.cpp index 6bfbf1aadb..9f4fafe893 100644 --- a/socket.cpp +++ b/socket.cpp @@ -21,8 +21,7 @@ const int UDP_PORT = 55443; const int BUFFER_LENGTH_BYTES = 1024; const int BUFFER_LENGTH_SAMPLES = BUFFER_LENGTH_BYTES / sizeof(int16_t); const float SAMPLE_RATE = 22050.0; -const int SAMPLES_PER_PACKET = 512; -const float BUFFER_SEND_INTERVAL = (SAMPLES_PER_PACKET/SAMPLE_RATE) * 1000; +const float BUFFER_SEND_INTERVAL_USECS = (BUFFER_LENGTH_SAMPLES/SAMPLE_RATE) * 1000000; const int MAX_SAMPLE_VALUE = std::numeric_limits::max(); const int MIN_SAMPLE_VALUE = std::numeric_limits::min(); @@ -50,13 +49,17 @@ struct SourceBuffer { bool transmitted; } sourceBuffers[MAX_SOURCE_BUFFERS]; -double diffclock(timeval clock1,timeval clock2) +double diffclock(timeval clock1, timeval clock2) { double diffms = (clock2.tv_sec - clock1.tv_sec) * 1000.0; diffms += (clock2.tv_usec - clock1.tv_usec) / 1000.0; // us to ms return diffms; } +double usecTimestamp(timeval *time, double addedUsecs = 0) { + return (time->tv_sec * 1000000.0) + time->tv_usec + addedUsecs; +} + int create_socket() { // Create socket @@ -139,61 +142,58 @@ void *send_buffer_thread(void *args) int handle = buffer_args->socket_handle; int sentBytes; - int currentSample = 1; - timeval firstSend; - timeval lastSend = {}; + int currentFrame = 1; + timeval startTime; timeval now; int16_t *clientMix = new int16_t[BUFFER_LENGTH_SAMPLES]; int16_t *masterMix = new int16_t[BUFFER_LENGTH_SAMPLES]; - gettimeofday(&firstSend, NULL); - gettimeofday(&now, NULL); + gettimeofday(&startTime, NULL); while (true) { - while (lastSend.tv_sec != 0 && diffclock(lastSend, now) < BUFFER_SEND_INTERVAL) { - // loop here until we're allowed to send the buffer - gettimeofday(&now, NULL); - } + gettimeofday(&now, NULL); - gettimeofday(&lastSend, NULL); - sentBytes = 0; + while (usecTimestamp(&startTime, (currentFrame * BUFFER_SEND_INTERVAL_USECS)) <= usecTimestamp(&now)) { + sentBytes = 0; - int sampleOffset = floor(diffclock(firstSend, now) * (SAMPLE_RATE / 1000) + 0.5); - sampleOffset = sampleOffset % whiteNoiseLength; + int sampleOffset = ((currentFrame - 1) * BUFFER_LENGTH_SAMPLES) % whiteNoiseLength; - memcpy(masterMix, whiteNoiseBuffer + sampleOffset, BUFFER_LENGTH_BYTES); + memcpy(masterMix, whiteNoiseBuffer + sampleOffset, BUFFER_LENGTH_BYTES); - for (int i = 0; i < num_agents; i++) { - if (agents[i].active) { - memcpy(clientMix, masterMix, BUFFER_LENGTH_BYTES); + for (int i = 0; i < num_agents; i++) { + if (agents[i].active) { + memcpy(clientMix, masterMix, BUFFER_LENGTH_BYTES); - for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { - if (b != i && !sourceBuffers[b].transmitted) { - for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { - // we have source buffer data for this sample - int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; - - int sampleToAdd = std::max(mixSample, MIN_SAMPLE_VALUE); - sampleToAdd = std::min(sampleToAdd, MAX_SAMPLE_VALUE); + for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { + if (b != i && !sourceBuffers[b].transmitted) { + for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { + // we have source buffer data for this sample + int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; + + int sampleToAdd = std::max(mixSample, MIN_SAMPLE_VALUE); + sampleToAdd = std::min(sampleToAdd, MAX_SAMPLE_VALUE); - clientMix[s] = sampleToAdd; + clientMix[s] = sampleToAdd; + } + + sourceBuffers[b].transmitted = true; } - - sourceBuffers[b].transmitted = true; } + + sockaddr_in dest_address = agents[i].agent_addr; + + sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + + if (sentBytes < BUFFER_LENGTH_BYTES) { + std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; + } + } + } - sockaddr_in dest_address = agents[i].agent_addr; - - sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); - - if (sentBytes < BUFFER_LENGTH_BYTES) { - std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; - } - - } + currentFrame++; } } From 0cfc2dae522aa4bf7b22473d21b019b748b2dbe6 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 4 Feb 2013 17:11:34 -0800 Subject: [PATCH 23/25] output difference in time since last send for debug --- socket.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/socket.cpp b/socket.cpp index 9f4fafe893..66fd716ba9 100644 --- a/socket.cpp +++ b/socket.cpp @@ -144,6 +144,7 @@ void *send_buffer_thread(void *args) int sentBytes; int currentFrame = 1; timeval startTime; + timeval lastSend; timeval now; int16_t *clientMix = new int16_t[BUFFER_LENGTH_SAMPLES]; @@ -157,6 +158,8 @@ void *send_buffer_thread(void *args) while (usecTimestamp(&startTime, (currentFrame * BUFFER_SEND_INTERVAL_USECS)) <= usecTimestamp(&now)) { sentBytes = 0; + std::cout << "The difference was " << diffclock(lastSend, now) << "ms.\n"; + int sampleOffset = ((currentFrame - 1) * BUFFER_LENGTH_SAMPLES) % whiteNoiseLength; memcpy(masterMix, whiteNoiseBuffer + sampleOffset, BUFFER_LENGTH_BYTES); @@ -193,6 +196,7 @@ void *send_buffer_thread(void *args) } } + gettimeofday(&lastSend, NULL); currentFrame++; } } From 6f904204fbf9464328f57d6b90328432afdd9f03 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 7 Feb 2013 17:45:00 -0800 Subject: [PATCH 24/25] switch to dynamic sleep for send buffer repititions --- .gitignore | 3 +- socket.cpp | 140 +++++++++++++++++++++-------------------------------- 2 files changed, 58 insertions(+), 85 deletions(-) diff --git a/.gitignore b/.gitignore index 1bcb570edf..f69e105ae9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ socket sftp-config.json -.DS_Store \ No newline at end of file +.DS_Store +*.raw \ No newline at end of file diff --git a/socket.cpp b/socket.cpp index 66fd716ba9..e685984c8a 100644 --- a/socket.cpp +++ b/socket.cpp @@ -49,10 +49,10 @@ struct SourceBuffer { bool transmitted; } sourceBuffers[MAX_SOURCE_BUFFERS]; -double diffclock(timeval clock1, timeval clock2) +double diffclock(timeval *clock1, timeval *clock2) { - double diffms = (clock2.tv_sec - clock1.tv_sec) * 1000.0; - diffms += (clock2.tv_usec - clock1.tv_usec) / 1000.0; // us to ms + double diffms = (clock2->tv_sec - clock1->tv_sec) * 1000.0; + diffms += (clock2->tv_usec - clock1->tv_usec) / 1000.0; // us to ms return diffms; } @@ -120,18 +120,6 @@ int addAgent(sockaddr_in dest_address, void *audioData) { return is_new; } -void update_agent_list(timeval now) { - int i; - // std::cout << "Checking agent list" << "\n"; - for (i = 0; i < num_agents; i++) { - if ((diffclock(agents[i].time, now) > LOGOFF_CHECK_INTERVAL) && - agents[i].active) { - std::cout << "Expired Agent #" << i << "\n"; - agents[i].active = false; - } - } -} - struct send_buffer_struct { int socket_handle; }; @@ -143,9 +131,7 @@ void *send_buffer_thread(void *args) int sentBytes; int currentFrame = 1; - timeval startTime; - timeval lastSend; - timeval now; + timeval startTime, sendTime, now; int16_t *clientMix = new int16_t[BUFFER_LENGTH_SAMPLES]; int16_t *masterMix = new int16_t[BUFFER_LENGTH_SAMPLES]; @@ -153,52 +139,55 @@ void *send_buffer_thread(void *args) gettimeofday(&startTime, NULL); while (true) { - gettimeofday(&now, NULL); + sentBytes = 0; - while (usecTimestamp(&startTime, (currentFrame * BUFFER_SEND_INTERVAL_USECS)) <= usecTimestamp(&now)) { - sentBytes = 0; + int sampleOffset = ((currentFrame - 1) * BUFFER_LENGTH_SAMPLES) % whiteNoiseLength; + memcpy(masterMix, whiteNoiseBuffer + sampleOffset, BUFFER_LENGTH_BYTES); - std::cout << "The difference was " << diffclock(lastSend, now) << "ms.\n"; + gettimeofday(&sendTime, NULL); - int sampleOffset = ((currentFrame - 1) * BUFFER_LENGTH_SAMPLES) % whiteNoiseLength; + for (int a = 0; a < num_agents; a++) { + if (diffclock(&agents[a].time, &sendTime) <= LOGOFF_CHECK_INTERVAL) { + memcpy(clientMix, masterMix, BUFFER_LENGTH_BYTES); - memcpy(masterMix, whiteNoiseBuffer + sampleOffset, BUFFER_LENGTH_BYTES); + for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { + if (b != a && !sourceBuffers[b].transmitted) { + for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { + // we have source buffer data for this sample + int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; + + int sampleToAdd = std::max(mixSample, MIN_SAMPLE_VALUE); + sampleToAdd = std::min(sampleToAdd, MAX_SAMPLE_VALUE); - for (int i = 0; i < num_agents; i++) { - if (agents[i].active) { - memcpy(clientMix, masterMix, BUFFER_LENGTH_BYTES); - - for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) { - if (b != i && !sourceBuffers[b].transmitted) { - for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) { - // we have source buffer data for this sample - int mixSample = clientMix[s] + sourceBuffers[b].sourceAudioData[s]; - - int sampleToAdd = std::max(mixSample, MIN_SAMPLE_VALUE); - sampleToAdd = std::min(sampleToAdd, MAX_SAMPLE_VALUE); - - clientMix[s] = sampleToAdd; - } - - sourceBuffers[b].transmitted = true; + clientMix[s] = sampleToAdd; } + + sourceBuffers[b].transmitted = true; } - - sockaddr_in dest_address = agents[i].agent_addr; - - sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); - - if (sentBytes < BUFFER_LENGTH_BYTES) { - std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; - } - } - } - gettimeofday(&lastSend, NULL); - currentFrame++; + sockaddr_in dest_address = agents[a].agent_addr; + + sentBytes = sendto(handle, clientMix, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + + if (sentBytes < BUFFER_LENGTH_BYTES) { + std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n"; + } + } + } + + gettimeofday(&now, NULL); + + double usecToSleep = usecTimestamp(&startTime, (currentFrame * BUFFER_SEND_INTERVAL_USECS)) - usecTimestamp(&now); + + if (usecToSleep > 0) { + usleep(usecToSleep); + } else { + std::cout << "NOT SLEEPING!"; } + + currentFrame++; } pthread_exit(0); @@ -233,7 +222,7 @@ bool different_clients(sockaddr_in addr1, sockaddr_in addr2) void white_noise_buffer_init() { // open a pointer to the audio file - FILE *whiteNoiseFile = fopen("wild.raw", "r"); + FILE *whiteNoiseFile = fopen("opera.raw", "r"); // get length of file std::fseek(whiteNoiseFile, 0, SEEK_END); @@ -265,13 +254,6 @@ int main(int argc, const char * argv[]) std::cout << "Network Started. Waiting for packets.\n"; } - // Set socket as non-blocking - int nonBlocking = 1; - if (fcntl(handle, F_SETFL, O_NONBLOCK, nonBlocking) == -1) { - printf( "failed to set non-blocking socket\n" ); - return false; - } - gettimeofday(&last_agent_update, NULL); int16_t packet_data[BUFFER_LENGTH_SAMPLES]; @@ -286,31 +268,21 @@ int main(int argc, const char * argv[]) pthread_t buffer_send_thread; pthread_create(&buffer_send_thread, NULL, send_buffer_thread, (void *)&send_buffer_args); - while (1) { + while (true) { received_bytes = recvfrom(handle, (int16_t*)packet_data, BUFFER_LENGTH_BYTES, - 0, (sockaddr*)&dest_address, &destLength); + 0, (sockaddr*)&dest_address, &destLength); + if (ECHO_DEBUG_MODE) { + sendto(handle, packet_data, BUFFER_LENGTH_BYTES, + 0, (sockaddr *) &dest_address, sizeof(dest_address)); + } else { + struct process_arg_struct args; + args.packet_data = packet_data; + args.dest_address = dest_address; - if (received_bytes > 0) { - if (ECHO_DEBUG_MODE) { - sendto(handle, packet_data, BUFFER_LENGTH_BYTES, - 0, (sockaddr *) &dest_address, sizeof(dest_address)); - } else { - struct process_arg_struct args; - args.packet_data = packet_data; - args.dest_address = dest_address; - - pthread_t client_process_thread; - pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); - pthread_join(client_process_thread, NULL); - } + pthread_t client_process_thread; + pthread_create(&client_process_thread, NULL, process_client_packet, (void *)&args); + pthread_join(client_process_thread, NULL); } - - gettimeofday(&now, NULL); - - if (diffclock(last_agent_update, now) > LOGOFF_CHECK_INTERVAL) { - gettimeofday(&last_agent_update, NULL); - update_agent_list(last_agent_update); - } } pthread_join(buffer_send_thread, NULL); From d93ca2f2fcd3e03a2ea5a9131255cdb3c7d75243 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 7 Feb 2013 17:48:23 -0800 Subject: [PATCH 25/25] prepare project for move --- .gitignore => mixer/.gitignore | 0 socket.cpp => mixer/socket.cpp | 0 socket.rb => mixer/socket.rb | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename .gitignore => mixer/.gitignore (100%) rename socket.cpp => mixer/socket.cpp (100%) rename socket.rb => mixer/socket.rb (100%) diff --git a/.gitignore b/mixer/.gitignore similarity index 100% rename from .gitignore rename to mixer/.gitignore diff --git a/socket.cpp b/mixer/socket.cpp similarity index 100% rename from socket.cpp rename to mixer/socket.cpp diff --git a/socket.rb b/mixer/socket.rb similarity index 100% rename from socket.rb rename to mixer/socket.rb