renamed folder and added prototype implementation of a UDP server/mixer

with two threads, it works!
master
Bartosz Kostrzewa 2010-11-12 16:16:22 +01:00
parent 58fc1cb2a4
commit 3a95944858
17 changed files with 619 additions and 0 deletions

33
frameserver/Buffer.cpp Normal file
View File

@ -0,0 +1,33 @@
#include "Buffer.h"
Buffer::Buffer( int _id )
{
id = _id;
// allocate memory
}
Buffer::~Buffer()
{
// deallocate memory
}
void Buffer::set(frame_t data)
{
{
Glib::Mutex::Lock lock(mutex_);
frame = data;
}
}
frame_t Buffer::get()
{
Glib::Mutex::Lock lock(mutex_);
return frame;
}
int Buffer::get_id()
{
Glib::Mutex::Lock lock(mutex_);
return id;
}

25
frameserver/Buffer.h Normal file
View File

@ -0,0 +1,25 @@
#ifndef __BUFFER_H_
#define __BUFFER_H_
#include <glibmm.h>
#include "defines.h"
/* This is a threadsafe wrapper around the "frame_t" struct
which automates locking during get and set. */
class Buffer : public sigc::trackable
{
public:
Buffer(int _id);
~Buffer();
void set(frame_t);
frame_t get();
int get_id();
private:
int id;
frame_t frame;
Glib::Mutex mutex_;
};
#endif

43
frameserver/Buffers.cpp Normal file
View File

@ -0,0 +1,43 @@
#include "Buffers.h"
Buffers::Buffers()
{
id = 0;
}
Buffers::Buffers(int _bufnum)
{
id = 0;
for( int i = 0; i < _bufnum; i++)
add();
}
Buffers::~Buffers()
{
Glib::Mutex::Lock lock(mutex_);
buffers.clear();
}
Buffer* Buffers::get(int index)
{
Glib::Mutex::Lock lock(mutex_);
return buffers[index];
}
void Buffers::add()
{
Glib::Mutex::Lock lock(mutex_);
id += 1;
buffers.push_back( new Buffer(id) );
}
void Buffers::remove(int _id)
{
Glib::Mutex::Lock lock(mutex_);
int size = buffers.size();
for( int i = 0; i < size; i++ )
{
if( buffers[i]->get_id() == _id )
buffers.erase( buffers.begin()+i );
}
}

39
frameserver/Buffers.h Normal file
View File

@ -0,0 +1,39 @@
#ifndef __BUFFERS_H_
#define __BUFFERS_H_
/* This is a thread-safe wrapper around a standard library
vector of pointers to "Buffer" objects. It automates locking during
all operations and generates "ID" hashes for the buffers upon creation. */
/* TODO: * create hashes during buffer creation
* throw and handle exceptions */
#include <glibmm.h>
#include <vector>
#include <time.h>
#include "defines.h"
#include "Buffer.h"
using namespace std;
class Buffers : public sigc::trackable
{
public:
Buffers();
Buffers(int);
~Buffers();
void add();
void remove(int);
Buffer* get(int);
private:
vector<Buffer*> buffers;
int id;
Glib::Mutex mutex_;
};
#endif

153
frameserver/Server.cpp Normal file
View File

@ -0,0 +1,153 @@
#include "Server.h"
Server::Server(int _numbufs, int _port )
{
for(int i = 0; i < _numbufs; i++)
buffers.push_back( new Buffer(i) );
port = _port;
}
Server::~Server()
{
Glib::Mutex::Lock lock(mutex_);
int size = buffers.size();
for(int i = 0; i < size; i++)
delete buffers[i];
}
void Server::launch_threads()
{
threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::listen), false ) );
threads.push_back( Glib::Thread::create( sigc::mem_fun(this, &Server::mix), false ) );
}
void Server::listen()
{
int packetcounter = 0;
try
{
boost::asio::io_service io_service;
// next line is NOT thread-safe because we're accessing "port" without lock
udp::socket socket(io_service, udp::endpoint(udp::v4(), port));
cout << "listening" << endl;
for (;;)
{
frame_t frame;
boost::array<char, 200> recv_buf;
udp::endpoint remote_endpoint;
boost::system::error_code error;
socket.receive_from(boost::asio::buffer(recv_buf),
remote_endpoint, 0, error);
packetcounter++;
if( packetcounter % 1000 == 0 )
cout << endl << packetcounter << endl;
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{
frame.windows[i][j] = recv_buf[2+i*(WIDTH+1)+j];
}
}
for(int i = 0; i < SEGWIDTH; i++ )
{
frame.segments[i].r = recv_buf[2+(WIDTH+1)*HEIGHT+i];
frame.segments[i].g = recv_buf[2+(WIDTH+1)*HEIGHT+(SEGWIDTH+1)*1+i];
frame.segments[i].b = recv_buf[2+(WIDTH+1)*HEIGHT+(SEGWIDTH+1)*2+i];
}
// this part needs to be made threadsafe because buffers will be accessed
// by the mixer and the udp listener
{
Glib::Mutex::Lock lock(mutex_);
// convert ascii to integer value
if( recv_buf[0]-49 < buffers.size() )
{
buffers[ recv_buf[0]-49 ]->set(frame);
}
}
if (error && error != boost::asio::error::message_size)
throw boost::system::system_error(error);
std::string message = "received";
boost::system::error_code ignored_error;
//socket.send_to(boost::asio::buffer(message),
// remote_endpoint, 0, ignored_error);
}
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void Server::mix()
{
int size = 0;
int counter = 0;
while(1)
{
counter++;
frame_t frame, temp_frame;
{
Glib::Mutex::Lock lock(mutex_);
size = buffers.size();
}
for(int x = 0; x < 6; x++)
{
{
Glib::Mutex::Lock lock(mutex_);
temp_frame = buffers[x]->get();
}
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{
frame.windows[i][j] = (frame.windows[i][j] + temp_frame.windows[i][j])/2;
}
}
for(int i = 0; i < SEGWIDTH; i++)
{
frame.segments[i].r = (temp_frame.segments[i].r + frame.segments[i].r)/2;
frame.segments[i].g = (temp_frame.segments[i].g + frame.segments[i].g)/2;
frame.segments[i].b = (temp_frame.segments[i].b + frame.segments[i].b)/2;
}
}
if( counter % 100 == 0 )
{
cout << counter << endl;
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
{
cout << frame.windows[i][j];
}
cout << endl;
}
cout << endl;
for(int i = 0; i < SEGWIDTH; i++)
{
cout << frame.segments[i].r;
}
cout << endl << endl; //*/
}
usleep( 25000 );
}
}

36
frameserver/Server.h Normal file
View File

@ -0,0 +1,36 @@
#ifndef __SERVER_H_
#define __SERVER_H_
#include <iostream>
#include <string>
#include <vector>
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include "Buffers.h"
#include "Buffer.h"
#include "defines.h"
using boost::asio::ip::udp;
using namespace std;
class Server : public sigc::trackable
{
public:
Server(int _numbufs, int _port);
~Server();
void listen();
void mix();
void launch_threads();
private:
Glib::Mutex mutex_;
vector<Glib::Thread*> threads;
vector<Buffer*> buffers;
int port;
};
#endif

53
frameserver/client.c Normal file
View File

@ -0,0 +1,53 @@
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
void error(char *msg)
{
perror(msg);
exit(0);
}
int main(int argc, char *argv[])
{
int sockfd, portno, n;
struct sockaddr_in serv_addr;
struct hostent *server;
char buffer[256];
if (argc < 3) {
fprintf(stderr,"usage %s hostname port\n", argv[0]);
exit(0);
}
portno = atoi(argv[2]);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
error("ERROR opening socket");
server = gethostbyname(argv[1]);
if (server == NULL) {
fprintf(stderr,"ERROR, no such host\n");
exit(0);
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
bcopy((char *)server->h_addr,
(char *)&serv_addr.sin_addr.s_addr,
server->h_length);
serv_addr.sin_port = htons(portno);
if (connect(sockfd,&serv_addr,sizeof(serv_addr)) < 0)
error("ERROR connecting");
printf("Please enter the message: ");
bzero(buffer,256);
fgets(buffer,255,stdin);
n = write(sockfd,buffer,strlen(buffer));
if (n < 0)
error("ERROR writing to socket");
bzero(buffer,256);
n = read(sockfd,buffer,255);
if (n < 0)
error("ERROR reading from socket");
printf("%s\n",buffer);
return 0;
}

21
frameserver/defines.h Normal file
View File

@ -0,0 +1,21 @@
#ifndef __DEFINES_H_
#define __DEFINES_H_
#define BUFLEN 1024
#define WIDTH 12
#define HEIGHT 7
#define SEGWIDTH 12
// not used for simplicity
//#define SEGHEIGHT 1
struct segment_t {
char r,g,b;
};
struct frame_t
{
char windows[HEIGHT][WIDTH];
segment_t segments[SEGWIDTH];
};
#endif

10
frameserver/display Normal file
View File

@ -0,0 +1,10 @@
123456789abc
123456789abc
123456789abc
123456789abc
123456789abc
123456789abc
123456789abc
123456789abc
123456789abc
123456789abc

2
frameserver/maketest Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
g++ -O2 -pipe -fomit-frame-pointer `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -o test Buffer.cpp Buffers.cpp test.cpp

2
frameserver/maketest_server Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
g++ -O0 -ggdb `pkg-config --libs --cflags glibmm-2.4` `pkg-config --libs --cflags gthread-2.0` -lboost_system -o test_server Buffer.cpp Buffers.cpp Server.cpp test_server.cpp

49
frameserver/server.c Normal file
View File

@ -0,0 +1,49 @@
/* A simple server in the internet domain using TCP
The port number is passed as an argument */
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
void error(char *msg)
{
perror(msg);
exit(1);
}
int main(int argc, char *argv[])
{
int sockfd, newsockfd, portno, clilen;
char buffer[256];
struct sockaddr_in serv_addr, cli_addr;
int n;
if (argc < 2) {
fprintf(stderr,"ERROR, no port provided\n");
exit(1);
}
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
error("ERROR opening socket");
bzero((char *) &serv_addr, sizeof(serv_addr));
portno = atoi(argv[1]);
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portno);
if (bind(sockfd, (struct sockaddr *) &serv_addr,
sizeof(serv_addr)) < 0)
error("ERROR on binding");
listen(sockfd,5);
clilen = sizeof(cli_addr);
newsockfd = accept(sockfd,
(struct sockaddr *) &cli_addr,
&clilen);
if (newsockfd < 0)
error("ERROR on accept");
bzero(buffer,256);
n = read(newsockfd,buffer,255);
if (n < 0) error("ERROR reading from socket");
printf("Here is the message: %s\n",buffer);
n = write(newsockfd,"I got your message",18);
if (n < 0) error("ERROR writing to socket");
return 0;
}

View File

@ -0,0 +1,8 @@
#!/bin/bash
i="0"
while [ 1 ]
do
echo "$1" | cat - display | netcat -q 0.1 -u 127.0.0.1 4321
done

BIN
frameserver/test Executable file

Binary file not shown.

111
frameserver/test.cpp Normal file
View File

@ -0,0 +1,111 @@
#include <glibmm.h>
#include <vector>
#include <iostream>
#include <time.h>
#include <stdlib.h>
#include "defines.h"
#include "Buffer.h"
#include "Buffers.h"
#define NUMBUFS 100
#define NUMTHREADS 20
#define USECS 1000
using namespace std;
// we use a pointer because we can only use objects from the threading library
// once the threading has been initialised, this is only done in main.
Buffers* buffers;
void reader(void);
void writer(void);
int main(void)
{
srand( time(NULL) );
Glib::thread_init();
// our main loop with support for signals and all that jazz
//Glib::RefPtr<Glib::MainLoop> Main = Glib::MainLoop::create();
buffers = new Buffers(NUMBUFS);
vector<Glib::Thread*> readers;
vector<Glib::Thread*> writers;
for(int i = 0; i < NUMTHREADS; i++)
{
readers.push_back( Glib::Thread::create( sigc::ptr_fun( &reader), false ) );
writers.push_back( Glib::Thread::create( sigc::ptr_fun( &writer), false ) );
}
//Main->run();
int count = 0;
while(1) {
count++;
if( count % 100 == 0 )
{
frame_t frame = buffers->get(rand()%NUMBUFS)->get();
for(int i = 0; i < HEIGHT; i++)
{
for(int j = 0; j < WIDTH; j++)
cout << ((frame.windows[i][j] >= 120) ? 'X' : '.');
cout << endl;
}
cout << endl;
for(int i = 0; i < SEGWIDTH; i++)
cout << frame.segments[i].r;
cout << endl << endl;
}
usleep( 10000 );
}//*/
return 0;
}
void reader(void)
{
bool quit = false;
frame_t frame;
int bufnum = 0;
while( !quit )
{
bufnum = rand()%NUMBUFS;
frame = buffers->get(bufnum)->get();
// cout << "read " << bufnum << endl;
usleep( rand()%USECS );
}
}
void writer(void)
{
frame_t frame;
bool quit = false;
int bufnum = 0;
while( !quit )
{
bufnum = rand()%NUMBUFS;
for(int i = 0; i < 7; i++)
{
for(int j = 0; j < 12; j++)
{
frame.windows[i][j] = rand()%255;
}
}
for(int i = 0; i < 12; i++)
{
frame.segments[i].r = 33+rand()%90;
frame.segments[i].g = 33+rand()%90;
frame.segments[i].b = 33+rand()%90;
}
buffers->get(bufnum)->set( frame );
// cout << "wrote " << bufnum << endl;
usleep( rand()%USECS );
}
}

BIN
frameserver/test_server Executable file

Binary file not shown.

View File

@ -0,0 +1,34 @@
#include <glibmm.h>
#include <vector>
#include <iostream>
#include <time.h>
#include <stdlib.h>
#include "defines.h"
#include "Server.h"
#include "Buffer.h"
#include "Buffers.h"
#define NUMBUFS 10
using namespace std;
Buffers* buffers;
int main(void)
{
srand( time(NULL) );
Glib::thread_init();
// our main loop with support for signals and all that jazz
Glib::RefPtr<Glib::MainLoop> Main = Glib::MainLoop::create();
Server server(10,4321);
server.launch_threads();
Main->run();
return 0;
}