2018-03-07 18:13:17 +01:00
#!/usr/bin/env python3
2017-11-30 08:17:53 +01:00
2019-05-29 01:30:57 +02:00
import argparse
import configparser
2017-11-30 08:17:53 +01:00
import copy
2019-05-29 01:30:57 +02:00
import datetime
import json
2017-12-04 16:44:44 +01:00
import logging
2017-11-30 08:17:53 +01:00
import os
2019-05-29 01:30:57 +02:00
import random
2017-11-30 08:17:53 +01:00
import sys
2019-05-29 01:30:57 +02:00
import time
import redis
import zmq
2017-11-30 08:17:53 +01:00
import util
2019-06-18 09:25:46 +02:00
import updates
2019-05-29 01:30:57 +02:00
from helpers import ( contributor_helper , geo_helper , live_helper ,
trendings_helper , users_helper )
2017-11-30 08:17:53 +01:00
2018-03-31 12:36:17 +02:00
configfile = os . path . join ( os . path . dirname ( os . path . realpath ( __file__ ) ) , ' config/config.cfg ' )
2017-11-30 08:17:53 +01:00
cfg = configparser . ConfigParser ( )
cfg . read ( configfile )
2017-12-05 10:23:40 +01:00
logDir = cfg . get ( ' Log ' , ' directory ' )
2019-05-29 03:41:43 +02:00
logfilename = cfg . get ( ' Log ' , ' dispatcher_filename ' )
2017-12-05 10:23:40 +01:00
logPath = os . path . join ( logDir , logfilename )
if not os . path . exists ( logDir ) :
os . makedirs ( logDir )
2019-05-29 02:48:26 +02:00
try :
logging . basicConfig ( filename = logPath , filemode = ' a ' , level = logging . INFO )
except PermissionError as error :
print ( error )
print ( " Please fix the above and try again. " )
sys . exit ( 126 )
2017-12-05 09:56:32 +01:00
logger = logging . getLogger ( ' zmq_dispatcher ' )
2017-12-04 16:44:44 +01:00
2017-11-30 08:17:53 +01:00
LISTNAME = cfg . get ( ' RedisLIST ' , ' listName ' )
serv_log = redis . StrictRedis (
host = cfg . get ( ' RedisGlobal ' , ' host ' ) ,
port = cfg . getint ( ' RedisGlobal ' , ' port ' ) ,
2019-06-19 11:32:06 +02:00
db = cfg . getint ( ' RedisLog ' , ' db ' ) ,
decode_responses = True )
2017-11-30 08:17:53 +01:00
serv_redis_db = redis . StrictRedis (
host = cfg . get ( ' RedisGlobal ' , ' host ' ) ,
port = cfg . getint ( ' RedisGlobal ' , ' port ' ) ,
2019-06-19 11:32:06 +02:00
db = cfg . getint ( ' RedisDB ' , ' db ' ) ,
decode_responses = True )
2017-11-30 08:17:53 +01:00
serv_list = redis . StrictRedis (
host = cfg . get ( ' RedisGlobal ' , ' host ' ) ,
port = cfg . getint ( ' RedisGlobal ' , ' port ' ) ,
2019-06-19 11:32:06 +02:00
db = cfg . getint ( ' RedisLIST ' , ' db ' ) ,
decode_responses = True )
2017-11-30 08:17:53 +01:00
2018-09-27 09:38:39 +02:00
live_helper = live_helper . Live_helper ( serv_redis_db , cfg )
2017-11-30 08:17:53 +01:00
geo_helper = geo_helper . Geo_helper ( serv_redis_db , cfg )
contributor_helper = contributor_helper . Contributor_helper ( serv_redis_db , cfg )
users_helper = users_helper . Users_helper ( serv_redis_db , cfg )
trendings_helper = trendings_helper . Trendings_helper ( serv_redis_db , cfg )
##############
## HANDLERS ##
##############
2018-10-01 13:28:27 +02:00
def handler_skip ( zmq_name , jsonevent ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Log not processed ' )
2017-11-30 08:17:53 +01:00
return
2018-10-01 13:28:27 +02:00
def handler_audit ( zmq_name , jsondata ) :
action = jsondata . get ( ' action ' , None )
jsonlog = jsondata . get ( ' Log ' , None )
if action is None or jsonlog is None :
return
# consider login operations
if action == ' log ' : # audit is related to log
logAction = jsonlog . get ( ' action ' , None )
if logAction == ' login ' : # only consider user login
timestamp = int ( time . time ( ) )
email = jsonlog . get ( ' email ' , ' ' )
org = jsonlog . get ( ' org ' , ' ' )
users_helper . add_user_login ( timestamp , org , email )
else :
pass
2017-11-30 08:17:53 +01:00
def handler_dispatcher ( zmq_name , jsonObj ) :
if " Event " in jsonObj :
handler_event ( zmq_name , jsonObj )
def handler_keepalive ( zmq_name , jsonevent ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling keepalive ' )
2017-11-30 08:17:53 +01:00
to_push = [ jsonevent [ ' uptime ' ] ]
2018-09-27 09:38:39 +02:00
live_helper . publish_log ( zmq_name , ' Keepalive ' , to_push )
2017-11-30 08:17:53 +01:00
2018-10-01 13:28:27 +02:00
# Login are no longer pushed by `misp_json_user`, but by `misp_json_audit`
2017-11-30 08:17:53 +01:00
def handler_user ( zmq_name , jsondata ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling user ' )
2017-11-30 08:17:53 +01:00
action = jsondata [ ' action ' ]
json_user = jsondata [ ' User ' ]
json_org = jsondata [ ' Organisation ' ]
org = json_org [ ' name ' ]
2018-10-01 13:28:27 +02:00
if action == ' edit ' : #only consider user login
pass
2017-11-30 08:17:53 +01:00
else :
pass
def handler_conversation ( zmq_name , jsonevent ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling conversation ' )
2017-11-30 08:17:53 +01:00
try : #only consider POST, not THREAD
jsonpost = jsonevent [ ' Post ' ]
2017-12-04 16:44:44 +01:00
except KeyError as e :
logger . error ( ' Error in handler_conversation: {} ' . format ( e ) )
2017-12-19 12:10:48 +01:00
return
2017-11-30 08:17:53 +01:00
org = jsonpost [ ' org_name ' ]
categ = None
action = ' add '
eventName = ' no name or id yet... '
contributor_helper . handleContribution ( zmq_name , org ,
' Discussion ' ,
None ,
action ,
isLabeled = False )
# add Discussion
nowSec = int ( time . time ( ) )
trendings_helper . addTrendingDisc ( eventName , nowSec )
def handler_object ( zmq_name , jsondata ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling object ' )
2019-02-22 15:16:50 +01:00
# check if jsonattr is an mispObject object
if ' Object ' in jsondata :
jsonobj = jsondata [ ' Object ' ]
soleObject = copy . deepcopy ( jsonobj )
del soleObject [ ' Attribute ' ]
for jsonattr in jsonobj [ ' Attribute ' ] :
jsonattrcpy = copy . deepcopy ( jsonobj )
jsonattrcpy [ ' Event ' ] = jsondata [ ' Event ' ]
jsonattrcpy [ ' Attribute ' ] = jsonattr
handler_attribute ( zmq_name , jsonattrcpy , False , parentObject = soleObject )
2017-11-30 08:17:53 +01:00
def handler_sighting ( zmq_name , jsondata ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling sighting ' )
2017-11-30 08:17:53 +01:00
jsonsight = jsondata [ ' Sighting ' ]
org = jsonsight [ ' Event ' ] [ ' Orgc ' ] [ ' name ' ]
categ = jsonsight [ ' Attribute ' ] [ ' category ' ]
action = jsondata . get ( ' action ' , None )
contributor_helper . handleContribution ( zmq_name , org , ' Sighting ' , categ , action , pntMultiplier = 2 )
handler_attribute ( zmq_name , jsonsight , hasAlreadyBeenContributed = True )
timestamp = jsonsight . get ( ' date_sighting ' , None )
if jsonsight [ ' type ' ] == " 0 " : # sightings
trendings_helper . addSightings ( timestamp )
elif jsonsight [ ' type ' ] == " 1 " : # false positive
trendings_helper . addFalsePositive ( timestamp )
def handler_event ( zmq_name , jsonobj ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling event ' )
2017-11-30 08:17:53 +01:00
#fields: threat_level_id, id, info
jsonevent = jsonobj [ ' Event ' ]
#Add trending
eventName = jsonevent [ ' info ' ]
timestamp = jsonevent [ ' timestamp ' ]
trendings_helper . addTrendingEvent ( eventName , timestamp )
tags = [ ]
2019-02-26 14:14:56 +01:00
for tag in jsonevent . get ( ' Tag ' , [ ] ) :
tags . append ( tag )
2017-11-30 08:17:53 +01:00
trendings_helper . addTrendingTags ( tags , timestamp )
#redirect to handler_attribute
if ' Attribute ' in jsonevent :
attributes = jsonevent [ ' Attribute ' ]
if type ( attributes ) is list :
for attr in attributes :
jsoncopy = copy . deepcopy ( jsonobj )
jsoncopy [ ' Attribute ' ] = attr
handler_attribute ( zmq_name , jsoncopy )
else :
handler_attribute ( zmq_name , attributes )
2019-02-22 15:16:50 +01:00
if ' Object ' in jsonevent :
objects = jsonevent [ ' Object ' ]
if type ( objects ) is list :
for obj in objects :
jsoncopy = copy . deepcopy ( jsonobj )
jsoncopy [ ' Object ' ] = obj
handler_object ( zmq_name , jsoncopy )
else :
handler_object ( zmq_name , objects )
2017-11-30 08:17:53 +01:00
action = jsonobj . get ( ' action ' , None )
eventLabeled = len ( jsonobj . get ( ' EventTag ' , [ ] ) ) > 0
org = jsonobj . get ( ' Orgc ' , { } ) . get ( ' name ' , None )
if org is not None :
contributor_helper . handleContribution ( zmq_name , org ,
' Event ' ,
None ,
action ,
isLabeled = eventLabeled )
2019-02-22 15:16:50 +01:00
def handler_attribute ( zmq_name , jsonobj , hasAlreadyBeenContributed = False , parentObject = False ) :
2017-12-04 16:44:44 +01:00
logger . info ( ' Handling attribute ' )
2017-11-30 08:17:53 +01:00
# check if jsonattr is an attribute object
if ' Attribute ' in jsonobj :
jsonattr = jsonobj [ ' Attribute ' ]
2019-02-22 15:16:50 +01:00
else :
jsonattr = jsonobj
attributeType = ' Attribute ' if jsonattr [ ' object_id ' ] == ' 0 ' else ' ObjectAttribute '
2017-11-30 08:17:53 +01:00
#Add trending
categName = jsonattr [ ' category ' ]
timestamp = jsonattr . get ( ' timestamp ' , int ( time . time ( ) ) )
trendings_helper . addTrendingCateg ( categName , timestamp )
tags = [ ]
for tag in jsonattr . get ( ' Tag ' , [ ] ) :
2019-02-26 14:14:56 +01:00
tags . append ( tag )
2017-11-30 08:17:53 +01:00
trendings_helper . addTrendingTags ( tags , timestamp )
#try to get coord from ip
if jsonattr [ ' category ' ] == " Network activity " :
geo_helper . getCoordFromIpAndPublish ( jsonattr [ ' value ' ] , jsonattr [ ' category ' ] )
#try to get coord from ip
if jsonattr [ ' type ' ] == " phone-number " :
geo_helper . getCoordFromPhoneAndPublish ( jsonattr [ ' value ' ] , jsonattr [ ' category ' ] )
if not hasAlreadyBeenContributed :
eventLabeled = len ( jsonobj . get ( ' EventTag ' , [ ] ) ) > 0
action = jsonobj . get ( ' action ' , None )
contributor_helper . handleContribution ( zmq_name , jsonobj [ ' Event ' ] [ ' Orgc ' ] [ ' name ' ] ,
2019-02-22 15:16:50 +01:00
attributeType ,
2017-11-30 08:17:53 +01:00
jsonattr [ ' category ' ] ,
action ,
isLabeled = eventLabeled )
# Push to log
2019-02-22 15:16:50 +01:00
live_helper . publish_log ( zmq_name , attributeType , jsonobj )
2017-11-30 08:17:53 +01:00
2019-06-14 16:59:00 +02:00
def handler_diagnostic_tool ( zmq_name , jsonobj ) :
2019-06-18 11:25:17 +02:00
try :
res = time . time ( ) - float ( jsonobj [ ' content ' ] )
except Exception as e :
logger . error ( e )
2019-06-14 16:59:00 +02:00
serv_list . set ( ' diagnostic_tool_response ' , str ( res ) )
2017-11-30 08:17:53 +01:00
###############
## MAIN LOOP ##
###############
def process_log ( zmq_name , event ) :
topic , eventdata = event . split ( ' ' , maxsplit = 1 )
jsonevent = json . loads ( eventdata )
try :
dico_action [ topic ] ( zmq_name , jsonevent )
except KeyError as e :
2017-12-04 16:44:44 +01:00
logger . error ( e )
2017-11-30 08:17:53 +01:00
2017-12-04 11:14:25 +01:00
def main ( sleeptime ) :
2019-06-18 09:25:46 +02:00
updates . check_for_updates ( )
2017-11-30 16:04:03 +01:00
numMsg = 0
2017-11-30 08:17:53 +01:00
while True :
2017-11-30 16:04:03 +01:00
content = serv_list . rpop ( LISTNAME )
if content is None :
2019-06-18 11:25:17 +02:00
log_text = ' Processed {} message(s) since last sleep. ' . format ( numMsg )
logger . info ( log_text )
2017-11-30 16:04:03 +01:00
numMsg = 0
time . sleep ( sleeptime )
continue
2019-06-19 11:32:06 +02:00
content = content
2017-11-30 16:04:03 +01:00
the_json = json . loads ( content )
zmqName = the_json [ ' zmq_name ' ]
content = the_json [ ' content ' ]
process_log ( zmqName , content )
numMsg + = 1
2017-11-30 08:17:53 +01:00
dico_action = {
" misp_json " : handler_dispatcher ,
" misp_json_event " : handler_event ,
" misp_json_self " : handler_keepalive ,
" misp_json_attribute " : handler_attribute ,
" misp_json_object " : handler_object ,
" misp_json_sighting " : handler_sighting ,
2018-10-01 13:28:27 +02:00
" misp_json_organisation " : handler_skip ,
2017-11-30 08:17:53 +01:00
" misp_json_user " : handler_user ,
" misp_json_conversation " : handler_conversation ,
2018-10-01 13:28:27 +02:00
" misp_json_object_reference " : handler_skip ,
" misp_json_audit " : handler_audit ,
2019-06-14 16:59:00 +02:00
" diagnostic_channel " : handler_diagnostic_tool
2017-11-30 08:17:53 +01:00
}
if __name__ == " __main__ " :
2017-12-04 11:14:25 +01:00
parser = argparse . ArgumentParser ( description = ' The ZMQ dispatcher. It pops from the redis buffer then redispatch it to the correct handlers ' )
2019-06-17 16:16:05 +02:00
parser . add_argument ( ' -s ' , ' --sleep ' , required = False , dest = ' sleeptime ' , type = int , help = ' The number of second to wait before checking redis list size ' , default = 1 )
2017-11-30 08:17:53 +01:00
args = parser . parse_args ( )
2019-05-29 02:48:26 +02:00
try :
main ( args . sleeptime )
2019-05-29 03:41:43 +02:00
except ( redis . exceptions . ResponseError , KeyboardInterrupt ) as error :
2019-05-29 02:48:26 +02:00
print ( error )