new: [live] support of few historical data to prefill the live dashboard + some bug fixes

pull/43/merge
Sami Mokaddem 2018-09-27 09:38:39 +02:00
parent dc8a944b23
commit bb2ea839f3
5 changed files with 75 additions and 30 deletions

View File

@ -12,6 +12,7 @@ import phonenumbers, pycountry
from phonenumbers import geocoder from phonenumbers import geocoder
import util import util
from helpers import live_helper
class InvalidCoordinate(Exception): class InvalidCoordinate(Exception):
pass pass
@ -24,6 +25,7 @@ class Geo_helper:
host=cfg.get('RedisGlobal', 'host'), host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'), port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisMap', 'db')) db=cfg.getint('RedisMap', 'db'))
self.live_helper = live_helper.Live_helper(serv_redis_db, cfg)
#logger #logger
logDir = cfg.get('Log', 'directory') logDir = cfg.get('Log', 'directory')
@ -118,7 +120,9 @@ class Geo_helper:
"cityName": rep['full_rep'].city.name, "cityName": rep['full_rep'].city.name,
"regionCode": rep['full_rep'].country.iso_code, "regionCode": rep['full_rep'].country.iso_code,
} }
self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send)) j_to_send = json.dumps(to_send)
self.serv_coord.publish(self.CHANNELDISP, j_to_send)
self.live_helper.add_to_stream_log_cache('Map', j_to_send)
self.logger.info('Published: {}'.format(json.dumps(to_send))) self.logger.info('Published: {}'.format(json.dumps(to_send)))
except ValueError: except ValueError:
self.logger.warning("can't resolve ip") self.logger.warning("can't resolve ip")
@ -163,7 +167,9 @@ class Geo_helper:
"cityName": "", "cityName": "",
"regionCode": country_code, "regionCode": country_code,
} }
self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send)) j_to_send = json.dumps(to_send)
self.serv_coord.publish(self.CHANNELDISP, j_to_send)
self.live_helper.add_to_stream_log_cache('Map', j_to_send)
self.logger.info('Published: {}'.format(json.dumps(to_send))) self.logger.info('Published: {}'.format(json.dumps(to_send)))
except phonenumbers.NumberParseException: except phonenumbers.NumberParseException:
self.logger.warning("Can't resolve phone number country") self.logger.warning("Can't resolve phone number country")

View File

@ -15,6 +15,7 @@ from helpers import geo_helper
from helpers import contributor_helper from helpers import contributor_helper
from helpers import users_helper from helpers import users_helper
from helpers import trendings_helper from helpers import trendings_helper
from helpers import live_helper
configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg') configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg')
cfg = configparser.ConfigParser() cfg = configparser.ConfigParser()
@ -41,6 +42,10 @@ serv_redis_db = redis.StrictRedis(
port=cfg.getint('RedisGlobal', 'port'), port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisDB', 'db')) db=cfg.getint('RedisDB', 'db'))
streamLogCacheKey = cfg.get('RedisLog', 'streamLogCacheKey')
streamMapCacheKey = cfg.get('RedisLog', 'streamMapCacheKey')
live_helper = live_helper.Live_helper(serv_redis_db, cfg)
geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg)
contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
users_helper = users_helper.Users_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg)
@ -56,8 +61,6 @@ class LogItem():
FIELDNAME_ORDER = [] FIELDNAME_ORDER = []
FIELDNAME_ORDER_HEADER = [] FIELDNAME_ORDER_HEADER = []
FIELDNAME_ORDER.append("Time")
FIELDNAME_ORDER_HEADER.append("Time")
for item in json.loads(cfg.get('Dashboard', 'fieldname_order')): for item in json.loads(cfg.get('Dashboard', 'fieldname_order')):
if type(item) is list: if type(item) is list:
FIELDNAME_ORDER_HEADER.append(" | ".join(item)) FIELDNAME_ORDER_HEADER.append(" | ".join(item))
@ -66,10 +69,7 @@ class LogItem():
FIELDNAME_ORDER.append(item) FIELDNAME_ORDER.append(item)
def __init__(self, feed): def __init__(self, feed):
self.time = strftime("%H:%M:%S", now())
#FIXME Parse feed message?
self.fields = [] self.fields = []
self.fields.append(self.time)
for f in feed: for f in feed:
self.fields.append(f) self.fields.append(f)
@ -100,15 +100,19 @@ class EventMessage():
logger.error(e) logger.error(e)
jsonMsg = { 'name': "undefined" ,'log': json.loads(msg) } jsonMsg = { 'name': "undefined" ,'log': json.loads(msg) }
self.feedName = jsonMsg['name'] self.name = jsonMsg['name']
self.zmqName = jsonMsg['zmqName'] self.zmqName = jsonMsg['zmqName']
self.feed = json.loads(jsonMsg['log']) self.feed = json.loads(jsonMsg['log'])
self.feed = LogItem(self.feed).get_row() self.feed = LogItem(self.feed).get_row()
def to_json(self): def to_json_ev(self):
to_ret = { 'log': self.feed, 'feedName': self.feedName, 'zmqName': self.zmqName } to_ret = { 'log': self.feed, 'name': self.name, 'zmqName': self.zmqName }
return 'data: {}\n\n'.format(json.dumps(to_ret)) return 'data: {}\n\n'.format(json.dumps(to_ret))
def to_json(self):
to_ret = { 'log': self.feed, 'name': self.name, 'zmqName': self.zmqName }
return json.dumps(to_ret)
########### ###########
## ROUTE ## ## ROUTE ##
########### ###########
@ -219,10 +223,20 @@ def trendings():
@app.route("/_logs") @app.route("/_logs")
def logs(): def logs():
if request.accept_mimetypes.accept_json or request.method == 'POST':
key = 'Attribute'
j = live_helper.get_stream_log_cache(key)
return jsonify(j)
else:
return Response(event_stream_log(), mimetype="text/event-stream") return Response(event_stream_log(), mimetype="text/event-stream")
@app.route("/_maps") @app.route("/_maps")
def maps(): def maps():
if request.accept_mimetypes.accept_json or request.method == 'POST':
key = 'Map'
j = live_helper.get_stream_log_cache(key)
return jsonify(j)
else:
return Response(event_stream_maps(), mimetype="text/event-stream") return Response(event_stream_maps(), mimetype="text/event-stream")
@app.route("/_get_log_head") @app.route("/_get_log_head")
@ -231,11 +245,12 @@ def getLogHead():
def event_stream_log(): def event_stream_log():
subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True) subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True)
subscriber_log.subscribe(cfg.get('RedisLog', 'channel')) subscriber_log.subscribe(live_helper.CHANNEL)
try: try:
for msg in subscriber_log.listen(): for msg in subscriber_log.listen():
content = msg['data'] content = msg['data']
yield EventMessage(content).to_json() ev = EventMessage(content)
yield ev.to_json_ev()
except GeneratorExit: except GeneratorExit:
subscriber_log.unsubscribe() subscriber_log.unsubscribe()
@ -245,7 +260,8 @@ def event_stream_maps():
try: try:
for msg in subscriber_map.listen(): for msg in subscriber_map.listen():
content = msg['data'].decode('utf8') content = msg['data'].decode('utf8')
yield 'data: {}\n\n'.format(content) to_ret = 'data: {}\n\n'.format(content)
yield to_ret
except GeneratorExit: except GeneratorExit:
subscriber_map.unsubscribe() subscriber_map.unsubscribe()

View File

@ -184,14 +184,19 @@ function connect_source_log() {
source_log.onmessage = function(event) { source_log.onmessage = function(event) {
var json = jQuery.parseJSON( event.data ); var json = jQuery.parseJSON( event.data );
updateLogTable(json.feedName, json.log, json.zmqName); updateLogTable(json.name, json.log, json.zmqName);
}; };
} }
$(document).ready(function () { $(document).ready(function () {
createHead(function() { createHead(function() {
if (!!window.EventSource) { if (!!window.EventSource) {
$.getJSON( urlForLogs, function( data ) {
data.forEach(function(item) {
updateLogTable(item.name, item.log, item.zmqName);
});
connect_source_log(); connect_source_log();
});
} else { } else {
console.log("No event source_log"); console.log("No event source_log");
} }
@ -202,7 +207,7 @@ $(document).ready(function () {
// LOG TABLE // LOG TABLE
function updateLogTable(feedName, log, zmqName) { function updateLogTable(name, log, zmqName) {
if (log.length == 0) if (log.length == 0)
return; return;
@ -213,7 +218,7 @@ function updateLogTable(feedName, log, zmqName) {
tableBody = document.getElementById('table_log_body'); tableBody = document.getElementById('table_log_body');
// only add row for attribute // only add row for attribute
if (feedName == "Attribute" ) { if (name == "Attribute" ) {
var categName = log[toPlotLocationLog]; var categName = log[toPlotLocationLog];
sources.addIfNotPresent(categName); sources.addIfNotPresent(categName);
sources.incCountOnSource(categName); sources.incCountOnSource(categName);
@ -226,7 +231,7 @@ function updateLogTable(feedName, log, zmqName) {
tableBody.deleteRow(0); tableBody.deleteRow(0);
} }
} else if (feedName == "Keepalive") { } else if (name == "Keepalive") {
// do nothing // do nothing
} else { } else {
// do nothing // do nothing

View File

@ -23,7 +23,17 @@ class MapEvent {
this.specifName = json.specifName; this.specifName = json.specifName;
this.cityName = json.cityName; this.cityName = json.cityName;
this.text = this.categ + ": " + this.value; this.text = this.categ + ": " + this.value;
this.textMarker = "<b>{1}</b><br>{2}".replace("{1}", this.country).replace("{2}", this.specifName+", "+this.cityName); let underText = "";
if (this.specifName !== null && this.cityName !== null) {
underText = this.specifName+", "+this.cityName;
} else if (this.specifName !== null) {
underText = this.specifName;
} else if (this.cityName !== null) {
underText = this.cityName;
} else {
underText = "";
}
this.textMarker = "<b>{1}</b><br>{2}".replace("{1}", this.country).replace("{2}", underText);
} }
} }
@ -218,7 +228,6 @@ function connect_source_map() {
setTimeout(function() { connect_source_map(); }, 5000); setTimeout(function() { connect_source_map(); }, 5000);
}; };
} }
connect_source_map()
$(document).ready(function () { $(document).ready(function () {
$( "#rotation_wait_time_selector" ).change(function() { $( "#rotation_wait_time_selector" ).change(function() {
@ -240,4 +249,15 @@ $(document).ready(function () {
ZOOMLEVEL = sel; ZOOMLEVEL = sel;
mapEventManager.directZoom(); mapEventManager.directZoom();
}); });
if (!!window.EventSource) {
$.getJSON( urlForMaps, function( data ) {
data.forEach(function(item) {
var marker = L.marker([item.coord.lat, item.coord.lon]).addTo(myOpenStreetMap);
var mapEvent = new MapEvent(item, marker);
mapEventManager.addMapEvent(mapEvent);
});
connect_source_map()
});
}
}); });

View File

@ -17,6 +17,7 @@ from helpers import geo_helper
from helpers import contributor_helper from helpers import contributor_helper
from helpers import users_helper from helpers import users_helper
from helpers import trendings_helper from helpers import trendings_helper
from helpers import live_helper
configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg') configfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config/config.cfg')
cfg = configparser.ConfigParser() cfg = configparser.ConfigParser()
@ -30,7 +31,6 @@ if not os.path.exists(logDir):
logging.basicConfig(filename=logPath, filemode='a', level=logging.INFO) logging.basicConfig(filename=logPath, filemode='a', level=logging.INFO)
logger = logging.getLogger('zmq_dispatcher') logger = logging.getLogger('zmq_dispatcher')
CHANNEL = cfg.get('RedisLog', 'channel')
LISTNAME = cfg.get('RedisLIST', 'listName') LISTNAME = cfg.get('RedisLIST', 'listName')
serv_log = redis.StrictRedis( serv_log = redis.StrictRedis(
@ -46,17 +46,13 @@ serv_list = redis.StrictRedis(
port=cfg.getint('RedisGlobal', 'port'), port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisLIST', 'db')) db=cfg.getint('RedisLIST', 'db'))
live_helper = live_helper.Live_helper(serv_redis_db, cfg)
geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg)
contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
users_helper = users_helper.Users_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg)
trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg)
def publish_log(zmq_name, name, content, channel=CHANNEL):
to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name }
serv_log.publish(channel, json.dumps(to_send))
logger.debug('Published: {}'.format(json.dumps(to_send)))
def getFields(obj, fields): def getFields(obj, fields):
jsonWalker = fields.split('.') jsonWalker = fields.split('.')
itemToExplore = obj itemToExplore = obj
@ -68,6 +64,8 @@ def getFields(obj, fields):
if type(itemToExplore) is list: if type(itemToExplore) is list:
return { 'name': lastName , 'data': itemToExplore } return { 'name': lastName , 'data': itemToExplore }
else: else:
if i == 'timestamp':
itemToExplore = datetime.datetime.utcfromtimestamp(int(itemToExplore)).strftime('%Y-%m-%d %H:%M:%S')
return itemToExplore return itemToExplore
except KeyError as e: except KeyError as e:
return "" return ""
@ -87,7 +85,7 @@ def handler_dispatcher(zmq_name, jsonObj):
def handler_keepalive(zmq_name, jsonevent): def handler_keepalive(zmq_name, jsonevent):
logger.info('Handling keepalive') logger.info('Handling keepalive')
to_push = [ jsonevent['uptime'] ] to_push = [ jsonevent['uptime'] ]
publish_log(zmq_name, 'Keepalive', to_push) live_helper.publish_log(zmq_name, 'Keepalive', to_push)
def handler_user(zmq_name, jsondata): def handler_user(zmq_name, jsondata):
logger.info('Handling user') logger.info('Handling user')
@ -226,7 +224,7 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False):
action, action,
isLabeled=eventLabeled) isLabeled=eventLabeled)
# Push to log # Push to log
publish_log(zmq_name, 'Attribute', to_push) live_helper.publish_log(zmq_name, 'Attribute', to_push)
############### ###############