mirror of https://github.com/D4-project/d4-core
chg: [worker 254] use module class
parent
3ad297799e
commit
9b310f7498
|
@ -5,30 +5,31 @@ import sys
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
import redis
|
import redis
|
||||||
|
import datetime
|
||||||
|
|
||||||
DEFAULT_FILE_EXTENSION = 'txt'
|
DEFAULT_FILE_EXTENSION = 'txt'
|
||||||
DEFAULT_FILE_SEPARATOR = b'\n'
|
DEFAULT_FILE_SEPARATOR = b'\n'
|
||||||
ROTATION_SAVE_CYCLE = 10 # seconds
|
ROTATION_SAVE_CYCLE = 5 # seconds
|
||||||
TYPE = 254
|
TYPE = 254
|
||||||
|
|
||||||
class MetaTypesDefault:
|
class MetaTypesDefault:
|
||||||
|
|
||||||
def __init__(self, uuid, json_file):
|
def __init__(self, uuid, json_file):
|
||||||
self.session_uuid = uuid
|
self.uuid = uuid
|
||||||
self.type_name = json_file['type']
|
self.type_name = json_file['type']
|
||||||
|
self.save_path = None
|
||||||
self.parse_json(json_file)
|
self.parse_json(json_file)
|
||||||
print('end default init')
|
|
||||||
|
|
||||||
def test(self):
|
def test(self):
|
||||||
print(self.session_uuid)
|
print('class: MetaTypesDefault')
|
||||||
|
|
||||||
######## JSON PARSER ########
|
######## JSON PARSER ########
|
||||||
def parse_json(self, uuid, json_file):
|
def parse_json(self, json_file):
|
||||||
self.uuid = uuid
|
|
||||||
self.save_file_on_disk = True
|
self.save_file_on_disk = True
|
||||||
self.is_file_rotation = True
|
self.file_rotation_mode = True
|
||||||
|
self.file_rotation = False
|
||||||
self.file_separator = b'\n'
|
self.file_separator = b'\n'
|
||||||
self.filename = b'{}.txt'.format(self.type_name)
|
self.filename = b''.join([self.type_name.encode(), b'.txt'])
|
||||||
|
|
||||||
######## PROCESS FUNCTIONS ########
|
######## PROCESS FUNCTIONS ########
|
||||||
def process_data(self, data):
|
def process_data(self, data):
|
||||||
|
@ -36,8 +37,6 @@ class MetaTypesDefault:
|
||||||
if self.is_file_saved_on_disk():
|
if self.is_file_saved_on_disk():
|
||||||
self.save_data_to_file(data)
|
self.save_data_to_file(data)
|
||||||
|
|
||||||
print('end process_data')
|
|
||||||
|
|
||||||
######## CORE FUNCTIONS ########
|
######## CORE FUNCTIONS ########
|
||||||
|
|
||||||
def check_json_file(self, json_file):
|
def check_json_file(self, json_file):
|
||||||
|
@ -47,27 +46,68 @@ class MetaTypesDefault:
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# # TODO: update for non rotate_file mode
|
||||||
def save_json_file(self, json_file):
|
def save_json_file(self, json_file):
|
||||||
self.last_time_saved = time.time() #time_file
|
self.set_last_time_saved(time.time()) #time_file
|
||||||
self.last_saved_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #date_file
|
self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S")) #date_file
|
||||||
|
# update save path
|
||||||
save_path = os.path.join(self.get_save_dir(file_extention='json'), self.get_save_dir())
|
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename(file_extention='json')) )
|
||||||
with open(save_path, 'w') as f:
|
# save json
|
||||||
f.write(json.dumps(full_json))
|
with open(self.get_save_path(), 'w') as f:
|
||||||
|
f.write(json.dumps(json_file))
|
||||||
|
# update save path for 254 files type
|
||||||
|
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) )
|
||||||
|
|
||||||
def save_data_to_file(self, data):
|
def save_data_to_file(self, data):
|
||||||
if self.is_file_rotation():
|
if self.is_file_rotation_mode():
|
||||||
self.save_rotate_file(data)
|
self.save_rotate_file(data)
|
||||||
|
|
||||||
|
|
||||||
def save_rotate_file(self, data):
|
def save_rotate_file(self, data):
|
||||||
|
if not self.get_file_rotation():
|
||||||
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||||
# check if a new file rotation is needed # # TODO: change ROTATION_SAVE_CYCLE
|
# check if a new file rotation is needed # # TODO: change ROTATION_SAVE_CYCLE
|
||||||
if ( new_date[0:8] != self.get_last_time_saved()[0:8] ) or ( time.time() - self.get_last_time_saved > ROTATION_SAVE_CYCLE ):
|
if ( new_date[0:8] != self.get_last_saved_date()[0:8] ) or ( int(time.time()) - self.get_last_time_saved() > ROTATION_SAVE_CYCLE ):
|
||||||
date_file = new_date
|
|
||||||
self.set_last_saved_date(new_date)
|
|
||||||
self.set_rotate_file(True)
|
self.set_rotate_file(True)
|
||||||
|
|
||||||
|
# rotate file
|
||||||
|
if self.get_file_rotation():
|
||||||
|
# init save path
|
||||||
|
if self.get_save_path() is None:
|
||||||
|
self.set_last_time_saved(time.time())
|
||||||
|
self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
|
||||||
|
# update save path
|
||||||
|
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) )
|
||||||
|
|
||||||
|
# rotate file
|
||||||
|
if self.get_file_separator() in data:
|
||||||
|
end_file, start_new_file = data.rsplit(self.get_file_separator(), maxsplit=1)
|
||||||
|
# save end of file
|
||||||
|
with open(self.get_save_path(), 'ab') as f:
|
||||||
|
f.write(end_file)
|
||||||
|
|
||||||
|
# set last saved date/time
|
||||||
|
self.set_last_time_saved(time.time())
|
||||||
|
self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
|
||||||
|
# update save path
|
||||||
|
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) )
|
||||||
|
|
||||||
|
# save start of new file
|
||||||
|
if start_new_file != b'':
|
||||||
|
with open(self.get_save_path(), 'ab') as f:
|
||||||
|
f.write(start_new_file)
|
||||||
|
# end of rotation
|
||||||
|
self.set_rotate_file(False)
|
||||||
|
|
||||||
|
# wait file separator
|
||||||
|
else:
|
||||||
|
with open(self.get_save_path(), 'ab') as f:
|
||||||
|
f.write(data)
|
||||||
|
else:
|
||||||
|
# save file
|
||||||
|
with open(self.get_save_path(), 'ab') as f:
|
||||||
|
f.write(data)
|
||||||
|
|
||||||
|
|
||||||
def save_same_directory(self, data):
|
def save_same_directory(self, data):
|
||||||
pass
|
pass
|
||||||
|
@ -80,29 +120,43 @@ class MetaTypesDefault:
|
||||||
def get_file_separator(self):
|
def get_file_separator(self):
|
||||||
return self.file_separator
|
return self.file_separator
|
||||||
|
|
||||||
|
def get_uuid(self):
|
||||||
|
return self.uuid
|
||||||
|
|
||||||
def get_filename(self, file_extention=None):
|
def get_filename(self, file_extention=None):
|
||||||
if file_extention is None:
|
if file_extention is None:
|
||||||
file_extention = DEFAULT_FILE_EXTENSION
|
file_extention = DEFAULT_FILE_EXTENSION
|
||||||
# File Rotation, : data/<uuid>/254/<year>/<month>/<day>/
|
# File Rotation, : data/<uuid>/254/<year>/<month>/<day>/
|
||||||
if self.is_file_rotation():
|
if self.is_file_rotation_mode():
|
||||||
return '{}-{}-{}-{}-{}.{}'.format(self.uuid, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day(), self.get_last_saved_hour_minute(), file_extention)
|
return '{}-{}-{}-{}-{}.{}'.format(self.uuid, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day(), self.get_last_saved_hour_minute(), file_extention)
|
||||||
|
|
||||||
# todo save save_dir ???
|
|
||||||
def get_save_dir(self):
|
def get_save_dir(self):
|
||||||
# File Rotation, save data in directory: data/<uuid>/254/<year>/<month>/<day>/
|
# File Rotation, save data in directory: data/<uuid>/254/<year>/<month>/<day>/
|
||||||
if self.is_file_rotation():
|
if self.is_file_rotation_mode():
|
||||||
data_directory_uuid_type = os.path.join('../../data', self.uuid, str(TYPE))
|
data_directory_uuid_type = os.path.join('../../data', self.get_uuid(), str(TYPE))
|
||||||
return os.path.join(data_directory_uuid_type, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day() , self.type_name)
|
return os.path.join(data_directory_uuid_type, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day() , self.type_name)
|
||||||
|
|
||||||
# # TODO: save global type dir ???
|
# # TODO: save global type dir ???
|
||||||
if self.is_file_saved_on_disk():
|
if self.is_file_saved_on_disk():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def is_file_saved_on_disk(self):
|
def get_save_path(self):
|
||||||
return self.save_file_on_disk
|
return self.save_path
|
||||||
|
|
||||||
def is_file_rotation(self):
|
def is_file_saved_on_disk(self):
|
||||||
return self.is_file_rotation
|
if self.save_file_on_disk:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_file_rotation_mode(self):
|
||||||
|
if self.file_rotation_mode:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_file_rotation(self):
|
||||||
|
return self.file_rotation
|
||||||
|
|
||||||
def get_last_time_saved(self):
|
def get_last_time_saved(self):
|
||||||
return self.last_time_saved
|
return self.last_time_saved
|
||||||
|
@ -116,7 +170,7 @@ class MetaTypesDefault:
|
||||||
def get_last_saved_month(self):
|
def get_last_saved_month(self):
|
||||||
return self.last_saved_date[4:6]
|
return self.last_saved_date[4:6]
|
||||||
|
|
||||||
def get_last_saved_year(self):
|
def get_last_saved_day(self):
|
||||||
return self.last_saved_date[6:8]
|
return self.last_saved_date[6:8]
|
||||||
|
|
||||||
def get_last_saved_hour_minute(self):
|
def get_last_saved_hour_minute(self):
|
||||||
|
@ -127,7 +181,17 @@ class MetaTypesDefault:
|
||||||
def set_rotate_file(self, boolean_value):
|
def set_rotate_file(self, boolean_value):
|
||||||
self.file_rotation = boolean_value
|
self.file_rotation = boolean_value
|
||||||
|
|
||||||
|
def set_last_time_saved(self, value_time):
|
||||||
|
self.last_time_saved = int(value_time)
|
||||||
|
|
||||||
def set_last_saved_date(self, date):
|
def set_last_saved_date(self, date):
|
||||||
self.get_last_saved_date = date
|
self.last_saved_date = date
|
||||||
|
|
||||||
|
def set_save_path(self, save_path):
|
||||||
|
# # TODO: create directory
|
||||||
|
dir_path = os.path.dirname(save_path)
|
||||||
|
if not os.path.isdir(dir_path):
|
||||||
|
os.makedirs(dir_path)
|
||||||
|
self.save_path = save_path
|
||||||
|
|
||||||
##############
|
##############
|
||||||
|
|
|
@ -6,12 +6,13 @@ import time
|
||||||
import json
|
import json
|
||||||
import redis
|
import redis
|
||||||
|
|
||||||
class TypeHandler(MetaTypesDefault:
|
from meta_types_modules.MetaTypesDefault import MetaTypesDefault
|
||||||
|
|
||||||
def __init__(self, json_file):
|
class TypeHandler(MetaTypesDefault):
|
||||||
super().__init__(json_file)
|
|
||||||
|
def __init__(self, uuid, json_file):
|
||||||
|
super().__init__(uuid, json_file)
|
||||||
print('init_spec')
|
print('init_spec')
|
||||||
|
|
||||||
def test2(self):
|
def test(self):
|
||||||
print('ja3-jl type')
|
print('Class: ja3-jl')
|
||||||
print(self.session_uuid)
|
|
||||||
|
|
|
@ -41,6 +41,31 @@ def get_class( package_class ):
|
||||||
mod = getattr(mod, comp)
|
mod = getattr(mod, comp)
|
||||||
return mod
|
return mod
|
||||||
|
|
||||||
|
def check_default_json_file(json_file):
|
||||||
|
# the json object must contain a type field
|
||||||
|
if "type" in json_file:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def on_error(session_uuid, type_error, message):
|
||||||
|
redis_server_stream.sadd('Error:IncorrectType', session_uuid)
|
||||||
|
redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, {}'.format(type_error, message))
|
||||||
|
clean_db(session_uuid)
|
||||||
|
print('Incorrect format')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def clean_db(session_uuid):
|
||||||
|
clean_stream(stream_meta_json, type_meta_header, session_uuid)
|
||||||
|
clean_stream(stream_defined, type_defined, session_uuid)
|
||||||
|
redis_server_stream.srem('ended_session', session_uuid)
|
||||||
|
redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid)
|
||||||
|
|
||||||
|
def clean_stream(stream_name, type, session_uuid):
|
||||||
|
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||||
|
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||||
|
redis_server_stream.delete(stream_name)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
|
||||||
|
@ -103,7 +128,8 @@ if __name__ == "__main__":
|
||||||
# complete json received
|
# complete json received
|
||||||
if full_json:
|
if full_json:
|
||||||
print(full_json)
|
print(full_json)
|
||||||
if check_json_file(full_json):
|
if check_default_json_file(full_json):
|
||||||
|
# end type 2 processing
|
||||||
break
|
break
|
||||||
# Incorrect Json
|
# Incorrect Json
|
||||||
else:
|
else:
|
||||||
|
@ -139,12 +165,7 @@ if __name__ == "__main__":
|
||||||
#extended_type_name = type_handler.get_file_name()
|
#extended_type_name = type_handler.get_file_name()
|
||||||
|
|
||||||
# save json on disk
|
# save json on disk
|
||||||
type_handler.save_json_file(json)
|
type_handler.save_json_file(full_json)
|
||||||
|
|
||||||
|
|
||||||
# get extended_type save_path #############################################################################################################
|
|
||||||
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name)
|
|
||||||
save_path = os.path.join(dir_full_path, filename)
|
|
||||||
|
|
||||||
# change stream_name/type
|
# change stream_name/type
|
||||||
stream_name = stream_defined
|
stream_name = stream_defined
|
||||||
|
@ -152,6 +173,8 @@ if __name__ == "__main__":
|
||||||
id = 0
|
id = 0
|
||||||
buffer = b''
|
buffer = b''
|
||||||
|
|
||||||
|
type_handler.test()
|
||||||
|
|
||||||
# handle 254 type
|
# handle 254 type
|
||||||
while True:
|
while True:
|
||||||
res = redis_server_stream.xread({stream_name: id}, count=1)
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
|
@ -162,41 +185,9 @@ if __name__ == "__main__":
|
||||||
data = res[0][1][0][1]
|
data = res[0][1][0][1]
|
||||||
|
|
||||||
if id and data:
|
if id and data:
|
||||||
|
# process 254 data type
|
||||||
type_handler.process_data(data)
|
type_handler.process_data(data[b'message'])
|
||||||
|
# remove data from redis stream
|
||||||
########################
|
|
||||||
# save data on disk
|
|
||||||
if save_to_file:
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# file rotation
|
|
||||||
if rotate_file and file_separator in data[b'message']:
|
|
||||||
end_file, start_new_file = data[b'message'].rsplit(file_separator, maxsplit=1)
|
|
||||||
# save end of file
|
|
||||||
with open(save_path, 'ab') as f:
|
|
||||||
f.write(end_file)
|
|
||||||
|
|
||||||
# get new save_path
|
|
||||||
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type)
|
|
||||||
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name)
|
|
||||||
save_path = os.path.join(dir_full_path, filename)
|
|
||||||
|
|
||||||
# save start of new file
|
|
||||||
if start_new_file != b'':
|
|
||||||
with open(save_path, 'ab') as f:
|
|
||||||
f.write(start_new_file)
|
|
||||||
# end of rotation
|
|
||||||
rotate_file = False
|
|
||||||
time_file = time.time()
|
|
||||||
else:
|
|
||||||
with open(save_path, 'ab') as f:
|
|
||||||
f.write(data[b'message'])
|
|
||||||
|
|
||||||
#######
|
|
||||||
|
|
||||||
|
|
||||||
redis_server_stream.xdel(stream_name, id)
|
redis_server_stream.xdel(stream_name, id)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in New Issue