(partially fix #90 too) using a simple Alarm (SIGNAL) when exec-timeout

Introducing a timer (in this case 60 seconds) to ensure that the
execution time of the analysis takes less than 60 seconds. This
is a simple and standard POSIX signal handler. If the timeout
is reached, the module will process the next one.

This approach fixes the specific issues we have currently
with some inputs where the sentiment analysis takes too much time. This
fix should be improved and be more generic:

 - Introducing statistics of content which timeouts.
 - Keeping a list/queue to further process those files using a different
   analysis approach. Maybe a set of "dirty" processes to handle the edge cases
   and to not impact the overall processing and analysis.
 - Make the timer configurable per module (at least for this one).
pull/110/head
Alexandre Dulaunoy 2017-01-26 07:11:18 +00:00
parent 3b101ea8f5
commit cf903cc212
1 changed files with 27 additions and 11 deletions

View File

@ -54,9 +54,9 @@ def Analyse(message, server):
the_time = datetime.time(getattr(the_time, 'hour'), 0, 0)
combined_datetime = datetime.datetime.combine(the_date, the_time)
timestamp = calendar.timegm(combined_datetime.timetuple())
sentences = tokenize.sent_tokenize(p_content.decode('utf-8', 'ignore'))
if len(sentences) > 0:
avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0}
neg_line = 0
@ -74,8 +74,8 @@ def Analyse(message, server):
pos_line += 1
else:
avg_score[k] += ss[k]
for k in avg_score:
if k == 'compoundPos':
avg_score[k] = avg_score[k] / (pos_line if pos_line > 0 else 1)
@ -83,15 +83,15 @@ def Analyse(message, server):
avg_score[k] = avg_score[k] / (neg_line if neg_line > 0 else 1)
else:
avg_score[k] = avg_score[k] / len(sentences)
# In redis-levelDB: {} = set, () = K-V
# {Provider_set -> provider_i}
# {Provider_TimestampInHour_i -> UniqID_i}_j
# (UniqID_i -> PasteValue_i)
server.sadd('Provider_set', provider)
provider_timestamp = provider + '_' + str(timestamp)
server.incr('UniqID')
UniqID = server.get('UniqID')
@ -100,7 +100,7 @@ def Analyse(message, server):
server.set(UniqID, avg_score)
else:
print 'Dropped:', p_MimeType
def isJSON(content):
try:
@ -110,6 +110,16 @@ def isJSON(content):
except Exception,e:
return False
import signal
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException
signal.signal(signal.SIGALRM, timeout_handler)
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
@ -138,6 +148,12 @@ if __name__ == '__main__':
publisher.debug("{} queue is empty, waiting".format(config_section))
time.sleep(1)
continue
Analyse(message, server)
signal.alarm(60)
try:
Analyse(message, server)
except TimeoutException:
print ("{0} processing timeout".format(message))
continue
else:
signal.alarm(0)