mirror of https://github.com/D4-project/d4-core
chg: [worker 1] compress files + send full pathname in feeder queues
parent
fe22c69512
commit
08809cf574
|
@ -51,7 +51,7 @@ function launching_redis {
|
|||
|
||||
screen -dmS "Redis_D4"
|
||||
sleep 0.1
|
||||
echo -e $GREEN"\t* Launching D4 Redis ervers"$DEFAULT
|
||||
echo -e $GREEN"\t* Launching D4 Redis Servers"$DEFAULT
|
||||
screen -S "Redis_D4" -X screen -t "6379" bash -c $redis_dir'redis-server '$conf_dir'6379.conf ; read x'
|
||||
sleep 0.1
|
||||
screen -S "Redis_D4" -X screen -t "6380" bash -c $redis_dir'redis-server '$conf_dir'6380.conf ; read x'
|
||||
|
|
|
@ -0,0 +1,156 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import gzip
|
||||
import redis
|
||||
import shutil
|
||||
import datetime
|
||||
|
||||
import signal
|
||||
|
||||
class GracefulKiller:
|
||||
kill_now = False
|
||||
def __init__(self):
|
||||
signal.signal(signal.SIGINT, self.exit_gracefully)
|
||||
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
||||
|
||||
def exit_gracefully(self,signum, frame):
|
||||
self.kill_now = True
|
||||
|
||||
def compress_file(file_full_path, session_uuid,i=0):
|
||||
redis_server_stream.set('data_in_process:{}'.format(session_uuid), file_full_path)
|
||||
if i==0:
|
||||
compressed_filename = '{}.gz'.format(file_full_path)
|
||||
else:
|
||||
compressed_filename = '{}.{}.gz'.format(file_full_path, i)
|
||||
if os.path.isfile(compressed_filename):
|
||||
compress_file(file_full_path, session_uuid, i+1)
|
||||
else:
|
||||
with open(file_full_path, 'rb') as f_in:
|
||||
with gzip.open(compressed_filename, 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
os.remove(file_full_path)
|
||||
# save full path in anylyzer queue
|
||||
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)):
|
||||
analyzer_uuid = analyzer_uuid.decode()
|
||||
redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename)
|
||||
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
|
||||
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
|
||||
if analyser_queue_max_size is None:
|
||||
analyser_queue_max_size = analyzer_list_max_default_size
|
||||
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size)
|
||||
|
||||
|
||||
host_redis_stream = "localhost"
|
||||
port_redis_stream = 6379
|
||||
|
||||
host_redis_metadata = "localhost"
|
||||
port_redis_metadata = 6380
|
||||
|
||||
redis_server_stream = redis.StrictRedis(
|
||||
host=host_redis_stream,
|
||||
port=port_redis_stream,
|
||||
db=0)
|
||||
|
||||
redis_server_metadata = redis.StrictRedis(
|
||||
host=host_redis_metadata,
|
||||
port=port_redis_metadata,
|
||||
db=0)
|
||||
|
||||
redis_server_analyzer = redis.StrictRedis(
|
||||
host=host_redis_metadata,
|
||||
port=port_redis_metadata,
|
||||
db=2)
|
||||
|
||||
type = 1
|
||||
sleep_time = 20
|
||||
|
||||
analyzer_list_max_default_size = 10000
|
||||
|
||||
if __name__ == "__main__":
|
||||
killer = GracefulKiller()
|
||||
|
||||
if len(sys.argv) != 4:
|
||||
print('usage:', 'Worker.py', 'session_uuid', 'tcpdump', 'date')
|
||||
exit(1)
|
||||
|
||||
# TODO sanityse input
|
||||
session_uuid = sys.argv[1]
|
||||
directory_data_uuid = sys.argv[2]
|
||||
date = sys.argv[3]
|
||||
|
||||
worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8])
|
||||
full_datetime = datetime.datetime.now().strftime("%Y%m%d%H")
|
||||
|
||||
current_file = None
|
||||
time_change = False
|
||||
|
||||
while True:
|
||||
if killer.kill_now:
|
||||
break
|
||||
|
||||
new_date = datetime.datetime.now().strftime("%Y%m%d")
|
||||
|
||||
# get all directory files
|
||||
all_files = os.listdir(worker_data_directory)
|
||||
not_compressed_file = []
|
||||
# filter: get all not compressed files
|
||||
for file in all_files:
|
||||
if file.endswith('.cap'):
|
||||
not_compressed_file.append(os.path.join(worker_data_directory, file))
|
||||
|
||||
if not_compressed_file:
|
||||
### check time-change (minus one hour) ###
|
||||
new_full_datetime = datetime.datetime.now().strftime("%Y%m%d%H")
|
||||
if new_full_datetime < full_datetime:
|
||||
# sort list, last modified
|
||||
not_compressed_file.sort(key=os.path.getctime)
|
||||
else:
|
||||
# sort list
|
||||
not_compressed_file.sort()
|
||||
### ###
|
||||
|
||||
# new day
|
||||
if date != new_date:
|
||||
# compress all file
|
||||
for file in not_compressed_file:
|
||||
if killer.kill_now:
|
||||
break
|
||||
compress_file(file, session_uuid)
|
||||
# reset file tracker
|
||||
current_file = None
|
||||
date = new_date
|
||||
# update worker_data_directory
|
||||
worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8])
|
||||
# restart
|
||||
continue
|
||||
|
||||
# file used by tcpdump
|
||||
max_file = not_compressed_file[-1]
|
||||
full_datetime = new_full_datetime
|
||||
|
||||
# Init: set current_file
|
||||
if not current_file:
|
||||
current_file = max_file
|
||||
#print('max_file set: {}'.format(current_file))
|
||||
|
||||
# new file created
|
||||
if max_file != current_file:
|
||||
|
||||
# get all previous files
|
||||
for file in not_compressed_file:
|
||||
if file != max_file:
|
||||
if killer.kill_now:
|
||||
break
|
||||
#print('new file: {}'.format(file))
|
||||
compress_file(file, session_uuid)
|
||||
|
||||
# update current_file tracker
|
||||
current_file = max_file
|
||||
|
||||
if killer.kill_now:
|
||||
break
|
||||
|
||||
time.sleep(10)
|
|
@ -3,10 +3,11 @@
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import gzip
|
||||
import redis
|
||||
import subprocess
|
||||
|
||||
import shutil
|
||||
import datetime
|
||||
import subprocess
|
||||
|
||||
def data_incorrect_format(stream_name, session_uuid, uuid):
|
||||
redis_server_stream.sadd('Error:IncorrectType:{}'.format(type), session_uuid)
|
||||
|
@ -22,6 +23,28 @@ def clean_stream(stream_name, session_uuid):
|
|||
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.delete(stream_name)
|
||||
|
||||
def compress_file(file_full_path, i=0):
|
||||
if i==0:
|
||||
compressed_filename = '{}.gz'.format(file_full_path)
|
||||
else:
|
||||
compressed_filename = '{}.{}.gz'.format(file_full_path, i)
|
||||
if os.path.isfile(compressed_filename):
|
||||
compress_file(file_full_path, i+1)
|
||||
else:
|
||||
with open(file_full_path, 'rb') as f_in:
|
||||
with gzip.open(compressed_filename, 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
os.remove(file_full_path)
|
||||
# save full path in anylyzer queue
|
||||
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)):
|
||||
analyzer_uuid = analyzer_uuid.decode()
|
||||
redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename)
|
||||
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
|
||||
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
|
||||
if analyser_queue_max_size is None:
|
||||
analyser_queue_max_size = analyzer_list_max_default_size
|
||||
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size)
|
||||
|
||||
host_redis_stream = "localhost"
|
||||
port_redis_stream = 6379
|
||||
|
||||
|
@ -38,10 +61,18 @@ redis_server_metadata = redis.StrictRedis(
|
|||
port=port_redis_metadata,
|
||||
db=0)
|
||||
|
||||
redis_server_analyzer = redis.StrictRedis(
|
||||
host=host_redis_metadata,
|
||||
port=port_redis_metadata,
|
||||
db=2)
|
||||
|
||||
type = 1
|
||||
tcp_dump_cycle = '300'
|
||||
tcp_dump_cycle = '10'
|
||||
stream_buffer = 100
|
||||
|
||||
analyzer_list_max_default_size = 10000
|
||||
|
||||
id_to_delete = []
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -59,6 +90,7 @@ if __name__ == "__main__":
|
|||
uuid = res[0][1][0][1][b'uuid'].decode()
|
||||
date = datetime.datetime.now().strftime("%Y%m%d")
|
||||
tcpdump_path = os.path.join('../../data', uuid, str(type))
|
||||
full_tcpdump_path = os.path.join(os.environ['D4_HOME'], 'data', uuid, str(type))
|
||||
rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8])
|
||||
if not os.path.isdir(rel_path):
|
||||
os.makedirs(rel_path)
|
||||
|
@ -72,6 +104,8 @@ if __name__ == "__main__":
|
|||
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
nb_save = 0
|
||||
|
||||
process_compressor = subprocess.Popen(['./file_compressor.py', session_uuid, full_tcpdump_path, date])
|
||||
|
||||
while True:
|
||||
|
||||
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||
|
@ -108,7 +142,7 @@ if __name__ == "__main__":
|
|||
nb_save = 0
|
||||
|
||||
else:
|
||||
# sucess, all data are saved
|
||||
# success, all data are saved
|
||||
if redis_server_stream.sismember('ended_session', session_uuid):
|
||||
out, err = process.communicate(timeout= 0.5)
|
||||
#print(out)
|
||||
|
@ -117,12 +151,31 @@ if __name__ == "__main__":
|
|||
elif err:
|
||||
print(err)
|
||||
|
||||
# close child
|
||||
try:
|
||||
process_compressor.communicate(timeout= 0.5)
|
||||
except subprocess.TimeoutExpired:
|
||||
process_compressor.kill()
|
||||
### compress all files ###
|
||||
date = datetime.datetime.now().strftime("%Y%m%d")
|
||||
worker_data_directory = os.path.join(full_tcpdump_path, date[0:4], date[4:6], date[6:8])
|
||||
all_files = os.listdir(worker_data_directory)
|
||||
all_files.sort()
|
||||
if all_files:
|
||||
for file in all_files:
|
||||
if file.endswith('.cap'):
|
||||
full_path = os.path.join(worker_data_directory, file)
|
||||
if redis_server_stream.get('data_in_process:{}'.format(session_uuid)) != full_path:
|
||||
compress_file(full_path)
|
||||
### ###
|
||||
|
||||
#print(process.stderr.read())
|
||||
redis_server_stream.srem('ended_session', session_uuid)
|
||||
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.srem('working_session_uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.delete(stream_name)
|
||||
redis_server_stream.delete('data_in_process:{}'.format(session_uuid))
|
||||
# make sure that tcpdump can save all datas
|
||||
time.sleep(10)
|
||||
print('---- tcpdump DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||
|
|
|
@ -115,7 +115,7 @@ if __name__ == "__main__":
|
|||
if len(buffer) < max_buffer_length:
|
||||
buffer += data[b'message']
|
||||
else:
|
||||
print('Error, infinite loop, buffer may length reached')
|
||||
print('Error, infinite loop, max buffer length reached')
|
||||
# force new line
|
||||
buffer += b'{}\n'.format(data[b'message'])
|
||||
|
||||
|
|
Loading…
Reference in New Issue