From d1ad7543c92b54f60f316e6de97302231b403be1 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Wed, 25 Oct 2017 16:21:35 +0200 Subject: [PATCH] Probably fixed zmq bug in categ --- zmq_subscriber.py | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/zmq_subscriber.py b/zmq_subscriber.py index 6948fcf..88a970b 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -21,7 +21,7 @@ CHANNELDISP = cfg.get('RedisMap', 'channelDisp') CHANNEL_PROC = cfg.get('RedisMap', 'channelProc') PATH_TO_DB = cfg.get('RedisMap', 'pathMaxMindDB') -redis_server = redis.StrictRedis( +serv_log = redis.StrictRedis( host=cfg.get('RedisLog', 'host'), port=cfg.getint('RedisLog', 'port'), db=cfg.getint('RedisLog', 'db')) @@ -29,13 +29,23 @@ serv_coord = redis.StrictRedis( host=cfg.get('RedisMap', 'host'), port=cfg.getint('RedisMap', 'port'), db=cfg.getint('RedisMap', 'db')) +serv_redis_db = redis.StrictRedis( + host=cfg.get('RedisDB', 'host'), + port=cfg.getint('RedisDB', 'port'), + db=cfg.getint('RedisDB', 'db')) reader = geoip2.database.Reader(PATH_TO_DB) def publish_log(zmq_name, name, content): to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name } - redis_server.publish(CHANNEL, json.dumps(to_send)) + serv_log.publish(CHANNEL, json.dumps(to_send)) + +def push_to_redis_zset(keyCateg, toAdd): + now = datetime.datetime.now() + today_str = str(now.year)+str(now.month)+str(now.day) + keyname = "{}:{}".format(keyCateg, today_str) + serv_redis_db.zincrby(keyname, toAdd) def ip_to_coord(ip): @@ -54,10 +64,8 @@ def getCoordAndPublish(zmq_name, supposed_ip, categ): coord = rep['coord'] coord_dic = {'lat': coord['lat'], 'lon': coord['lon']} coord_list = [coord['lat'], coord['lon']] - now = datetime.datetime.now() - today_str = str(now.year)+str(now.month)+str(now.day) - keyname = 'GEO_' + today_str - serv_coord.zincrby(keyname, coord_list) + push_to_redis_zset('GEO_COORD', json.dumps(coord_dic)) + push_to_redis_zset('GEO_COUNTRY', rep['full_rep'].country.iso_code) to_send = { "coord": coord, "categ": categ, @@ -85,38 +93,36 @@ def handler_keepalive(zmq_name, jsonevent): publish_log(zmq_name, 'Keepalive', to_push) def handler_event(zmq_name, jsonevent): - #print(jsonevent) #fields: threat_level_id, id, info jsonevent = jsonevent['Event'] #redirect to handler_attribute if 'Attribute' in jsonevent: attributes = jsonevent['Attribute'] - print("+--------- EVENTS -----------+") - print(attributes) - if attributes is list: + if type(attributes) is list: for attr in attributes: handler_attribute(zmq_name, attr) else: - handler_attribute(zmq_name, jsonevent) + handler_attribute(zmq_name, attributes) def handler_attribute(zmq_name, jsonattr): - print("+--------- ATTRIBUTE -----------+") - jsonattr = jsonattr['Attribute'] - print(jsonattr) + # check if jsonattr is an attribute object + if 'Attribute' in jsonattr: + jsonattr = jsonattr['Attribute'] + to_push = [] for field in json.loads(cfg.get('Log', 'fieldname_order')): - print(field) if type(field) is list: to_add = cfg.get('Log', 'char_separator').join([ jsonattr[subField] for subField in field ]) else: to_add = jsonattr[field] to_push.append(to_add) - #try to get coord + #try to get coord from ip if jsonattr['category'] == "Network activity": getCoordAndPublish(zmq_name, jsonattr['value'], jsonattr['category']) + # Push to log publish_log(zmq_name, 'Attribute', to_push)