Merge pull request #15 from mokaddem/redisBuffer

Redis buffering and bugfixes
pull/18/head
Alexandre Dulaunoy 2017-12-04 14:57:58 +01:00 committed by GitHub
commit fd82ef8a2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 345 additions and 258 deletions

View File

@ -12,9 +12,16 @@ An experimental dashboard showing live data and statistics from the ZMQ of one o
- RedisGlobal -> misp_web_url
- RedisMap -> pathMaxMindDB
# Updating by pulling
- Re-launch ```./install_dependencies.sh``` to fetch new required dependencies
- Re-update your configuration file ```config.cfg```
# Starting the System
- Be sure to have a running redis server
- e.g. ```redis-server -p 6250```
- Activate your virtualenv ```. ./DASHENV/bin/activate```
- Listen to the MISP feed by starting the zmq_subscriber ```./zmq_subscriber.py```
- Start the dispatcher to process received messages ```./zmq_dispatcher.py```
- Start the Flask server ```./server.py```
- Access the interface at ```http://localhost:8001/```

View File

@ -37,6 +37,10 @@ misp_web_url = http://localhost
#zmq_url=tcp://192.168.56.50:50000
zmq_url=tcp://localhost:50000
[RedisLIST]
db=3
listName=bufferList
[RedisLog]
db=0
channel=1

View File

@ -1,18 +1,30 @@
import util
from util import getZrange
import math, random
import time
import os
import configparser
import json
import datetime
import redis
import util
import users_helper
KEYDAY = "CONTRIB_DAY" # To be used by other module
class Contributor_helper:
def __init__(self, serv_redis_db, cfg):
self.serv_redis_db = serv_redis_db
self.serv_log = redis.StrictRedis(
host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisLog', 'db'))
self.cfg = cfg
self.cfg_org_rank = configparser.ConfigParser()
self.cfg_org_rank.read(os.path.join(os.environ['DASH_CONFIG'], 'ranking.cfg'))
self.CHANNEL_LASTAWARDS = cfg.get('RedisLog', 'channelLastAwards')
self.CHANNEL_LASTCONTRIB = cfg.get('RedisLog', 'channelLastContributor')
self.users_helper = users_helper.Users_helper(serv_redis_db, cfg)
#honorBadge
self.honorBadgeNum = len(self.cfg_org_rank.options('HonorBadge'))
@ -63,10 +75,10 @@ class Contributor_helper:
self.DICO_PNTS_REWARD[categ] = self.default_pnts_per_contribution
self.rankMultiplier = self.cfg_org_rank.getfloat('monthlyRanking' ,'rankMultiplier')
self.levelMax = self.cfg_org_rank.getfloat('monthlyRanking' ,'levelMax')
self.levelMax = self.cfg_org_rank.getint('monthlyRanking' ,'levelMax')
# REDIS KEYS
self.keyDay = "CONTRIB_DAY"
self.keyDay = KEYDAY
self.keyCateg = "CONTRIB_CATEG"
self.keyLastContrib = "CONTRIB_LAST"
self.keyAllOrg = "CONTRIB_ALL_ORG"
@ -86,7 +98,7 @@ class Contributor_helper:
def publish_log(self, zmq_name, name, content, channel=""):
to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name }
serv_log.publish(channel, json.dumps(to_send))
self.serv_log.publish(channel, json.dumps(to_send))
''' HANDLER '''
#pntMultiplier if one contribution rewards more than others. (e.g. shighting may gives more points than editing)
@ -100,36 +112,36 @@ class Contributor_helper:
pnts_to_add = self.default_pnts_per_contribution
# if there is a contribution, there is a login (even if ti comes from the API)
users_helper.add_user_login(nowSec, org)
self.users_helper.add_user_login(nowSec, org)
# is a valid contribution
if categ is not None:
try:
pnts_to_add = self.DICO_PNTS_REWARD[noSpaceLower(categ)]
pnts_to_add = self.DICO_PNTS_REWARD[util.noSpaceLower(categ)]
except KeyError:
pnts_to_add = self.default_pnts_per_contribution
pnts_to_add *= pntMultiplier
util.push_to_redis_zset(self.serv_redis_db, self.keyDay, org, count=pnts_to_add)
#CONTRIB_CATEG retain the contribution per category, not the point earned in this categ
util.push_to_redis_zset(self.serv_redis_db, self.keyCateg, org, count=1, endSubkey=':'+noSpaceLower(categ))
self.publish_log(zmq_name, 'CONTRIBUTION', {'org': org, 'categ': categ, 'action': action, 'epoch': nowSec }, channel=CHANNEL_LASTCONTRIB)
util.push_to_redis_zset(self.serv_redis_db, self.keyCateg, org, count=1, endSubkey=':'+util.noSpaceLower(categ))
self.publish_log(zmq_name, 'CONTRIBUTION', {'org': org, 'categ': categ, 'action': action, 'epoch': nowSec }, channel=self.CHANNEL_LASTCONTRIB)
else:
categ = ""
serv_redis_db.sadd(self.keyAllOrg, org)
self.serv_redis_db.sadd(self.keyAllOrg, org)
keyname = "{}:{}".format(self.keyLastContrib, util.getDateStrFormat(now))
serv_redis_db.zadd(keyname, nowSec, org)
serv_redis_db.expire(keyname, ONE_DAY*7) #expire after 7 day
self.serv_redis_db.zadd(keyname, nowSec, org)
self.serv_redis_db.expire(keyname, util.ONE_DAY*7) #expire after 7 day
awards_given = self.updateOrgContributionRank(org, pnts_to_add, action, contribType, eventTime=datetime.datetime.now(), isLabeled=isLabeled, categ=noSpaceLower(categ))
awards_given = self.updateOrgContributionRank(org, pnts_to_add, action, contribType, eventTime=datetime.datetime.now(), isLabeled=isLabeled, categ=util.noSpaceLower(categ))
for award in awards_given:
# update awards given
keyname = "{}:{}".format(self.keyLastAward, util.getDateStrFormat(now))
serv_redis_db.zadd(keyname, nowSec, json.dumps({'org': org, 'award': award, 'epoch': nowSec }))
serv_redis_db.expire(keyname, ONE_DAY*7) #expire after 7 day
self.serv_redis_db.zadd(keyname, nowSec, json.dumps({'org': org, 'award': award, 'epoch': nowSec }))
self.serv_redis_db.expire(keyname, util.ONE_DAY*7) #expire after 7 day
# publish
self.publish_log(zmq_name, 'CONTRIBUTION', {'org': org, 'award': award, 'epoch': nowSec }, channel=self.CHANNEL_LASTAWARDS)
@ -446,7 +458,7 @@ class Contributor_helper:
dic['epoch'] = epoch
return dic
def getTopContributorFromRedis(self, date):
def getTopContributorFromRedis(self, date, maxNum=100):
orgDicoPnts = {}
for curDate in util.getMonthSpan(date):
topNum = 0 # all
@ -468,7 +480,7 @@ class Contributor_helper:
data.append(dic)
data.sort(key=lambda x: x['pnts'], reverse=True)
return data
return data[:maxNum]
def getTop5OvertimeFromRedis(self):
data = []

View File

@ -109,14 +109,15 @@ class Geo_helper:
def getCoordFromPhoneAndPublish(self, phoneNumber, categ):
try:
print('function accessed')
rep = phonenumbers.parse(phoneNumber, None)
if not (phonenumbers.is_valid_number(rep) or phonenumbers.is_possible_number(rep)):
print("Phone number not valid")
return
country_name = geocoder.country_name_for_number(rep, "en")
country_code = self.country_to_iso[country_name]
if country_code is None:
print("Non matching ISO_CODE")
return
coord = self.country_code_to_coord[country_code.lower()] # countrycode is in upper case
coord_dic = {'lat': coord['lat'], 'lon': coord['long']}
@ -139,7 +140,6 @@ class Geo_helper:
"cityName": "",
"regionCode": country_code,
}
print(to_send)
self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send))
except phonenumbers.NumberParseException:
print("Can't resolve phone number country")

View File

@ -312,7 +312,7 @@ def eventStreamAwards():
yield 'data: {}\n\n'.format(json.dumps(to_return))
@app.route("/_getTopContributor")
def getTopContributor(suppliedDate=None):
def getTopContributor(suppliedDate=None, maxNum=100):
if suppliedDate is None:
try:
date = datetime.datetime.fromtimestamp(float(request.args.get('date')))
@ -321,7 +321,7 @@ def getTopContributor(suppliedDate=None):
else:
date = suppliedDate
data = contributor_helper.getTopContributorFromRedis(date)
data = contributor_helper.getTopContributorFromRedis(date, maxNum=maxNum)
return jsonify(data)
@app.route("/_getFameContributor")
@ -332,7 +332,7 @@ def getFameContributor():
today = datetime.datetime.now()
# get previous month
date = (datetime.datetime(today.year, today.month, 1) - datetime.timedelta(days=1))
return getTopContributor(suppliedDate=date)
return getTopContributor(suppliedDate=date, maxNum=10)
@app.route("/_getFameQualContributor")
def getFameQualContributor():
@ -342,7 +342,7 @@ def getFameQualContributor():
today = datetime.datetime.now()
# get previous month
date = (datetime.datetime(today.year, today.month, 1) - datetime.timedelta(days=1))
return getTopContributor(suppliedDate=date)
return getTopContributor(suppliedDate=date, maxNum=10)
@app.route("/_getTop5Overtime")
def getTop5Overtime():

View File

@ -15,10 +15,13 @@ screenName="Misp-Dashboard"
screen -dmS "$screenName"
sleep 0.1
echo -e $GREEN"\t* Launching Redis servers"$DEFAULT
screen -S "$screenName" -X screen -t "redis-server" bash -c $redis_dir'redis-server '$conf_dir'6250.conf ; read x'
screen -S "$screenName" -X screen -t "redis-server" bash -c $redis_dir'redis-server '$conf_dir'6250.conf; read x'
echo -e $GREEN"\t* Launching zmq subscriber"$DEFAULT
screen -S "$screenName" -X screen -t "zmq-subscriber" bash -c './zmq_subscriber.py; read x'
echo -e $GREEN"\t* Launching zmq dispatcher"$DEFAULT
screen -S "$screenName" -X screen -t "zmq-dispatcher" bash -c './zmq_dispatcher.py; read x'
echo -e $GREEN"\t* Launching flask server"$DEFAULT
screen -S "$screenName" -X screen -t "flask" bash -c './server.py; read x'

View File

@ -17,10 +17,10 @@
}
.circleBadgeSmall {
width: 73px;
height: 73px;
width: 57px;
height: 57px;
text-align: center;
border-radius: 38px;
border-radius: 28px;
background-color: #e1e1e1;
border: 1px solid #caccce;
vertical-align: middle;

View File

@ -79,8 +79,9 @@ optionDatatable_last.columnDefs = [
{ className: "centerCellPicOrgLogo verticalAlign", "targets": [ 5 ] },
{ className: "verticalAlign", "targets": [ 6 ] }
]
var optionDatatable_fame = jQuery.extend({}, optionDatatable_light)
optionDatatable_fame.scrollY = '45vh';
var optionDatatable_fameQuant = jQuery.extend({}, optionDatatable_light)
var optionDatatable_fameQual = jQuery.extend({}, optionDatatable_light)
optionDatatable_fameQual.scrollY = '39vh';
var optionDatatable_Categ = {
responsive: true,
@ -590,8 +591,8 @@ $(document).ready(function() {
});
datatableTop = $('#topContribTable').DataTable(optionDatatable_top);
datatableFameQuant = $('#fameTableQuantity').DataTable(optionDatatable_fame);
datatableFameQual = $('#fameTableQuality').DataTable(optionDatatable_fame);
datatableFameQuant = $('#fameTableQuantity').DataTable(optionDatatable_fameQuant);
datatableFameQual = $('#fameTableQuality').DataTable(optionDatatable_fameQual);
datatableCateg = $('#categTable').DataTable(optionDatatable_Categ);
datatableLast = $('#lastTable').DataTable(optionDatatable_last);
datatableAwards = $('#awardTable').DataTable(optionDatatable_awards);

View File

@ -172,7 +172,7 @@
<tr>
<td>
<div id="divBadge_{{ loop.index }}" class="circleBadgeSmall circlBadgeNotAcquired">
<img height='64px' width='64px' class="" style="margin-top: 3px;" src="{{ url_for('static', filename='pics/MISPHonorableIcons/1.svg')[:-5]}}{{ item[0] }}.svg" type='image/svg' style="margin: auto;"</img>
<img height='48px' width='48' class="" style="margin-top: 3px;" src="{{ url_for('static', filename='pics/MISPHonorableIcons/1.svg')[:-5]}}{{ item[0] }}.svg" type='image/svg' style="margin: auto;"</img>
</div>
</td>
<td style="padding-left: 15px;">{{ item[1] }}</td>
@ -273,7 +273,7 @@
<object id='orgContributionRank' height=32 width=64 class="centerInBtn"></object>
<strong id="orgText" class="centerInBtn"></strong>
<div id="orgRankDiv" class='textTopHeader' style="padding-top: 0px; position: relative; width: 40px; height: 40px;"></div>
<div class='' style="float: left; padding: 10px;">
<div class='' style="float: left; top: 10px; position: relative;">
<div class="progress" style=''>
<div id="progressBarDiv" class="progress-bar progress-bar-striped" role="progressbar" style="width:0%">
</div>

View File

@ -14,8 +14,7 @@ class Users_helper:
self.keyTimestamp = "LOGIN_TIMESTAMP"
self.keyTimestampSet = "LOGIN_TIMESTAMPSET"
self.keyOrgLog = "LOGIN_ORG"
contrib_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
self.keyContribDay = contrib_helper.keyDay # Key to get monthly contribution
self.keyContribDay = contributor_helper.KEYDAY # Key to get monthly contribution
def addTemporary(self, org, timestamp):
timestampDate = datetime.datetime.fromtimestamp(float(timestamp))
@ -73,7 +72,7 @@ class Users_helper:
for curDate in util.getXPrevDaysSpan(date, prev_days):
log = self.serv_redis_db.zscore(keyname_log.format(self.keyOrgLog, util.getDateStrFormat(curDate)), org)
log = 0 if log is None else 1
contrib = self.serv_redis_db.zscore(keyname_contrib.format(keyContribDay, util.getDateStrFormat(curDate)), org)
contrib = self.serv_redis_db.zscore(keyname_contrib.format(self.keyContribDay, util.getDateStrFormat(curDate)), org)
contrib = 0 if contrib is None else 1
data.append([log, contrib])
return data
@ -100,7 +99,7 @@ class Users_helper:
def getLoginVSCOntribution(self, date):
keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(date))
keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(date))
orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False)
orgs_contri = [ org.decode('utf8') for org in orgs_contri ]
orgs_login = [ org[0] for org in self.getOrgslogin(date, topNum=0) ]
@ -152,7 +151,7 @@ class Users_helper:
for curDate in util.getXPrevDaysSpan(date, prev_days):
timestamps = self.getUserLogins(curDate)
keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(curDate))
keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(curDate))
orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False)
orgs_contri_num = len(orgs_contri)

View File

@ -9,9 +9,12 @@ def getZrange(serv_redis_db, keyCateg, date, topNum, endSubkey=""):
data = [ [record[0].decode('utf8'), record[1]] for record in data ]
return data
def noSpaceLower(text):
return text.lower().replace(' ', '_')
def push_to_redis_zset(serv_redis_db, mainKey, toAdd, endSubkey="", count=1):
now = datetime.datetime.now()
today_str = util.getDateStrFormat(now)
today_str = getDateStrFormat(now)
keyname = "{}:{}{}".format(mainKey, today_str, endSubkey)
serv_redis_db.zincrby(keyname, toAdd, count)

270
zmq_dispatcher.py Executable file
View File

@ -0,0 +1,270 @@
#!/usr/bin/env python3.5
import time, datetime
import copy
from pprint import pprint
import zmq
import redis
import random
import configparser
import argparse
import os
import sys
import json
import util
import geo_helper
import contributor_helper
import users_helper
import trendings_helper
configfile = os.path.join(os.environ['DASH_CONFIG'], 'config.cfg')
cfg = configparser.ConfigParser()
cfg.read(configfile)
CHANNEL = cfg.get('RedisLog', 'channel')
LISTNAME = cfg.get('RedisLIST', 'listName')
serv_log = redis.StrictRedis(
host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisLog', 'db'))
serv_redis_db = redis.StrictRedis(
host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisDB', 'db'))
serv_list = redis.StrictRedis(
host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisLIST', 'db'))
geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg)
contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
users_helper = users_helper.Users_helper(serv_redis_db, cfg)
trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg)
def publish_log(zmq_name, name, content, channel=CHANNEL):
to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name }
serv_log.publish(channel, json.dumps(to_send))
def getFields(obj, fields):
jsonWalker = fields.split('.')
itemToExplore = obj
lastName = ""
try:
for i in jsonWalker:
itemToExplore = itemToExplore[i]
lastName = i
if type(itemToExplore) is list:
return { 'name': lastName , 'data': itemToExplore }
else:
return itemToExplore
except KeyError as e:
return ""
##############
## HANDLERS ##
##############
def handler_log(zmq_name, jsonevent):
print('sending', 'log')
return
def handler_dispatcher(zmq_name, jsonObj):
if "Event" in jsonObj:
handler_event(zmq_name, jsonObj)
def handler_keepalive(zmq_name, jsonevent):
print('sending', 'keepalive')
to_push = [ jsonevent['uptime'] ]
publish_log(zmq_name, 'Keepalive', to_push)
def handler_user(zmq_name, jsondata):
print('sending', 'user')
action = jsondata['action']
json_user = jsondata['User']
json_org = jsondata['Organisation']
org = json_org['name']
if action == 'login': #only consider user login
timestamp = int(time.time())
users_helper.add_user_login(timestamp, org)
else:
pass
def handler_conversation(zmq_name, jsonevent):
try: #only consider POST, not THREAD
jsonpost = jsonevent['Post']
except KeyError:
return
print('sending' ,'Post')
org = jsonpost['org_name']
categ = None
action = 'add'
eventName = 'no name or id yet...'
contributor_helper.handleContribution(zmq_name, org,
'Discussion',
None,
action,
isLabeled=False)
# add Discussion
nowSec = int(time.time())
trendings_helper.addTrendingDisc(eventName, nowSec)
def handler_object(zmq_name, jsondata):
print('obj')
return
def handler_sighting(zmq_name, jsondata):
print('sending' ,'sighting')
jsonsight = jsondata['Sighting']
org = jsonsight['Event']['Orgc']['name']
categ = jsonsight['Attribute']['category']
action = jsondata.get('action', None)
contributor_helper.handleContribution(zmq_name, org, 'Sighting', categ, action, pntMultiplier=2)
handler_attribute(zmq_name, jsonsight, hasAlreadyBeenContributed=True)
timestamp = jsonsight.get('date_sighting', None)
if jsonsight['type'] == "0": # sightings
trendings_helper.addSightings(timestamp)
elif jsonsight['type'] == "1": # false positive
trendings_helper.addFalsePositive(timestamp)
def handler_event(zmq_name, jsonobj):
#fields: threat_level_id, id, info
jsonevent = jsonobj['Event']
#Add trending
eventName = jsonevent['info']
timestamp = jsonevent['timestamp']
trendings_helper.addTrendingEvent(eventName, timestamp)
tags = []
for tag in jsonobj.get('EventTag', []):
try:
tags.append(tag['Tag'])
except KeyError:
pass
trendings_helper.addTrendingTags(tags, timestamp)
#redirect to handler_attribute
if 'Attribute' in jsonevent:
attributes = jsonevent['Attribute']
if type(attributes) is list:
for attr in attributes:
jsoncopy = copy.deepcopy(jsonobj)
jsoncopy['Attribute'] = attr
handler_attribute(zmq_name, jsoncopy)
else:
handler_attribute(zmq_name, attributes)
action = jsonobj.get('action', None)
eventLabeled = len(jsonobj.get('EventTag', [])) > 0
org = jsonobj.get('Orgc', {}).get('name', None)
if org is not None:
contributor_helper.handleContribution(zmq_name, org,
'Event',
None,
action,
isLabeled=eventLabeled)
def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False):
# check if jsonattr is an attribute object
if 'Attribute' in jsonobj:
jsonattr = jsonobj['Attribute']
#Add trending
categName = jsonattr['category']
timestamp = jsonattr.get('timestamp', int(time.time()))
trendings_helper.addTrendingCateg(categName, timestamp)
tags = []
for tag in jsonattr.get('Tag', []):
try:
tags.append(tag['Tag'])
except KeyError:
pass
trendings_helper.addTrendingTags(tags, timestamp)
to_push = []
for field in json.loads(cfg.get('Log', 'fieldname_order')):
if type(field) is list:
to_join = []
for subField in field:
to_join.append(getFields(jsonobj, subField))
to_add = cfg.get('Log', 'char_separator').join(to_join)
else:
to_add = getFields(jsonobj, field)
to_push.append(to_add)
#try to get coord from ip
if jsonattr['category'] == "Network activity":
geo_helper.getCoordFromIpAndPublish(jsonattr['value'], jsonattr['category'])
#try to get coord from ip
if jsonattr['type'] == "phone-number":
geo_helper.getCoordFromPhoneAndPublish(jsonattr['value'], jsonattr['category'])
if not hasAlreadyBeenContributed:
eventLabeled = len(jsonobj.get('EventTag', [])) > 0
action = jsonobj.get('action', None)
contributor_helper.handleContribution(zmq_name, jsonobj['Event']['Orgc']['name'],
'Attribute',
jsonattr['category'],
action,
isLabeled=eventLabeled)
# Push to log
publish_log(zmq_name, 'Attribute', to_push)
###############
## MAIN LOOP ##
###############
def process_log(zmq_name, event):
topic, eventdata = event.split(' ', maxsplit=1)
jsonevent = json.loads(eventdata)
try:
dico_action[topic](zmq_name, jsonevent)
except KeyError as e:
print(e)
def main(sleeptime):
numMsg = 0
while True:
content = serv_list.rpop(LISTNAME)
if content is None:
print('Processed', numMsg, 'message(s) since last sleep.')
numMsg = 0
time.sleep(sleeptime)
continue
content = content.decode('utf8')
the_json = json.loads(content)
zmqName = the_json['zmq_name']
content = the_json['content']
process_log(zmqName, content)
numMsg += 1
dico_action = {
"misp_json": handler_dispatcher,
"misp_json_event": handler_event,
"misp_json_self": handler_keepalive,
"misp_json_attribute": handler_attribute,
"misp_json_object": handler_object,
"misp_json_sighting": handler_sighting,
"misp_json_organisation": handler_log,
"misp_json_user": handler_user,
"misp_json_conversation": handler_conversation,
"misp_json_object_reference": handler_log,
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='The ZMQ dispatcher. It pops from the redis buffer then redispatch it to the correct handlers')
parser.add_argument('-s', '--sleep', required=False, dest='sleeptime', type=int, help='The number of second to wait before checking redis list size', default=5)
args = parser.parse_args()
main(args.sleeptime)

View File

@ -1,234 +1,37 @@
#!/usr/bin/env python3.5
import time, datetime
import copy
from pprint import pprint
import zmq
import redis
import random
import configparser
import argparse
import os
import sys
import json
import util
import geo_helper
import contributor_helper
import users_helper
import trendings_helper
configfile = os.path.join(os.environ['DASH_CONFIG'], 'config.cfg')
cfg = configparser.ConfigParser()
cfg.read(configfile)
ZMQ_URL = cfg.get('RedisGlobal', 'zmq_url')
CHANNEL = cfg.get('RedisLog', 'channel')
LISTNAME = cfg.get('RedisLIST', 'listName')
serv_log = redis.StrictRedis(
serv_list = redis.StrictRedis(
host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisLog', 'db'))
serv_redis_db = redis.StrictRedis(
host=cfg.get('RedisGlobal', 'host'),
port=cfg.getint('RedisGlobal', 'port'),
db=cfg.getint('RedisDB', 'db'))
geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg)
contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
users_helper = users_helper.Users_helper(serv_redis_db, cfg)
trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg)
def publish_log(zmq_name, name, content, channel=CHANNEL):
to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name }
serv_log.publish(channel, json.dumps(to_send))
def getFields(obj, fields):
jsonWalker = fields.split('.')
itemToExplore = obj
lastName = ""
try:
for i in jsonWalker:
itemToExplore = itemToExplore[i]
lastName = i
if type(itemToExplore) is list:
return { 'name': lastName , 'data': itemToExplore }
else:
return itemToExplore
except KeyError as e:
return ""
def noSpaceLower(text):
return text.lower().replace(' ', '_')
##############
## HANDLERS ##
##############
def handler_log(zmq_name, jsonevent):
print('sending', 'log')
return
def handler_dispatcher(zmq_name, jsonObj):
if "Event" in jsonObj:
handler_event(zmq_name, jsonObj)
def handler_keepalive(zmq_name, jsonevent):
print('sending', 'keepalive')
to_push = [ jsonevent['uptime'] ]
publish_log(zmq_name, 'Keepalive', to_push)
def handler_user(zmq_name, jsondata):
action = jsondata['action']
json_user = jsondata['User']
json_org = jsondata['Organisation']
org = json_org['name']
if action == 'login': #only consider user login
timestamp = int(time.time())
users_helper.add_user_login(timestamp, org)
else:
pass
def handler_conversation(zmq_name, jsonevent):
try: #only consider POST, not THREAD
jsonpost = jsonevent['Post']
except KeyError:
return
print('sending' ,'Post')
org = jsonpost['org_name']
categ = None
action = 'add'
eventName = 'no name or id yet...'
contributor_helper.handleContribution(zmq_name, org,
'Discussion',
None,
action,
isLabeled=False)
# add Discussion
nowSec = int(time.time())
trendings_helper.addTrendingDisc(eventName, nowSec)
def handler_object(zmq_name, jsondata):
print('obj')
return
def handler_sighting(zmq_name, jsondata):
print('sending' ,'sighting')
jsonsight = jsondata['Sighting']
org = jsonsight['Event']['Orgc']['name']
categ = jsonsight['Attribute']['category']
action = jsondata.get('action', None)
contributor_helper.handleContribution(zmq_name, org, 'Sighting', categ, action, pntMultiplier=2)
handler_attribute(zmq_name, jsonsight, hasAlreadyBeenContributed=True)
timestamp = jsonsight.get('date_sighting', None)
if jsonsight['type'] == "0": # sightings
trendings_helper.addSightings(timestamp)
elif jsonsight['type'] == "1": # false positive
trendings_helper.addFalsePositive(timestamp)
def handler_event(zmq_name, jsonobj):
#fields: threat_level_id, id, info
jsonevent = jsonobj['Event']
#Add trending
eventName = jsonevent['info']
timestamp = jsonevent['timestamp']
trendings_helper.addTrendingEvent(eventName, timestamp)
tags = []
for tag in jsonobj.get('EventTag', []):
try:
tags.append(tag['Tag'])
except KeyError:
pass
trendings_helper.addTrendingTags(tags, timestamp)
#redirect to handler_attribute
if 'Attribute' in jsonevent:
attributes = jsonevent['Attribute']
if type(attributes) is list:
for attr in attributes:
jsoncopy = copy.deepcopy(jsonobj)
jsoncopy['Attribute'] = attr
handler_attribute(zmq_name, jsoncopy)
else:
handler_attribute(zmq_name, attributes)
action = jsonobj.get('action', None)
eventLabeled = len(jsonobj.get('EventTag', [])) > 0
org = jsonobj.get('Orgc', {}).get('name', None)
if org is not None:
contributor_helper.handleContribution(zmq_name, org,
'Event',
None,
action,
isLabeled=eventLabeled)
def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False):
# check if jsonattr is an attribute object
if 'Attribute' in jsonobj:
jsonattr = jsonobj['Attribute']
#Add trending
categName = jsonattr['category']
timestamp = jsonattr.get('timestamp', int(time.time()))
trendings_helper.addTrendingCateg(categName, timestamp)
tags = []
for tag in jsonattr.get('Tag', []):
try:
tags.append(tag['Tag'])
except KeyError:
pass
trendings_helper.addTrendingTags(tags, timestamp)
to_push = []
for field in json.loads(cfg.get('Log', 'fieldname_order')):
if type(field) is list:
to_join = []
for subField in field:
to_join.append(getFields(jsonobj, subField))
to_add = cfg.get('Log', 'char_separator').join(to_join)
else:
to_add = getFields(jsonobj, field)
to_push.append(to_add)
#try to get coord from ip
if jsonattr['category'] == "Network activity":
geo_helper.getCoordFromIpAndPublish(jsonattr['value'], jsonattr['category'])
#try to get coord from ip
if jsonattr['type'] == "phone-number":
geo_helper.getCoordFromPhoneAndPublish(jsonattr['value'], jsonattr['category'])
if not hasAlreadyBeenContributed:
eventLabeled = len(jsonobj.get('EventTag', [])) > 0
action = jsonobj.get('action', None)
contributor_helper.handleContribution(zmq_name, jsonobj['Event']['Orgc']['name'],
'Attribute',
jsonattr['category'],
action,
isLabeled=eventLabeled)
# Push to log
publish_log(zmq_name, 'Attribute', to_push)
db=cfg.getint('RedisLIST', 'db'))
###############
## MAIN LOOP ##
###############
def process_log(zmq_name, event):
event = event.decode('utf8')
topic, eventdata = event.split(' ', maxsplit=1)
jsonevent = json.loads(eventdata)
print(event)
try:
dico_action[topic](zmq_name, jsonevent)
except KeyError as e:
print(e)
def put_in_redis_list(zmq_name, content):
content = content.decode('utf8')
to_add = {'zmq_name': zmq_name, 'content': content}
serv_list.lpush(LISTNAME, json.dumps(to_add))
def main(zmqName):
context = zmq.Context()
@ -240,29 +43,14 @@ def main(zmqName):
try:
content = socket.recv()
content.replace(b'\n', b'') # remove \n...
zmq_name = zmqName
process_log(zmq_name, content)
put_in_redis_list(zmqName, content)
except KeyboardInterrupt:
return
dico_action = {
"misp_json": handler_dispatcher,
"misp_json_event": handler_event,
"misp_json_self": handler_keepalive,
"misp_json_attribute": handler_attribute,
"misp_json_object": handler_object,
"misp_json_sighting": handler_sighting,
"misp_json_organisation": handler_log,
"misp_json_user": handler_user,
"misp_json_conversation": handler_conversation,
"misp_json_object_reference": handler_log,
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard')
parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribes to a ZNQ then redispatch it to the misp-dashboard')
parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ")
parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL)
args = parser.parse_args()