2018-03-07 18:13:17 +01:00
#!/usr/bin/env python3
2017-11-30 08:17:53 +01:00
import time , datetime
import copy
2017-12-04 16:44:44 +01:00
import logging
2017-11-30 08:17:53 +01:00
import zmq
import redis
import random
import configparser
import argparse
import os
import sys
import json
import util
2017-12-05 09:56:32 +01:00
from helpers import geo_helper
from helpers import contributor_helper
from helpers import users_helper
from helpers import trendings_helper
2018-09-27 09:38:39 +02:00
from helpers import live_helper
2017-11-30 08:17:53 +01:00
2018-03-31 12:36:17 +02:00
configfile = os . path . join ( os . path . dirname ( os . path . realpath ( __file__ ) ) , ' config/config.cfg ' )
2017-11-30 08:17:53 +01:00
cfg = configparser . ConfigParser ( )
cfg . read ( configfile )
2017-12-05 10:23:40 +01:00
logDir = cfg . get ( ' Log ' , ' directory ' )
logfilename = cfg . get ( ' Log ' , ' filename ' )
logPath = os . path . join ( logDir , logfilename )
if not os . path . exists ( logDir ) :
os . makedirs ( logDir )
2017-12-05 10:32:12 +01:00
logging . basicConfig ( filename = logPath , filemode = ' a ' , level = logging . INFO )
2017-12-05 09:56:32 +01:00
logger = logging . getLogger ( ' zmq_dispatcher ' )
2017-12-04 16:44:44 +01:00
2017-11-30 08:17:53 +01:00
LISTNAME = cfg . get ( ' RedisLIST ' , ' listName ' )
serv_log = redis . StrictRedis (
host = cfg . get ( ' RedisGlobal ' , ' host ' ) ,
port = cfg . getint ( ' RedisGlobal ' , ' port ' ) ,
db = cfg . getint ( ' RedisLog ' , ' db ' ) )
serv_redis_db = redis . StrictRedis (
host = cfg . get ( ' RedisGlobal ' , ' host ' ) ,
port = cfg . getint ( ' RedisGlobal ' , ' port ' ) ,
db = cfg . getint ( ' RedisDB ' , ' db ' ) )
serv_list = redis . StrictRedis (
host = cfg . get ( ' RedisGlobal ' , ' host ' ) ,
port = cfg . getint ( ' RedisGlobal ' , ' port ' ) ,
db = cfg . getint ( ' RedisLIST ' , ' db ' ) )
2018-09-27 09:38:39 +02:00
live_helper = live_helper . Live_helper ( serv_redis_db , cfg )
2017-11-30 08:17:53 +01:00
geo_helper = geo_helper . Geo_helper ( serv_redis_db , cfg )
contributor_helper = contributor_helper . Contributor_helper ( serv_redis_db , cfg )
users_helper = users_helper . Users_helper ( serv_redis_db , cfg )
trendings_helper = trendings_helper . Trendings_helper ( serv_redis_db , cfg )
##############
## HANDLERS ##
##############
2018-10-01 13:28:27 +02:00
def handler_skip ( zmq_name , jsonevent ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Log not processed ' )
2017-11-30 08:17:53 +01:00
return
2018-10-01 13:28:27 +02:00
def handler_audit ( zmq_name , jsondata ) :
action = jsondata . get ( ' action ' , None )
jsonlog = jsondata . get ( ' Log ' , None )
if action is None or jsonlog is None :
return
# consider login operations
if action == ' log ' : # audit is related to log
logAction = jsonlog . get ( ' action ' , None )
if logAction == ' login ' : # only consider user login
timestamp = int ( time . time ( ) )
email = jsonlog . get ( ' email ' , ' ' )
org = jsonlog . get ( ' org ' , ' ' )
users_helper . add_user_login ( timestamp , org , email )
else :
pass
2017-11-30 08:17:53 +01:00
def handler_dispatcher ( zmq_name , jsonObj ) :
if " Event " in jsonObj :
handler_event ( zmq_name , jsonObj )
def handler_keepalive ( zmq_name , jsonevent ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling keepalive ' )
2017-11-30 08:17:53 +01:00
to_push = [ jsonevent [ ' uptime ' ] ]
2018-09-27 09:38:39 +02:00
live_helper . publish_log ( zmq_name , ' Keepalive ' , to_push )
2017-11-30 08:17:53 +01:00
2018-10-01 13:28:27 +02:00
# Login are no longer pushed by `misp_json_user`, but by `misp_json_audit`
2017-11-30 08:17:53 +01:00
def handler_user ( zmq_name , jsondata ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling user ' )
2017-11-30 08:17:53 +01:00
action = jsondata [ ' action ' ]
json_user = jsondata [ ' User ' ]
json_org = jsondata [ ' Organisation ' ]
org = json_org [ ' name ' ]
2018-10-01 13:28:27 +02:00
if action == ' edit ' : #only consider user login
pass
2017-11-30 08:17:53 +01:00
else :
pass
def handler_conversation ( zmq_name , jsonevent ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling conversation ' )
2017-11-30 08:17:53 +01:00
try : #only consider POST, not THREAD
jsonpost = jsonevent [ ' Post ' ]
2017-12-04 16:44:44 +01:00
except KeyError as e :
logger . error ( ' Error in handler_conversation: {} ' . format ( e ) )
2017-12-19 12:10:48 +01:00
return
2017-11-30 08:17:53 +01:00
org = jsonpost [ ' org_name ' ]
categ = None
action = ' add '
eventName = ' no name or id yet... '
contributor_helper . handleContribution ( zmq_name , org ,
' Discussion ' ,
None ,
action ,
isLabeled = False )
# add Discussion
nowSec = int ( time . time ( ) )
trendings_helper . addTrendingDisc ( eventName , nowSec )
def handler_object ( zmq_name , jsondata ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling object ' )
2017-11-30 08:17:53 +01:00
return
def handler_sighting ( zmq_name , jsondata ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling sighting ' )
2017-11-30 08:17:53 +01:00
jsonsight = jsondata [ ' Sighting ' ]
org = jsonsight [ ' Event ' ] [ ' Orgc ' ] [ ' name ' ]
categ = jsonsight [ ' Attribute ' ] [ ' category ' ]
action = jsondata . get ( ' action ' , None )
contributor_helper . handleContribution ( zmq_name , org , ' Sighting ' , categ , action , pntMultiplier = 2 )
handler_attribute ( zmq_name , jsonsight , hasAlreadyBeenContributed = True )
timestamp = jsonsight . get ( ' date_sighting ' , None )
if jsonsight [ ' type ' ] == " 0 " : # sightings
trendings_helper . addSightings ( timestamp )
elif jsonsight [ ' type ' ] == " 1 " : # false positive
trendings_helper . addFalsePositive ( timestamp )
def handler_event ( zmq_name , jsonobj ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling event ' )
2017-11-30 08:17:53 +01:00
#fields: threat_level_id, id, info
jsonevent = jsonobj [ ' Event ' ]
#Add trending
eventName = jsonevent [ ' info ' ]
timestamp = jsonevent [ ' timestamp ' ]
trendings_helper . addTrendingEvent ( eventName , timestamp )
tags = [ ]
for tag in jsonobj . get ( ' EventTag ' , [ ] ) :
try :
tags . append ( tag [ ' Tag ' ] )
except KeyError :
pass
trendings_helper . addTrendingTags ( tags , timestamp )
#redirect to handler_attribute
if ' Attribute ' in jsonevent :
attributes = jsonevent [ ' Attribute ' ]
if type ( attributes ) is list :
for attr in attributes :
jsoncopy = copy . deepcopy ( jsonobj )
jsoncopy [ ' Attribute ' ] = attr
handler_attribute ( zmq_name , jsoncopy )
else :
handler_attribute ( zmq_name , attributes )
action = jsonobj . get ( ' action ' , None )
eventLabeled = len ( jsonobj . get ( ' EventTag ' , [ ] ) ) > 0
org = jsonobj . get ( ' Orgc ' , { } ) . get ( ' name ' , None )
if org is not None :
contributor_helper . handleContribution ( zmq_name , org ,
' Event ' ,
None ,
action ,
isLabeled = eventLabeled )
def handler_attribute ( zmq_name , jsonobj , hasAlreadyBeenContributed = False ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling attribute ' )
2017-11-30 08:17:53 +01:00
# check if jsonattr is an attribute object
if ' Attribute ' in jsonobj :
jsonattr = jsonobj [ ' Attribute ' ]
#Add trending
categName = jsonattr [ ' category ' ]
timestamp = jsonattr . get ( ' timestamp ' , int ( time . time ( ) ) )
trendings_helper . addTrendingCateg ( categName , timestamp )
tags = [ ]
for tag in jsonattr . get ( ' Tag ' , [ ] ) :
try :
2018-02-22 09:45:33 +01:00
tags . append ( tag )
2017-11-30 08:17:53 +01:00
except KeyError :
pass
trendings_helper . addTrendingTags ( tags , timestamp )
#try to get coord from ip
if jsonattr [ ' category ' ] == " Network activity " :
geo_helper . getCoordFromIpAndPublish ( jsonattr [ ' value ' ] , jsonattr [ ' category ' ] )
#try to get coord from ip
if jsonattr [ ' type ' ] == " phone-number " :
geo_helper . getCoordFromPhoneAndPublish ( jsonattr [ ' value ' ] , jsonattr [ ' category ' ] )
if not hasAlreadyBeenContributed :
eventLabeled = len ( jsonobj . get ( ' EventTag ' , [ ] ) ) > 0
action = jsonobj . get ( ' action ' , None )
contributor_helper . handleContribution ( zmq_name , jsonobj [ ' Event ' ] [ ' Orgc ' ] [ ' name ' ] ,
' Attribute ' ,
jsonattr [ ' category ' ] ,
action ,
isLabeled = eventLabeled )
# Push to log
2019-02-22 10:41:54 +01:00
live_helper . publish_log ( zmq_name , ' Attribute ' , jsonobj )
2017-11-30 08:17:53 +01:00
###############
## MAIN LOOP ##
###############
def process_log ( zmq_name , event ) :
topic , eventdata = event . split ( ' ' , maxsplit = 1 )
jsonevent = json . loads ( eventdata )
try :
dico_action [ topic ] ( zmq_name , jsonevent )
except KeyError as e :
2017-12-04 16:44:44 +01:00
logger . error ( e )
2017-11-30 08:17:53 +01:00
2017-12-04 11:14:25 +01:00
def main ( sleeptime ) :
2017-11-30 16:04:03 +01:00
numMsg = 0
2017-11-30 08:17:53 +01:00
while True :
2017-11-30 16:04:03 +01:00
content = serv_list . rpop ( LISTNAME )
if content is None :
2017-12-05 09:02:49 +01:00
logger . debug ( ' Processed {} message(s) since last sleep. ' . format ( numMsg ) )
2017-11-30 16:04:03 +01:00
numMsg = 0
time . sleep ( sleeptime )
continue
content = content . decode ( ' utf8 ' )
the_json = json . loads ( content )
zmqName = the_json [ ' zmq_name ' ]
content = the_json [ ' content ' ]
process_log ( zmqName , content )
numMsg + = 1
2017-11-30 08:17:53 +01:00
dico_action = {
" misp_json " : handler_dispatcher ,
" misp_json_event " : handler_event ,
" misp_json_self " : handler_keepalive ,
" misp_json_attribute " : handler_attribute ,
" misp_json_object " : handler_object ,
" misp_json_sighting " : handler_sighting ,
2018-10-01 13:28:27 +02:00
" misp_json_organisation " : handler_skip ,
2017-11-30 08:17:53 +01:00
" misp_json_user " : handler_user ,
" misp_json_conversation " : handler_conversation ,
2018-10-01 13:28:27 +02:00
" misp_json_object_reference " : handler_skip ,
" misp_json_audit " : handler_audit ,
2017-11-30 08:17:53 +01:00
}
if __name__ == " __main__ " :
2017-12-04 11:14:25 +01:00
parser = argparse . ArgumentParser ( description = ' The ZMQ dispatcher. It pops from the redis buffer then redispatch it to the correct handlers ' )
2017-11-30 16:04:03 +01:00
parser . add_argument ( ' -s ' , ' --sleep ' , required = False , dest = ' sleeptime ' , type = int , help = ' The number of second to wait before checking redis list size ' , default = 5 )
2017-11-30 08:17:53 +01:00
args = parser . parse_args ( )
2017-12-04 11:14:25 +01:00
main ( args . sleeptime )