From e70b9cd15cc62d53077cbc783f3cd257c48f5e97 Mon Sep 17 00:00:00 2001 From: Mokaddem Date: Fri, 23 Dec 2016 10:31:26 +0100 Subject: [PATCH 1/9] Added basic mixer with confirugable behavior. It handles muliple feeders and performs some basic stats on them. --- bin/Global.py | 2 + bin/Helper.py | 27 ++++-- bin/LAUNCH.sh | 2 + bin/Mixer.py | 172 +++++++++++++++++++++++++++++++++ bin/packages/config.cfg.sample | 14 +++ bin/packages/modules.cfg | 6 +- 6 files changed, 213 insertions(+), 10 deletions(-) create mode 100755 bin/Mixer.py diff --git a/bin/Global.py b/bin/Global.py index 9cacbc88..a29c3b86 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -72,6 +72,8 @@ if __name__ == '__main__': os.makedirs(dirname) with open(filename, 'wb') as f: + print gzip64encoded + print base64.standard_b64decode(gzip64encoded) f.write(base64.standard_b64decode(gzip64encoded)) p.populate_set_out(filename) processed_paste+=1 diff --git a/bin/Helper.py b/bin/Helper.py index 05b73bf3..d6441b9a 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -32,7 +32,7 @@ class PubSub(object): self.config.read(configfile) self.redis_sub = False self.zmq_sub = False - self.subscriber = None + self.subscribers = None self.publishers = {'Redis': [], 'ZMQ': []} def setup_subscribe(self, conn_name): @@ -46,14 +46,19 @@ class PubSub(object): host=self.config.get('RedisPubSub', 'host'), port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) - self.subscriber = r.pubsub(ignore_subscribe_messages=True) - self.subscriber.psubscribe(channel) + self.subscribers = r.pubsub(ignore_subscribe_messages=True) + self.subscribers.psubscribe(channel) elif conn_name.startswith('ZMQ'): self.zmq_sub = True context = zmq.Context() - self.subscriber = context.socket(zmq.SUB) - self.subscriber.connect(self.config.get(conn_name, 'address')) - self.subscriber.setsockopt(zmq.SUBSCRIBE, channel) + + self.subscribers = [] + addresses = self.config.get(conn_name, 'address') + for address in addresses.split(','): + new_sub = context.socket(zmq.SUB) + new_sub.connect(address) + new_sub.setsockopt(zmq.SUBSCRIBE, channel) + self.subscribers.append(new_sub) def setup_publish(self, conn_name): if self.config.has_section(conn_name): @@ -83,13 +88,17 @@ class PubSub(object): def subscribe(self): if self.redis_sub: - for msg in self.subscriber.listen(): + for msg in self.subscribers.listen(): if msg.get('data', None) is not None: yield msg['data'] elif self.zmq_sub: while True: - msg = self.subscriber.recv() - yield msg.split(' ', 1)[1] + for sub in self.subscribers: + try: + msg = sub.recv(zmq.NOBLOCK) + yield msg.split(' ', 1)[1] + except zmq.error.Again as e: + pass else: raise Exception('No subscribe function defined') diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index d7c31472..f020f3a7 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -114,6 +114,8 @@ function launching_scripts { screen -S "Script" -X screen -t "ModuleInformation" bash -c './ModuleInformation.py -k 0 -c 1; read x' sleep 0.1 + screen -S "Script" -X screen -t "Mixer" bash -c './Mixer.py; read x' + sleep 0.1 screen -S "Script" -X screen -t "Global" bash -c './Global.py; read x' sleep 0.1 screen -S "Script" -X screen -t "Duplicates" bash -c './Duplicates.py; read x' diff --git a/bin/Mixer.py b/bin/Mixer.py new file mode 100755 index 00000000..e1804263 --- /dev/null +++ b/bin/Mixer.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# -*-coding:UTF-8 -* +""" +The ZMQ_Feed_Q Module +===================== + +This module is consuming the Redis-list created by the ZMQ_Feed_Q Module. + +This module take all the feeds provided in the config. +Depending on the configuration, this module will process the feed as follow: + operation_mode 1: "Avoid any duplicate from any sources" + - The module maintain a list of content for each paste + - If the content is new, process it + - Else, do not process it but keep track for statistics on duplicate + + operation_mode 2: "Keep duplicate coming from different sources" + - The module maintain a list of name given to the paste by the feeder + - If the name has not yet been seen, process it + - Elseif, the saved content associated with the paste is not the same, process it + - Else, do not process it but keep track for statistics on duplicate + +Note that the hash of the content is defined as the gzip64encoded + +Requirements +------------ + +*Need running Redis instances. +*Need the ZMQ_Feed_Q Module running to be able to work properly. + +""" +import base64 +import os +import time +from pubsublogger import publisher +import redis +import ConfigParser + +from Helper import Process + + +# CONFIG # +refresh_time = 30 + + + +if __name__ == '__main__': + publisher.port = 6380 + publisher.channel = 'Script' + + config_section = 'Mixer' + + p = Process(config_section) + + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + + cfg = ConfigParser.ConfigParser() + cfg.read(configfile) + + # REDIS # + server = redis.StrictRedis( + host=cfg.get("Redis_Mixer", "host"), + port=cfg.getint("Redis_Mixer", "port"), + db=cfg.getint("Redis_Mixer", "db")) + + # LOGGING # + publisher.info("Feed Script started to receive & publish.") + + # OTHER CONFIG # + operation_mode = cfg.getint("Module_Mixer", "operation_mode") + ttl_key = cfg.getint("Module_Mixer", "ttl_duplicate") + + # STATS # + processed_paste = 0 + processed_paste_per_feeder = {} + duplicated_paste_per_feeder = {} + time_1 = time.time() + + + while True: + + message = p.get_from_set() + if message is not None: + splitted = message.split() + if len(splitted) == 2: + paste, gzip64encoded = splitted + try: + feeder_name, paste_name = paste.split('>') + feeder_name.replace(" ","") + except ValueError as e: + feeder_name = "unnamed_feeder" + paste_name = paste + + # Processed paste + processed_paste += 1 + try: + processed_paste_per_feeder[feeder_name] += 1 + except KeyError as e: + # new feeder + processed_paste_per_feeder[feeder_name] = 1 + duplicated_paste_per_feeder[feeder_name] = 0 + + relay_message = "{0} {1}".format(paste_name, gzip64encoded) + + # Avoid any duplicate coming from any sources + if operation_mode == 1: + if server.exists(gzip64encoded): # Content already exists + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + else: # New content + p.populate_set_out(relay_message) + server.sadd(gzip64encoded, feeder_name) + server.expire(gzip64encoded, ttl_key) + + + # Keep duplicate coming from different sources + else: + # Filter to avoid duplicate + content = server.get('HASH_'+paste_name) + if content is None: + # New content + # Store in redis for filtering + server.set('HASH_'+paste_name, content) + server.sadd(paste_name, feeder_name) + server.expire(paste_name, ttl_key) + server.expire('HASH_'+paste_name, ttl_key) + p.populate_set_out(relay_message) + else: + if gzip64encoded != content: + # Same paste name but different content + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + server.sadd(paste_name, feeder_name) + server.expire(paste_name, ttl_key) + p.populate_set_out(relay_message) + else: + # Already processed + # Keep track of processed pastes + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + continue + + else: + # TODO Store the name of the empty paste inside a Redis-list. + print "Empty Paste: not processed" + publisher.debug("Empty Paste: {0} not processed".format(message)) + else: + print "Empty Queues: Waiting..." + if int(time.time() - time_1) > refresh_time: + to_print = 'Mixer; ; ; ;mixer_all Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) + print to_print + publisher.info(to_print) + processed_paste = 0 + + for feeder, count in processed_paste_per_feeder.iteritems(): + to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) + print to_print + publisher.info(to_print) + processed_paste_per_feeder[feeder] = 0 + + for feeder, count in duplicated_paste_per_feeder.iteritems(): + to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) + print to_print + publisher.info(to_print) + duplicated_paste_per_feeder[feeder] = 0 + + time_1 = time.time() + time.sleep(0.5) + continue diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 566cf22c..880bf169 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -40,6 +40,12 @@ min_paste_size = 0.3 #Threshold to deduce if a module is stuck or not, in seconds. threshold_stucked_module=600 +[Module_Mixer] +#Define the configuration of the mixer, possible value: 1 or 2 +operation_mode = 1 +#Define the time that a paste will be considerate duplicate. in seconds (1day = 86400) +ttl_duplicate = 86400 + ##### Redis ##### [Redis_Cache] host = localhost @@ -66,6 +72,12 @@ host = localhost port = 6379 db = 2 +[Redis_Mixer] +host = localhost +port = 6381 +db = 1 +channel = 102 + ##### LevelDB ##### [Redis_Level_DB_Curve] host = localhost @@ -111,6 +123,8 @@ path = indexdir ############################################################################### +# For multiple feed, add them with "," without space +# e.g.: tcp://127.0.0.1:5556,tcp://127.0.0.1:5557 [ZMQ_Global] #address = tcp://crf.circl.lu:5556 address = tcp://127.0.0.1:5556 diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index 53bbb2a6..dee8baff 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -1,5 +1,9 @@ -[Global] +[Mixer] subscribe = ZMQ_Global +publish = Redis_Mixer + +[Global] +subscribe = Redis_Mixer publish = Redis_Global,Redis_ModuleStats [Duplicates] From 97292e08994c105a01d30589cb56353950e63ab9 Mon Sep 17 00:00:00 2001 From: Mokaddem Date: Fri, 23 Dec 2016 15:44:46 +0100 Subject: [PATCH 2/9] Updated web interface to handle new mixer module. and fixed one dependency bug. --- bin/Global.py | 4 +- bin/Mixer.py | 2 +- var/www/Flasks/Flask_trendingmodules.py | 1 + var/www/static/js/indexjavascript.js | 103 ++++++++++++++++-------- var/www/templates/index.html | 51 +++++++----- 5 files changed, 107 insertions(+), 54 deletions(-) diff --git a/bin/Global.py b/bin/Global.py index a29c3b86..bab45b47 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -59,7 +59,7 @@ if __name__ == '__main__': if int(time.time() - time_1) > 30: to_print = 'Global; ; ; ;glob Processed {0} paste(s)'.format(processed_paste) print to_print - publisher.info(to_print) + #publisher.info(to_print) time_1 = time.time() processed_paste = 0 time.sleep(1) @@ -72,8 +72,6 @@ if __name__ == '__main__': os.makedirs(dirname) with open(filename, 'wb') as f: - print gzip64encoded - print base64.standard_b64decode(gzip64encoded) f.write(base64.standard_b64decode(gzip64encoded)) p.populate_set_out(filename) processed_paste+=1 diff --git a/bin/Mixer.py b/bin/Mixer.py index e1804263..558cf4ec 100755 --- a/bin/Mixer.py +++ b/bin/Mixer.py @@ -150,7 +150,7 @@ if __name__ == '__main__': else: print "Empty Queues: Waiting..." if int(time.time() - time_1) > refresh_time: - to_print = 'Mixer; ; ; ;mixer_all Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) + to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) print to_print publisher.info(to_print) processed_paste = 0 diff --git a/var/www/Flasks/Flask_trendingmodules.py b/var/www/Flasks/Flask_trendingmodules.py index 73cef7f5..06cb65c3 100644 --- a/var/www/Flasks/Flask_trendingmodules.py +++ b/var/www/Flasks/Flask_trendingmodules.py @@ -6,6 +6,7 @@ ''' import redis import datetime +from Date import Date import flask from flask import Flask, render_template, jsonify, request diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index 8d50ea9d..3710e00b 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -1,10 +1,16 @@ -var time_since_last_pastes_num; +var time_since_last_pastes_num = {}; +var data_for_processed_paste = { "global": [] }; +var list_feeder = ["global"]; +var htmltext_graph_container = "
"; +window.paste_num_tabvar_all = {}; //If we do not received info from global, set pastes_num to 0 function checkIfReceivedData(){ - if ((new Date().getTime() - time_since_last_pastes_num) > 45*1000) - window.paste_num_tabvar = 0; - setTimeout(checkIfReceivedData, 45*1000); + for (i in list_feeder) { + if ((new Date().getTime() - time_since_last_pastes_num[list_feeder[i]]) > 45*1000) + window.paste_num_tabvar_all[list_feeder[i]] = 0; + setTimeout(checkIfReceivedData, 45*1000); + } } setTimeout(checkIfReceivedData, 45*1000); @@ -24,34 +30,41 @@ function update_values() { // Plot and update the number of processed pastes -$(function() { - var data = []; +// BEGIN PROCESSED PASTES var default_minute = (typeof window.default_minute !== "undefined") ? parseInt(window.default_minute) : 10; var totalPoints = 60*parseInt(default_minute); //60s*minute var curr_max = 0; - function getData() { - if (data.length > 0){ - var data_old = data[0]; - data = data.slice(1); - curr_max = curr_max == data_old ? Math.max.apply(null, data) : curr_max; + function getData(dataset) { + var curr_data; + if (data_for_processed_paste[dataset] === undefined) { // create feeder dataset if not exists yet + data_for_processed_paste[dataset] = []; + } + curr_data = data_for_processed_paste[dataset]; + + if (curr_data.length > 0){ + var data_old = curr_data[0]; + curr_data = curr_data.slice(1); + curr_max = curr_max == data_old ? Math.max.apply(null, curr_data) : curr_max; } - while (data.length < totalPoints) { - var y = (typeof window.paste_num_tabvar !== "undefined") ? parseInt(window.paste_num_tabvar) : 0; + while (curr_data.length < totalPoints) { + //var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0; + var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0; curr_max = y > curr_max ? y : curr_max; - data.push(y); + curr_data.push(y); } // Zip the generated y values with the x values var res = []; - for (var i = 0; i < data.length; ++i) { - res.push([i, data[i]]) - } + for (var i = 0; i < curr_data.length; ++i) { + res.push([i, curr_data[i]]) + } + data_for_processed_paste[dataset] = curr_data; return res; } var updateInterval = 1000; - var options = { + var options_processed_pastes = { series: { shadowSize: 1 }, lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }}, yaxis: { min: 0, max: 40 }, @@ -61,17 +74,19 @@ $(function() { borderWidth: 0 }, }; - var plot = $.plot("#realtimechart", [ getData() ], options); - - function update() { - plot.setData([getData()]); - plot.getOptions().yaxes[0].max = curr_max; - plot.setupGrid(); - plot.draw(); - setTimeout(update, updateInterval); + var total_proc = $.plot("#global", [ getData("global") ], options_processed_pastes); + + function update_processed_pastes(graph, dataset) { + graph.setData([getData(dataset)]); + graph.getOptions().yaxes[0].max = curr_max; + graph.setupGrid(); + graph.draw(); + setTimeout(function(){ update_processed_pastes(graph, dataset); }, updateInterval); } - update(); -}); + update_processed_pastes(total_proc, "global"); + + +// END PROCESSED PASTES function initfunc( csvay, scroot) { window.csv = csvay; @@ -114,10 +129,34 @@ function create_log_table(obj_json) { var chansplit = obj_json.channel.split('.'); var parsedmess = obj_json.data.split(';'); - if (parsedmess[0] == "Global"){ - var paste_processed = parsedmess[4].split(" ")[2]; - window.paste_num_tabvar = paste_processed; - time_since_last_pastes_num = new Date().getTime(); + + if (parsedmess[0] == "Mixer"){ + var feeder = parsedmess[4].split(" ")[1]; + var paste_processed = parsedmess[4].split(" ")[3]; + var msg_type = parsedmess[4].split(" ")[2]; + + if (feeder == "All_feeders"){ + window.paste_num_tabvar_all["global"] = paste_processed; + time_since_last_pastes_num["global"] = new Date().getTime(); + } else { + + if (list_feeder.indexOf(feeder) == -1) { + list_feeder.push(feeder); + //ADD HTML CONTAINER + PLOT THE GRAPH, ADD IT TO A LIST CONTAING THE PLOTED GRAPH + $("#panelbody").append(""+feeder+""); + $("#panelbody").append("
" + htmltext_graph_container.replace("$1", feeder+"Proc") + htmltext_graph_container.replace("$1", feeder+"Dup")+"
"); + var new_feederProc = $.plot("#"+feeder+"Proc", [ getData(feeder+"Proc") ], options_processed_pastes); + options_processed_pastes.colors = ["#edc240"]; + var new_feederDup = $.plot("#"+feeder+"Dup", [ getData(feeder+"Dup") ], options_processed_pastes); + options_processed_pastes.colors = ["#a971ff"]; + update_processed_pastes(new_feederProc, feeder+"Proc"); + update_processed_pastes(new_feederDup, feeder+"Dup"); + } + + var feederName = msg_type == "Duplicated" ? feeder+"Dup" : feeder+"Proc"; + window.paste_num_tabvar_all[feederName] = paste_processed; + time_since_last_pastes_num[feederName] = new Date().getTime(); + } return; } diff --git a/var/www/templates/index.html b/var/www/templates/index.html index 74b45c01..278bc873 100644 --- a/var/www/templates/index.html +++ b/var/www/templates/index.html @@ -52,10 +52,10 @@
- Pastes since {{ default_minute }} min + Total pastes since {{ default_minute }} min
-
+
@@ -91,23 +91,38 @@
-
-
-
- Queues Monitor -
-
-
-
-
-
- +
+
+
+ Feeder(s) Monitor: Processed pastes and filtered duplicated
- -
- -
- +
+ + +
+ +
+ +
+ +
+
+
+ Queues Monitor +
+
+
+
+
+
+
+ +
+ +
+ +
+
From 03dddbc359a420b60d5dfb82597c55cc46da254a Mon Sep 17 00:00:00 2001 From: Mokaddem Date: Fri, 23 Dec 2016 16:01:30 +0100 Subject: [PATCH 3/9] Reduced refresh rate of processed_pastes, synchro graphs and adjusted max on each graphs. --- var/www/static/js/indexjavascript.js | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index 3710e00b..5e5d477d 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -33,7 +33,7 @@ function update_values() { // BEGIN PROCESSED PASTES var default_minute = (typeof window.default_minute !== "undefined") ? parseInt(window.default_minute) : 10; var totalPoints = 60*parseInt(default_minute); //60s*minute - var curr_max = 0; + var curr_max = {"global": 0}; function getData(dataset) { var curr_data; @@ -44,14 +44,14 @@ function update_values() { if (curr_data.length > 0){ var data_old = curr_data[0]; - curr_data = curr_data.slice(1); - curr_max = curr_max == data_old ? Math.max.apply(null, curr_data) : curr_max; + curr_data = curr_data.slice(10); + curr_max[dataset] = curr_max[dataset] == data_old ? Math.max.apply(null, curr_data) : curr_max[dataset]; } while (curr_data.length < totalPoints) { //var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0; var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0; - curr_max = y > curr_max ? y : curr_max; + curr_max[dataset] = y > curr_max[dataset] ? y : curr_max[dataset]; curr_data.push(y); } // Zip the generated y values with the x values @@ -63,7 +63,7 @@ function update_values() { return res; } - var updateInterval = 1000; + var updateInterval = 10000; var options_processed_pastes = { series: { shadowSize: 1 }, lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }}, @@ -74,16 +74,14 @@ function update_values() { borderWidth: 0 }, }; - var total_proc = $.plot("#global", [ getData("global") ], options_processed_pastes); function update_processed_pastes(graph, dataset) { graph.setData([getData(dataset)]); - graph.getOptions().yaxes[0].max = curr_max; + graph.getOptions().yaxes[0].max = curr_max[dataset]; graph.setupGrid(); graph.draw(); setTimeout(function(){ update_processed_pastes(graph, dataset); }, updateInterval); } - update_processed_pastes(total_proc, "global"); // END PROCESSED PASTES @@ -136,6 +134,8 @@ function create_log_table(obj_json) { var msg_type = parsedmess[4].split(" ")[2]; if (feeder == "All_feeders"){ + var total_proc = $.plot("#global", [ getData("global") ], options_processed_pastes); + update_processed_pastes(total_proc, "global"); window.paste_num_tabvar_all["global"] = paste_processed; time_since_last_pastes_num["global"] = new Date().getTime(); } else { From a18c046deac8863d159482e005f977973b04acef Mon Sep 17 00:00:00 2001 From: Mokaddem Date: Fri, 23 Dec 2016 16:15:05 +0100 Subject: [PATCH 4/9] Fixed bug multiple refresh instances and harmonized interface. --- var/www/static/js/indexjavascript.js | 9 ++++++--- var/www/templates/index.html | 8 ++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index 5e5d477d..1f727f9b 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -1,6 +1,6 @@ var time_since_last_pastes_num = {}; var data_for_processed_paste = { "global": [] }; -var list_feeder = ["global"]; +var list_feeder = []; var htmltext_graph_container = "
"; window.paste_num_tabvar_all = {}; @@ -134,8 +134,11 @@ function create_log_table(obj_json) { var msg_type = parsedmess[4].split(" ")[2]; if (feeder == "All_feeders"){ - var total_proc = $.plot("#global", [ getData("global") ], options_processed_pastes); - update_processed_pastes(total_proc, "global"); + if(list_feeder.indexOf("global") == -1) { + list_feeder.push("global"); + var total_proc = $.plot("#global", [ getData("global") ], options_processed_pastes); + update_processed_pastes(total_proc, "global"); + } window.paste_num_tabvar_all["global"] = paste_processed; time_since_last_pastes_num["global"] = new Date().getTime(); } else { diff --git a/var/www/templates/index.html b/var/www/templates/index.html index 278bc873..90d7134b 100644 --- a/var/www/templates/index.html +++ b/var/www/templates/index.html @@ -91,12 +91,12 @@
-
+
- Feeder(s) Monitor: Processed pastes and filtered duplicated + Feeder(s) Monitor: Processed pastes and filtered duplicated
-
+
@@ -105,7 +105,7 @@
-
+
Queues Monitor From 1c2169d3bcc1687851f0dd0bbc8e1ac0436e2d10 Mon Sep 17 00:00:00 2001 From: Mokaddem Date: Mon, 26 Dec 2016 16:16:44 +0100 Subject: [PATCH 5/9] Improved interface for multiple feeds and refresh_script. --- bin/Mixer.py | 1 + var/www/static/js/indexjavascript.js | 108 ++++++++++++++++++--------- var/www/templates/index.html | 5 +- 3 files changed, 78 insertions(+), 36 deletions(-) diff --git a/bin/Mixer.py b/bin/Mixer.py index 558cf4ec..ddfb1399 100755 --- a/bin/Mixer.py +++ b/bin/Mixer.py @@ -150,6 +150,7 @@ if __name__ == '__main__': else: print "Empty Queues: Waiting..." if int(time.time() - time_1) > refresh_time: + print processed_paste_per_feeder to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) print to_print publisher.info(to_print) diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index 1f727f9b..b85f62b0 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -1,19 +1,25 @@ var time_since_last_pastes_num = {}; -var data_for_processed_paste = { "global": [] }; +var data_for_processed_paste = { }; var list_feeder = []; -var htmltext_graph_container = "
"; window.paste_num_tabvar_all = {}; -//If we do not received info from global, set pastes_num to 0 +//If we do not received info from mixer, set pastes_num to 0 function checkIfReceivedData(){ for (i in list_feeder) { - if ((new Date().getTime() - time_since_last_pastes_num[list_feeder[i]]) > 45*1000) - window.paste_num_tabvar_all[list_feeder[i]] = 0; - setTimeout(checkIfReceivedData, 45*1000); + if(list_feeder[i] == "global"){ + if ((new Date().getTime() - time_since_last_pastes_num[list_feeder[i]]) > 35*1000){ + window.paste_num_tabvar_all[list_feeder[i]] = 0; + } + } else { + if ((new Date().getTime() - time_since_last_pastes_num["Proc"+list_feeder[i]]) > 35*1000){ + window.paste_num_tabvar_all["Proc"+list_feeder[i]] = 0; + window.paste_num_tabvar_all["Dup"+list_feeder[i]] = 0; + } + } } + setTimeout(checkIfReceivedData, 35*1000); } -setTimeout(checkIfReceivedData, 45*1000); function initfunc( csvay, scroot) { window.csv = csvay; @@ -32,24 +38,17 @@ function update_values() { // Plot and update the number of processed pastes // BEGIN PROCESSED PASTES var default_minute = (typeof window.default_minute !== "undefined") ? parseInt(window.default_minute) : 10; - var totalPoints = 60*parseInt(default_minute); //60s*minute + var totalPoints = 2*parseInt(default_minute); //60s*minute var curr_max = {"global": 0}; - - function getData(dataset) { - var curr_data; - if (data_for_processed_paste[dataset] === undefined) { // create feeder dataset if not exists yet - data_for_processed_paste[dataset] = []; - } - curr_data = data_for_processed_paste[dataset]; + function fetch_data(dataset, curr_data, feeder_name) { if (curr_data.length > 0){ var data_old = curr_data[0]; - curr_data = curr_data.slice(10); + curr_data = curr_data.slice(1); curr_max[dataset] = curr_max[dataset] == data_old ? Math.max.apply(null, curr_data) : curr_max[dataset]; } while (curr_data.length < totalPoints) { - //var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0; var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0; curr_max[dataset] = y > curr_max[dataset] ? y : curr_max[dataset]; curr_data.push(y); @@ -60,27 +59,61 @@ function update_values() { res.push([i, curr_data[i]]) } data_for_processed_paste[dataset] = curr_data; - return res; + return { label: feeder_name, data: res }; + } + + function getData(dataset_group, graph_type) { + var curr_data; + + var all_res = []; + if (dataset_group == "global") { + if (data_for_processed_paste["global"] === undefined) { // create feeder dataset if not exists yet + data_for_processed_paste["global"] = []; + } + curr_data = data_for_processed_paste["global"]; + all_res.push(fetch_data("global", curr_data, "global")); + } else { + + for(d_i in list_feeder) { + if(list_feeder[d_i] == "global") { + continue; + } + + dataset = graph_type+list_feeder[d_i]; + if (data_for_processed_paste[dataset] === undefined) { // create feeder dataset if not exists yet + data_for_processed_paste[dataset] = []; + } + curr_data = data_for_processed_paste[dataset]; + all_res.push(fetch_data(dataset, curr_data, list_feeder[d_i])); + } + + } + return all_res; } var updateInterval = 10000; var options_processed_pastes = { - series: { shadowSize: 1 }, - lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }}, + series: { shadowSize: 0 , + lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }} + }, yaxis: { min: 0, max: 40 }, - colors: ["#a971ff"], + xaxis: { ticks: [[0, 0], [2, 1], [4, 2], [6, 3], [8, 4], [10, 5], [12, 6], [14, 7], [16, 8], [18, 9], [20, 10]] }, grid: { tickColor: "#dddddd", borderWidth: 0 }, + legend: { + show: true, + position: "nw", + } }; - function update_processed_pastes(graph, dataset) { - graph.setData([getData(dataset)]); + function update_processed_pastes(graph, dataset, graph_type) { + graph.setData(getData(dataset, graph_type)); graph.getOptions().yaxes[0].max = curr_max[dataset]; graph.setupGrid(); graph.draw(); - setTimeout(function(){ update_processed_pastes(graph, dataset); }, updateInterval); + setTimeout(function(){ update_processed_pastes(graph, dataset, graph_type); }, updateInterval); } @@ -136,8 +169,20 @@ function create_log_table(obj_json) { if (feeder == "All_feeders"){ if(list_feeder.indexOf("global") == -1) { list_feeder.push("global"); - var total_proc = $.plot("#global", [ getData("global") ], options_processed_pastes); + + options_processed_pastes.legend.show = false; + var total_proc = $.plot("#global", [ getData("global", null) ], options_processed_pastes); + options_processed_pastes.legend.show = true; + options_processed_pastes.series.lines = { show: true }; + data_for_processed_paste["global"] = Array(totalPoints+1).join(0).split(''); + + var feederProc = $.plot("#Proc_feeder", [ getData(feeder, "Proc") ], options_processed_pastes); + var feederDup = $.plot("#Dup_feeder", [ getData(feeder, "Dup") ], options_processed_pastes); + + update_processed_pastes(feederProc, "feeder", "Proc"); + update_processed_pastes(feederDup, "feeder", "Dup"); update_processed_pastes(total_proc, "global"); + setTimeout(checkIfReceivedData, 45*1000); } window.paste_num_tabvar_all["global"] = paste_processed; time_since_last_pastes_num["global"] = new Date().getTime(); @@ -145,18 +190,11 @@ function create_log_table(obj_json) { if (list_feeder.indexOf(feeder) == -1) { list_feeder.push(feeder); - //ADD HTML CONTAINER + PLOT THE GRAPH, ADD IT TO A LIST CONTAING THE PLOTED GRAPH - $("#panelbody").append(""+feeder+""); - $("#panelbody").append("
" + htmltext_graph_container.replace("$1", feeder+"Proc") + htmltext_graph_container.replace("$1", feeder+"Dup")+"
"); - var new_feederProc = $.plot("#"+feeder+"Proc", [ getData(feeder+"Proc") ], options_processed_pastes); - options_processed_pastes.colors = ["#edc240"]; - var new_feederDup = $.plot("#"+feeder+"Dup", [ getData(feeder+"Dup") ], options_processed_pastes); - options_processed_pastes.colors = ["#a971ff"]; - update_processed_pastes(new_feederProc, feeder+"Proc"); - update_processed_pastes(new_feederDup, feeder+"Dup"); + data_for_processed_paste["Proc"+feeder] = Array(totalPoints+1).join(0).split(''); + data_for_processed_paste["Dup"+feeder] = Array(totalPoints+1).join(0).split(''); } - var feederName = msg_type == "Duplicated" ? feeder+"Dup" : feeder+"Proc"; + var feederName = msg_type == "Duplicated" ? "Dup"+feeder : "Proc"+feeder; window.paste_num_tabvar_all[feederName] = paste_processed; time_since_last_pastes_num[feederName] = new Date().getTime(); } diff --git a/var/www/templates/index.html b/var/www/templates/index.html index 90d7134b..a943369e 100644 --- a/var/www/templates/index.html +++ b/var/www/templates/index.html @@ -17,6 +17,7 @@ +