From 0a172be26c925be44acde94c8b0beaf5c2d5f813 Mon Sep 17 00:00:00 2001 From: = Date: Mon, 11 Sep 2017 14:53:06 +0200 Subject: [PATCH] Support of unknown number of feeders + display log messages --- server.py | 102 ++++++++++++++++------------ static/js/index.js | 154 ++++++++++++++++++++++++++++++++++--------- templates/index.html | 25 +++++-- zmq_subscriber.py | 29 ++++++-- 4 files changed, 229 insertions(+), 81 deletions(-) diff --git a/server.py b/server.py index 7b31b90..c2f20ce 100755 --- a/server.py +++ b/server.py @@ -1,36 +1,52 @@ #!/usr/bin/env python3.5 from flask import Flask, render_template, Response import json +import redis +import configparser from time import gmtime as now from time import sleep, strftime +import os + +configfile = os.path.join(os.environ['VIRTUAL_ENV'], '../config.cfg') +cfg = configparser.ConfigParser() +cfg.read(configfile) app = Flask(__name__) -FIELDNAME = { - 'time': 'Time', - 'level': 'Level', - 'source': 'Source', - 'name': 'Name', - 'message': 'Message' - } +redis_server = redis.StrictRedis( + host=cfg.get('Redis', 'host'), + port=cfg.getint('Redis', 'port'), + db=cfg.getint('Redis', 'db')) -FIELDNAME_ORDER = [ 'time', 'level', 'source', 'name', 'message' ] +subscriber = redis_server.pubsub(ignore_subscribe_messages=True) +subscriber.psubscribe(cfg.get('Redis', 'channel')) +eventNumber = 0 -class LogRow(): - def __init__(self, feed='', time='', level='level', src='source', name='name', message='wonderful meesage'): +class LogItem(): + + FIELDNAME_ORDER = [ 'time', 'level', 'source', 'name', 'message' ] + + #def __init__(self, feed='', time='', level='level', src='source', name='name', message='wonderful meesage'): + def __init__(self, feed): # Parse feed message - # Assign potential supplied values - self.time = time if time != '' else strftime("%H:%M:%S", now()) - self.level = level - self.source = src - self.name = name - self.message = message + ## Assign potential supplied values + #self.time = time if time != '' else strftime("%H:%M:%S", now()) + #self.level = level + #self.source = src + #self.name = name + #self.message = message + + self.time = strftime("%H:%M:%S", now()) + self.level = 'level' + self.source = 'src' + self.name = 'name' + self.message = feed def get_head_row(self): to_ret = [] - for fn in FIELDNAME_ORDER: - to_ret.append(FIELDNAME[fn]) + for fn in LogItem.FIELDNAME_ORDER: + to_ret.append(fn[0].upper()+fn[1:]) return to_ret def get_row(self): @@ -45,26 +61,32 @@ class LogRow(): class EventMessage(): + # Suppose the event message is a json with the format {name: 'feedName', log:'logData'} def __init__(self, msg): - self.feed = None - self.isLog = EventMessage.is_log(msg) + msg = msg.decode('utf8') + try: + jsonMsg = json.loads(msg) + except json.JSONDecodeError: + jsonMsg = { 'name': "undefined" ,'log': msg } + + self.feedName = jsonMsg['name'] + self.feed = jsonMsg['log'] + self.feed = LogItem(jsonMsg['log']).get_row() #get type of message: log or feed, then create - if self.isLog: - self.feed = msg - #FIXME do parser - self.feed = LogRow(feed=msg).get_row() - else: - self.feed = {feed.name: feed.data} - - def is_log(msg): - return True + #if self.isLog: + # self.feed = msg + # #FIXME do parser + # self.feed = LogItem(feed=msg).get_row() + #else: + # #FIXME do parser + # temp = [] + # for feed in msg: + # temp.append(feed.name, feed.data) + # self.feed = temp def to_json(self): - if self.isLog: - to_ret = { 'log': self.feed, 'chart': "" } - else: - to_ret = { 'log': "", 'chart': self.feed } + to_ret = { 'log': self.feed, 'feedName': self.feedName } return 'data: {}\n\n'.format(json.dumps(to_ret)) @app.route("/") @@ -77,16 +99,12 @@ def logs(): @app.route("/_get_log_head") def getLogHead(): - return json.dumps(LogRow().get_head_row()) + return json.dumps(LogItem('').get_head_row()) def event_stream(): - #for msg in pubsub: - for i in range(3): - msg = now() - sleep(0.3) - print('sending', EventMessage(msg).to_json()) - yield EventMessage(msg).to_json() - + for msg in subscriber.listen(): + content = msg['data'] + yield EventMessage(content).to_json() if __name__ == '__main__': - app.run(host='localhost', port=8000, debug=True) + app.run(host='localhost', port=8000) diff --git a/static/js/index.js b/static/js/index.js index 1bd35b1..27321eb 100644 --- a/static/js/index.js +++ b/static/js/index.js @@ -1,19 +1,104 @@ +//color: #5f6062 +var updateInterval = 1000; // 1s +var numPoint = 60; -var updateInterval = 30; // 30ms -var numPoint = 10; var emptyArray = []; +var dataNumLog = []; for(i=0; i res[0] ? globMax : res[0]; + // data + this._sourcesArray[src] = res[1]; + } + this._globalMax = globMax; + } + + toArray() { + var to_return = []; + for (var src of this._sourceNames) { + to_return.push({ + label: src, + data: this._sourcesArray[src] + }); + } + return to_return; + } + + getGlobalMax() { + return this._globalMax; + } + + getSingleSource(sourceName) { + return this._sourcesArray[sourceName]; + } + + getEmptyData() { + return [{label: 'no data', data: emptyArray}]; + } +} + +var sources = new Sources(); +sources.addSource('global'); + +var curNumLog = 0; +var curMaxDataNumLog = 0; + var optionsGraph = { series: { shadowSize: 0 , - lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }} + lines: { + fill: true, + fillColor: { + colors: [ { opacity: 1 }, { opacity: 0.1 } ] + } + } }, + //colors: ["#2fa1db"], yaxis: { min: 0, max: 20 }, - xaxis: { ticks: [[0, 0], [1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6], [7, 7], [8, 8], [9, 9], [10, 10]] }, + xaxis: { min: 0, max: numPoint-1 }, + ticks: numPoint, grid: { tickColor: "#dddddd", borderWidth: 0 @@ -39,8 +124,7 @@ $(document).ready(function () { source.onmessage = function(event) { var json = jQuery.parseJSON( event.data ); - updateLogTable(json.log); - //updateChartData(json.chart); + updateLogTable(json.feedName, json.log); }; } else { @@ -50,37 +134,39 @@ $(document).ready(function () { }); -//var plot = $.plot("#placeholder", [ [] ], optionsGraph); -//updateChart() +//var plot1 = $.plot("#feedDiv1", [ { label: "Number of log messages", data: dataNumLog } ], optionsGraph); +var plot1 = $.plot("#feedDiv1", sources.getEmptyData(), optionsGraph); +updateChart() function updateChart() { - plot.setData(data); - plot.getOptions().yaxes[0].max = curMax[dataset]; - plot.setupGrid(); - plot.draw(); - setTimeout(update, updateInterval); + updateChart1(); + updateChart2(); + setTimeout(updateChart, updateInterval); } -function updateChartData(feed) { - if (feed.length == 0) - return; +function updateChart1() { + sources.slideSource(); + sources.resetCountOnSource(); + plot1.setData(sources.toArray()); + plot1.getOptions().yaxes[0].max = sources.getGlobalMax(); + plot1.setupGrid(); + plot1.draw(); +} - for (feedName in feed) { - console.log(feedName.name); - if (data[feedName.name] === undefined) { - data[feedName.name] = {}; - } - data[feedName.name].data = slide(data[feedName.name].data, feedName.data) - } +function updateChart2() { } -function updateLogTable(log) { +function updateLogTable(feedName, log) { if (log.length == 0) return; // Create new row tableBody = document.getElementById('table_log_body'); + //curNumLog++; + sources.addIfNotPresent(feedName); + sources.incCountOnSource(feedName); + sources.incCountOnSource('global'); createRow(tableBody, log); // Remove old row @@ -94,10 +180,16 @@ function updateLogTable(log) { } function slide(orig, newData) { - var slided = orig; - slided.slice(newData.length); - slided.concat(newData); - return slided + var slided = []; + var max = newData; + for (i=1; i max ? y : max; + } + slided.push([orig.length-1, newData]); + curMaxDataNumLog = max; + return [curMaxDataNumLog, slided]; } function createRow(tableBody, log) { diff --git a/templates/index.html b/templates/index.html index 19bf58e..fc487c3 100644 --- a/templates/index.html +++ b/templates/index.html @@ -36,14 +36,14 @@
-
+
- Feed + Log feed
-
+
@@ -51,7 +51,24 @@
- + + +
+ +
+
+ Feed +
+
+
+
+ + +
+ + +
+
diff --git a/zmq_subscriber.py b/zmq_subscriber.py index cf80432..aeac103 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -4,25 +4,46 @@ import time import zmq import redis import random +import configparser +import os +import sys +import json +configfile = os.path.join(os.environ['VIRTUAL_ENV'], '../config.cfg') +cfg = configparser.ConfigParser() +cfg.read(configfile) + +zmq_url = cfg.get('Redis', 'zmq_url') zmq_url = "tcp://crf.circl.lu:5556" -channel = "102" +channel = cfg.get('Redis', 'channel') context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(zmq_url) socket.setsockopt_string(zmq.SUBSCRIBE, channel) redis_server = redis.StrictRedis( - host='localhost', - port=6250, - db=0) + host=cfg.get('Redis', 'host'), + port=cfg.getint('Redis', 'port'), + db=cfg.getint('Redis', 'db')) # server side pubsub = redis_server.pubsub(ignore_subscribe_messages=True) while True: + rdm = random.randint(1,3) + time.sleep(float(rdm / 10)) + content = "rdm "+str(rdm) + to_send = { 'name': 'feeder'+str(rdm), 'log': content } + redis_server.publish(channel, json.dumps(to_send)) + +sys.exit(1) + + +while True: + #FIXME check if sock.recv is blocking time.sleep(0.1) content = socket.recv() + console.log('sending') print(content) redis_server.publish(channel, content)