mirror of https://github.com/D4-project/d4-core
use dynamic Parent/Child class
parent
9c17d74d7d
commit
3ad297799e
|
@ -0,0 +1,133 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import redis
|
||||
|
||||
DEFAULT_FILE_EXTENSION = 'txt'
|
||||
DEFAULT_FILE_SEPARATOR = b'\n'
|
||||
ROTATION_SAVE_CYCLE = 10 # seconds
|
||||
TYPE = 254
|
||||
|
||||
class MetaTypesDefault:
|
||||
|
||||
def __init__(self, uuid, json_file):
|
||||
self.session_uuid = uuid
|
||||
self.type_name = json_file['type']
|
||||
self.parse_json(json_file)
|
||||
print('end default init')
|
||||
|
||||
def test(self):
|
||||
print(self.session_uuid)
|
||||
|
||||
######## JSON PARSER ########
|
||||
def parse_json(self, uuid, json_file):
|
||||
self.uuid = uuid
|
||||
self.save_file_on_disk = True
|
||||
self.is_file_rotation = True
|
||||
self.file_separator = b'\n'
|
||||
self.filename = b'{}.txt'.format(self.type_name)
|
||||
|
||||
######## PROCESS FUNCTIONS ########
|
||||
def process_data(self, data):
|
||||
# save data on disk
|
||||
if self.is_file_saved_on_disk():
|
||||
self.save_data_to_file(data)
|
||||
|
||||
print('end process_data')
|
||||
|
||||
######## CORE FUNCTIONS ########
|
||||
|
||||
def check_json_file(self, json_file):
|
||||
# the json object must contain a type field
|
||||
if "type" in json_file:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def save_json_file(self, json_file):
|
||||
self.last_time_saved = time.time() #time_file
|
||||
self.last_saved_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #date_file
|
||||
|
||||
save_path = os.path.join(self.get_save_dir(file_extention='json'), self.get_save_dir())
|
||||
with open(save_path, 'w') as f:
|
||||
f.write(json.dumps(full_json))
|
||||
|
||||
def save_data_to_file(self, data):
|
||||
if self.is_file_rotation():
|
||||
self.save_rotate_file(data)
|
||||
|
||||
|
||||
def save_rotate_file(self, data):
|
||||
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
# check if a new file rotation is needed # # TODO: change ROTATION_SAVE_CYCLE
|
||||
if ( new_date[0:8] != self.get_last_time_saved()[0:8] ) or ( time.time() - self.get_last_time_saved > ROTATION_SAVE_CYCLE ):
|
||||
date_file = new_date
|
||||
self.set_last_saved_date(new_date)
|
||||
self.set_rotate_file(True)
|
||||
|
||||
|
||||
def save_same_directory(self, data):
|
||||
pass
|
||||
|
||||
######## GET FUNCTIONS ########
|
||||
|
||||
def get_type_name(self):
|
||||
return self.type_name
|
||||
|
||||
def get_file_separator(self):
|
||||
return self.file_separator
|
||||
|
||||
def get_filename(self, file_extention=None):
|
||||
if file_extention is None:
|
||||
file_extention = DEFAULT_FILE_EXTENSION
|
||||
# File Rotation, : data/<uuid>/254/<year>/<month>/<day>/
|
||||
if self.is_file_rotation():
|
||||
return '{}-{}-{}-{}-{}.{}'.format(self.uuid, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day(), self.get_last_saved_hour_minute(), file_extention)
|
||||
|
||||
# todo save save_dir ???
|
||||
def get_save_dir(self):
|
||||
# File Rotation, save data in directory: data/<uuid>/254/<year>/<month>/<day>/
|
||||
if self.is_file_rotation():
|
||||
data_directory_uuid_type = os.path.join('../../data', self.uuid, str(TYPE))
|
||||
return os.path.join(data_directory_uuid_type, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day() , self.type_name)
|
||||
|
||||
# # TODO: save global type dir ???
|
||||
if self.is_file_saved_on_disk():
|
||||
pass
|
||||
|
||||
def is_file_saved_on_disk(self):
|
||||
return self.save_file_on_disk
|
||||
|
||||
def is_file_rotation(self):
|
||||
return self.is_file_rotation
|
||||
|
||||
def get_last_time_saved(self):
|
||||
return self.last_time_saved
|
||||
|
||||
def get_last_saved_date(self):
|
||||
return self.last_saved_date
|
||||
|
||||
def get_last_saved_year(self):
|
||||
return self.last_saved_date[0:4]
|
||||
|
||||
def get_last_saved_month(self):
|
||||
return self.last_saved_date[4:6]
|
||||
|
||||
def get_last_saved_year(self):
|
||||
return self.last_saved_date[6:8]
|
||||
|
||||
def get_last_saved_hour_minute(self):
|
||||
return self.last_saved_date[8:14]
|
||||
|
||||
######## SET FUNCTIONS ########
|
||||
|
||||
def set_rotate_file(self, boolean_value):
|
||||
self.file_rotation = boolean_value
|
||||
|
||||
def set_last_saved_date(self, date):
|
||||
self.get_last_saved_date = date
|
||||
|
||||
##############
|
|
@ -0,0 +1,17 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import redis
|
||||
|
||||
class TypeHandler(MetaTypesDefault:
|
||||
|
||||
def __init__(self, json_file):
|
||||
super().__init__(json_file)
|
||||
print('init_spec')
|
||||
|
||||
def test2(self):
|
||||
print('ja3-jl type')
|
||||
print(self.session_uuid)
|
|
@ -8,6 +8,8 @@ import redis
|
|||
|
||||
import datetime
|
||||
|
||||
from meta_types_modules import MetaTypesDefault
|
||||
|
||||
host_redis_stream = "localhost"
|
||||
port_redis_stream = 6379
|
||||
|
||||
|
@ -30,48 +32,20 @@ max_buffer_length = 100000
|
|||
rotation_save_cycle = 10 #seconds
|
||||
|
||||
json_file_name = 'meta_json.json'
|
||||
extended_type_name = None # # TODO: use default or json['file_type']
|
||||
|
||||
save_to_file = True
|
||||
|
||||
def get_dir_data_uuid(uuid, type):
|
||||
return os.path.join('../../data', uuid, str(type))
|
||||
|
||||
def get_save_dir(dir_data_uuid, year, month, day, extended_type=None):
|
||||
dir_path = os.path.join(dir_data_uuid, year, month, day)
|
||||
if extended_type:
|
||||
dir_path = os.path.join(dir_path, extended_type)
|
||||
if not os.path.isdir(dir_path):
|
||||
os.makedirs(dir_path)
|
||||
return dir_path
|
||||
|
||||
def check_json_file(json_file):
|
||||
# the json object must contain a type field
|
||||
if "type" in json_file:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def on_error(session_uuid, type_error, message):
|
||||
redis_server_stream.sadd('Error:IncorrectType', session_uuid)
|
||||
redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, {}'.format(type_error, message))
|
||||
clean_db(session_uuid)
|
||||
print('Incorrect format')
|
||||
sys.exit(1)
|
||||
|
||||
def clean_db(session_uuid):
|
||||
clean_stream(stream_meta_json, type_meta_header, session_uuid)
|
||||
clean_stream(stream_defined, type_defined, session_uuid)
|
||||
redis_server_stream.srem('ended_session', session_uuid)
|
||||
redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid)
|
||||
|
||||
def clean_stream(stream_name, type, session_uuid):
|
||||
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.delete(stream_name)
|
||||
def get_class( package_class ):
|
||||
parts = package_class.split('.')
|
||||
module = ".".join(parts[:-1])
|
||||
mod = __import__( module )
|
||||
for comp in parts[1:]:
|
||||
mod = getattr(mod, comp)
|
||||
return mod
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
|
||||
###################################################3
|
||||
|
||||
if len(sys.argv) != 2:
|
||||
print('usage:', 'Worker.py', 'session_uuid')
|
||||
exit(1)
|
||||
|
@ -151,22 +125,24 @@ if __name__ == "__main__":
|
|||
clean_db(session_uuid)
|
||||
sys.exit(1)
|
||||
|
||||
file_separator = b'\n' ## TODO: map all file separator or extract from json
|
||||
extended_type_name = '{}.txt'.format(extended_type) # # TODO: create default or extract from JSON
|
||||
|
||||
#### Handle Specific MetaTypes ####
|
||||
# Use Specific Handler defined
|
||||
if os.path.isdir(os.path.join('meta_types_modules', extended_type)):
|
||||
class_type_handler = get_class('meta_types_modules.{}.{}.TypeHandler'.format(extended_type, extended_type))
|
||||
type_handler = class_type_handler(uuid, full_json)
|
||||
# Use Standard Handler
|
||||
else:
|
||||
type_handler = MetaTypesDefault.MetaTypesDefault(uuid, full_json)
|
||||
|
||||
#file_separator = type_handler.get_file_separator(self)
|
||||
#extended_type_name = type_handler.get_file_name()
|
||||
|
||||
# save json on disk
|
||||
if save_to_file:
|
||||
rotate_file = False
|
||||
time_file = time.time()
|
||||
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
# get new save_path #use first or last received date ???
|
||||
dir_data_uuid = get_dir_data_uuid(uuid, type_defined)
|
||||
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type)
|
||||
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], json_file_name)
|
||||
save_path = os.path.join(dir_full_path, filename)
|
||||
with open(save_path, 'w') as f:
|
||||
f.write(json.dumps(full_json))
|
||||
# get extended_type save_path
|
||||
type_handler.save_json_file(json)
|
||||
|
||||
|
||||
# get extended_type save_path #############################################################################################################
|
||||
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name)
|
||||
save_path = os.path.join(dir_full_path, filename)
|
||||
|
||||
|
@ -186,13 +162,14 @@ if __name__ == "__main__":
|
|||
data = res[0][1][0][1]
|
||||
|
||||
if id and data:
|
||||
|
||||
type_handler.process_data(data)
|
||||
|
||||
########################
|
||||
# save data on disk
|
||||
if save_to_file:
|
||||
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
# check if a new rotation is needed
|
||||
if ( new_date[0:8] != date_file[0:8] ) or ( time.time() - time_file > rotation_save_cycle ):
|
||||
date_file = new_date
|
||||
rotate_file = True
|
||||
|
||||
|
||||
|
||||
# file rotation
|
||||
if rotate_file and file_separator in data[b'message']:
|
||||
|
@ -217,6 +194,9 @@ if __name__ == "__main__":
|
|||
with open(save_path, 'ab') as f:
|
||||
f.write(data[b'message'])
|
||||
|
||||
#######
|
||||
|
||||
|
||||
redis_server_stream.xdel(stream_name, id)
|
||||
|
||||
else:
|
||||
|
|
Loading…
Reference in New Issue