From 640de64f1dadde8ffd9e88c9b2f36da15709a62a Mon Sep 17 00:00:00 2001 From: Bartosz Kostrzewa Date: Mon, 15 Nov 2010 18:18:28 +0100 Subject: [PATCH] significant improvements to the server, now has timeouts and buffer cleanup --- frameserver/Server.cpp | 288 +++++++++++++++++++++++------------------ frameserver/Server.h | 11 +- frameserver/defines.h | 8 +- 3 files changed, 179 insertions(+), 128 deletions(-) diff --git a/frameserver/Server.cpp b/frameserver/Server.cpp index 0bccc2c..7dbc235 100644 --- a/frameserver/Server.cpp +++ b/frameserver/Server.cpp @@ -17,6 +17,8 @@ void Server::launch_threads() { threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::listen), false ) ); threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::mix), false ) ); + threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::console), false) ); + threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::expire), false) ); } @@ -42,10 +44,9 @@ void Server::listen() udp::socket socket(io_service, udp::endpoint(udp::v4(), port)); cout << "listening" << endl; + frame_t frame; while (1) { - frame_t frame; - // creating the buffer each time is faster than zeroing it out boost::array recv_buf; udp::endpoint remote_endpoint; @@ -54,85 +55,86 @@ void Server::listen() socket.receive_from(boost::asio::buffer(recv_buf), remote_endpoint, 0, error); - // bufnum is used further down - int bufnum = 0; + /* the buffer is locked for a long long time, however, we need + to make sure that none of the buffers expires while we're about + to write to it */ { - Glib::Mutex::Lock lock(mutex_); - int size = endpoints.size(); - bool known = false; - for(bufnum = 0; bufnum < size; bufnum++) - { + Glib::Mutex::Lock lock(mutex_); + int size = buffers.size(); + time(¤ttime); + int bufnum = 0; + bool known = false; + for(bufnum = 0; bufnum < size; bufnum++) + { // have we encountered this source before? - if(endpoints[bufnum] == remote_endpoint) - { - known = true; - break; - } - } - - if( !known && size < NUMBUFS ) - { - // create a new buffer make a note of the endpoint - std::stringstream endpointstring; - endpointstring << remote_endpoint; - cout << "adding new buffer for " << remote_endpoint << endl; - buffers.push_back( new Buffer( endpointstring.str() ) ); - endpoints.push_back( remote_endpoint ); - } - - // discard packet, we're not accepting any more sources! - else if( !known && size >= NUMBUFS ) - { - cout << "no more buffers left! " << bufnum << endl; - continue; - } - } - - if( packetcounter % 10000 == 0 ) - { - cout << endl << "packets received " << packetcounter << endl; - /*cout << remote_endpoint << endl; - for(int i = 0; i < BUFLEN; i++) - cout << recv_buf[i]; - cout << endl;//*/ - } - packetcounter++; - - frame.z = recv_buf[0]; - for(int i = 0; i < HEIGHT; i++) - { - for(int j = 0; j < WIDTH; j++) - { - for(int a = 0; a < CHANNELS; a++) - { - frame.windows[i][j][a] = recv_buf[HEADEROFFSET+ i*(CHANNELS*WIDTH+1) + j*CHANNELS + a]; + if(endpoints[bufnum] == remote_endpoint) + { + known = true; + break; } } - } - - for(int w = 0; w < SEGWIDTH; w++ ) - { - for(int n = 0;n < SEGNUM; n++) - { - for(int a = 0; a < SEGCHANNELS; a++) - { - frame.segments[w][n][a] = recv_buf[HEADEROFFSET+WINDOWOFFSET+ w*(SEGCHANNELS*SEGNUM+1) + n*SEGCHANNELS + a]; - } - } - } - - // this part needs to be made threadsafe because buffers will be accessed - // by the mixer and the udp listener + + if( !known && size < NUMBUFS ) { - Glib::Mutex::Lock lock(mutex_); - // convert ascii to integer value - if( bufnum < buffers.size() ) + // create a new buffer make a note of the endpoint + std::stringstream endpointstring; + endpointstring << remote_endpoint; + cout << "adding new buffer for " << remote_endpoint << endl; + buffers.push_back( new Buffer( endpointstring.str() ) ); + endpoints.push_back( remote_endpoint ); + times.push_back( currenttime ); + } + + // discard packet, we're not accepting any more sources! + else if( !known && size >= NUMBUFS ) + { + cout << "no more buffers left! " << bufnum << endl; + continue; + } + + if( packetcounter % 10000 == 0 ) { - buffers[ bufnum ]->set(frame); + cout << endl << "packets received " << packetcounter << endl; + /*cout << remote_endpoint << endl; + for(int i = 0; i < BUFLEN; i++) + cout << recv_buf[i]; + cout << endl;//*/ + } + packetcounter++; + + frame.z = recv_buf[0]; + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + { + for(int a = 0; a < CHANNELS; a++) + { + frame.windows[i][j][a] = recv_buf[HEADEROFFSET+ i*(CHANNELS*WIDTH+1) + j*CHANNELS + a]; + } } } + for(int w = 0; w < SEGWIDTH; w++ ) + { + for(int n = 0;n < SEGNUM; n++) + { + for(int a = 0; a < SEGCHANNELS; a++) + { + frame.segments[w][n][a] = recv_buf[HEADEROFFSET+WINDOWOFFSET+ w*(SEGCHANNELS*SEGNUM+1) + n*SEGCHANNELS + a]; + } + } + } + + // be extra certain that we're not writing into wild memory + if( bufnum < buffers.size() ) + { + buffers[ bufnum ]->set(frame); + + // this is accurate enough for the purpose of expiring unused buffers + times[bufnum] = currenttime; + } + } // lock is released if (error && error != boost::asio::error::message_size) throw boost::system::system_error(error); @@ -160,75 +162,113 @@ void Server::mix() { int size = 0; int counter = 0; - + while(1) { counter++; - frame_t frame, temp_frame; + frame_t frame, temp_frame; + + // we lock the buffers for a long time, but we need to make sure + // that none of the buffers is allowed to expire while we're working on it! { Glib::Mutex::Lock lock(mutex_); size = buffers.size(); - } - - for(int x = 0; x < size; x++) - { + for(int x = 0; x < size; x++) { - Glib::Mutex::Lock lock(mutex_); - temp_frame = buffers[x]->get(); - } - - for(int i = 0; i < HEIGHT; i++) - { - for(int j = 0; j < WIDTH; j++) + temp_frame = buffers[x]->get(); + + for(int i = 0; i < HEIGHT; i++) { - for(int a = 0; a < CHANNELS; a++) - { - // do something interesting here - frame.windows[i][j][a] = temp_frame.windows[i][j][a]; + for(int j = 0; j < WIDTH; j++) + { + for(int a = 0; a < CHANNELS; a++) + { + // do something interesting here + frame.windows[i][j][a] = temp_frame.windows[i][j][a]; + } } } - } - - for(int w = 0; w < SEGWIDTH; w++ ) - { - for(int n = 0;n < SEGNUM; n++) - { - for(int a = 0; a < SEGCHANNELS; a++) - { - frame.segments[w][n][a] = temp_frame.segments[w][n][a]; - } + + for(int w = 0; w < SEGWIDTH; w++ ) + { + for(int n = 0;n < SEGNUM; n++) + { + for(int a = 0; a < SEGCHANNELS; a++) + { + frame.segments[w][n][a] = temp_frame.segments[w][n][a]; + } + } } - } - if( counter % 100 == 0 ) - { - cout << counter << endl; - for(int i = 0; i < HEIGHT; i++) - { - for(int j = 0; j < WIDTH; j++) - { - cout << frame.windows[i][j][0]; - } - cout << endl; - } - cout << endl; - - for(int w = 0; w < SEGWIDTH; w++) - { - for(int n = 0; n < SEGNUM; n++) - { - cout << frame.segments[w][n][0]; - } - cout << endl; - } - cout << endl << endl; - } //*/ + /*if( counter % 100 == 0 ) + { + cout << counter << endl; + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + { + cout << frame.windows[i][j][0]; + } + cout << endl; + } + cout << endl; + + for(int w = 0; w < SEGWIDTH; w++) + { + for(int n = 0; n < SEGNUM; n++) + { + cout << frame.segments[w][n][0]; + } + cout << endl; + } + cout << endl << endl; + } //*/ + } } - usleep( 25000 ); } } -void Server::control() +void Server::console() { + while(1) + { + usleep( 100000 ); + } +} + +int Server::get_size() +{ + Glib::Mutex::Lock lock(mutex_); + return buffers.size(); +} + + +/* this expires buffers if they haven't been updated in a long time, + * therefore allowing a new source to be added */ + +void Server::expire() +{ + while(1) + { + { + Glib::Mutex::Lock lock(mutex_); + time(¤ttime); + for(int i = 0; i < buffers.size(); i++) + { + if( difftime( currenttime, times[i] ) > BUFTIMEOUT ) + { + cout << "buffer " << i << " will now expire\n"; + delete buffers[i]; + buffers.erase(buffers.begin()+i); + times.erase(times.begin()+i); + endpoints.erase(endpoints.begin()+i); + + // element i has been deleted, i-- is required + i--; + } + } + } + usleep( 1000000 ); + } } diff --git a/frameserver/Server.h b/frameserver/Server.h index 2271388..eb2bfac 100644 --- a/frameserver/Server.h +++ b/frameserver/Server.h @@ -5,6 +5,9 @@ #include #include #include + +#include + #include #include @@ -24,17 +27,21 @@ public: void listen(); void mix(); + void expire(); void launch_threads(); - void control(); + void console(); + int get_size(); private: Glib::Mutex mutex_; vector threads; + vector buffers; vector endpoints; + vector times; - int numbufs; + time_t currenttime; int port; }; diff --git a/frameserver/defines.h b/frameserver/defines.h index 8d57239..5caa81c 100644 --- a/frameserver/defines.h +++ b/frameserver/defines.h @@ -1,8 +1,10 @@ #ifndef __DEFINES_H_ #define __DEFINES_H_ -#define BUFLEN 572 -#define NUMBUFS 100 +// four minutes should be enough +#define BUFTIMEOUT 240 + +#define NUMBUFS 200 // one number + newline #define HEADEROFFSET 2 @@ -19,6 +21,8 @@ #define SEGWIDTH 12 #define SEGCHANNELS 4 +#define BUFLEN HEADEROFFSET+WINDOWOFFSET+ (SEGNUM*SEGCHANNELS+1)*SEGWIDTH + // not used for simplicity //#define SEGHEIGHT 1