diff --git a/users_helper.py b/users_helper.py index 80ad79f..ec01615 100644 --- a/users_helper.py +++ b/users_helper.py @@ -73,7 +73,7 @@ class Users_helper: for curDate in util.getXPrevDaysSpan(date, prev_days): log = self.serv_redis_db.zscore(keyname_log.format(self.keyOrgLog, util.getDateStrFormat(curDate)), org) log = 0 if log is None else 1 - contrib = self.serv_redis_db.zscore(keyname_contrib.format(keyContribDay, util.getDateStrFormat(curDate)), org) + contrib = self.serv_redis_db.zscore(keyname_contrib.format(self.keyContribDay, util.getDateStrFormat(curDate)), org) contrib = 0 if contrib is None else 1 data.append([log, contrib]) return data @@ -100,7 +100,7 @@ class Users_helper: def getLoginVSCOntribution(self, date): - keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(date)) + keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(date)) orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False) orgs_contri = [ org.decode('utf8') for org in orgs_contri ] orgs_login = [ org[0] for org in self.getOrgslogin(date, topNum=0) ] @@ -152,7 +152,7 @@ class Users_helper: for curDate in util.getXPrevDaysSpan(date, prev_days): timestamps = self.getUserLogins(curDate) - keyname = "{}:{}".format(keyContribDay, util.getDateStrFormat(curDate)) + keyname = "{}:{}".format(self.keyContribDay, util.getDateStrFormat(curDate)) orgs_contri = self.serv_redis_db.zrange(keyname, 0, -1, desc=True, withscores=False) orgs_contri_num = len(orgs_contri) diff --git a/zmq_dispatcher.py b/zmq_dispatcher.py index a4699c5..48a6f53 100755 --- a/zmq_dispatcher.py +++ b/zmq_dispatcher.py @@ -85,6 +85,7 @@ def handler_keepalive(zmq_name, jsonevent): publish_log(zmq_name, 'Keepalive', to_push) def handler_user(zmq_name, jsondata): + print('sending', 'user') action = jsondata['action'] json_user = jsondata['User'] json_org = jsondata['Organisation'] @@ -225,31 +226,34 @@ def handler_attribute(zmq_name, jsonobj, hasAlreadyBeenContributed=False): ############### def process_log(zmq_name, event): - event = event.decode('utf8') topic, eventdata = event.split(' ', maxsplit=1) jsonevent = json.loads(eventdata) - print(event) try: dico_action[topic](zmq_name, jsonevent) except KeyError as e: print(e) -def main(zmqName): +def main(zmqName, zmqurl, sleeptime): context = zmq.Context() socket = context.socket(zmq.SUB) - socket.connect(ZMQ_URL) + socket.connect(zmqurl) socket.setsockopt_string(zmq.SUBSCRIBE, '') + numMsg = 0 while True: - try: - content = serv_list.rpop(LISTNAME) - the_json = json.loads(content) - zmqName - the_json['zmq_name'] - content = the_json['content'] - process_log(zmqName, content) - except KeyboardInterrupt: - return + content = serv_list.rpop(LISTNAME) + if content is None: + print('Processed', numMsg, 'message(s) since last sleep.') + numMsg = 0 + time.sleep(sleeptime) + continue + content = content.decode('utf8') + the_json = json.loads(content) + zmqName = the_json['zmq_name'] + content = the_json['content'] + process_log(zmqName, content) + numMsg += 1 dico_action = { @@ -271,6 +275,7 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description='A zmq subscriber. It subscribe to a ZNQ then redispatch it to the misp-dashboard') parser.add_argument('-n', '--name', required=False, dest='zmqname', help='The ZMQ feed name', default="MISP Standard ZMQ") parser.add_argument('-u', '--url', required=False, dest='zmqurl', help='The URL to connect to', default=ZMQ_URL) + parser.add_argument('-s', '--sleep', required=False, dest='sleeptime', type=int, help='The number of second to wait before checking redis list size', default=5) args = parser.parse_args() - main(args.zmqname) + main(args.zmqname, args.zmqurl, args.sleeptime) diff --git a/zmq_subscriber.py b/zmq_subscriber.py index ee7922c..bb44f84 100755 --- a/zmq_subscriber.py +++ b/zmq_subscriber.py @@ -29,8 +29,9 @@ serv_list = redis.StrictRedis( ############### def put_in_redis_list(zmq_name, content): + content = content.decode('utf8') to_add = {'zmq_name': zmq_name, 'content': content} - serv_list.lpush(LISTNAME, json.dumps(content)) + serv_list.lpush(LISTNAME, json.dumps(to_add)) def main(zmqName): context = zmq.Context()