fix: redis buffer seems to work

pull/15/head
Sami Mokaddem 2017-11-30 16:04:03 +01:00
parent 850b686e76
commit 868ba9f80b
3 changed files with 23 additions and 17 deletions

View File

@ -73,7 +73,7 @@ class Users_helper:
for curDate in util.getXPrevDaysSpan(date, prev_days): for curDate in util.getXPrevDaysSpan(date, prev_days):
log = self.serv_redis_db.zscore(keyname_log.format(self.keyOrgLog, util.getDateStrFormat(curDate)), org) log = self.serv_redis_db.zscore(keyname_log.format(self.keyOrgLog, util.getDateStrFormat(curDate)), org)
log = 0 if log is None else 1 log = 0 if log is None else 1
contrib = self.serv_redis_db.zscore(keyname_contrib.format(keyContribDay, util.getDateStrFormat(curDate)), org) contrib = self.serv_redis_db.zscore(keyname_contrib.format(self.keyContribDay, util.getDateStrFormat(curDate)), org)
contrib = 0 if contrib is None else 1 contrib = 0 if contrib is None else 1
data.append([log, contrib]) data.append([log, contrib])
return data return data
@ -100,7 +100,7 @@ class Users_helper:
def getLoginVSCOntribution(self, date): def getLoginVSCOntribution(self, date):
keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(date)) keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(date))
orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False) orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False)
orgs_contri = [ org.decode('utf8') for org in orgs_contri ] orgs_contri = [ org.decode('utf8') for org in orgs_contri ]
orgs_login = [ org[0] for org in self.getOrgslogin(date, topNum=0) ] orgs_login = [ org[0] for org in self.getOrgslogin(date, topNum=0) ]
@ -152,7 +152,7 @@ class Users_helper:
for curDate in util.getXPrevDaysSpan(date, prev_days): for curDate in util.getXPrevDaysSpan(date, prev_days):
timestamps = self.getUserLogins(curDate) timestamps = self.getUserLogins(curDate)
keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(curDate)) keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(curDate))
orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False) orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False)
orgs_contri_num = len(orgs_contri) orgs_contri_num = len(orgs_contri)

View File

@ -85,6 +85,7 @@ def handler_keepalive(zmq_name, jsonevent):
publish_log(zmq_name, 'Keepalive', to_push) publish_log(zmq_name, 'Keepalive', to_push)
def handler_user(zmq_name, jsondata): def handler_user(zmq_name, jsondata):
print('sending', 'user')
action = jsondata['action'] action = jsondata['action']
json_user = jsondata['User'] json_user = jsondata['User']
json_org = jsondata['Organisation'] json_org = jsondata['Organisation']
@ -225,31 +226,34 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False):
############### ###############
def process_log(zmq_name, event): def process_log(zmq_name, event):
event = event.decode('utf8')
topic, eventdata = event.split(' ', maxsplit=1) topic, eventdata = event.split(' ', maxsplit=1)
jsonevent = json.loads(eventdata) jsonevent = json.loads(eventdata)
print(event)
try: try:
dico_action[topic](zmq_name, jsonevent) dico_action[topic](zmq_name, jsonevent)
except KeyError as e: except KeyError as e:
print(e) print(e)
def main(zmqName): def main(zmqName, zmqurl, sleeptime):
context = zmq.Context() context = zmq.Context()
socket = context.socket(zmq.SUB) socket = context.socket(zmq.SUB)
socket.connect(ZMQ_URL) socket.connect(zmqurl)
socket.setsockopt_string(zmq.SUBSCRIBE, '') socket.setsockopt_string(zmq.SUBSCRIBE, '')
numMsg = 0
while True: while True:
try:
content = serv_list.rpop(LISTNAME) content = serv_list.rpop(LISTNAME)
if content is None:
print('Processed', numMsg, 'message(s) since last sleep.')
numMsg = 0
time.sleep(sleeptime)
continue
content = content.decode('utf8')
the_json = json.loads(content) the_json = json.loads(content)
zmqName - the_json['zmq_name'] zmqName = the_json['zmq_name']
content = the_json['content'] content = the_json['content']
process_log(zmqName, content) process_log(zmqName, content)
except KeyboardInterrupt: numMsg += 1
return
dico_action = { dico_action = {
@ -271,6 +275,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard') parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard')
parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ") parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ")
parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL) parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL)
parser.add_argument('-s', '--sleep', required=False, dest='sleeptime', type=int, help='The number of second to wait before checking redis list size', default=5)
args = parser.parse_args() args = parser.parse_args()
main(args.zmqname) main(args.zmqname, args.zmqurl, args.sleeptime)

View File

@ -29,8 +29,9 @@ serv_list = redis.StrictRedis(
############### ###############
def put_in_redis_list(zmq_name, content): def put_in_redis_list(zmq_name, content):
content = content.decode('utf8')
to_add = {'zmq_name': zmq_name, 'content': content} to_add = {'zmq_name': zmq_name, 'content': content}
serv_list.lpush(LISTNAME, json.dumps(content)) serv_list.lpush(LISTNAME, json.dumps(to_add))
def main(zmqName): def main(zmqName):
context = zmq.Context() context = zmq.Context()