diff --git a/clients/plasma.py b/clients/plasma.py new file mode 100644 index 0000000..2495afa --- /dev/null +++ b/clients/plasma.py @@ -0,0 +1,64 @@ +# Client program + +from socket import * +import sys +import time +import random +from math import * + +# Set the socket parameters +local_port = 5000 +remote_port = 4321 + +# TODO: autodetect interface address for remote application +outgoing_if = "127.0.0.1" +remote_host = "127.0.0.1" + +# udp is the default for DGRAM +UDPSock = socket(AF_INET, SOCK_DGRAM) +UDPSock.bind((outgoing_if, local_port)) + +# we will not use connections so we can keep working even if the server +# goes down or refuses connection +#UDPSock.connect((remote_host, remote_port)) + +segments = open('segments','r') + +alpha = 1 + +z_buffer = "1" + "\n" +width = 7 +height = 12 + +# Send messages +sleeptime = 0.04 +t = 0 +frequency = 2*pi/40 +while (1): + #zero out the data buffer + data = z_buffer + for i in range(0,width): + for j in range(0,height): + pixel = sin(1.5*pi*(float(i)/width)+t*frequency)*sin(1.5*pi*(float(j)/height)+t*frequency) + if pixel < 0.25: + pixel = '.' + elif pixel < 0.5: + pixel = '-' + elif pixel < 0.75: + pixel = '+' + elif pixel <= 1.0: + pixel = '#' + else: + pixel = '*' + data = data + pixel + str(alpha) + data = data + "\n" + t+=1 + #data += segments.read() + if not data: + break + else: + UDPSock.sendto(data,(remote_host,remote_port)) + #send ~100 packets per second + time.sleep(sleeptime) + +UDPSock.close() diff --git a/clients/segments b/clients/segments new file mode 100644 index 0000000..5c16404 --- /dev/null +++ b/clients/segments @@ -0,0 +1,12 @@ +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba +rgbargbargbargbargbargbargbargba diff --git a/frameserver/Server.cpp b/frameserver/Server.cpp index 7dbc235..c7f2cf7 100644 --- a/frameserver/Server.cpp +++ b/frameserver/Server.cpp @@ -3,6 +3,9 @@ Server::Server(int _port ) { port = _port; + displaycounter = 0; + packetcounter = 0; + consoleinit = false; } Server::~Server() @@ -33,7 +36,6 @@ void Server::launch_threads() void Server::listen() { - long packetcounter = 0; try { boost::asio::io_service io_service; @@ -43,7 +45,7 @@ void Server::listen() // should be safe udp::socket socket(io_service, udp::endpoint(udp::v4(), port)); - cout << "listening" << endl; + //cout << "listening" << endl; frame_t frame; while (1) { @@ -80,7 +82,7 @@ void Server::listen() // create a new buffer make a note of the endpoint std::stringstream endpointstring; endpointstring << remote_endpoint; - cout << "adding new buffer for " << remote_endpoint << endl; + //cout << "adding new buffer for " << remote_endpoint << endl; buffers.push_back( new Buffer( endpointstring.str() ) ); endpoints.push_back( remote_endpoint ); times.push_back( currenttime ); @@ -89,13 +91,13 @@ void Server::listen() // discard packet, we're not accepting any more sources! else if( !known && size >= NUMBUFS ) { - cout << "no more buffers left! " << bufnum << endl; + //cout << "no more buffers left! " << bufnum << endl; continue; } if( packetcounter % 10000 == 0 ) { - cout << endl << "packets received " << packetcounter << endl; + //cout << endl << "packets received " << packetcounter << endl; /*cout << remote_endpoint << endl; for(int i = 0; i < BUFLEN; i++) cout << recv_buf[i]; @@ -134,7 +136,7 @@ void Server::listen() // this is accurate enough for the purpose of expiring unused buffers times[bufnum] = currenttime; } - } // lock is released + } // lock is released here because the block ends if (error && error != boost::asio::error::message_size) throw boost::system::system_error(error); @@ -165,14 +167,39 @@ void Server::mix() while(1) { - counter++; - frame_t frame, temp_frame; + frame_t temp_frame; // we lock the buffers for a long time, but we need to make sure // that none of the buffers is allowed to expire while we're working on it! { Glib::Mutex::Lock lock(mutex_); + displaycounter++; size = buffers.size(); + + // zero out the frame + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + { + for(int a = 0; a < CHANNELS; a++) + { + // do something interesting here + frame.windows[i][j][a] = 0; + } + } + } + + for(int w = 0; w < SEGWIDTH; w++ ) + { + for(int n = 0;n < SEGNUM; n++) + { + for(int a = 0; a < SEGCHANNELS; a++) + { + frame.segments[w][n][a] = 0; + } + } + } + for(int x = 0; x < size; x++) { temp_frame = buffers[x]->get(); @@ -200,7 +227,7 @@ void Server::mix() } } - /*if( counter % 100 == 0 ) + /*if( counter % 100 == 0 && x == size-1 ) { cout << counter << endl; for(int i = 0; i < HEIGHT; i++) @@ -231,10 +258,81 @@ void Server::mix() void Server::console() { + initscr(); + keypad(stdscr,TRUE); + cbreak(); + noecho(); + { + Glib::Mutex::Lock lock(mutex_); + consoleinit = true; + threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::input), false) ); + } + while(1) { - usleep( 100000 ); + { + // we'll be accessing some data to provide statistics, lock the Server + Glib::Mutex::Lock lock(mutex_); + mvprintw(0,0,"Clients\t %d \t| F1 Frame | F2 Stats | F3 Clients | input: %d ", buffers.size(),console_input ); + switch(console_input) + { + case KEY_F(1): + console_printframe(); + default: + console_printframe(); + } + refresh(); /* Print it on to the real screen */ + } + usleep( 20000 ); } + endwin(); /* End curses mode */ +} + +void Server::input() +{ + int c; + while(consoleinit) + { + // getch will wait for input, so loop will not lock up cpu + c = getch(); + + // now we need to lock data structure because we're going to use shared objects + { + Glib::Mutex::Lock lock(mutex_); + console_input = c; + } + + } +} + +/* the console functions should only be used in the console thread, they don't + * implement their own locking and they need ncurses to be initialised */ +void Server::console_printframe() +{ + // output the current screen contents + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + { + mvprintw(i+2,j,"%c",frame.windows[i][j][0]); + } + } + + for(int w = 0; w < SEGWIDTH; w++) + { + for(int n = 0; n < SEGNUM; n++) + { + for(int a = 0; a < SEGCHANNELS; a++) + { + mvprintw(HEIGHT+3+w,(n*SEGCHANNELS)+a,"%c",frame.segments[w][n][a]); + } + } + } + +} + +void Server::console_printstats() +{ } int Server::get_size() @@ -258,13 +356,13 @@ void Server::expire() { if( difftime( currenttime, times[i] ) > BUFTIMEOUT ) { - cout << "buffer " << i << " will now expire\n"; + //cout << "buffer " << i << " will now expire\n"; delete buffers[i]; buffers.erase(buffers.begin()+i); times.erase(times.begin()+i); endpoints.erase(endpoints.begin()+i); - // element i has been deleted, i-- is required + // element i has been erased, i-- is required i--; } } diff --git a/frameserver/Server.h b/frameserver/Server.h index eb2bfac..f9f316b 100644 --- a/frameserver/Server.h +++ b/frameserver/Server.h @@ -11,6 +11,8 @@ #include #include +#include + #include "Buffers.h" #include "Buffer.h" #include "defines.h" @@ -24,16 +26,26 @@ class Server : public sigc::trackable public: Server(int _port); ~Server(); + + void launch_threads(); + +private: + void console(); + void input(); + void console_printframe(); + void console_printstats(); void listen(); void mix(); - void expire(); - void launch_threads(); - void console(); int get_size(); - -private: + void expire(); + + Glib::Mutex mutex_; + + bool consoleinit; + int console_input; + long displaycounter, packetcounter; vector threads; @@ -42,6 +54,8 @@ private: vector times; time_t currenttime; + + frame_t frame; int port; }; diff --git a/frameserver/defines.h b/frameserver/defines.h index 5caa81c..a5d39ad 100644 --- a/frameserver/defines.h +++ b/frameserver/defines.h @@ -2,9 +2,9 @@ #define __DEFINES_H_ // four minutes should be enough -#define BUFTIMEOUT 240 +#define BUFTIMEOUT 2 -#define NUMBUFS 200 +#define NUMBUFS 1000 // one number + newline #define HEADEROFFSET 2 diff --git a/frameserver/launchpythons.sh b/frameserver/launchpythons.sh index 84454ab..15c78e4 100755 --- a/frameserver/launchpythons.sh +++ b/frameserver/launchpythons.sh @@ -1,6 +1,6 @@ #!/bin/bash -processes=100 +processes=80 i=0 while [ "$i" -lt "$processes" ] @@ -9,7 +9,7 @@ do pids=( ${pids[@]} $! ) echo ${pids["$i"]} i=$((i+1)) - sleep 0.1 + #sleep 0.1 done #wait for input to exit diff --git a/frameserver/maketest_server b/frameserver/maketest_server index 4bb8dac..336f556 100755 --- a/frameserver/maketest_server +++ b/frameserver/maketest_server @@ -1,2 +1,2 @@ #!/bin/bash -g++ -O0 -ggdb `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -lboost_system -o test_server Buffer.cpp Buffers.cpp Server.cpp test_server.cpp \ No newline at end of file +g++ -O2 -ggdb -lncurses `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -lboost_system -o test_server Buffer.cpp Buffers.cpp Server.cpp test_server.cpp \ No newline at end of file diff --git a/frameserver/servertest.py b/frameserver/servertest.py index 9a8a132..361257c 100644 --- a/frameserver/servertest.py +++ b/frameserver/servertest.py @@ -3,6 +3,7 @@ from socket import * import sys import time +import random # Set the socket parameters local_port = 5000 + int( sys.argv[1] ) @@ -25,13 +26,22 @@ z_buffer = "1" + "\n" data = z_buffer + display.read() +random.seed() + # Send messages +sleeptime = 0.0001 +frequency = 1/sleeptime +i = random.randint(0,frequency-1) while (1): if not data: break else: UDPSock.sendto(data,(remote_host,remote_port)) - #send ~100 packets per second - time.sleep(0.01) + #send ~100 packets per second + i+=1 + time.sleep(sleeptime) + if i >= frequency: + time.sleep(3) + i = random.randint(0,frequency-1) UDPSock.close()