From f92dd11f7f2a698ca9fa383a21a401f1a3c46592 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Mon, 24 Sep 2018 10:26:15 +0200 Subject: [PATCH] chg+fix: [trending/flask] applied algorithm strategy for other stats + FIXED eventsource caching bug: Data was cached and delivered all at once when reconnecting to the source --- helpers/trendings_helper.py | 8 ++-- server.py | 90 ++++++++++++++++++++----------------- util.py | 2 +- 3 files changed, 55 insertions(+), 45 deletions(-) diff --git a/helpers/trendings_helper.py b/helpers/trendings_helper.py index d385037..61e1479 100644 --- a/helpers/trendings_helper.py +++ b/helpers/trendings_helper.py @@ -105,8 +105,8 @@ class Trendings_helper: specificLabel = specificLabel.replace('\\n', '\n'); # reset correctly label with their \n (CR) instead of their char value return self.getSpecificTrending(self.keyEvent, dateS, dateE, specificLabel) - def getTrendingCategs(self, dateS, dateE): - return self.getGenericTrending(self.keyCateg, dateS, dateE) + def getTrendingCategs(self, dateS, dateE, topNum=None): + return self.getGenericTrending(self.keyCateg, dateS, dateE, topNum=topNum) # FIXME: Construct this when getting data def getTrendingTags(self, dateS, dateE, topNum=12): @@ -137,8 +137,8 @@ class Trendings_helper: to_ret.append([util.getTimestamp(curDate), { 'sightings': sight, 'false_positive': fp}]) return to_ret - def getTrendingDisc(self, dateS, dateE): - return self.getGenericTrending(self.keyDisc, dateS, dateE) + def getTrendingDisc(self, dateS, dateE, topNum=None): + return self.getGenericTrending(self.keyDisc, dateS, dateE, topNum=topNum) def getTypeaheadData(self, dateS, dateE): to_ret = {} diff --git a/server.py b/server.py index 6166993..b3dbd6a 100755 --- a/server.py +++ b/server.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -from flask import Flask, render_template, request, Response, jsonify +from flask import Flask, render_template, request, Response, jsonify, stream_with_context import json import redis import random, math @@ -46,16 +46,6 @@ contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg) trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) -subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True) -subscriber_log.psubscribe(cfg.get('RedisLog', 'channel')) -subscriber_map = redis_server_map.pubsub(ignore_subscribe_messages=True) -subscriber_map.psubscribe(cfg.get('RedisMap', 'channelDisp')) -subscriber_lastContrib = redis_server_log.pubsub(ignore_subscribe_messages=True) -subscriber_lastContrib.psubscribe(cfg.get('RedisLog', 'channelLastContributor')) -subscriber_lastAwards = redis_server_log.pubsub(ignore_subscribe_messages=True) -subscriber_lastAwards.psubscribe(cfg.get('RedisLog', 'channelLastAwards')) - -eventNumber = 0 ########## ## UTIL ## @@ -240,14 +230,24 @@ def getLogHead(): return json.dumps(LogItem('').get_head_row()) def event_stream_log(): - for msg in subscriber_log.listen(): - content = msg['data'] - yield EventMessage(content).to_json() + subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True) + subscriber_log.subscribe(cfg.get('RedisLog', 'channel')) + try: + for msg in subscriber_log.listen(): + content = msg['data'] + yield EventMessage(content).to_json() + except GeneratorExit: + subscriber_log.unsubscribe() def event_stream_maps(): - for msg in subscriber_map.listen(): - content = msg['data'].decode('utf8') - yield 'data: {}\n\n'.format(content) + subscriber_map = redis_server_map.pubsub(ignore_subscribe_messages=True) + subscriber_map.psubscribe(cfg.get('RedisMap', 'channelDisp')) + try: + for msg in subscriber_map.listen(): + content = msg['data'].decode('utf8') + yield 'data: {}\n\n'.format(content) + except GeneratorExit: + subscriber_map.unsubscribe() ''' GEO ''' @@ -298,27 +298,37 @@ def getLastStreamAwards(): return Response(eventStreamAwards(), mimetype="text/event-stream") def eventStreamLastContributor(): - for msg in subscriber_lastContrib.listen(): - content = msg['data'].decode('utf8') - contentJson = json.loads(content) - lastContribJson = json.loads(contentJson['log']) - org = lastContribJson['org'] - to_return = contributor_helper.getContributorFromRedis(org) - epoch = lastContribJson['epoch'] - to_return['epoch'] = epoch - yield 'data: {}\n\n'.format(json.dumps(to_return)) + subscriber_lastContrib = redis_server_log.pubsub(ignore_subscribe_messages=True) + subscriber_lastContrib.psubscribe(cfg.get('RedisLog', 'channelLastContributor')) + try: + for msg in subscriber_lastContrib.listen(): + content = msg['data'].decode('utf8') + contentJson = json.loads(content) + lastContribJson = json.loads(contentJson['log']) + org = lastContribJson['org'] + to_return = contributor_helper.getContributorFromRedis(org) + epoch = lastContribJson['epoch'] + to_return['epoch'] = epoch + yield 'data: {}\n\n'.format(json.dumps(to_return)) + except GeneratorExit: + subscriber_lastContrib.unsubscribe() def eventStreamAwards(): - for msg in subscriber_lastAwards.listen(): - content = msg['data'].decode('utf8') - contentJson = json.loads(content) - lastAwardJson = json.loads(contentJson['log']) - org = lastAwardJson['org'] - to_return = contributor_helper.getContributorFromRedis(org) - epoch = lastAwardJson['epoch'] - to_return['epoch'] = epoch - to_return['award'] = lastAwardJson['award'] - yield 'data: {}\n\n'.format(json.dumps(to_return)) + subscriber_lastAwards = redis_server_log.pubsub(ignore_subscribe_messages=True) + subscriber_lastAwards.psubscribe(cfg.get('RedisLog', 'channelLastAwards')) + try: + for msg in subscriber_lastAwards.listen(): + content = msg['data'].decode('utf8') + contentJson = json.loads(content) + lastAwardJson = json.loads(contentJson['log']) + org = lastAwardJson['org'] + to_return = contributor_helper.getContributorFromRedis(org) + epoch = lastAwardJson['epoch'] + to_return['epoch'] = epoch + to_return['award'] = lastAwardJson['award'] + yield 'data: {}\n\n'.format(json.dumps(to_return)) + except GeneratorExit: + subscriber_lastAwards.unsubscribe() @app.route("/_getTopContributor") def getTopContributor(suppliedDate=None, maxNum=100): @@ -475,7 +485,7 @@ def getTrendingEvents(): dateE = datetime.datetime.now() specificLabel = request.args.get('specificLabel') - data = trendings_helper.getTrendingEvents(dateS, dateE, specificLabel) + data = trendings_helper.getTrendingEvents(dateS, dateE, specificLabel, topNum=int(request.args.get('topNum', 10))) return jsonify(data) @app.route("/_getTrendingCategs") @@ -488,7 +498,7 @@ def getTrendingCategs(): dateE = datetime.datetime.now() - data = trendings_helper.getTrendingCategs(dateS, dateE) + data = trendings_helper.getTrendingCategs(dateS, dateE, topNum=int(request.args.get('topNum', 10))) return jsonify(data) @app.route("/_getTrendingTags") @@ -501,7 +511,7 @@ def getTrendingTags(): dateE = datetime.datetime.now() - data = trendings_helper.getTrendingTags(dateS, dateE) + data = trendings_helper.getTrendingTags(dateS, dateE, topNum=int(request.args.get('topNum', 10))) return jsonify(data) @app.route("/_getTrendingSightings") @@ -551,7 +561,7 @@ def getGenericTrendingOvertime(): dateE = datetime.datetime.now() choice = request.args.get('choice', 'events') - data = trendings_helper.getGenericTrendingOvertime(dateS, dateE, choice) + data = trendings_helper.getGenericTrendingOvertime(dateS, dateE, choice=choice) return jsonify(data) if __name__ == '__main__': diff --git a/util.py b/util.py index 2938213..6f6d0db 100644 --- a/util.py +++ b/util.py @@ -78,7 +78,7 @@ def sortByTrendingScore(toSort, topNum=5): scoredLabels = defaultdict(float) numDay = len(toSort) baseDecay = 1.0 - decayRate = lambda x: baseDecay*((numDay-x)/numDay) + decayRate = lambda x: baseDecay*((numDay-x**2)/numDay) for i, arr in enumerate(toSort): timestamp = arr[0]