diff --git a/config/config.cfg.default b/config/config.cfg.default index cdb6eed..ff2bbaa 100644 --- a/config/config.cfg.default +++ b/config/config.cfg.default @@ -51,7 +51,6 @@ channelLastAwards = lastAwards [RedisMap] db=1 -channelProc=CoordToProcess channelDisp=PicToDisplay pathMaxMindDB=./data/GeoLite2-City_20171003/GeoLite2-City.mmdb diff --git a/geo_helper.py b/geo_helper.py new file mode 100644 index 0000000..e8ff2ab --- /dev/null +++ b/geo_helper.py @@ -0,0 +1,131 @@ +import math, random +import os +import json +import datetime, time +import redis +from collections import OrderedDict +import geoip2.database + +import util + +class Geo_helper: + def __init__(self, serv_redis_db, cfg): + self.serv_redis_db = serv_redis_db + self.cfg = cfg + self.serv_coord = redis.StrictRedis( + host=cfg.get('RedisGlobal', 'host'), + port=cfg.getint('RedisGlobal', 'port'), + db=cfg.getint('RedisMap', 'db')) + + self.keyCategCoord = "GEO_COORD" + self.keyCategCountry = "GEO_COUNTRY" + self.keyCategRad = "GEO_RAD" + self.PATH_TO_DB = cfg.get('RedisMap', 'pathMaxMindDB') + self.CHANNELDISP = cfg.get('RedisMap', 'channelDisp') + + self.reader = geoip2.database.Reader(self.PATH_TO_DB) + + ''' GET ''' + def getTopCoord(self, date): + topNum = 6 # default Num + data = util.getZrange(self.serv_redis_db, self.keyCategCoord, date, topNum) + return data + + def getHitMap(self, date): + topNum = 0 # all + data = util.getZrange(self.serv_redis_db, self.keyCategCountry, date, topNum) + return data + + def getCoordsByRadius(self, dateStart, dateEnd, centerLat, centerLon, radius): + dico_coord = {} + to_return = [] + delta = dateEnd - dateStart + for i in range(delta.days+1): + correctDatetime = dateStart + datetime.timedelta(days=i) + date_str = util.getDateStrFormat(correctDatetime) + keyname = "{}:{}".format(self.keyCategRad, date_str) + res = self.serv_redis_db.georadius(keyname, centerLon, centerLat, radius, unit='km', withcoord=True) + + #sum up really close coord + for data, coord in res: + flag_added = False + coord = [coord[0], coord[1]] + #list all coord + for dicoCoordStr in dico_coord.keys(): + dicoCoord = json.loads(dicoCoordStr) + #if curCoord close to coord + if self.isCloseTo(dicoCoord, coord): + #add data to dico coord + dico_coord[dicoCoordStr].append(data) + flag_added = True + break + # coord not in dic + if not flag_added: + dico_coord[str(coord)] = [data] + + for dicoCoord, array in dico_coord.items(): + dicoCoord = json.loads(dicoCoord) + to_return.append([array, dicoCoord]) + return to_return + + ''' ADD ''' + def getCoordAndPublish(self, supposed_ip, categ): + try: + rep = self.ip_to_coord(supposed_ip) + coord = rep['coord'] + coord_dic = {'lat': coord['lat'], 'lon': coord['lon']} + ordDic = OrderedDict() #keep fields with the same layout in redis + ordDic['lat'] = coord_dic['lat'] + ordDic['lon'] = coord_dic['lon'] + coord_list = [coord['lat'], coord['lon']] + self.push_to_redis_zset(self.keyCategCoord, json.dumps(ordDic)) + self.push_to_redis_zset(self.keyCategCountry, rep['full_rep'].country.iso_code) + ordDic = OrderedDict() #keep fields with the same layout in redis + ordDic['categ'] = categ + ordDic['value'] = supposed_ip + self.push_to_redis_geo(self.keyCategRad, coord['lon'], coord['lat'], json.dumps(ordDic)) + to_send = { + "coord": coord, + "categ": categ, + "value": supposed_ip, + "country": rep['full_rep'].country.name, + "specifName": rep['full_rep'].subdivisions.most_specific.name, + "cityName": rep['full_rep'].city.name, + "regionCode": rep['full_rep'].country.iso_code, + } + self.serv_coord.publish(self.CHANNELDISP, json.dumps(to_send)) + except ValueError: + print("can't resolve ip") + except geoip2.errors.AddressNotFoundError: + print("Address not in Database") + + ''' UTIL ''' + def push_to_redis_geo(self, keyCateg, lon, lat, content): + now = datetime.datetime.now() + today_str = util.getDateStrFormat(now) + keyname = "{}:{}".format(keyCateg, today_str) + self.serv_redis_db.geoadd(keyname, lon, lat, content) + def push_to_redis_zset(self, keyCateg, toAdd, endSubkey="", count=1): + now = datetime.datetime.now() + today_str = util.getDateStrFormat(now) + keyname = "{}:{}{}".format(keyCateg, today_str, endSubkey) + self.serv_redis_db.zincrby(keyname, toAdd, count) + + + def ip_to_coord(self, ip): + resp = self.reader.city(ip) + lat = float(resp.location.latitude) + lon = float(resp.location.longitude) + # 0.0001 correspond to ~10m + # Cast the float so that it has the correct float format + lat_corrected = float("{:.4f}".format(lat)) + lon_corrected = float("{:.4f}".format(lon)) + return { 'coord': {'lat': lat_corrected, 'lon': lon_corrected}, 'full_rep': resp } + + def isCloseTo(self, coord1, coord2): + clusterMeter = self.cfg.getfloat('GEO' ,'clusteringDistance') + clusterThres = math.pow(10, len(str(abs(clusterMeter)))-7) #map meter to coord threshold (~ big approx) + if abs(float(coord1[0]) - float(coord2[0])) <= clusterThres: + if abs(float(coord1[1]) - float(coord2[1])) <= clusterThres: + return True + return False diff --git a/server.py b/server.py index 546031f..9611bcc 100755 --- a/server.py +++ b/server.py @@ -10,6 +10,7 @@ import datetime import os import util +import geo_helper import contributor_helper import users_helper import trendings_helper @@ -33,6 +34,7 @@ serv_redis_db = redis.StrictRedis( port=cfg.getint('RedisGlobal', 'port'), db=cfg.getint('RedisDB', 'db')) +geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg) trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) @@ -110,14 +112,6 @@ class EventMessage(): to_ret = { 'log': self.feed, 'feedName': self.feedName, 'zmqName': self.zmqName } return 'data: {}\n\n'.format(json.dumps(to_ret)) -''' GENERAL ''' -def getZrange(keyCateg, date, topNum, endSubkey=""): - date_str = util.getDateStrFormat(date) - keyname = "{}:{}{}".format(keyCateg, date_str, endSubkey) - data = serv_redis_db.zrange(keyname, 0, topNum-1, desc=True, withscores=True) - data = [ [record[0].decode('utf8'), record[1]] for record in data ] - return data - ########### ## ROUTE ## ########### @@ -254,9 +248,7 @@ def getTopCoord(): date = datetime.datetime.fromtimestamp(float(request.args.get('date'))) except: date = datetime.datetime.now() - keyCateg = "GEO_COORD" - topNum = 6 # default Num - data = getZrange(keyCateg, date, topNum) + data = geo_helper.getTopCoord(date) return jsonify(data) @app.route("/_getHitMap") @@ -265,23 +257,11 @@ def getHitMap(): date = datetime.datetime.fromtimestamp(float(request.args.get('date'))) except: date = datetime.datetime.now() - keyCateg = "GEO_COUNTRY" - topNum = 0 # all - data = getZrange(keyCateg, date, topNum) + data = geo_helper.getHitMap(date) return jsonify(data) -def isCloseTo(coord1, coord2): - clusterMeter = cfg.getfloat('GEO' ,'clusteringDistance') - clusterThres = math.pow(10, len(str(abs(clusterMeter)))-7) #map meter to coord threshold (~ big approx) - if abs(float(coord1[0]) - float(coord2[0])) <= clusterThres: - if abs(float(coord1[1]) - float(coord2[1])) <= clusterThres: - return True - return False - @app.route("/_getCoordsByRadius") def getCoordsByRadius(): - dico_coord = {} - to_return = [] try: dateStart = datetime.datetime.fromtimestamp(float(request.args.get('dateStart'))) dateEnd = datetime.datetime.fromtimestamp(float(request.args.get('dateEnd'))) @@ -289,38 +269,10 @@ def getCoordsByRadius(): centerLon = request.args.get('centerLon') radius = int(math.ceil(float(request.args.get('radius')))) except: - return jsonify(to_return) + return jsonify([]) - delta = dateEnd - dateStart - for i in range(delta.days+1): - correctDatetime = dateStart + datetime.timedelta(days=i) - date_str = util.getDateStrFormat(correctDatetime) - keyCateg = 'GEO_RAD' - keyname = "{}:{}".format(keyCateg, date_str) - res = serv_redis_db.georadius(keyname, centerLon, centerLat, radius, unit='km', withcoord=True) - - #sum up really close coord - for data, coord in res: - flag_added = False - coord = [coord[0], coord[1]] - #list all coord - for dicoCoordStr in dico_coord.keys(): - dicoCoord = json.loads(dicoCoordStr) - #if curCoord close to coord - if isCloseTo(dicoCoord, coord): - #add data to dico coord - dico_coord[dicoCoordStr].append(data) - flag_added = True - break - # coord not in dic - if not flag_added: - dico_coord[str(coord)] = [data] - - for dicoCoord, array in dico_coord.items(): - dicoCoord = json.loads(dicoCoord) - to_return.append([array, dicoCoord]) - - return jsonify(to_return) + data = geo_helper.getCoordsByRadius(dateStart, dateEnd, centerLat, centerLon, radius) + return jsonify(data) ''' CONTRIB ''' diff --git a/util.py b/util.py index c22e539..5f937e6 100644 --- a/util.py +++ b/util.py @@ -2,6 +2,13 @@ import datetime, time ONE_DAY = 60*60*24 +def getZrange(serv_redis_db, keyCateg, date, topNum, endSubkey=""): + date_str = getDateStrFormat(date) + keyname = "{}:{}{}".format(keyCateg, date_str, endSubkey) + data = serv_redis_db.zrange(keyname, 0, topNum-1, desc=True, withscores=True) + data = [ [record[0].decode('utf8'), record[1]] for record in data ] + return data + def getMonthSpan(date): ds = datetime.datetime(date.year, date.month, 1) dyear = 1 if ds.month+1 > 12 else 0 diff --git a/zmq_subscriber.py b/zmq_subscriber.py index 39fae6f..81c6383 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -2,7 +2,6 @@ import time, datetime import copy -from collections import OrderedDict from pprint import pprint import zmq import redis @@ -12,9 +11,9 @@ import argparse import os import sys import json -import geoip2.database import util +import geo_helper import contributor_helper import users_helper import trendings_helper @@ -28,10 +27,6 @@ ZMQ_URL = cfg.get('RedisGlobal', 'zmq_url') CHANNEL = cfg.get('RedisLog', 'channel') CHANNEL_LASTCONTRIB = cfg.get('RedisLog', 'channelLastContributor') CHANNEL_LASTAWARDS = cfg.get('RedisLog', 'channelLastAwards') -CHANNELDISP = cfg.get('RedisMap', 'channelDisp') -CHANNEL_PROC = cfg.get('RedisMap', 'channelProc') -PATH_TO_DB = cfg.get('RedisMap', 'pathMaxMindDB') - DEFAULT_PNTS_REWARD = cfg.get('CONTRIB', 'default_pnts_per_contribution') categories_in_datatable = json.loads(cfg.get('CONTRIB', 'categories_in_datatable')) @@ -39,26 +34,21 @@ DICO_PNTS_REWARD = {} temp = json.loads(cfg.get('CONTRIB', 'pnts_per_contribution')) for categ, pnts in temp: DICO_PNTS_REWARD[categ] = pnts -MAX_NUMBER_OF_LAST_CONTRIBUTOR = cfg.getint('CONTRIB', 'max_number_of_last_contributor') serv_log = redis.StrictRedis( host=cfg.get('RedisGlobal', 'host'), port=cfg.getint('RedisGlobal', 'port'), db=cfg.getint('RedisLog', 'db')) -serv_coord = redis.StrictRedis( - host=cfg.get('RedisGlobal', 'host'), - port=cfg.getint('RedisGlobal', 'port'), - db=cfg.getint('RedisMap', 'db')) serv_redis_db = redis.StrictRedis( host=cfg.get('RedisGlobal', 'host'), port=cfg.getint('RedisGlobal', 'port'), db=cfg.getint('RedisDB', 'db')) +geo_helper = geo_helper.Geo_helper(serv_redis_db, cfg) contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg) trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) -reader = geoip2.database.Reader(PATH_TO_DB) def publish_log(zmq_name, name, content, channel=CHANNEL): to_send = { 'name': name, 'log': json.dumps(content), 'zmqName': zmq_name } @@ -70,52 +60,6 @@ def push_to_redis_zset(keyCateg, toAdd, endSubkey="", count=1): keyname = "{}:{}{}".format(keyCateg, today_str, endSubkey) serv_redis_db.zincrby(keyname, toAdd, count) -def push_to_redis_geo(keyCateg, lon, lat, content): - now = datetime.datetime.now() - today_str = util.getDateStrFormat(now) - keyname = "{}:{}".format(keyCateg, today_str) - serv_redis_db.geoadd(keyname, lon, lat, content) - -def ip_to_coord(ip): - resp = reader.city(ip) - lat = float(resp.location.latitude) - lon = float(resp.location.longitude) - # 0.0001 correspond to ~10m - # Cast the float so that it has the correct float format - lat_corrected = float("{:.4f}".format(lat)) - lon_corrected = float("{:.4f}".format(lon)) - return { 'coord': {'lat': lat_corrected, 'lon': lon_corrected}, 'full_rep': resp } - -def getCoordAndPublish(zmq_name, supposed_ip, categ): - try: - rep = ip_to_coord(supposed_ip) - coord = rep['coord'] - coord_dic = {'lat': coord['lat'], 'lon': coord['lon']} - ordDic = OrderedDict() #keep fields with the same layout in redis - ordDic['lat'] = coord_dic['lat'] - ordDic['lon'] = coord_dic['lon'] - coord_list = [coord['lat'], coord['lon']] - push_to_redis_zset('GEO_COORD', json.dumps(ordDic)) - push_to_redis_zset('GEO_COUNTRY', rep['full_rep'].country.iso_code) - ordDic = OrderedDict() #keep fields with the same layout in redis - ordDic['categ'] = categ - ordDic['value'] = supposed_ip - push_to_redis_geo('GEO_RAD', coord['lon'], coord['lat'], json.dumps(ordDic)) - to_send = { - "coord": coord, - "categ": categ, - "value": supposed_ip, - "country": rep['full_rep'].country.name, - "specifName": rep['full_rep'].subdivisions.most_specific.name, - "cityName": rep['full_rep'].city.name, - "regionCode": rep['full_rep'].country.iso_code, - } - serv_coord.publish(CHANNELDISP, json.dumps(to_send)) - except ValueError: - print("can't resolve ip") - except geoip2.errors.AddressNotFoundError: - print("Address not in Database") - def getFields(obj, fields): jsonWalker = fields.split('.') itemToExplore = obj @@ -332,7 +276,7 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False): #try to get coord from ip if jsonattr['category'] == "Network activity": - getCoordAndPublish(zmq_name, jsonattr['value'], jsonattr['category']) + geo_helper.getCoordAndPublish(jsonattr['value'], jsonattr['category']) if not hasAlreadyBeenContributed: try: @@ -374,10 +318,13 @@ def main(zmqName): socket.setsockopt_string(zmq.SUBSCRIBE, '') while True: - content = socket.recv() - content.replace(b'\n', b'') # remove \n... - zmq_name = zmqName - process_log(zmq_name, content) + try: + content = socket.recv() + content.replace(b'\n', b'') # remove \n... + zmq_name = zmqName + process_log(zmq_name, content) + except KeyboardInterrupt: + return dico_action = {