From bb2ea839f34eb03fc59625f7041473e801cebcff Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Thu, 27 Sep 2018 09:38:39 +0200 Subject: [PATCH] new: [live] support of few historical data to prefill the live dashboard + some bug fixes --- helpers/geo_helper.py | 10 +++++++-- server.py | 42 +++++++++++++++++++++++++----------- static/js/index/index.js | 15 ++++++++----- static/js/index/index_map.js | 24 +++++++++++++++++++-- zmq_dispatcher.py | 14 ++++++------ 5 files changed, 75 insertions(+), 30 deletions(-) diff --git a/helpers/geo_helper.py b/helpers/geo_helper.py index aa125ee..d45e6f3 100644 --- a/helpers/geo_helper.py +++ b/helpers/geo_helper.py @@ -12,6 +12,7 @@ import phonenumbers, pycountry from phonenumbers import geocoder import util +from helpers import live_helper class InvalidCoordinate(Exception): pass @@ -24,6 +25,7 @@ class Geo_helper: host=cfg.get('RedisGlobal', 'host'), port=cfg.getint('RedisGlobal', 'port'), db=cfg.getint('RedisMap', 'db')) + self.live_helper = live_helper.Live_helper(serv_redis_db, cfg) #logger logDir = cfg.get('Log', 'directory') @@ -118,7 +120,9 @@ class Geo_helper: "cityName": rep['full_rep'].city.name, "regionCode": rep['full_rep'].country.iso_code, } - self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send)) + j_to_send = json.dumps(to_send) + self.serv_coord.publish(self.CHANNELDISP, j_to_send) + self.live_helper.add_to_stream_log_cache('Map', j_to_send) self.logger.info('Published: {}'.format(json.dumps(to_send))) except ValueError: self.logger.warning("can't resolve ip") @@ -163,7 +167,9 @@ class Geo_helper: "cityName": "", "regionCode": country_code, } - self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send)) + j_to_send = json.dumps(to_send) + self.serv_coord.publish(self.CHANNELDISP, j_to_send) + self.live_helper.add_to_stream_log_cache('Map', j_to_send) self.logger.info('Published: {}'.format(json.dumps(to_send))) except phonenumbers.NumberParseException: self.logger.warning("Can't resolve phone number country") diff --git a/server.py b/server.py index f3cdb4a..23e9139 100755 --- a/server.py +++ b/server.py @@ -15,6 +15,7 @@ from helpers import geo_helper from helpers import contributor_helper from helpers import users_helper from helpers import trendings_helper +from helpers import live_helper configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg') cfg = configparser.ConfigParser() @@ -41,6 +42,10 @@ serv_redis_db = redis.StrictRedis( port=cfg.getint('RedisGlobal', 'port'), db=cfg.getint('RedisDB', 'db')) +streamLogCacheKey = cfg.get('RedisLog', 'streamLogCacheKey') +streamMapCacheKey = cfg.get('RedisLog', 'streamMapCacheKey') + +live_helper = live_helper.Live_helper(serv_redis_db, cfg) geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg) @@ -56,8 +61,6 @@ class LogItem(): FIELDNAME_ORDER = [] FIELDNAME_ORDER_HEADER = [] - FIELDNAME_ORDER.append("Time") - FIELDNAME_ORDER_HEADER.append("Time") for item in json.loads(cfg.get('Dashboard', 'fieldname_order')): if type(item) is list: FIELDNAME_ORDER_HEADER.append(" | ".join(item)) @@ -66,10 +69,7 @@ class LogItem(): FIELDNAME_ORDER.append(item) def __init__(self, feed): - self.time = strftime("%H:%M:%S", now()) - #FIXME Parse feed message? self.fields = [] - self.fields.append(self.time) for f in feed: self.fields.append(f) @@ -100,15 +100,19 @@ class EventMessage(): logger.error(e) jsonMsg = { 'name': "undefined" ,'log': json.loads(msg) } - self.feedName = jsonMsg['name'] + self.name = jsonMsg['name'] self.zmqName = jsonMsg['zmqName'] self.feed = json.loads(jsonMsg['log']) self.feed = LogItem(self.feed).get_row() - def to_json(self): - to_ret = { 'log': self.feed, 'feedName': self.feedName, 'zmqName': self.zmqName } + def to_json_ev(self): + to_ret = { 'log': self.feed, 'name': self.name, 'zmqName': self.zmqName } return 'data: {}\n\n'.format(json.dumps(to_ret)) + def to_json(self): + to_ret = { 'log': self.feed, 'name': self.name, 'zmqName': self.zmqName } + return json.dumps(to_ret) + ########### ## ROUTE ## ########### @@ -219,11 +223,21 @@ def trendings(): @app.route("/_logs") def logs(): - return Response(event_stream_log(), mimetype="text/event-stream") + if request.accept_mimetypes.accept_json or request.method == 'POST': + key = 'Attribute' + j = live_helper.get_stream_log_cache(key) + return jsonify(j) + else: + return Response(event_stream_log(), mimetype="text/event-stream") @app.route("/_maps") def maps(): - return Response(event_stream_maps(), mimetype="text/event-stream") + if request.accept_mimetypes.accept_json or request.method == 'POST': + key = 'Map' + j = live_helper.get_stream_log_cache(key) + return jsonify(j) + else: + return Response(event_stream_maps(), mimetype="text/event-stream") @app.route("/_get_log_head") def getLogHead(): @@ -231,11 +245,12 @@ def getLogHead(): def event_stream_log(): subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True) - subscriber_log.subscribe(cfg.get('RedisLog', 'channel')) + subscriber_log.subscribe(live_helper.CHANNEL) try: for msg in subscriber_log.listen(): content = msg['data'] - yield EventMessage(content).to_json() + ev = EventMessage(content) + yield ev.to_json_ev() except GeneratorExit: subscriber_log.unsubscribe() @@ -245,7 +260,8 @@ def event_stream_maps(): try: for msg in subscriber_map.listen(): content = msg['data'].decode('utf8') - yield 'data: {}\n\n'.format(content) + to_ret = 'data: {}\n\n'.format(content) + yield to_ret except GeneratorExit: subscriber_map.unsubscribe() diff --git a/static/js/index/index.js b/static/js/index/index.js index 21f161c..7c5a752 100644 --- a/static/js/index/index.js +++ b/static/js/index/index.js @@ -184,14 +184,19 @@ function connect_source_log() { source_log.onmessage = function(event) { var json = jQuery.parseJSON( event.data ); - updateLogTable(json.feedName, json.log, json.zmqName); + updateLogTable(json.name, json.log, json.zmqName); }; } $(document).ready(function () { createHead(function() { if (!!window.EventSource) { - connect_source_log(); + $.getJSON( urlForLogs, function( data ) { + data.forEach(function(item) { + updateLogTable(item.name, item.log, item.zmqName); + }); + connect_source_log(); + }); } else { console.log("No event source_log"); } @@ -202,7 +207,7 @@ $(document).ready(function () { // LOG TABLE -function updateLogTable(feedName, log, zmqName) { +function updateLogTable(name, log, zmqName) { if (log.length == 0) return; @@ -213,7 +218,7 @@ function updateLogTable(feedName, log, zmqName) { tableBody = document.getElementById('table_log_body'); // only add row for attribute - if (feedName == "Attribute" ) { + if (name == "Attribute" ) { var categName = log[toPlotLocationLog]; sources.addIfNotPresent(categName); sources.incCountOnSource(categName); @@ -226,7 +231,7 @@ function updateLogTable(feedName, log, zmqName) { tableBody.deleteRow(0); } - } else if (feedName == "Keepalive") { + } else if (name == "Keepalive") { // do nothing } else { // do nothing diff --git a/static/js/index/index_map.js b/static/js/index/index_map.js index c5740bf..12699f2 100644 --- a/static/js/index/index_map.js +++ b/static/js/index/index_map.js @@ -23,7 +23,17 @@ class MapEvent { this.specifName = json.specifName; this.cityName = json.cityName; this.text = this.categ + ": " + this.value; - this.textMarker = "{1}
{2}".replace("{1}", this.country).replace("{2}", this.specifName+", "+this.cityName); + let underText = ""; + if (this.specifName !== null && this.cityName !== null) { + underText = this.specifName+", "+this.cityName; + } else if (this.specifName !== null) { + underText = this.specifName; + } else if (this.cityName !== null) { + underText = this.cityName; + } else { + underText = ""; + } + this.textMarker = "{1}
{2}".replace("{1}", this.country).replace("{2}", underText); } } @@ -218,7 +228,6 @@ function connect_source_map() { setTimeout(function() { connect_source_map(); }, 5000); }; } -connect_source_map() $(document).ready(function () { $( "#rotation_wait_time_selector" ).change(function() { @@ -240,4 +249,15 @@ $(document).ready(function () { ZOOMLEVEL = sel; mapEventManager.directZoom(); }); + + if (!!window.EventSource) { + $.getJSON( urlForMaps, function( data ) { + data.forEach(function(item) { + var marker = L.marker([item.coord.lat, item.coord.lon]).addTo(myOpenStreetMap); + var mapEvent = new MapEvent(item, marker); + mapEventManager.addMapEvent(mapEvent); + }); + connect_source_map() + }); + } }); diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py index e2a9bb7..cfb2dee 100755 --- a/zmq_dispatcher.py +++ b/zmq_dispatcher.py @@ -17,6 +17,7 @@ from helpers import geo_helper from helpers import contributor_helper from helpers import users_helper from helpers import trendings_helper +from helpers import live_helper configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg') cfg = configparser.ConfigParser() @@ -30,7 +31,6 @@ if not os.path.exists(logDir): logging.basicConfig(filename=logPath, filemode='a', level=logging.INFO) logger = logging.getLogger('zmq_dispatcher') -CHANNEL = cfg.get('RedisLog', 'channel') LISTNAME = cfg.get('RedisLIST', 'listName') serv_log = redis.StrictRedis( @@ -46,17 +46,13 @@ serv_list = redis.StrictRedis( port=cfg.getint('RedisGlobal', 'port'), db=cfg.getint('RedisLIST', 'db')) +live_helper = live_helper.Live_helper(serv_redis_db, cfg) geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg) trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) -def publish_log(zmq_name, name, content, channel=CHANNEL): - to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name } - serv_log.publish(channel, json.dumps(to_send)) - logger.debug('Published: {}'.format(json.dumps(to_send))) - def getFields(obj, fields): jsonWalker = fields.split('.') itemToExplore = obj @@ -68,6 +64,8 @@ def getFields(obj, fields): if type(itemToExplore) is list: return { 'name': lastName , 'data': itemToExplore } else: + if i == 'timestamp': + itemToExplore = datetime.datetime.utcfromtimestamp(int(itemToExplore)).strftime('%Y-%m-%d %H:%M:%S') return itemToExplore except KeyError as e: return "" @@ -87,7 +85,7 @@ def handler_dispatcher(zmq_name, jsonObj): def handler_keepalive(zmq_name, jsonevent): logger.info('Handling keepalive') to_push = [ jsonevent['uptime'] ] - publish_log(zmq_name, 'Keepalive', to_push) + live_helper.publish_log(zmq_name, 'Keepalive', to_push) def handler_user(zmq_name, jsondata): logger.info('Handling user') @@ -226,7 +224,7 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False): action, isLabeled=eventLabeled) # Push to log - publish_log(zmq_name, 'Attribute', to_push) + live_helper.publish_log(zmq_name, 'Attribute', to_push) ###############