diff --git a/frameserver/Buffer.cpp b/frameserver/Buffer.cpp new file mode 100644 index 0000000..1431a6e --- /dev/null +++ b/frameserver/Buffer.cpp @@ -0,0 +1,33 @@ +#include "Buffer.h" + +Buffer::Buffer( int _id ) +{ + id = _id; + + // allocate memory +} + +Buffer::~Buffer() +{ + // deallocate memory +} + +void Buffer::set(frame_t data) +{ + { + Glib::Mutex::Lock lock(mutex_); + frame = data; + } +} + +frame_t Buffer::get() +{ + Glib::Mutex::Lock lock(mutex_); + return frame; +} + +int Buffer::get_id() +{ + Glib::Mutex::Lock lock(mutex_); + return id; +} diff --git a/frameserver/Buffer.h b/frameserver/Buffer.h new file mode 100644 index 0000000..776fd77 --- /dev/null +++ b/frameserver/Buffer.h @@ -0,0 +1,25 @@ +#ifndef __BUFFER_H_ +#define __BUFFER_H_ +#include +#include "defines.h" + +/* This is a threadsafe wrapper around the "frame_t" struct + which automates locking during get and set. */ + +class Buffer : public sigc::trackable +{ + public: + Buffer(int _id); + ~Buffer(); + + void set(frame_t); + frame_t get(); + int get_id(); + + private: + int id; + frame_t frame; + + Glib::Mutex mutex_; +}; +#endif diff --git a/frameserver/Buffers.cpp b/frameserver/Buffers.cpp new file mode 100644 index 0000000..e66081b --- /dev/null +++ b/frameserver/Buffers.cpp @@ -0,0 +1,43 @@ +#include "Buffers.h" + +Buffers::Buffers() +{ + id = 0; +} + +Buffers::Buffers(int _bufnum) +{ + id = 0; + for( int i = 0; i < _bufnum; i++) + add(); +} + +Buffers::~Buffers() +{ + Glib::Mutex::Lock lock(mutex_); + buffers.clear(); +} + +Buffer* Buffers::get(int index) +{ + Glib::Mutex::Lock lock(mutex_); + return buffers[index]; +} + +void Buffers::add() +{ + Glib::Mutex::Lock lock(mutex_); + id += 1; + buffers.push_back( new Buffer(id) ); +} + +void Buffers::remove(int _id) +{ + Glib::Mutex::Lock lock(mutex_); + int size = buffers.size(); + for( int i = 0; i < size; i++ ) + { + if( buffers[i]->get_id() == _id ) + buffers.erase( buffers.begin()+i ); + } +} diff --git a/frameserver/Buffers.h b/frameserver/Buffers.h new file mode 100644 index 0000000..18f0d9a --- /dev/null +++ b/frameserver/Buffers.h @@ -0,0 +1,39 @@ +#ifndef __BUFFERS_H_ +#define __BUFFERS_H_ + +/* This is a thread-safe wrapper around a standard library + vector of pointers to "Buffer" objects. It automates locking during + all operations and generates "ID" hashes for the buffers upon creation. */ + +/* TODO: * create hashes during buffer creation + * throw and handle exceptions */ + +#include +#include + +#include + +#include "defines.h" +#include "Buffer.h" + +using namespace std; + +class Buffers : public sigc::trackable +{ + public: + Buffers(); + Buffers(int); + + ~Buffers(); + + void add(); + void remove(int); + + Buffer* get(int); + + private: + vector buffers; + int id; + Glib::Mutex mutex_; +}; +#endif diff --git a/frameserver/Server.cpp b/frameserver/Server.cpp new file mode 100644 index 0000000..c4f5633 --- /dev/null +++ b/frameserver/Server.cpp @@ -0,0 +1,153 @@ +#include "Server.h" + +Server::Server(int _numbufs, int _port ) +{ + for(int i = 0; i < _numbufs; i++) + buffers.push_back( new Buffer(i) ); + + port = _port; +} + +Server::~Server() +{ + Glib::Mutex::Lock lock(mutex_); + int size = buffers.size(); + for(int i = 0; i < size; i++) + delete buffers[i]; +} + +void Server::launch_threads() +{ + threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::listen), false ) ); + threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::mix), false ) ); +} + +void Server::listen() +{ + int packetcounter = 0; + try + { + boost::asio::io_service io_service; + + // next line is NOT thread-safe because we're accessing "port" without lock + udp::socket socket(io_service, udp::endpoint(udp::v4(), port)); + + cout << "listening" << endl; + for (;;) + { + frame_t frame; + boost::array recv_buf; + udp::endpoint remote_endpoint; + boost::system::error_code error; + + socket.receive_from(boost::asio::buffer(recv_buf), + remote_endpoint, 0, error); + + packetcounter++; + if( packetcounter % 1000 == 0 ) + cout << endl << packetcounter << endl; + + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + { + frame.windows[i][j] = recv_buf[2+i*(WIDTH+1)+j]; + } + } + + for(int i = 0; i < SEGWIDTH; i++ ) + { + frame.segments[i].r = recv_buf[2+(WIDTH+1)*HEIGHT+i]; + frame.segments[i].g = recv_buf[2+(WIDTH+1)*HEIGHT+(SEGWIDTH+1)*1+i]; + frame.segments[i].b = recv_buf[2+(WIDTH+1)*HEIGHT+(SEGWIDTH+1)*2+i]; + } + + // this part needs to be made threadsafe because buffers will be accessed + // by the mixer and the udp listener + { + Glib::Mutex::Lock lock(mutex_); + // convert ascii to integer value + if( recv_buf[0]-49 < buffers.size() ) + { + buffers[ recv_buf[0]-49 ]->set(frame); + } + } + + + if (error && error != boost::asio::error::message_size) + throw boost::system::system_error(error); + + std::string message = "received"; + + boost::system::error_code ignored_error; + //socket.send_to(boost::asio::buffer(message), + // remote_endpoint, 0, ignored_error); + } + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + } +} + +void Server::mix() +{ + int size = 0; + int counter = 0; + + while(1) + { + counter++; + frame_t frame, temp_frame; + { + Glib::Mutex::Lock lock(mutex_); + size = buffers.size(); + } + + for(int x = 0; x < 6; x++) + { + { + Glib::Mutex::Lock lock(mutex_); + temp_frame = buffers[x]->get(); + } + + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + { + frame.windows[i][j] = (frame.windows[i][j] + temp_frame.windows[i][j])/2; + } + } + + for(int i = 0; i < SEGWIDTH; i++) + { + frame.segments[i].r = (temp_frame.segments[i].r + frame.segments[i].r)/2; + frame.segments[i].g = (temp_frame.segments[i].g + frame.segments[i].g)/2; + frame.segments[i].b = (temp_frame.segments[i].b + frame.segments[i].b)/2; + } + } + + if( counter % 100 == 0 ) + { + cout << counter << endl; + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + { + cout << frame.windows[i][j]; + } + cout << endl; + } + cout << endl; + + for(int i = 0; i < SEGWIDTH; i++) + { + cout << frame.segments[i].r; + } + cout << endl << endl; //*/ + } + + usleep( 25000 ); + } +} + diff --git a/frameserver/Server.h b/frameserver/Server.h new file mode 100644 index 0000000..186fe1c --- /dev/null +++ b/frameserver/Server.h @@ -0,0 +1,36 @@ +#ifndef __SERVER_H_ +#define __SERVER_H_ + +#include +#include +#include +#include +#include + +#include "Buffers.h" +#include "Buffer.h" +#include "defines.h" + + +using boost::asio::ip::udp; +using namespace std; + +class Server : public sigc::trackable +{ +public: + Server(int _numbufs, int _port); + ~Server(); + + void listen(); + void mix(); + void launch_threads(); + +private: + Glib::Mutex mutex_; + vector threads; + vector buffers; + + int port; +}; + +#endif diff --git a/frameserver/client.c b/frameserver/client.c new file mode 100644 index 0000000..d1fcc18 --- /dev/null +++ b/frameserver/client.c @@ -0,0 +1,53 @@ +#include +#include +#include +#include +#include + +void error(char *msg) +{ + perror(msg); + exit(0); +} + +int main(int argc, char *argv[]) +{ + int sockfd, portno, n; + struct sockaddr_in serv_addr; + struct hostent *server; + + char buffer[256]; + if (argc < 3) { + fprintf(stderr,"usage %s hostname port\n", argv[0]); + exit(0); + } + portno = atoi(argv[2]); + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) + error("ERROR opening socket"); + server = gethostbyname(argv[1]); + if (server == NULL) { + fprintf(stderr,"ERROR, no such host\n"); + exit(0); + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + bcopy((char *)server->h_addr, + (char *)&serv_addr.sin_addr.s_addr, + server->h_length); + serv_addr.sin_port = htons(portno); + if (connect(sockfd,&serv_addr,sizeof(serv_addr)) < 0) + error("ERROR connecting"); + printf("Please enter the message: "); + bzero(buffer,256); + fgets(buffer,255,stdin); + n = write(sockfd,buffer,strlen(buffer)); + if (n < 0) + error("ERROR writing to socket"); + bzero(buffer,256); + n = read(sockfd,buffer,255); + if (n < 0) + error("ERROR reading from socket"); + printf("%s\n",buffer); + return 0; +} diff --git a/frameserver/defines.h b/frameserver/defines.h new file mode 100644 index 0000000..9a32e05 --- /dev/null +++ b/frameserver/defines.h @@ -0,0 +1,21 @@ +#ifndef __DEFINES_H_ +#define __DEFINES_H_ +#define BUFLEN 1024 +#define WIDTH 12 +#define HEIGHT 7 +#define SEGWIDTH 12 + +// not used for simplicity +//#define SEGHEIGHT 1 + +struct segment_t { + char r,g,b; +}; + +struct frame_t +{ + char windows[HEIGHT][WIDTH]; + segment_t segments[SEGWIDTH]; +}; + +#endif \ No newline at end of file diff --git a/frameserver/display b/frameserver/display new file mode 100644 index 0000000..a402e94 --- /dev/null +++ b/frameserver/display @@ -0,0 +1,10 @@ +123456789abc +123456789abc +123456789abc +123456789abc +123456789abc +123456789abc +123456789abc +123456789abc +123456789abc +123456789abc diff --git a/frameserver/maketest b/frameserver/maketest new file mode 100755 index 0000000..363baeb --- /dev/null +++ b/frameserver/maketest @@ -0,0 +1,2 @@ +#!/bin/bash +g++ -O2 -pipe -fomit-frame-pointer `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -o test Buffer.cpp Buffers.cpp test.cpp \ No newline at end of file diff --git a/frameserver/maketest_server b/frameserver/maketest_server new file mode 100755 index 0000000..4bb8dac --- /dev/null +++ b/frameserver/maketest_server @@ -0,0 +1,2 @@ +#!/bin/bash +g++ -O0 -ggdb `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -lboost_system -o test_server Buffer.cpp Buffers.cpp Server.cpp test_server.cpp \ No newline at end of file diff --git a/frameserver/server.c b/frameserver/server.c new file mode 100644 index 0000000..f7d4890 --- /dev/null +++ b/frameserver/server.c @@ -0,0 +1,49 @@ +/* A simple server in the internet domain using TCP + The port number is passed as an argument */ +#include +#include +#include +#include + +void error(char *msg) +{ + perror(msg); + exit(1); +} + +int main(int argc, char *argv[]) +{ + int sockfd, newsockfd, portno, clilen; + char buffer[256]; + struct sockaddr_in serv_addr, cli_addr; + int n; + if (argc < 2) { + fprintf(stderr,"ERROR, no port provided\n"); + exit(1); + } + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) + error("ERROR opening socket"); + bzero((char *) &serv_addr, sizeof(serv_addr)); + portno = atoi(argv[1]); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(portno); + if (bind(sockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) + error("ERROR on binding"); + listen(sockfd,5); + clilen = sizeof(cli_addr); + newsockfd = accept(sockfd, + (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd < 0) + error("ERROR on accept"); + bzero(buffer,256); + n = read(newsockfd,buffer,255); + if (n < 0) error("ERROR reading from socket"); + printf("Here is the message: %s\n",buffer); + n = write(newsockfd,"I got your message",18); + if (n < 0) error("ERROR writing to socket"); + return 0; +} diff --git a/frameserver/servertest.sh b/frameserver/servertest.sh new file mode 100644 index 0000000..7994744 --- /dev/null +++ b/frameserver/servertest.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +i="0" + +while [ 1 ] +do + echo "$1" | cat - display | netcat -q 0.1 -u 127.0.0.1 4321 +done \ No newline at end of file diff --git a/frameserver/test b/frameserver/test new file mode 100755 index 0000000..35086fa Binary files /dev/null and b/frameserver/test differ diff --git a/frameserver/test.cpp b/frameserver/test.cpp new file mode 100644 index 0000000..039190e --- /dev/null +++ b/frameserver/test.cpp @@ -0,0 +1,111 @@ +#include +#include +#include + +#include +#include + +#include "defines.h" +#include "Buffer.h" +#include "Buffers.h" + +#define NUMBUFS 100 +#define NUMTHREADS 20 +#define USECS 1000 + +using namespace std; + +// we use a pointer because we can only use objects from the threading library +// once the threading has been initialised, this is only done in main. +Buffers* buffers; + +void reader(void); +void writer(void); + +int main(void) +{ + srand( time(NULL) ); + Glib::thread_init(); + + // our main loop with support for signals and all that jazz + //Glib::RefPtr Main = Glib::MainLoop::create(); + + buffers = new Buffers(NUMBUFS); + + vector readers; + vector writers; + + for(int i = 0; i < NUMTHREADS; i++) + { + readers.push_back( Glib::Thread::create( sigc::ptr_fun( &reader), false ) ); + writers.push_back( Glib::Thread::create( sigc::ptr_fun( &writer), false ) ); + } + + //Main->run(); + + int count = 0; + while(1) { + count++; + if( count % 100 == 0 ) + { + frame_t frame = buffers->get(rand()%NUMBUFS)->get(); + for(int i = 0; i < HEIGHT; i++) + { + for(int j = 0; j < WIDTH; j++) + cout << ((frame.windows[i][j] >= 120) ? 'X' : '.'); + cout << endl; + } + cout << endl; + + for(int i = 0; i < SEGWIDTH; i++) + cout << frame.segments[i].r; + cout << endl << endl; + } + usleep( 10000 ); + }//*/ + + return 0; +} + +void reader(void) +{ + bool quit = false; + frame_t frame; + int bufnum = 0; + while( !quit ) + { + bufnum = rand()%NUMBUFS; + frame = buffers->get(bufnum)->get(); +// cout << "read " << bufnum << endl; + usleep( rand()%USECS ); + } +} + +void writer(void) +{ + frame_t frame; + bool quit = false; + int bufnum = 0; + while( !quit ) + { + bufnum = rand()%NUMBUFS; + for(int i = 0; i < 7; i++) + { + for(int j = 0; j < 12; j++) + { + frame.windows[i][j] = rand()%255; + } + } + + for(int i = 0; i < 12; i++) + { + frame.segments[i].r = 33+rand()%90; + frame.segments[i].g = 33+rand()%90; + frame.segments[i].b = 33+rand()%90; + } + + buffers->get(bufnum)->set( frame ); +// cout << "wrote " << bufnum << endl; + usleep( rand()%USECS ); + } +} diff --git a/frameserver/test_server b/frameserver/test_server new file mode 100755 index 0000000..b4d5db9 Binary files /dev/null and b/frameserver/test_server differ diff --git a/frameserver/test_server.cpp b/frameserver/test_server.cpp new file mode 100644 index 0000000..4914b3b --- /dev/null +++ b/frameserver/test_server.cpp @@ -0,0 +1,34 @@ +#include +#include +#include + +#include +#include + +#include "defines.h" + +#include "Server.h" + +#include "Buffer.h" +#include "Buffers.h" + +#define NUMBUFS 10 + +using namespace std; +Buffers* buffers; + +int main(void) +{ + srand( time(NULL) ); + Glib::thread_init(); + + // our main loop with support for signals and all that jazz + Glib::RefPtr Main = Glib::MainLoop::create(); + + Server server(10,4321); + server.launch_threads(); + + Main->run(); + + return 0; +}