significant improvements to the server, now has timeouts and buffer cleanup

master
Bartosz Kostrzewa 2010-11-15 18:18:28 +01:00
parent 75e29a6f66
commit 640de64f1d
3 changed files with 179 additions and 128 deletions

View File

@ -17,6 +17,8 @@ void Server::launch_threads()
{ {
threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::listen), false ) ); threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::listen), false ) );
threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::mix), false ) ); threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::mix), false ) );
threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::console), false) );
threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::expire), false) );
} }
@ -42,10 +44,9 @@ void Server::listen()
udp::socket socket(io_service, udp::endpoint(udp::v4(), port)); udp::socket socket(io_service, udp::endpoint(udp::v4(), port));
cout << "listening" << endl; cout << "listening" << endl;
frame_t frame;
while (1) while (1)
{ {
frame_t frame;
// creating the buffer each time is faster than zeroing it out // creating the buffer each time is faster than zeroing it out
boost::array<char, BUFLEN> recv_buf; boost::array<char, BUFLEN> recv_buf;
udp::endpoint remote_endpoint; udp::endpoint remote_endpoint;
@ -54,85 +55,86 @@ void Server::listen()
socket.receive_from(boost::asio::buffer(recv_buf), socket.receive_from(boost::asio::buffer(recv_buf),
remote_endpoint, 0, error); remote_endpoint, 0, error);
// bufnum is used further down // bufnum is used further down
int bufnum = 0; /* the buffer is locked for a long long time, however, we need
to make sure that none of the buffers expires while we're about
to write to it */
{ {
Glib::Mutex::Lock lock(mutex_); Glib::Mutex::Lock lock(mutex_);
int size = endpoints.size(); int size = buffers.size();
bool known = false; time(&currenttime);
for(bufnum = 0; bufnum < size; bufnum++) int bufnum = 0;
{ bool known = false;
for(bufnum = 0; bufnum < size; bufnum++)
{
// have we encountered this source before? // have we encountered this source before?
if(endpoints[bufnum] == remote_endpoint) if(endpoints[bufnum] == remote_endpoint)
{ {
known = true; known = true;
break; break;
}
}
if( !known && size < NUMBUFS )
{
// create a new buffer make a note of the endpoint
std::stringstream endpointstring;
endpointstring << remote_endpoint;
cout << "adding new buffer for " << remote_endpoint << endl;
buffers.push_back( new Buffer( endpointstring.str() ) );
endpoints.push_back( remote_endpoint );
}
// discard packet, we're not accepting any more sources!
else if( !known && size >= NUMBUFS )
{
cout << "no more buffers left! " << bufnum << endl;
continue;
}
}
if( packetcounter % 10000 == 0 )
{
cout << endl << "packets received " << packetcounter << endl;
/*cout << remote_endpoint << endl;
for(int i = 0; i < BUFLEN; i++)
cout << recv_buf[i];
cout << endl;//*/
}
packetcounter++;
frame.z = recv_buf[0];
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{
for(int a = 0; a < CHANNELS; a++)
{
frame.windows[i][j][a] = recv_buf[HEADEROFFSET+ i*(CHANNELS*WIDTH+1) + j*CHANNELS + a];
} }
} }
}
if( !known && size < NUMBUFS )
for(int w = 0; w < SEGWIDTH; w++ )
{
for(int n = 0;n < SEGNUM; n++)
{
for(int a = 0; a < SEGCHANNELS; a++)
{
frame.segments[w][n][a] = recv_buf[HEADEROFFSET+WINDOWOFFSET+ w*(SEGCHANNELS*SEGNUM+1) + n*SEGCHANNELS + a];
}
}
}
// this part needs to be made threadsafe because buffers will be accessed
// by the mixer and the udp listener
{ {
Glib::Mutex::Lock lock(mutex_); // create a new buffer make a note of the endpoint
// convert ascii to integer value std::stringstream endpointstring;
if( bufnum < buffers.size() ) endpointstring << remote_endpoint;
cout << "adding new buffer for " << remote_endpoint << endl;
buffers.push_back( new Buffer( endpointstring.str() ) );
endpoints.push_back( remote_endpoint );
times.push_back( currenttime );
}
// discard packet, we're not accepting any more sources!
else if( !known && size >= NUMBUFS )
{
cout << "no more buffers left! " << bufnum << endl;
continue;
}
if( packetcounter % 10000 == 0 )
{ {
buffers[ bufnum ]->set(frame); cout << endl << "packets received " << packetcounter << endl;
/*cout << remote_endpoint << endl;
for(int i = 0; i < BUFLEN; i++)
cout << recv_buf[i];
cout << endl;//*/
}
packetcounter++;
frame.z = recv_buf[0];
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{
for(int a = 0; a < CHANNELS; a++)
{
frame.windows[i][j][a] = recv_buf[HEADEROFFSET+ i*(CHANNELS*WIDTH+1) + j*CHANNELS + a];
}
} }
} }
for(int w = 0; w < SEGWIDTH; w++ )
{
for(int n = 0;n < SEGNUM; n++)
{
for(int a = 0; a < SEGCHANNELS; a++)
{
frame.segments[w][n][a] = recv_buf[HEADEROFFSET+WINDOWOFFSET+ w*(SEGCHANNELS*SEGNUM+1) + n*SEGCHANNELS + a];
}
}
}
// be extra certain that we're not writing into wild memory
if( bufnum < buffers.size() )
{
buffers[ bufnum ]->set(frame);
// this is accurate enough for the purpose of expiring unused buffers
times[bufnum] = currenttime;
}
} // lock is released
if (error && error != boost::asio::error::message_size) if (error && error != boost::asio::error::message_size)
throw boost::system::system_error(error); throw boost::system::system_error(error);
@ -160,75 +162,113 @@ void Server::mix()
{ {
int size = 0; int size = 0;
int counter = 0; int counter = 0;
while(1) while(1)
{ {
counter++; counter++;
frame_t frame, temp_frame; frame_t frame, temp_frame;
// we lock the buffers for a long time, but we need to make sure
// that none of the buffers is allowed to expire while we're working on it!
{ {
Glib::Mutex::Lock lock(mutex_); Glib::Mutex::Lock lock(mutex_);
size = buffers.size(); size = buffers.size();
} for(int x = 0; x < size; x++)
for(int x = 0; x < size; x++)
{
{ {
Glib::Mutex::Lock lock(mutex_); temp_frame = buffers[x]->get();
temp_frame = buffers[x]->get();
} for(int i = 0; i < HEIGHT; i++)
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{ {
for(int a = 0; a < CHANNELS; a++) for(int j = 0; j < WIDTH; j++)
{ {
// do something interesting here for(int a = 0; a < CHANNELS; a++)
frame.windows[i][j][a] = temp_frame.windows[i][j][a]; {
// do something interesting here
frame.windows[i][j][a] = temp_frame.windows[i][j][a];
}
} }
} }
}
for(int w = 0; w < SEGWIDTH; w++ )
for(int w = 0; w < SEGWIDTH; w++ ) {
{ for(int n = 0;n < SEGNUM; n++)
for(int n = 0;n < SEGNUM; n++) {
{ for(int a = 0; a < SEGCHANNELS; a++)
for(int a = 0; a < SEGCHANNELS; a++) {
{ frame.segments[w][n][a] = temp_frame.segments[w][n][a];
frame.segments[w][n][a] = temp_frame.segments[w][n][a]; }
} }
} }
}
if( counter % 100 == 0 ) /*if( counter % 100 == 0 )
{ {
cout << counter << endl; cout << counter << endl;
for(int i = 0; i < HEIGHT; i++) for(int i = 0; i < HEIGHT; i++)
{ {
for(int j = 0; j < WIDTH; j++) for(int j = 0; j < WIDTH; j++)
{ {
cout << frame.windows[i][j][0]; cout << frame.windows[i][j][0];
} }
cout << endl; cout << endl;
} }
cout << endl; cout << endl;
for(int w = 0; w < SEGWIDTH; w++) for(int w = 0; w < SEGWIDTH; w++)
{ {
for(int n = 0; n < SEGNUM; n++) for(int n = 0; n < SEGNUM; n++)
{ {
cout << frame.segments[w][n][0]; cout << frame.segments[w][n][0];
} }
cout << endl; cout << endl;
} }
cout << endl << endl; cout << endl << endl;
} //*/ } //*/
}
} }
usleep( 25000 ); usleep( 25000 );
} }
} }
void Server::control() void Server::console()
{ {
while(1)
{
usleep( 100000 );
}
}
int Server::get_size()
{
Glib::Mutex::Lock lock(mutex_);
return buffers.size();
}
/* this expires buffers if they haven't been updated in a long time,
* therefore allowing a new source to be added */
void Server::expire()
{
while(1)
{
{
Glib::Mutex::Lock lock(mutex_);
time(&currenttime);
for(int i = 0; i < buffers.size(); i++)
{
if( difftime( currenttime, times[i] ) > BUFTIMEOUT )
{
cout << "buffer " << i << " will now expire\n";
delete buffers[i];
buffers.erase(buffers.begin()+i);
times.erase(times.begin()+i);
endpoints.erase(endpoints.begin()+i);
// element i has been deleted, i-- is required
i--;
}
}
}
usleep( 1000000 );
}
} }

View File

@ -5,6 +5,9 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <list> #include <list>
#include <glibmm/timeval.h>
#include <boost/array.hpp> #include <boost/array.hpp>
#include <boost/asio.hpp> #include <boost/asio.hpp>
@ -24,17 +27,21 @@ public:
void listen(); void listen();
void mix(); void mix();
void expire();
void launch_threads(); void launch_threads();
void control(); void console();
int get_size();
private: private:
Glib::Mutex mutex_; Glib::Mutex mutex_;
vector<Glib::Thread*> threads; vector<Glib::Thread*> threads;
vector<Buffer*> buffers; vector<Buffer*> buffers;
vector<udp::endpoint> endpoints; vector<udp::endpoint> endpoints;
vector<time_t> times;
int numbufs; time_t currenttime;
int port; int port;
}; };

View File

@ -1,8 +1,10 @@
#ifndef __DEFINES_H_ #ifndef __DEFINES_H_
#define __DEFINES_H_ #define __DEFINES_H_
#define BUFLEN 572 // four minutes should be enough
#define NUMBUFS 100 #define BUFTIMEOUT 240
#define NUMBUFS 200
// one number + newline // one number + newline
#define HEADEROFFSET 2 #define HEADEROFFSET 2
@ -19,6 +21,8 @@
#define SEGWIDTH 12 #define SEGWIDTH 12
#define SEGCHANNELS 4 #define SEGCHANNELS 4
#define BUFLEN HEADEROFFSET+WINDOWOFFSET+ (SEGNUM*SEGCHANNELS+1)*SEGWIDTH
// not used for simplicity // not used for simplicity
//#define SEGHEIGHT 1 //#define SEGHEIGHT 1