mirror of https://github.com/CIRCL/AIL-framework
Added new module 'ModuleInformation' which gives real time information on running module
parent
07856f3119
commit
deeebec2f5
|
@ -22,8 +22,7 @@ from pubsublogger import publisher
|
|||
from packages import lib_words
|
||||
import datetime
|
||||
import calendar
|
||||
|
||||
from Helper import Process
|
||||
import ConfigParser
|
||||
|
||||
# Config Variables
|
||||
Refresh_rate = 60*5 #sec
|
||||
|
@ -96,8 +95,14 @@ if __name__ == '__main__':
|
|||
# Script is the default channel used for the modules.
|
||||
publisher.channel = 'Script'
|
||||
|
||||
config_section = 'CurveManageTopSets'
|
||||
p = Process(config_section)
|
||||
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
|
||||
if not os.path.exists(configfile):
|
||||
raise Exception('Unable to find the configuration file. \
|
||||
Did you set environment variables? \
|
||||
Or activate the virtualenv.')
|
||||
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
server_term = redis.StrictRedis(
|
||||
host=p.config.get("Redis_Level_DB_TermFreq", "host"),
|
||||
|
@ -113,7 +118,6 @@ if __name__ == '__main__':
|
|||
|
||||
while True:
|
||||
# Get one message from the input queue (module only work if linked with a queue)
|
||||
message = p.get_from_set()
|
||||
if message is None:
|
||||
publisher.debug("{} queue is empty, waiting".format(config_section))
|
||||
print 'sleeping'
|
||||
|
|
|
@ -16,6 +16,7 @@ import ConfigParser
|
|||
import os
|
||||
import zmq
|
||||
import time
|
||||
import datetime
|
||||
import json
|
||||
|
||||
|
||||
|
@ -132,7 +133,26 @@ class Process(object):
|
|||
in_set = self.subscriber_name + 'in'
|
||||
self.r_temp.hset('queues', self.subscriber_name,
|
||||
int(self.r_temp.scard(in_set)))
|
||||
return self.r_temp.spop(in_set)
|
||||
message = self.r_temp.spop(in_set)
|
||||
timestamp = int(time.mktime(datetime.datetime.utcnow().timetuple()))
|
||||
dir_name = os.environ['AIL_HOME']+self.config.get('Directories', 'pastes')
|
||||
|
||||
if message is None:
|
||||
return None
|
||||
|
||||
else:
|
||||
try:
|
||||
#path = message[message.index(dir_name)+len(dir_name):message.index(".gz")]
|
||||
path = message.split(".")[-2].split("/")[-1]
|
||||
value = str(timestamp) + ", " + path
|
||||
self.r_temp.set("MODULE_"+self.subscriber_name, value)
|
||||
return message
|
||||
|
||||
except:
|
||||
path = "?"
|
||||
value = str(timestamp) + ", " + path
|
||||
self.r_temp.set("MODULE_"+self.subscriber_name, value)
|
||||
return message
|
||||
|
||||
def populate_set_out(self, msg, channel=None):
|
||||
# multiproc
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
|
||||
import time
|
||||
import datetime
|
||||
import calendar
|
||||
import redis
|
||||
import os
|
||||
import ConfigParser
|
||||
from prettytable import PrettyTable
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
|
||||
if not os.path.exists(configfile):
|
||||
raise Exception('Unable to find the configuration file. \
|
||||
Did you set environment variables? \
|
||||
Or activate the virtualenv.')
|
||||
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
server = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
while True:
|
||||
|
||||
table = PrettyTable(['#', 'Queue', 'Amount', 'Paste start time', 'Processing time for current paste (H:M:S)', 'Paste hash'], sortby="Processing time for current paste (H:M:S)", reversesort=True)
|
||||
num = 0
|
||||
for queue, card in server.hgetall("queues").iteritems():
|
||||
key = "MODULE_" + queue
|
||||
value = server.get(key)
|
||||
if value is not None:
|
||||
timestamp, path = value.split(", ")
|
||||
if timestamp is not None and path is not None:
|
||||
num += 1
|
||||
startTime_readable = datetime.datetime.utcfromtimestamp(int(timestamp))
|
||||
processed_time_readable = str((datetime.datetime.now() - startTime_readable)).split('.')[0]
|
||||
table.add_row([num, queue, card, startTime_readable, processed_time_readable, path])
|
||||
|
||||
os.system('clear')
|
||||
print table
|
||||
time.sleep(1)
|
Binary file not shown.
Before Width: | Height: | Size: 56 KiB |
Loading…
Reference in New Issue