From 850b686e76439e838212667ed6d01d090b64f259 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Thu, 30 Nov 2017 08:17:53 +0100 Subject: [PATCH 01/11] Added draft of redis buffer --- config/config.cfg.default | 3 + zmq_dispatcher.py | 276 ++++++++++++++++++++++++++++++++++++++ zmq_subscriber.py | 227 +------------------------------ 3 files changed, 286 insertions(+), 220 deletions(-) create mode 100755 zmq_dispatcher.py diff --git a/config/config.cfg.default b/config/config.cfg.default index eabe996..cda6938 100644 --- a/config/config.cfg.default +++ b/config/config.cfg.default @@ -37,6 +37,9 @@ misp_web_url = http://localhost #zmq_url=tcp://192.168.56.50:50000 zmq_url=tcp://localhost:50000 +[RedisLIST] +db=3 + [RedisLog] db=0 channel=1 diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py new file mode 100755 index 0000000..a4699c5 --- /dev/null +++ b/zmq_dispatcher.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3.5 + +import time, datetime +import copy +from pprint import pprint +import zmq +import redis +import random +import configparser +import argparse +import os +import sys +import json + +import util +import geo_helper +import contributor_helper +import users_helper +import trendings_helper + +configfile = os.path.join(os.environ['DASH_CONFIG'], 'config.cfg') +cfg = configparser.ConfigParser() +cfg.read(configfile) + +ZMQ_URL = cfg.get('RedisGlobal', 'zmq_url') +CHANNEL = cfg.get('RedisLog', 'channel') +LISTNAME = cfg.get('RedisLIST', 'listName') + +serv_log = redis.StrictRedis( + host=cfg.get('RedisGlobal', 'host'), + port=cfg.getint('RedisGlobal', 'port'), + db=cfg.getint('RedisLog', 'db')) +serv_redis_db = redis.StrictRedis( + host=cfg.get('RedisGlobal', 'host'), + port=cfg.getint('RedisGlobal', 'port'), + db=cfg.getint('RedisDB', 'db')) +serv_list = redis.StrictRedis( + host=cfg.get('RedisGlobal', 'host'), + port=cfg.getint('RedisGlobal', 'port'), + db=cfg.getint('RedisLIST', 'db')) + +geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) +contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) +users_helper = users_helper.Users_helper(serv_redis_db, cfg) +trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) + + +def publish_log(zmq_name, name, content, channel=CHANNEL): + to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name } + serv_log.publish(channel, json.dumps(to_send)) + +def getFields(obj, fields): + jsonWalker = fields.split('.') + itemToExplore = obj + lastName = "" + try: + for i in jsonWalker: + itemToExplore = itemToExplore[i] + lastName = i + if type(itemToExplore) is list: + return { 'name': lastName , 'data': itemToExplore } + else: + return itemToExplore + except KeyError as e: + return "" + +def noSpaceLower(text): + return text.lower().replace(' ', '_') + +############## +## HANDLERS ## +############## + +def handler_log(zmq_name, jsonevent): + print('sending', 'log') + return + +def handler_dispatcher(zmq_name, jsonObj): + if "Event" in jsonObj: + handler_event(zmq_name, jsonObj) + +def handler_keepalive(zmq_name, jsonevent): + print('sending', 'keepalive') + to_push = [ jsonevent['uptime'] ] + publish_log(zmq_name, 'Keepalive', to_push) + +def handler_user(zmq_name, jsondata): + action = jsondata['action'] + json_user = jsondata['User'] + json_org = jsondata['Organisation'] + org = json_org['name'] + if action == 'login': #only consider user login + timestamp = int(time.time()) + users_helper.add_user_login(timestamp, org) + else: + pass + +def handler_conversation(zmq_name, jsonevent): + try: #only consider POST, not THREAD + jsonpost = jsonevent['Post'] + except KeyError: + return + print('sending' ,'Post') + org = jsonpost['org_name'] + categ = None + action = 'add' + eventName = 'no name or id yet...' + contributor_helper.handleContribution(zmq_name, org, + 'Discussion', + None, + action, + isLabeled=False) + # add Discussion + nowSec = int(time.time()) + trendings_helper.addTrendingDisc(eventName, nowSec) + +def handler_object(zmq_name, jsondata): + print('obj') + return + +def handler_sighting(zmq_name, jsondata): + print('sending' ,'sighting') + jsonsight = jsondata['Sighting'] + org = jsonsight['Event']['Orgc']['name'] + categ = jsonsight['Attribute']['category'] + action = jsondata.get('action', None) + contributor_helper.handleContribution(zmq_name, org, 'Sighting', categ, action, pntMultiplier=2) + handler_attribute(zmq_name, jsonsight, hasAlreadyBeenContributed=True) + + timestamp = jsonsight.get('date_sighting', None) + + if jsonsight['type'] == "0": # sightings + trendings_helper.addSightings(timestamp) + elif jsonsight['type'] == "1": # false positive + trendings_helper.addFalsePositive(timestamp) + +def handler_event(zmq_name, jsonobj): + #fields: threat_level_id, id, info + jsonevent = jsonobj['Event'] + + #Add trending + eventName = jsonevent['info'] + timestamp = jsonevent['timestamp'] + trendings_helper.addTrendingEvent(eventName, timestamp) + tags = [] + for tag in jsonobj.get('EventTag', []): + try: + tags.append(tag['Tag']) + except KeyError: + pass + trendings_helper.addTrendingTags(tags, timestamp) + + #redirect to handler_attribute + if 'Attribute' in jsonevent: + attributes = jsonevent['Attribute'] + if type(attributes) is list: + for attr in attributes: + jsoncopy = copy.deepcopy(jsonobj) + jsoncopy['Attribute'] = attr + handler_attribute(zmq_name, jsoncopy) + else: + handler_attribute(zmq_name, attributes) + + action = jsonobj.get('action', None) + eventLabeled = len(jsonobj.get('EventTag', [])) > 0 + org = jsonobj.get('Orgc', {}).get('name', None) + + if org is not None: + contributor_helper.handleContribution(zmq_name, org, + 'Event', + None, + action, + isLabeled=eventLabeled) + +def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False): + # check if jsonattr is an attribute object + if 'Attribute' in jsonobj: + jsonattr = jsonobj['Attribute'] + + #Add trending + categName = jsonattr['category'] + timestamp = jsonattr.get('timestamp', int(time.time())) + trendings_helper.addTrendingCateg(categName, timestamp) + tags = [] + for tag in jsonattr.get('Tag', []): + try: + tags.append(tag['Tag']) + except KeyError: + pass + trendings_helper.addTrendingTags(tags, timestamp) + + to_push = [] + for field in json.loads(cfg.get('Log', 'fieldname_order')): + if type(field) is list: + to_join = [] + for subField in field: + to_join.append(getFields(jsonobj, subField)) + to_add = cfg.get('Log', 'char_separator').join(to_join) + else: + to_add = getFields(jsonobj, field) + to_push.append(to_add) + + #try to get coord from ip + if jsonattr['category'] == "Network activity": + geo_helper.getCoordFromIpAndPublish(jsonattr['value'], jsonattr['category']) + + #try to get coord from ip + if jsonattr['type'] == "phone-number": + geo_helper.getCoordFromPhoneAndPublish(jsonattr['value'], jsonattr['category']) + + if not hasAlreadyBeenContributed: + eventLabeled = len(jsonobj.get('EventTag', [])) > 0 + action = jsonobj.get('action', None) + contributor_helper.handleContribution(zmq_name, jsonobj['Event']['Orgc']['name'], + 'Attribute', + jsonattr['category'], + action, + isLabeled=eventLabeled) + # Push to log + publish_log(zmq_name, 'Attribute', to_push) + + +############### +## MAIN LOOP ## +############### + +def process_log(zmq_name, event): + event = event.decode('utf8') + topic, eventdata = event.split(' ', maxsplit=1) + jsonevent = json.loads(eventdata) + print(event) + try: + dico_action[topic](zmq_name, jsonevent) + except KeyError as e: + print(e) + + +def main(zmqName): + context = zmq.Context() + socket = context.socket(zmq.SUB) + socket.connect(ZMQ_URL) + socket.setsockopt_string(zmq.SUBSCRIBE, '') + + while True: + try: + content = serv_list.rpop(LISTNAME) + the_json = json.loads(content) + zmqName - the_json['zmq_name'] + content = the_json['content'] + process_log(zmqName, content) + except KeyboardInterrupt: + return + + +dico_action = { + "misp_json": handler_dispatcher, + "misp_json_event": handler_event, + "misp_json_self": handler_keepalive, + "misp_json_attribute": handler_attribute, + "misp_json_object": handler_object, + "misp_json_sighting": handler_sighting, + "misp_json_organisation": handler_log, + "misp_json_user": handler_user, + "misp_json_conversation": handler_conversation, + "misp_json_object_reference": handler_log, + } + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard') + parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ") + parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL) + args = parser.parse_args() + + main(args.zmqname) diff --git a/zmq_subscriber.py b/zmq_subscriber.py index 598eec7..ee7922c 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -1,234 +1,36 @@ #!/usr/bin/env python3.5 import time, datetime -import copy from pprint import pprint import zmq import redis -import random import configparser import argparse import os import sys import json -import util -import geo_helper -import contributor_helper -import users_helper -import trendings_helper - configfile = os.path.join(os.environ['DASH_CONFIG'], 'config.cfg') cfg = configparser.ConfigParser() cfg.read(configfile) ZMQ_URL = cfg.get('RedisGlobal', 'zmq_url') CHANNEL = cfg.get('RedisLog', 'channel') +LISTNAME = cfg.get('RedisLIST', 'listName') -serv_log = redis.StrictRedis( +serv_list = redis.StrictRedis( host=cfg.get('RedisGlobal', 'host'), port=cfg.getint('RedisGlobal', 'port'), - db=cfg.getint('RedisLog', 'db')) -serv_redis_db = redis.StrictRedis( - host=cfg.get('RedisGlobal', 'host'), - port=cfg.getint('RedisGlobal', 'port'), - db=cfg.getint('RedisDB', 'db')) - -geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) -contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) -users_helper = users_helper.Users_helper(serv_redis_db, cfg) -trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) - - -def publish_log(zmq_name, name, content, channel=CHANNEL): - to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name } - serv_log.publish(channel, json.dumps(to_send)) - -def getFields(obj, fields): - jsonWalker = fields.split('.') - itemToExplore = obj - lastName = "" - try: - for i in jsonWalker: - itemToExplore = itemToExplore[i] - lastName = i - if type(itemToExplore) is list: - return { 'name': lastName , 'data': itemToExplore } - else: - return itemToExplore - except KeyError as e: - return "" - -def noSpaceLower(text): - return text.lower().replace(' ', '_') - -############## -## HANDLERS ## -############## - -def handler_log(zmq_name, jsonevent): - print('sending', 'log') - return - -def handler_dispatcher(zmq_name, jsonObj): - if "Event" in jsonObj: - handler_event(zmq_name, jsonObj) - -def handler_keepalive(zmq_name, jsonevent): - print('sending', 'keepalive') - to_push = [ jsonevent['uptime'] ] - publish_log(zmq_name, 'Keepalive', to_push) - -def handler_user(zmq_name, jsondata): - action = jsondata['action'] - json_user = jsondata['User'] - json_org = jsondata['Organisation'] - org = json_org['name'] - if action == 'login': #only consider user login - timestamp = int(time.time()) - users_helper.add_user_login(timestamp, org) - else: - pass - -def handler_conversation(zmq_name, jsonevent): - try: #only consider POST, not THREAD - jsonpost = jsonevent['Post'] - except KeyError: - return - print('sending' ,'Post') - org = jsonpost['org_name'] - categ = None - action = 'add' - eventName = 'no name or id yet...' - contributor_helper.handleContribution(zmq_name, org, - 'Discussion', - None, - action, - isLabeled=False) - # add Discussion - nowSec = int(time.time()) - trendings_helper.addTrendingDisc(eventName, nowSec) - -def handler_object(zmq_name, jsondata): - print('obj') - return - -def handler_sighting(zmq_name, jsondata): - print('sending' ,'sighting') - jsonsight = jsondata['Sighting'] - org = jsonsight['Event']['Orgc']['name'] - categ = jsonsight['Attribute']['category'] - action = jsondata.get('action', None) - contributor_helper.handleContribution(zmq_name, org, 'Sighting', categ, action, pntMultiplier=2) - handler_attribute(zmq_name, jsonsight, hasAlreadyBeenContributed=True) - - timestamp = jsonsight.get('date_sighting', None) - - if jsonsight['type'] == "0": # sightings - trendings_helper.addSightings(timestamp) - elif jsonsight['type'] == "1": # false positive - trendings_helper.addFalsePositive(timestamp) - -def handler_event(zmq_name, jsonobj): - #fields: threat_level_id, id, info - jsonevent = jsonobj['Event'] - - #Add trending - eventName = jsonevent['info'] - timestamp = jsonevent['timestamp'] - trendings_helper.addTrendingEvent(eventName, timestamp) - tags = [] - for tag in jsonobj.get('EventTag', []): - try: - tags.append(tag['Tag']) - except KeyError: - pass - trendings_helper.addTrendingTags(tags, timestamp) - - #redirect to handler_attribute - if 'Attribute' in jsonevent: - attributes = jsonevent['Attribute'] - if type(attributes) is list: - for attr in attributes: - jsoncopy = copy.deepcopy(jsonobj) - jsoncopy['Attribute'] = attr - handler_attribute(zmq_name, jsoncopy) - else: - handler_attribute(zmq_name, attributes) - - action = jsonobj.get('action', None) - eventLabeled = len(jsonobj.get('EventTag', [])) > 0 - org = jsonobj.get('Orgc', {}).get('name', None) - - if org is not None: - contributor_helper.handleContribution(zmq_name, org, - 'Event', - None, - action, - isLabeled=eventLabeled) - -def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False): - # check if jsonattr is an attribute object - if 'Attribute' in jsonobj: - jsonattr = jsonobj['Attribute'] - - #Add trending - categName = jsonattr['category'] - timestamp = jsonattr.get('timestamp', int(time.time())) - trendings_helper.addTrendingCateg(categName, timestamp) - tags = [] - for tag in jsonattr.get('Tag', []): - try: - tags.append(tag['Tag']) - except KeyError: - pass - trendings_helper.addTrendingTags(tags, timestamp) - - to_push = [] - for field in json.loads(cfg.get('Log', 'fieldname_order')): - if type(field) is list: - to_join = [] - for subField in field: - to_join.append(getFields(jsonobj, subField)) - to_add = cfg.get('Log', 'char_separator').join(to_join) - else: - to_add = getFields(jsonobj, field) - to_push.append(to_add) - - #try to get coord from ip - if jsonattr['category'] == "Network activity": - geo_helper.getCoordFromIpAndPublish(jsonattr['value'], jsonattr['category']) - - #try to get coord from ip - if jsonattr['type'] == "phone-number": - geo_helper.getCoordFromPhoneAndPublish(jsonattr['value'], jsonattr['category']) - - if not hasAlreadyBeenContributed: - eventLabeled = len(jsonobj.get('EventTag', [])) > 0 - action = jsonobj.get('action', None) - contributor_helper.handleContribution(zmq_name, jsonobj['Event']['Orgc']['name'], - 'Attribute', - jsonattr['category'], - action, - isLabeled=eventLabeled) - # Push to log - publish_log(zmq_name, 'Attribute', to_push) + db=cfg.getint('RedisLIST', 'db')) ############### ## MAIN LOOP ## ############### -def process_log(zmq_name, event): - event = event.decode('utf8') - topic, eventdata = event.split(' ', maxsplit=1) - jsonevent = json.loads(eventdata) - print(event) - try: - dico_action[topic](zmq_name, jsonevent) - except KeyError as e: - print(e) - +def put_in_redis_list(zmq_name, content): + to_add = {'zmq_name': zmq_name, 'content': content} + serv_list.lpush(LISTNAME, json.dumps(content)) def main(zmqName): context = zmq.Context() @@ -240,26 +42,11 @@ def main(zmqName): try: content = socket.recv() content.replace(b'\n', b'') # remove \n... - zmq_name = zmqName - process_log(zmq_name, content) + put_in_redis_list(zmqName, content) except KeyboardInterrupt: return -dico_action = { - "misp_json": handler_dispatcher, - "misp_json_event": handler_event, - "misp_json_self": handler_keepalive, - "misp_json_attribute": handler_attribute, - "misp_json_object": handler_object, - "misp_json_sighting": handler_sighting, - "misp_json_organisation": handler_log, - "misp_json_user": handler_user, - "misp_json_conversation": handler_conversation, - "misp_json_object_reference": handler_log, - } - - if __name__ == "__main__": parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard') From 868ba9f80b1c595c0822edd4f1270cf3a4c610e6 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Thu, 30 Nov 2017 16:04:03 +0100 Subject: [PATCH 02/11] fix: redis buffer seems to work --- users_helper.py | 6 +++--- zmq_dispatcher.py | 31 ++++++++++++++++++------------- zmq_subscriber.py | 3 ++- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/users_helper.py b/users_helper.py index 80ad79f..ec01615 100644 --- a/users_helper.py +++ b/users_helper.py @@ -73,7 +73,7 @@ class Users_helper: for curDate in util.getXPrevDaysSpan(date, prev_days): log = self.serv_redis_db.zscore(keyname_log.format(self.keyOrgLog, util.getDateStrFormat(curDate)), org) log = 0 if log is None else 1 - contrib = self.serv_redis_db.zscore(keyname_contrib.format(keyContribDay, util.getDateStrFormat(curDate)), org) + contrib = self.serv_redis_db.zscore(keyname_contrib.format(self.keyContribDay, util.getDateStrFormat(curDate)), org) contrib = 0 if contrib is None else 1 data.append([log, contrib]) return data @@ -100,7 +100,7 @@ class Users_helper: def getLoginVSCOntribution(self, date): - keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(date)) + keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(date)) orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False) orgs_contri = [ org.decode('utf8') for org in orgs_contri ] orgs_login = [ org[0] for org in self.getOrgslogin(date, topNum=0) ] @@ -152,7 +152,7 @@ class Users_helper: for curDate in util.getXPrevDaysSpan(date, prev_days): timestamps = self.getUserLogins(curDate) - keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(curDate)) + keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(curDate)) orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False) orgs_contri_num = len(orgs_contri) diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py index a4699c5..48a6f53 100755 --- a/zmq_dispatcher.py +++ b/zmq_dispatcher.py @@ -85,6 +85,7 @@ def handler_keepalive(zmq_name, jsonevent): publish_log(zmq_name, 'Keepalive', to_push) def handler_user(zmq_name, jsondata): + print('sending', 'user') action = jsondata['action'] json_user = jsondata['User'] json_org = jsondata['Organisation'] @@ -225,31 +226,34 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False): ############### def process_log(zmq_name, event): - event = event.decode('utf8') topic, eventdata = event.split(' ', maxsplit=1) jsonevent = json.loads(eventdata) - print(event) try: dico_action[topic](zmq_name, jsonevent) except KeyError as e: print(e) -def main(zmqName): +def main(zmqName, zmqurl, sleeptime): context = zmq.Context() socket = context.socket(zmq.SUB) - socket.connect(ZMQ_URL) + socket.connect(zmqurl) socket.setsockopt_string(zmq.SUBSCRIBE, '') + numMsg = 0 while True: - try: - content = serv_list.rpop(LISTNAME) - the_json = json.loads(content) - zmqName - the_json['zmq_name'] - content = the_json['content'] - process_log(zmqName, content) - except KeyboardInterrupt: - return + content = serv_list.rpop(LISTNAME) + if content is None: + print('Processed', numMsg, 'message(s) since last sleep.') + numMsg = 0 + time.sleep(sleeptime) + continue + content = content.decode('utf8') + the_json = json.loads(content) + zmqName = the_json['zmq_name'] + content = the_json['content'] + process_log(zmqName, content) + numMsg += 1 dico_action = { @@ -271,6 +275,7 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard') parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ") parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL) + parser.add_argument('-s', '--sleep', required=False, dest='sleeptime', type=int, help='The number of second to wait before checking redis list size', default=5) args = parser.parse_args() - main(args.zmqname) + main(args.zmqname, args.zmqurl, args.sleeptime) diff --git a/zmq_subscriber.py b/zmq_subscriber.py index ee7922c..bb44f84 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -29,8 +29,9 @@ serv_list = redis.StrictRedis( ############### def put_in_redis_list(zmq_name, content): + content = content.decode('utf8') to_add = {'zmq_name': zmq_name, 'content': content} - serv_list.lpush(LISTNAME, json.dumps(content)) + serv_list.lpush(LISTNAME, json.dumps(to_add)) def main(zmqName): context = zmq.Context() From 17315f6c74f3eba8505ef219f2334f32f648a67e Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Fri, 1 Dec 2017 15:09:26 +0100 Subject: [PATCH 03/11] fix: Added keyname in configuration --- config/config.cfg.default | 1 + 1 file changed, 1 insertion(+) diff --git a/config/config.cfg.default b/config/config.cfg.default index cda6938..6b3b46b 100644 --- a/config/config.cfg.default +++ b/config/config.cfg.default @@ -39,6 +39,7 @@ zmq_url=tcp://localhost:50000 [RedisLIST] db=3 +listName=bufferList [RedisLog] db=0 From 849d8cc8eb5861d0f634c65fe7d8f688ea8d000c Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Fri, 1 Dec 2017 15:39:17 +0100 Subject: [PATCH 04/11] fix: Fixed tons of bugs related to migration of handle_contribution to controbutor_helper --- contributor_helper.py | 36 ++++++++++++++++++++++++------------ users_helper.py | 3 +-- util.py | 5 ++++- zmq_dispatcher.py | 3 --- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/contributor_helper.py b/contributor_helper.py index 0f47d43..2b0cd24 100644 --- a/contributor_helper.py +++ b/contributor_helper.py @@ -1,18 +1,30 @@ import util from util import getZrange import math, random +import time import os import configparser import json import datetime +import redis + +import util +import users_helper +KEYDAY = "CONTRIB_DAY" # To be used by other module class Contributor_helper: def __init__(self, serv_redis_db, cfg): self.serv_redis_db = serv_redis_db + self.serv_log = redis.StrictRedis( + host=cfg.get('RedisGlobal', 'host'), + port=cfg.getint('RedisGlobal', 'port'), + db=cfg.getint('RedisLog', 'db')) self.cfg = cfg self.cfg_org_rank = configparser.ConfigParser() self.cfg_org_rank.read(os.path.join(os.environ['DASH_CONFIG'], 'ranking.cfg')) self.CHANNEL_LASTAWARDS = cfg.get('RedisLog', 'channelLastAwards') + self.CHANNEL_LASTCONTRIB = cfg.get('RedisLog', 'channelLastContributor') + self.users_helper = users_helper.Users_helper(serv_redis_db, cfg) #honorBadge self.honorBadgeNum = len(self.cfg_org_rank.options('HonorBadge')) @@ -66,7 +78,7 @@ class Contributor_helper: self.levelMax = self.cfg_org_rank.getfloat('monthlyRanking' ,'levelMax') # REDIS KEYS - self.keyDay = "CONTRIB_DAY" + self.keyDay = KEYDAY self.keyCateg = "CONTRIB_CATEG" self.keyLastContrib = "CONTRIB_LAST" self.keyAllOrg = "CONTRIB_ALL_ORG" @@ -86,7 +98,7 @@ class Contributor_helper: def publish_log(self, zmq_name, name, content, channel=""): to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name } - serv_log.publish(channel, json.dumps(to_send)) + self.serv_log.publish(channel, json.dumps(to_send)) ''' HANDLER ''' #pntMultiplier if one contribution rewards more than others. (e.g. shighting may gives more points than editing) @@ -100,36 +112,36 @@ class Contributor_helper: pnts_to_add = self.default_pnts_per_contribution # if there is a contribution, there is a login (even if ti comes from the API) - users_helper.add_user_login(nowSec, org) + self.users_helper.add_user_login(nowSec, org) # is a valid contribution if categ is not None: try: - pnts_to_add = self.DICO_PNTS_REWARD[noSpaceLower(categ)] + pnts_to_add = self.DICO_PNTS_REWARD[util.noSpaceLower(categ)] except KeyError: pnts_to_add = self.default_pnts_per_contribution pnts_to_add *= pntMultiplier util.push_to_redis_zset(self.serv_redis_db, self.keyDay, org, count=pnts_to_add) #CONTRIB_CATEG retain the contribution per category, not the point earned in this categ - util.push_to_redis_zset(self.serv_redis_db, self.keyCateg, org, count=1, endSubkey=':'+noSpaceLower(categ)) - self.publish_log(zmq_name, 'CONTRIBUTION', {'org': org, 'categ': categ, 'action': action, 'epoch': nowSec }, channel=CHANNEL_LASTCONTRIB) + util.push_to_redis_zset(self.serv_redis_db, self.keyCateg, org, count=1, endSubkey=':'+util.noSpaceLower(categ)) + self.publish_log(zmq_name, 'CONTRIBUTION', {'org': org, 'categ': categ, 'action': action, 'epoch': nowSec }, channel=self.CHANNEL_LASTCONTRIB) else: categ = "" - serv_redis_db.sadd(self.keyAllOrg, org) + self.serv_redis_db.sadd(self.keyAllOrg, org) keyname = "{}:{}".format(self.keyLastContrib, util.getDateStrFormat(now)) - serv_redis_db.zadd(keyname, nowSec, org) - serv_redis_db.expire(keyname, ONE_DAY*7) #expire after 7 day + self.serv_redis_db.zadd(keyname, nowSec, org) + self.serv_redis_db.expire(keyname, util.ONE_DAY*7) #expire after 7 day - awards_given = self.updateOrgContributionRank(org, pnts_to_add, action, contribType, eventTime=datetime.datetime.now(), isLabeled=isLabeled, categ=noSpaceLower(categ)) + awards_given = self.updateOrgContributionRank(org, pnts_to_add, action, contribType, eventTime=datetime.datetime.now(), isLabeled=isLabeled, categ=util.noSpaceLower(categ)) for award in awards_given: # update awards given keyname = "{}:{}".format(self.keyLastAward, util.getDateStrFormat(now)) - serv_redis_db.zadd(keyname, nowSec, json.dumps({'org': org, 'award': award, 'epoch': nowSec })) - serv_redis_db.expire(keyname, ONE_DAY*7) #expire after 7 day + self.serv_redis_db.zadd(keyname, nowSec, json.dumps({'org': org, 'award': award, 'epoch': nowSec })) + self.serv_redis_db.expire(keyname, util.ONE_DAY*7) #expire after 7 day # publish self.publish_log(zmq_name, 'CONTRIBUTION', {'org': org, 'award': award, 'epoch': nowSec }, channel=self.CHANNEL_LASTAWARDS) diff --git a/users_helper.py b/users_helper.py index ec01615..12576a3 100644 --- a/users_helper.py +++ b/users_helper.py @@ -14,8 +14,7 @@ class Users_helper: self.keyTimestamp = "LOGIN_TIMESTAMP" self.keyTimestampSet = "LOGIN_TIMESTAMPSET" self.keyOrgLog = "LOGIN_ORG" - contrib_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) - self.keyContribDay = contrib_helper.keyDay # Key to get monthly contribution + self.keyContribDay = contributor_helper.KEYDAY # Key to get monthly contribution def addTemporary(self, org, timestamp): timestampDate = datetime.datetime.fromtimestamp(float(timestamp)) diff --git a/util.py b/util.py index 50364ba..e37b391 100644 --- a/util.py +++ b/util.py @@ -9,9 +9,12 @@ def getZrange(serv_redis_db, keyCateg, date, topNum, endSubkey=""): data = [ [record[0].decode('utf8'), record[1]] for record in data ] return data +def noSpaceLower(text): + return text.lower().replace(' ', '_') + def push_to_redis_zset(serv_redis_db, mainKey, toAdd, endSubkey="", count=1): now = datetime.datetime.now() - today_str = util.getDateStrFormat(now) + today_str = getDateStrFormat(now) keyname = "{}:{}{}".format(mainKey, today_str, endSubkey) serv_redis_db.zincrby(keyname, toAdd, count) diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py index 48a6f53..7a17842 100755 --- a/zmq_dispatcher.py +++ b/zmq_dispatcher.py @@ -64,9 +64,6 @@ def getFields(obj, fields): except KeyError as e: return "" -def noSpaceLower(text): - return text.lower().replace(' ', '_') - ############## ## HANDLERS ## ############## From b1bbb8efc31fecf82b24580c38a7d8b00f93840f Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Fri, 1 Dec 2017 15:49:20 +0100 Subject: [PATCH 05/11] fix: parsing into float instead of int --- contributor_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contributor_helper.py b/contributor_helper.py index 2b0cd24..0c9a510 100644 --- a/contributor_helper.py +++ b/contributor_helper.py @@ -75,7 +75,7 @@ class Contributor_helper: self.DICO_PNTS_REWARD[categ] = self.default_pnts_per_contribution self.rankMultiplier = self.cfg_org_rank.getfloat('monthlyRanking' ,'rankMultiplier') - self.levelMax = self.cfg_org_rank.getfloat('monthlyRanking' ,'levelMax') + self.levelMax = self.cfg_org_rank.getint('monthlyRanking' ,'levelMax') # REDIS KEYS self.keyDay = KEYDAY From e4d933a55b831cc8c887ddb71acc386584355403 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Fri, 1 Dec 2017 15:52:50 +0100 Subject: [PATCH 06/11] refacto: rmoved prints and added return in case phone number can't be parsed --- geo_helper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geo_helper.py b/geo_helper.py index ca1c83f..0a7f2da 100644 --- a/geo_helper.py +++ b/geo_helper.py @@ -109,14 +109,15 @@ class Geo_helper: def getCoordFromPhoneAndPublish(self, phoneNumber, categ): try: - print('function accessed') rep = phonenumbers.parse(phoneNumber, None) if not (phonenumbers.is_valid_number(rep) or phonenumbers.is_possible_number(rep)): print("Phone number not valid") + return country_name = geocoder.country_name_for_number(rep, "en") country_code = self.country_to_iso[country_name] if country_code is None: print("Non matching ISO_CODE") + return coord = self.country_code_to_coord[country_code.lower()] # countrycode is in upper case coord_dic = {'lat': coord['lat'], 'lon': coord['long']} @@ -139,7 +140,6 @@ class Geo_helper: "cityName": "", "regionCode": country_code, } - print(to_send) self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send)) except phonenumbers.NumberParseException: print("Can't resolve phone number country") From d8b7734b08496fec058797fb1a74f46cf98ad560 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Fri, 1 Dec 2017 16:02:28 +0100 Subject: [PATCH 07/11] ui: Restrained number of orgs in fame and prevented datatable to go over the visible screen --- contributor_helper.py | 4 ++-- server.py | 8 ++++---- static/js/contrib.js | 9 +++++---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/contributor_helper.py b/contributor_helper.py index 0c9a510..6924218 100644 --- a/contributor_helper.py +++ b/contributor_helper.py @@ -458,7 +458,7 @@ class Contributor_helper: dic['epoch'] = epoch return dic - def getTopContributorFromRedis(self, date): + def getTopContributorFromRedis(self, date, maxNum=100): orgDicoPnts = {} for curDate in util.getMonthSpan(date): topNum = 0 # all @@ -480,7 +480,7 @@ class Contributor_helper: data.append(dic) data.sort(key=lambda x: x['pnts'], reverse=True) - return data + return data[:maxNum] def getTop5OvertimeFromRedis(self): data = [] diff --git a/server.py b/server.py index 9611bcc..6866f60 100755 --- a/server.py +++ b/server.py @@ -312,7 +312,7 @@ def eventStreamAwards(): yield 'data: {}\n\n'.format(json.dumps(to_return)) @app.route("/_getTopContributor") -def getTopContributor(suppliedDate=None): +def getTopContributor(suppliedDate=None, maxNum=100): if suppliedDate is None: try: date = datetime.datetime.fromtimestamp(float(request.args.get('date'))) @@ -321,7 +321,7 @@ def getTopContributor(suppliedDate=None): else: date = suppliedDate - data = contributor_helper.getTopContributorFromRedis(date) + data = contributor_helper.getTopContributorFromRedis(date, maxNum=maxNum) return jsonify(data) @app.route("/_getFameContributor") @@ -332,7 +332,7 @@ def getFameContributor(): today = datetime.datetime.now() # get previous month date = (datetime.datetime(today.year, today.month, 1) - datetime.timedelta(days=1)) - return getTopContributor(suppliedDate=date) + return getTopContributor(suppliedDate=date, maxNum=10) @app.route("/_getFameQualContributor") def getFameQualContributor(): @@ -342,7 +342,7 @@ def getFameQualContributor(): today = datetime.datetime.now() # get previous month date = (datetime.datetime(today.year, today.month, 1) - datetime.timedelta(days=1)) - return getTopContributor(suppliedDate=date) + return getTopContributor(suppliedDate=date, maxNum=10) @app.route("/_getTop5Overtime") def getTop5Overtime(): diff --git a/static/js/contrib.js b/static/js/contrib.js index 2809911..531f897 100644 --- a/static/js/contrib.js +++ b/static/js/contrib.js @@ -79,8 +79,9 @@ optionDatatable_last.columnDefs = [ { className: "centerCellPicOrgLogo verticalAlign", "targets": [ 5 ] }, { className: "verticalAlign", "targets": [ 6 ] } ] -var optionDatatable_fame = jQuery.extend({}, optionDatatable_light) -optionDatatable_fame.scrollY = '45vh'; +var optionDatatable_fameQuant = jQuery.extend({}, optionDatatable_light) +var optionDatatable_fameQual = jQuery.extend({}, optionDatatable_light) +optionDatatable_fameQual.scrollY = '40vh'; var optionDatatable_Categ = { responsive: true, @@ -590,8 +591,8 @@ $(document).ready(function() { }); datatableTop = $('#topContribTable').DataTable(optionDatatable_top); - datatableFameQuant = $('#fameTableQuantity').DataTable(optionDatatable_fame); - datatableFameQual = $('#fameTableQuality').DataTable(optionDatatable_fame); + datatableFameQuant = $('#fameTableQuantity').DataTable(optionDatatable_fameQuant); + datatableFameQual = $('#fameTableQuality').DataTable(optionDatatable_fameQual); datatableCateg = $('#categTable').DataTable(optionDatatable_Categ); datatableLast = $('#lastTable').DataTable(optionDatatable_last); datatableAwards = $('#awardTable').DataTable(optionDatatable_awards); From 7532074417fd23e4e0ee173d7dfb08f4f1e0429d Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Mon, 4 Dec 2017 11:14:25 +0100 Subject: [PATCH 08/11] Refacto: removed useless args and vars --- zmq_dispatcher.py | 14 +++----------- zmq_subscriber.py | 2 +- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py index 7a17842..aee91ce 100755 --- a/zmq_dispatcher.py +++ b/zmq_dispatcher.py @@ -22,7 +22,6 @@ configfile = os.path.join(os.environ['DASH_CONFIG'], 'config.cfg') cfg = configparser.ConfigParser() cfg.read(configfile) -ZMQ_URL = cfg.get('RedisGlobal', 'zmq_url') CHANNEL = cfg.get('RedisLog', 'channel') LISTNAME = cfg.get('RedisLIST', 'listName') @@ -231,12 +230,7 @@ def process_log(zmq_name, event): print(e) -def main(zmqName, zmqurl, sleeptime): - context = zmq.Context() - socket = context.socket(zmq.SUB) - socket.connect(zmqurl) - socket.setsockopt_string(zmq.SUBSCRIBE, '') - +def main(sleeptime): numMsg = 0 while True: content = serv_list.rpop(LISTNAME) @@ -269,10 +263,8 @@ dico_action = { if __name__ == "__main__": - parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard') - parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ") - parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL) + parser = argparse.ArgumentParser(description='The ZMQ dispatcher. It pops from the redis buffer then redispatch it to the correct handlers') parser.add_argument('-s', '--sleep', required=False, dest='sleeptime', type=int, help='The number of second to wait before checking redis list size', default=5) args = parser.parse_args() - main(args.zmqname, args.zmqurl, args.sleeptime) + main(args.sleeptime) diff --git a/zmq_subscriber.py b/zmq_subscriber.py index bb44f84..e722012 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -50,7 +50,7 @@ def main(zmqName): if __name__ == "__main__": - parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard') + parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribes to a ZNQ then redispatch it to the misp-dashboard') parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ") parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL) args = parser.parse_args() From 43c4895d3be08548f4b9fa4d4c1ebb9e87423339 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Mon, 4 Dec 2017 11:49:30 +0100 Subject: [PATCH 09/11] UI: Reduced size of panel so that it does not overflow in the page. Changed PB offset from padding to top, reducing overing bug/flickering. Reduced badges size so that the modal fit in the page. --- static/css/ranking.css | 6 +++--- static/js/contrib.js | 2 +- templates/contrib.html | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/static/css/ranking.css b/static/css/ranking.css index df7d62d..f0116e6 100644 --- a/static/css/ranking.css +++ b/static/css/ranking.css @@ -17,10 +17,10 @@ } .circleBadgeSmall { - width: 73px; - height: 73px; + width: 57px; + height: 57px; text-align: center; - border-radius: 38px; + border-radius: 28px; background-color: #e1e1e1; border: 1px solid #caccce; vertical-align: middle; diff --git a/static/js/contrib.js b/static/js/contrib.js index 531f897..e0d3754 100644 --- a/static/js/contrib.js +++ b/static/js/contrib.js @@ -81,7 +81,7 @@ optionDatatable_last.columnDefs = [ ] var optionDatatable_fameQuant = jQuery.extend({}, optionDatatable_light) var optionDatatable_fameQual = jQuery.extend({}, optionDatatable_light) -optionDatatable_fameQual.scrollY = '40vh'; +optionDatatable_fameQual.scrollY = '39vh'; var optionDatatable_Categ = { responsive: true, diff --git a/templates/contrib.html b/templates/contrib.html index 4faee41..f54a5c2 100644 --- a/templates/contrib.html +++ b/templates/contrib.html @@ -172,7 +172,7 @@
- +
{{ item[1] }} @@ -273,7 +273,7 @@
-
+
From 25f50fb8df5ec6da93add96042af6a3fe87c1851 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Mon, 4 Dec 2017 14:01:06 +0100 Subject: [PATCH 10/11] update: updated readme --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 8fd1f8d..b854bf4 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,16 @@ An experimental dashboard showing live data and statistics from the ZMQ of one o - RedisGlobal -> misp_web_url - RedisMap -> pathMaxMindDB +# Updating by pulling +- Re-launch ```./install_dependencies.sh``` to fetch new required dependencies +- Re-update your configuration file ```config.cfg``` + # Starting the System +- Be sure to have a running redis server + - e.g. ```redis-server -p 6250``` - Activate your virtualenv ```. ./DASHENV/bin/activate``` - Listen to the MISP feed by starting the zmq_subscriber ```./zmq_subscriber.py``` +- Start the dispatcher to process received messages ```./zmq_dispatcher.py``` - Start the Flask server ```./server.py``` - Access the interface at ```http://localhost:8001/``` From 92ccd8517e6ac5ad780a2a0abacc6d634d32a9c9 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Mon, 4 Dec 2017 14:36:30 +0100 Subject: [PATCH 11/11] update: start_all.sh --- start.sh => start_all.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) rename start.sh => start_all.sh (78%) diff --git a/start.sh b/start_all.sh similarity index 78% rename from start.sh rename to start_all.sh index 7371498..1296c21 100755 --- a/start.sh +++ b/start_all.sh @@ -15,10 +15,13 @@ screenName="Misp-Dashboard" screen -dmS "$screenName" sleep 0.1 echo -e $GREEN"\t* Launching Redis servers"$DEFAULT -screen -S "$screenName" -X screen -t "redis-server" bash -c $redis_dir'redis-server '$conf_dir'6250.conf ; read x' +screen -S "$screenName" -X screen -t "redis-server" bash -c $redis_dir'redis-server '$conf_dir'6250.conf; read x' echo -e $GREEN"\t* Launching zmq subscriber"$DEFAULT screen -S "$screenName" -X screen -t "zmq-subscriber" bash -c './zmq_subscriber.py; read x' +echo -e $GREEN"\t* Launching zmq dispatcher"$DEFAULT +screen -S "$screenName" -X screen -t "zmq-dispatcher" bash -c './zmq_dispatcher.py; read x' + echo -e $GREEN"\t* Launching flask server"$DEFAULT screen -S "$screenName" -X screen -t "flask" bash -c './server.py; read x'