first client and improvements to server and servertest

master
Bartek Kostrzewa 2010-11-16 20:27:22 +01:00
parent 8c799c037c
commit dbfb3ecc2c
8 changed files with 222 additions and 24 deletions

64
clients/plasma.py Normal file
View File

@ -0,0 +1,64 @@
# Client program
from socket import *
import sys
import time
import random
from math import *
# Set the socket parameters
local_port = 5000
remote_port = 4321
# TODO: autodetect interface address for remote application
outgoing_if = "127.0.0.1"
remote_host = "127.0.0.1"
# udp is the default for DGRAM
UDPSock = socket(AF_INET, SOCK_DGRAM)
UDPSock.bind((outgoing_if, local_port))
# we will not use connections so we can keep working even if the server
# goes down or refuses connection
#UDPSock.connect((remote_host, remote_port))
segments = open('segments','r')
alpha = 1
z_buffer = "1" + "\n"
width = 7
height = 12
# Send messages
sleeptime = 0.04
t = 0
frequency = 2*pi/40
while (1):
#zero out the data buffer
data = z_buffer
for i in range(0,width):
for j in range(0,height):
pixel = sin(1.5*pi*(float(i)/width)+t*frequency)*sin(1.5*pi*(float(j)/height)+t*frequency)
if pixel < 0.25:
pixel = '.'
elif pixel < 0.5:
pixel = '-'
elif pixel < 0.75:
pixel = '+'
elif pixel <= 1.0:
pixel = '#'
else:
pixel = '*'
data = data + pixel + str(alpha)
data = data + "\n"
t+=1
#data += segments.read()
if not data:
break
else:
UDPSock.sendto(data,(remote_host,remote_port))
#send ~100 packets per second
time.sleep(sleeptime)
UDPSock.close()

12
clients/segments Normal file
View File

@ -0,0 +1,12 @@
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba
rgbargbargbargbargbargbargbargba

View File

@ -3,6 +3,9 @@
Server::Server(int _port )
{
port = _port;
displaycounter = 0;
packetcounter = 0;
consoleinit = false;
}
Server::~Server()
@ -33,7 +36,6 @@ void Server::launch_threads()
void Server::listen()
{
long packetcounter = 0;
try
{
boost::asio::io_service io_service;
@ -43,7 +45,7 @@ void Server::listen()
// should be safe
udp::socket socket(io_service, udp::endpoint(udp::v4(), port));
cout << "listening" << endl;
//cout << "listening" << endl;
frame_t frame;
while (1)
{
@ -80,7 +82,7 @@ void Server::listen()
// create a new buffer make a note of the endpoint
std::stringstream endpointstring;
endpointstring << remote_endpoint;
cout << "adding new buffer for " << remote_endpoint << endl;
//cout << "adding new buffer for " << remote_endpoint << endl;
buffers.push_back( new Buffer( endpointstring.str() ) );
endpoints.push_back( remote_endpoint );
times.push_back( currenttime );
@ -89,13 +91,13 @@ void Server::listen()
// discard packet, we're not accepting any more sources!
else if( !known && size >= NUMBUFS )
{
cout << "no more buffers left! " << bufnum << endl;
//cout << "no more buffers left! " << bufnum << endl;
continue;
}
if( packetcounter % 10000 == 0 )
{
cout << endl << "packets received " << packetcounter << endl;
//cout << endl << "packets received " << packetcounter << endl;
/*cout << remote_endpoint << endl;
for(int i = 0; i < BUFLEN; i++)
cout << recv_buf[i];
@ -134,7 +136,7 @@ void Server::listen()
// this is accurate enough for the purpose of expiring unused buffers
times[bufnum] = currenttime;
}
} // lock is released
} // lock is released here because the block ends
if (error && error != boost::asio::error::message_size)
throw boost::system::system_error(error);
@ -165,14 +167,39 @@ void Server::mix()
while(1)
{
counter++;
frame_t frame, temp_frame;
frame_t temp_frame;
// we lock the buffers for a long time, but we need to make sure
// that none of the buffers is allowed to expire while we're working on it!
{
Glib::Mutex::Lock lock(mutex_);
displaycounter++;
size = buffers.size();
// zero out the frame
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{
for(int a = 0; a < CHANNELS; a++)
{
// do something interesting here
frame.windows[i][j][a] = 0;
}
}
}
for(int w = 0; w < SEGWIDTH; w++ )
{
for(int n = 0;n < SEGNUM; n++)
{
for(int a = 0; a < SEGCHANNELS; a++)
{
frame.segments[w][n][a] = 0;
}
}
}
for(int x = 0; x < size; x++)
{
temp_frame = buffers[x]->get();
@ -200,7 +227,7 @@ void Server::mix()
}
}
/*if( counter % 100 == 0 )
/*if( counter % 100 == 0 && x == size-1 )
{
cout << counter << endl;
for(int i = 0; i < HEIGHT; i++)
@ -231,10 +258,81 @@ void Server::mix()
void Server::console()
{
initscr();
keypad(stdscr,TRUE);
cbreak();
noecho();
{
Glib::Mutex::Lock lock(mutex_);
consoleinit = true;
threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::input), false) );
}
while(1)
{
usleep( 100000 );
{
// we'll be accessing some data to provide statistics, lock the Server
Glib::Mutex::Lock lock(mutex_);
mvprintw(0,0,"Clients\t %d \t| F1 Frame | F2 Stats | F3 Clients | input: %d ", buffers.size(),console_input );
switch(console_input)
{
case KEY_F(1):
console_printframe();
default:
console_printframe();
}
refresh(); /* Print it on to the real screen */
}
usleep( 20000 );
}
endwin(); /* End curses mode */
}
void Server::input()
{
int c;
while(consoleinit)
{
// getch will wait for input, so loop will not lock up cpu
c = getch();
// now we need to lock data structure because we're going to use shared objects
{
Glib::Mutex::Lock lock(mutex_);
console_input = c;
}
}
}
/* the console functions should only be used in the console thread, they don't
* implement their own locking and they need ncurses to be initialised */
void Server::console_printframe()
{
// output the current screen contents
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{
mvprintw(i+2,j,"%c",frame.windows[i][j][0]);
}
}
for(int w = 0; w < SEGWIDTH; w++)
{
for(int n = 0; n < SEGNUM; n++)
{
for(int a = 0; a < SEGCHANNELS; a++)
{
mvprintw(HEIGHT+3+w,(n*SEGCHANNELS)+a,"%c",frame.segments[w][n][a]);
}
}
}
}
void Server::console_printstats()
{
}
int Server::get_size()
@ -258,13 +356,13 @@ void Server::expire()
{
if( difftime( currenttime, times[i] ) > BUFTIMEOUT )
{
cout << "buffer " << i << " will now expire\n";
//cout << "buffer " << i << " will now expire\n";
delete buffers[i];
buffers.erase(buffers.begin()+i);
times.erase(times.begin()+i);
endpoints.erase(endpoints.begin()+i);
// element i has been deleted, i-- is required
// element i has been erased, i-- is required
i--;
}
}

View File

@ -11,6 +11,8 @@
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <ncurses.h>
#include "Buffers.h"
#include "Buffer.h"
#include "defines.h"
@ -24,16 +26,26 @@ class Server : public sigc::trackable
public:
Server(int _port);
~Server();
void launch_threads();
private:
void console();
void input();
void console_printframe();
void console_printstats();
void listen();
void mix();
void expire();
void launch_threads();
void console();
int get_size();
private:
void expire();
Glib::Mutex mutex_;
bool consoleinit;
int console_input;
long displaycounter, packetcounter;
vector<Glib::Thread*> threads;
@ -42,6 +54,8 @@ private:
vector<time_t> times;
time_t currenttime;
frame_t frame;
int port;
};

View File

@ -2,9 +2,9 @@
#define __DEFINES_H_
// four minutes should be enough
#define BUFTIMEOUT 240
#define BUFTIMEOUT 2
#define NUMBUFS 200
#define NUMBUFS 1000
// one number + newline
#define HEADEROFFSET 2

View File

@ -1,6 +1,6 @@
#!/bin/bash
processes=100
processes=80
i=0
while [ "$i" -lt "$processes" ]
@ -9,7 +9,7 @@ do
pids=( ${pids[@]} $! )
echo ${pids["$i"]}
i=$((i+1))
sleep 0.1
#sleep 0.1
done
#wait for input to exit

View File

@ -1,2 +1,2 @@
#!/bin/bash
g++ -O0 -ggdb `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -lboost_system -o test_server Buffer.cpp Buffers.cpp Server.cpp test_server.cpp
g++ -O2 -ggdb -lncurses `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -lboost_system -o test_server Buffer.cpp Buffers.cpp Server.cpp test_server.cpp

View File

@ -3,6 +3,7 @@
from socket import *
import sys
import time
import random
# Set the socket parameters
local_port = 5000 + int( sys.argv[1] )
@ -25,13 +26,22 @@ z_buffer = "1" + "\n"
data = z_buffer + display.read()
random.seed()
# Send messages
sleeptime = 0.0001
frequency = 1/sleeptime
i = random.randint(0,frequency-1)
while (1):
if not data:
break
else:
UDPSock.sendto(data,(remote_host,remote_port))
#send ~100 packets per second
time.sleep(0.01)
#send ~100 packets per second
i+=1
time.sleep(sleeptime)
if i >= frequency:
time.sleep(3)
i = random.randint(0,frequency-1)
UDPSock.close()