Added number of same module running in ModuleInformation

pull/74/head
Mokaddem 2016-08-25 16:07:47 +02:00
parent 0b5a3e0cd2
commit 78c991539e
2 changed files with 49 additions and 23 deletions

View File

@ -108,6 +108,7 @@ class Process(object):
self.modules = ConfigParser.ConfigParser()
self.modules.read(modulesfile)
self.subscriber_name = conf_section
self.pubsub = None
if self.modules.has_section(conf_section):
self.pubsub = PubSub()
@ -118,6 +119,15 @@ class Process(object):
port=self.config.get('RedisPubSub', 'port'),
db=self.config.get('RedisPubSub', 'db'))
self.moduleNum = 1
for i in range(1, 50):
curr_num = self.r_temp.get("MODULE_"+self.subscriber_name + "_" + str(i))
if curr_num is None:
self.moduleNum = i
break
def populate_set_in(self):
# monoproc
src = self.modules.get(self.subscriber_name, 'subscribe')
@ -142,15 +152,18 @@ class Process(object):
else:
try:
path = message.split(".")[-2].split("/")[-1]
if ".gz" in message:
path = message.split(".")[-2].split("/")[-1]
else:
path = "?"
value = str(timestamp) + ", " + path
self.r_temp.set("MODULE_"+self.subscriber_name, value)
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value)
return message
except:
path = "?"
value = str(timestamp) + ", " + path
self.r_temp.set("MODULE_"+self.subscriber_name, value)
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value)
return message
def populate_set_out(self, msg, channel=None):

View File

@ -29,6 +29,9 @@ def getPid(module):
else:
return None
def clearRedisModuleInfo():
for k in server.keys("MODULE_*"):
server.delete(k)
def kill_module(module):
print ''
@ -62,6 +65,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Show info concerning running modules and log suspected stucked modules. May be use to automatically kill and restart stucked one.')
parser.add_argument('-r', '--refresh', type=int, required=False, default=1, help='Refresh rate')
parser.add_argument('-k', '--autokill', type=int, required=True, default=1, help='Enable auto kill option (1 for TRUE, anything else for FALSE)')
parser.add_argument('-c', '--clear', type=int, required=False, default=1, help='Clear the current module information (Used to clear data from old launched modules)')
args = parser.parse_args()
@ -80,37 +84,46 @@ if __name__ == "__main__":
port=cfg.getint("Redis_Queues", "port"),
db=cfg.getint("Redis_Queues", "db"))
if args.clear == 1:
clearRedisModuleInfo()
while True:
num = 0
curr_range = 50
printarray1 = []
printarray2 = []
for queue, card in server.hgetall("queues").iteritems():
key = "MODULE_" + queue
value = server.get(key)
if value is not None:
timestamp, path = value.split(", ")
if timestamp is not None and path is not None:
num += 1
startTime_readable = datetime.datetime.fromtimestamp(int(timestamp))
processed_time_readable = str((datetime.datetime.now() - startTime_readable)).split('.')[0]
key = "MODULE_" + queue + "_"
for i in range(1, 50):
curr_num = server.get("MODULE_"+ queue + "_" + str(i))
if curr_num is None:
curr_range = i
break
if int(card) > 0:
if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module:
log = open(log_filename, 'a')
log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n")
if args.autokill == 1:
kill_module(queue)
for moduleNum in range(1, curr_range):
value = server.get(key + str(moduleNum))
if value is not None:
timestamp, path = value.split(", ")
if timestamp is not None and path is not None:
startTime_readable = datetime.datetime.fromtimestamp(int(timestamp))
processed_time_readable = str((datetime.datetime.now() - startTime_readable)).split('.')[0]
printarray1.append([str(num), str(queue), str(card), str(startTime_readable), str(processed_time_readable), str(path)])
if int(card) > 0:
if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module:
log = open(log_filename, 'a')
log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n")
if args.autokill == 1:
kill_module(queue)
else:
printarray2.append([str(num), str(queue), str(card), str(startTime_readable), str(processed_time_readable), str(path)])
printarray1.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)])
else:
printarray2.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)])
printarray1.sort(lambda x,y: cmp(x[4], y[4]), reverse=True)
printarray2.sort(lambda x,y: cmp(x[4], y[4]), reverse=True)
printarray1.insert(0,["#", "Queue", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"])
printarray2.insert(0,["#", "Queue", "Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"])
printarray1.insert(0,["Queue", "#", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"])
printarray2.insert(0,["Queue", "#","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"])
os.system('clear')
t1 = AsciiTable(printarray1, title="Working queues")