chg+fix: [trending/flask] applied algorithm strategy for other stats + FIXED eventsource caching bug:

Data was cached and delivered all at once when reconnecting to the source
pull/43/merge
Sami Mokaddem 2018-09-24 10:26:15 +02:00
parent a7065c1171
commit f92dd11f7f
3 changed files with 55 additions and 45 deletions

View File

@ -105,8 +105,8 @@ class Trendings_helper:
specificLabel = specificLabel.replace('\\n', '\n'); # reset correctly label with their \n (CR) instead of their char value specificLabel = specificLabel.replace('\\n', '\n'); # reset correctly label with their \n (CR) instead of their char value
return self.getSpecificTrending(self.keyEvent, dateS, dateE, specificLabel) return self.getSpecificTrending(self.keyEvent, dateS, dateE, specificLabel)
def getTrendingCategs(self, dateS, dateE): def getTrendingCategs(self, dateS, dateE, topNum=None):
return self.getGenericTrending(self.keyCateg, dateS, dateE) return self.getGenericTrending(self.keyCateg, dateS, dateE, topNum=topNum)
# FIXME: Construct this when getting data # FIXME: Construct this when getting data
def getTrendingTags(self, dateS, dateE, topNum=12): def getTrendingTags(self, dateS, dateE, topNum=12):
@ -137,8 +137,8 @@ class Trendings_helper:
to_ret.append([util.getTimestamp(curDate), { 'sightings': sight, 'false_positive': fp}]) to_ret.append([util.getTimestamp(curDate), { 'sightings': sight, 'false_positive': fp}])
return to_ret return to_ret
def getTrendingDisc(self, dateS, dateE): def getTrendingDisc(self, dateS, dateE, topNum=None):
return self.getGenericTrending(self.keyDisc, dateS, dateE) return self.getGenericTrending(self.keyDisc, dateS, dateE, topNum=topNum)
def getTypeaheadData(self, dateS, dateE): def getTypeaheadData(self, dateS, dateE):
to_ret = {} to_ret = {}

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from flask import Flask, render_template, request, Response, jsonify from flask import Flask, render_template, request, Response, jsonify, stream_with_context
import json import json
import redis import redis
import random, math import random, math
@ -46,16 +46,6 @@ contributor_helper = contributor_helper.Contributor_helper(serv_redis_db, cfg)
users_helper = users_helper.Users_helper(serv_redis_db, cfg) users_helper = users_helper.Users_helper(serv_redis_db, cfg)
trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg) trendings_helper = trendings_helper.Trendings_helper(serv_redis_db, cfg)
subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True)
subscriber_log.psubscribe(cfg.get('RedisLog', 'channel'))
subscriber_map = redis_server_map.pubsub(ignore_subscribe_messages=True)
subscriber_map.psubscribe(cfg.get('RedisMap', 'channelDisp'))
subscriber_lastContrib = redis_server_log.pubsub(ignore_subscribe_messages=True)
subscriber_lastContrib.psubscribe(cfg.get('RedisLog', 'channelLastContributor'))
subscriber_lastAwards = redis_server_log.pubsub(ignore_subscribe_messages=True)
subscriber_lastAwards.psubscribe(cfg.get('RedisLog', 'channelLastAwards'))
eventNumber = 0
########## ##########
## UTIL ## ## UTIL ##
@ -240,14 +230,24 @@ def getLogHead():
return json.dumps(LogItem('').get_head_row()) return json.dumps(LogItem('').get_head_row())
def event_stream_log(): def event_stream_log():
for msg in subscriber_log.listen(): subscriber_log = redis_server_log.pubsub(ignore_subscribe_messages=True)
content = msg['data'] subscriber_log.subscribe(cfg.get('RedisLog', 'channel'))
yield EventMessage(content).to_json() try:
for msg in subscriber_log.listen():
content = msg['data']
yield EventMessage(content).to_json()
except GeneratorExit:
subscriber_log.unsubscribe()
def event_stream_maps(): def event_stream_maps():
for msg in subscriber_map.listen(): subscriber_map = redis_server_map.pubsub(ignore_subscribe_messages=True)
content = msg['data'].decode('utf8') subscriber_map.psubscribe(cfg.get('RedisMap', 'channelDisp'))
yield 'data: {}\n\n'.format(content) try:
for msg in subscriber_map.listen():
content = msg['data'].decode('utf8')
yield 'data: {}\n\n'.format(content)
except GeneratorExit:
subscriber_map.unsubscribe()
''' GEO ''' ''' GEO '''
@ -298,27 +298,37 @@ def getLastStreamAwards():
return Response(eventStreamAwards(), mimetype="text/event-stream") return Response(eventStreamAwards(), mimetype="text/event-stream")
def eventStreamLastContributor(): def eventStreamLastContributor():
for msg in subscriber_lastContrib.listen(): subscriber_lastContrib = redis_server_log.pubsub(ignore_subscribe_messages=True)
content = msg['data'].decode('utf8') subscriber_lastContrib.psubscribe(cfg.get('RedisLog', 'channelLastContributor'))
contentJson = json.loads(content) try:
lastContribJson = json.loads(contentJson['log']) for msg in subscriber_lastContrib.listen():
org = lastContribJson['org'] content = msg['data'].decode('utf8')
to_return = contributor_helper.getContributorFromRedis(org) contentJson = json.loads(content)
epoch = lastContribJson['epoch'] lastContribJson = json.loads(contentJson['log'])
to_return['epoch'] = epoch org = lastContribJson['org']
yield 'data: {}\n\n'.format(json.dumps(to_return)) to_return = contributor_helper.getContributorFromRedis(org)
epoch = lastContribJson['epoch']
to_return['epoch'] = epoch
yield 'data: {}\n\n'.format(json.dumps(to_return))
except GeneratorExit:
subscriber_lastContrib.unsubscribe()
def eventStreamAwards(): def eventStreamAwards():
for msg in subscriber_lastAwards.listen(): subscriber_lastAwards = redis_server_log.pubsub(ignore_subscribe_messages=True)
content = msg['data'].decode('utf8') subscriber_lastAwards.psubscribe(cfg.get('RedisLog', 'channelLastAwards'))
contentJson = json.loads(content) try:
lastAwardJson = json.loads(contentJson['log']) for msg in subscriber_lastAwards.listen():
org = lastAwardJson['org'] content = msg['data'].decode('utf8')
to_return = contributor_helper.getContributorFromRedis(org) contentJson = json.loads(content)
epoch = lastAwardJson['epoch'] lastAwardJson = json.loads(contentJson['log'])
to_return['epoch'] = epoch org = lastAwardJson['org']
to_return['award'] = lastAwardJson['award'] to_return = contributor_helper.getContributorFromRedis(org)
yield 'data: {}\n\n'.format(json.dumps(to_return)) epoch = lastAwardJson['epoch']
to_return['epoch'] = epoch
to_return['award'] = lastAwardJson['award']
yield 'data: {}\n\n'.format(json.dumps(to_return))
except GeneratorExit:
subscriber_lastAwards.unsubscribe()
@app.route("/_getTopContributor") @app.route("/_getTopContributor")
def getTopContributor(suppliedDate=None, maxNum=100): def getTopContributor(suppliedDate=None, maxNum=100):
@ -475,7 +485,7 @@ def getTrendingEvents():
dateE = datetime.datetime.now() dateE = datetime.datetime.now()
specificLabel = request.args.get('specificLabel') specificLabel = request.args.get('specificLabel')
data = trendings_helper.getTrendingEvents(dateS, dateE, specificLabel) data = trendings_helper.getTrendingEvents(dateS, dateE, specificLabel, topNum=int(request.args.get('topNum', 10)))
return jsonify(data) return jsonify(data)
@app.route("/_getTrendingCategs") @app.route("/_getTrendingCategs")
@ -488,7 +498,7 @@ def getTrendingCategs():
dateE = datetime.datetime.now() dateE = datetime.datetime.now()
data = trendings_helper.getTrendingCategs(dateS, dateE) data = trendings_helper.getTrendingCategs(dateS, dateE, topNum=int(request.args.get('topNum', 10)))
return jsonify(data) return jsonify(data)
@app.route("/_getTrendingTags") @app.route("/_getTrendingTags")
@ -501,7 +511,7 @@ def getTrendingTags():
dateE = datetime.datetime.now() dateE = datetime.datetime.now()
data = trendings_helper.getTrendingTags(dateS, dateE) data = trendings_helper.getTrendingTags(dateS, dateE, topNum=int(request.args.get('topNum', 10)))
return jsonify(data) return jsonify(data)
@app.route("/_getTrendingSightings") @app.route("/_getTrendingSightings")
@ -551,7 +561,7 @@ def getGenericTrendingOvertime():
dateE = datetime.datetime.now() dateE = datetime.datetime.now()
choice = request.args.get('choice', 'events') choice = request.args.get('choice', 'events')
data = trendings_helper.getGenericTrendingOvertime(dateS, dateE, choice) data = trendings_helper.getGenericTrendingOvertime(dateS, dateE, choice=choice)
return jsonify(data) return jsonify(data)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -78,7 +78,7 @@ def sortByTrendingScore(toSort, topNum=5):
scoredLabels = defaultdict(float) scoredLabels = defaultdict(float)
numDay = len(toSort) numDay = len(toSort)
baseDecay = 1.0 baseDecay = 1.0
decayRate = lambda x: baseDecay*((numDay-x)/numDay) decayRate = lambda x: baseDecay*((numDay-x**2)/numDay)
for i, arr in enumerate(toSort): for i, arr in enumerate(toSort):
timestamp = arr[0] timestamp = arr[0]