Support of unknown number of feeders + display log messages

pull/18/head
= 2017-09-11 14:53:06 +02:00
parent 3d30846662
commit 0a172be26c
4 changed files with 229 additions and 81 deletions

102
server.py
View File

@ -1,36 +1,52 @@
#!/usr/bin/env python3.5
from flask import Flask, render_template, Response
import json
import redis
import configparser
from time import gmtime as now
from time import sleep, strftime
import os
configfile = os.path.join(os.environ['VIRTUAL_ENV'], '../config.cfg')
cfg = configparser.ConfigParser()
cfg.read(configfile)
app = Flask(__name__)
FIELDNAME = {
'time': 'Time',
'level': 'Level',
'source': 'Source',
'name': 'Name',
'message': 'Message'
}
redis_server = redis.StrictRedis(
host=cfg.get('Redis', 'host'),
port=cfg.getint('Redis', 'port'),
db=cfg.getint('Redis', 'db'))
FIELDNAME_ORDER = [ 'time', 'level', 'source', 'name', 'message' ]
subscriber = redis_server.pubsub(ignore_subscribe_messages=True)
subscriber.psubscribe(cfg.get('Redis', 'channel'))
eventNumber = 0
class LogRow():
def __init__(self, feed='', time='', level='level', src='source', name='name', message='wonderful meesage'):
class LogItem():
FIELDNAME_ORDER = [ 'time', 'level', 'source', 'name', 'message' ]
#def __init__(self, feed='', time='', level='level', src='source', name='name', message='wonderful meesage'):
def __init__(self, feed):
# Parse feed message
# Assign potential supplied values
self.time = time if time != '' else strftime("%H:%M:%S", now())
self.level = level
self.source = src
self.name = name
self.message = message
## Assign potential supplied values
#self.time = time if time != '' else strftime("%H:%M:%S", now())
#self.level = level
#self.source = src
#self.name = name
#self.message = message
self.time = strftime("%H:%M:%S", now())
self.level = 'level'
self.source = 'src'
self.name = 'name'
self.message = feed
def get_head_row(self):
to_ret = []
for fn in FIELDNAME_ORDER:
to_ret.append(FIELDNAME[fn])
for fn in LogItem.FIELDNAME_ORDER:
to_ret.append(fn[0].upper()+fn[1:])
return to_ret
def get_row(self):
@ -45,26 +61,32 @@ class LogRow():
class EventMessage():
# Suppose the event message is a json with the format {name: 'feedName', log:'logData'}
def __init__(self, msg):
self.feed = None
self.isLog = EventMessage.is_log(msg)
msg = msg.decode('utf8')
try:
jsonMsg = json.loads(msg)
except json.JSONDecodeError:
jsonMsg = { 'name': "undefined" ,'log': msg }
self.feedName = jsonMsg['name']
self.feed = jsonMsg['log']
self.feed = LogItem(jsonMsg['log']).get_row()
#get type of message: log or feed, then create
if self.isLog:
self.feed = msg
#FIXME do parser
self.feed = LogRow(feed=msg).get_row()
else:
self.feed = {feed.name: feed.data}
def is_log(msg):
return True
#if self.isLog:
# self.feed = msg
# #FIXME do parser
# self.feed = LogItem(feed=msg).get_row()
#else:
# #FIXME do parser
# temp = []
# for feed in msg:
# temp.append(feed.name, feed.data)
# self.feed = temp
def to_json(self):
if self.isLog:
to_ret = { 'log': self.feed, 'chart': "" }
else:
to_ret = { 'log': "", 'chart': self.feed }
to_ret = { 'log': self.feed, 'feedName': self.feedName }
return 'data: {}\n\n'.format(json.dumps(to_ret))
@app.route("/")
@ -77,16 +99,12 @@ def logs():
@app.route("/_get_log_head")
def getLogHead():
return json.dumps(LogRow().get_head_row())
return json.dumps(LogItem('').get_head_row())
def event_stream():
#for msg in pubsub:
for i in range(3):
msg = now()
sleep(0.3)
print('sending', EventMessage(msg).to_json())
yield EventMessage(msg).to_json()
for msg in subscriber.listen():
content = msg['data']
yield EventMessage(content).to_json()
if __name__ == '__main__':
app.run(host='localhost', port=8000, debug=True)
app.run(host='localhost', port=8000)

View File

@ -1,19 +1,104 @@
//color: #5f6062
var updateInterval = 1000; // 1s
var numPoint = 60;
var updateInterval = 30; // 30ms
var numPoint = 10;
var emptyArray = [];
var dataNumLog = [];
for(i=0; i<numPoint; i++) {
emptyArray.push(0);
dataNumLog.push([i, 0]);
emptyArray.push([i, 0]);
}
var data = { 'undefined': { label: 'undefined', data: [] } };
var curMax = { 'undefined': [] };
class Sources {
constructor() {
this._sourcesArray = {};
this._sourcesCount = {};
this._sourcesCountMax = {};
this._globalMax = 0;
this._sourceNames = [];
}
addSource(sourceName) {
this._sourcesArray[sourceName] = emptyArray;
this._sourcesCount[sourceName] = 0;
this._sourcesCountMax[sourceName] = 0;
this._sourceNames.push(sourceName);
}
addIfNotPresent(sourceName) {
if (this._sourceNames.indexOf(sourceName) == -1) {
this.addSource(sourceName);
}
}
incCountOnSource(sourceName) {
this._sourcesCount[sourceName] += 1;
}
resetCountOnSource() {
for (var src of this._sourceNames) {
this._sourcesCount[src] = 0;
}
}
slideSource() {
var globMax = 0;
for (var src of this._sourceNames) {
// res[0] = max, res[1] = slidedArray
var res = slide(this._sourcesArray[src], this._sourcesCount[src]);
// max
this._sourcesCountMax[src] = res[0];
globMax = globMax > res[0] ? globMax : res[0];
// data
this._sourcesArray[src] = res[1];
}
this._globalMax = globMax;
}
toArray() {
var to_return = [];
for (var src of this._sourceNames) {
to_return.push({
label: src,
data: this._sourcesArray[src]
});
}
return to_return;
}
getGlobalMax() {
return this._globalMax;
}
getSingleSource(sourceName) {
return this._sourcesArray[sourceName];
}
getEmptyData() {
return [{label: 'no data', data: emptyArray}];
}
}
var sources = new Sources();
sources.addSource('global');
var curNumLog = 0;
var curMaxDataNumLog = 0;
var optionsGraph = {
series: {
shadowSize: 0 ,
lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }}
lines: {
fill: true,
fillColor: {
colors: [ { opacity: 1 }, { opacity: 0.1 } ]
}
}
},
//colors: ["#2fa1db"],
yaxis: { min: 0, max: 20 },
xaxis: { ticks: [[0, 0], [1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6], [7, 7], [8, 8], [9, 9], [10, 10]] },
xaxis: { min: 0, max: numPoint-1 },
ticks: numPoint,
grid: {
tickColor: "#dddddd",
borderWidth: 0
@ -39,8 +124,7 @@ $(document).ready(function () {
source.onmessage = function(event) {
var json = jQuery.parseJSON( event.data );
updateLogTable(json.log);
//updateChartData(json.chart);
updateLogTable(json.feedName, json.log);
};
} else {
@ -50,37 +134,39 @@ $(document).ready(function () {
});
//var plot = $.plot("#placeholder", [ [] ], optionsGraph);
//updateChart()
//var plot1 = $.plot("#feedDiv1", [ { label: "Number of log messages", data: dataNumLog } ], optionsGraph);
var plot1 = $.plot("#feedDiv1", sources.getEmptyData(), optionsGraph);
updateChart()
function updateChart() {
plot.setData(data);
plot.getOptions().yaxes[0].max = curMax[dataset];
plot.setupGrid();
plot.draw();
setTimeout(update, updateInterval);
updateChart1();
updateChart2();
setTimeout(updateChart, updateInterval);
}
function updateChartData(feed) {
if (feed.length == 0)
return;
function updateChart1() {
sources.slideSource();
sources.resetCountOnSource();
plot1.setData(sources.toArray());
plot1.getOptions().yaxes[0].max = sources.getGlobalMax();
plot1.setupGrid();
plot1.draw();
}
for (feedName in feed) {
console.log(feedName.name);
if (data[feedName.name] === undefined) {
data[feedName.name] = {};
}
data[feedName.name].data = slide(data[feedName.name].data, feedName.data)
}
function updateChart2() {
}
function updateLogTable(log) {
function updateLogTable(feedName, log) {
if (log.length == 0)
return;
// Create new row
tableBody = document.getElementById('table_log_body');
//curNumLog++;
sources.addIfNotPresent(feedName);
sources.incCountOnSource(feedName);
sources.incCountOnSource('global');
createRow(tableBody, log);
// Remove old row
@ -94,10 +180,16 @@ function updateLogTable(log) {
}
function slide(orig, newData) {
var slided = orig;
slided.slice(newData.length);
slided.concat(newData);
return slided
var slided = [];
var max = newData;
for (i=1; i<orig.length; i++) {
y = orig[i][1];
slided.push([i-1, y]);
max = y > max ? y : max;
}
slided.push([orig.length-1, newData]);
curMaxDataNumLog = max;
return [curMaxDataNumLog, slided];
}
function createRow(tableBody, log) {

View File

@ -36,14 +36,14 @@
<div id="page-wrapper">
<div class="container-fluid">
<div class="row">
<div class="col-lg-12">
<div class="col-lg-6">
<div class="panel panel-default" style="margin-top: 15px;">
<div class="panel-heading">
<i class="fa fa-bar-chart-o fa-fw"></i> Feed
<i class="fa fa-bar-chart-o fa-fw"></i> Log feed
</div>
<div id="panelbody" class="panel-body">
<div id="feedDiv" style="height: 100%; position: relative;"></div>
<div id="feedDiv1" style="width:100%; height:300px; position: relative;"></div>
</div>
<!-- /.panel-body -->
@ -51,7 +51,24 @@
<!-- /.panel -->
</div>
<!-- /.col-lg-12 -->
<!-- /.col-lg-6 -->
<!-- /.col-lg-6 -->
<div class="col-lg-6">
<div class="panel panel-default" style="margin-top: 15px;">
<div class="panel-heading">
<i class="fa fa-bar-chart-o fa-fw"></i> Feed
</div>
<div id="panelbody" class="panel-body">
<div id="feedDiv2" style="width:100%; height:300px; position: relative;"></div>
</div>
<!-- /.panel-body -->
</div>
<!-- /.panel -->
</div>
<!-- /.col-lg-6 -->
</div>
<!-- /.row CHART -->

View File

@ -4,25 +4,46 @@ import time
import zmq
import redis
import random
import configparser
import os
import sys
import json
configfile = os.path.join(os.environ['VIRTUAL_ENV'], '../config.cfg')
cfg = configparser.ConfigParser()
cfg.read(configfile)
zmq_url = cfg.get('Redis', 'zmq_url')
zmq_url = "tcp://crf.circl.lu:5556"
channel = "102"
channel = cfg.get('Redis', 'channel')
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(zmq_url)
socket.setsockopt_string(zmq.SUBSCRIBE, channel)
redis_server = redis.StrictRedis(
host='localhost',
port=6250,
db=0)
host=cfg.get('Redis', 'host'),
port=cfg.getint('Redis', 'port'),
db=cfg.getint('Redis', 'db'))
# server side
pubsub = redis_server.pubsub(ignore_subscribe_messages=True)
while True:
rdm = random.randint(1,3)
time.sleep(float(rdm / 10))
content = "rdm "+str(rdm)
to_send = { 'name': 'feeder'+str(rdm), 'log': content }
redis_server.publish(channel, json.dumps(to_send))
sys.exit(1)
while True:
#FIXME check if sock.recv is blocking
time.sleep(0.1)
content = socket.recv()
console.log('sending')
print(content)
redis_server.publish(channel, content)