Merge pull request #8 from D4-project/metatypes

Use parent and Child class for 254 types. Use Module (Child Class) to change type handler behaviour
visu-type
Thirion Aurélien 2019-03-11 11:57:58 +01:00 committed by GitHub
commit e24b9f576b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 248 additions and 62 deletions

View File

@ -0,0 +1,197 @@
#!/usr/bin/env python3
import os
import sys
import time
import json
import redis
import datetime
DEFAULT_FILE_EXTENSION = 'txt'
DEFAULT_FILE_SEPARATOR = b'\n'
ROTATION_SAVE_CYCLE = 5 # seconds
TYPE = 254
class MetaTypesDefault:
def __init__(self, uuid, json_file):
self.uuid = uuid
self.type_name = json_file['type']
self.save_path = None
self.parse_json(json_file)
def test(self):
print('class: MetaTypesDefault')
######## JSON PARSER ########
def parse_json(self, json_file):
self.save_file_on_disk = True
self.file_rotation_mode = True
self.file_rotation = False
self.file_separator = b'\n'
self.filename = b''.join([self.type_name.encode(), b'.txt'])
######## PROCESS FUNCTIONS ########
def process_data(self, data):
# save data on disk
if self.is_file_saved_on_disk():
self.save_data_to_file(data)
######## CORE FUNCTIONS ########
def check_json_file(self, json_file):
# the json object must contain a type field
if "type" in json_file:
return True
else:
return False
# # TODO: update for non rotate_file mode
def save_json_file(self, json_file):
self.set_last_time_saved(time.time()) #time_file
self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S")) #date_file
# update save path
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename(file_extention='json')) )
# save json
with open(self.get_save_path(), 'w') as f:
f.write(json.dumps(json_file))
# update save path for 254 files type
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) )
def save_data_to_file(self, data):
if self.is_file_rotation_mode():
self.save_rotate_file(data)
def save_rotate_file(self, data):
if not self.get_file_rotation():
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# check if a new file rotation is needed # # TODO: change ROTATION_SAVE_CYCLE
if ( new_date[0:8] != self.get_last_saved_date()[0:8] ) or ( int(time.time()) - self.get_last_time_saved() > ROTATION_SAVE_CYCLE ):
self.set_rotate_file(True)
# rotate file
if self.get_file_rotation():
# init save path
if self.get_save_path() is None:
self.set_last_time_saved(time.time())
self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
# update save path
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) )
# rotate file
if self.get_file_separator() in data:
end_file, start_new_file = data.rsplit(self.get_file_separator(), maxsplit=1)
# save end of file
with open(self.get_save_path(), 'ab') as f:
f.write(end_file)
# set last saved date/time
self.set_last_time_saved(time.time())
self.set_last_saved_date(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
# update save path
self.set_save_path( os.path.join(self.get_save_dir(), self.get_filename()) )
# save start of new file
if start_new_file != b'':
with open(self.get_save_path(), 'ab') as f:
f.write(start_new_file)
# end of rotation
self.set_rotate_file(False)
# wait file separator
else:
with open(self.get_save_path(), 'ab') as f:
f.write(data)
else:
# save file
with open(self.get_save_path(), 'ab') as f:
f.write(data)
def save_same_directory(self, data):
pass
######## GET FUNCTIONS ########
def get_type_name(self):
return self.type_name
def get_file_separator(self):
return self.file_separator
def get_uuid(self):
return self.uuid
def get_filename(self, file_extention=None):
if file_extention is None:
file_extention = DEFAULT_FILE_EXTENSION
# File Rotation, : data/<uuid>/254/<year>/<month>/<day>/
if self.is_file_rotation_mode():
return '{}-{}-{}-{}-{}.{}'.format(self.uuid, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day(), self.get_last_saved_hour_minute(), file_extention)
def get_save_dir(self):
# File Rotation, save data in directory: data/<uuid>/254/<year>/<month>/<day>/
if self.is_file_rotation_mode():
data_directory_uuid_type = os.path.join('../../data', self.get_uuid(), str(TYPE))
return os.path.join(data_directory_uuid_type, self.get_last_saved_year(), self.get_last_saved_month(), self.get_last_saved_day() , self.type_name)
# # TODO: save global type dir ???
if self.is_file_saved_on_disk():
pass
def get_save_path(self):
return self.save_path
def is_file_saved_on_disk(self):
if self.save_file_on_disk:
return True
else:
return False
def is_file_rotation_mode(self):
if self.file_rotation_mode:
return True
else:
return False
def get_file_rotation(self):
return self.file_rotation
def get_last_time_saved(self):
return self.last_time_saved
def get_last_saved_date(self):
return self.last_saved_date
def get_last_saved_year(self):
return self.last_saved_date[0:4]
def get_last_saved_month(self):
return self.last_saved_date[4:6]
def get_last_saved_day(self):
return self.last_saved_date[6:8]
def get_last_saved_hour_minute(self):
return self.last_saved_date[8:14]
######## SET FUNCTIONS ########
def set_rotate_file(self, boolean_value):
self.file_rotation = boolean_value
def set_last_time_saved(self, value_time):
self.last_time_saved = int(value_time)
def set_last_saved_date(self, date):
self.last_saved_date = date
def set_save_path(self, save_path):
# # TODO: create directory
dir_path = os.path.dirname(save_path)
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
self.save_path = save_path
##############

View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
import os
import sys
import time
import json
import redis
from meta_types_modules.MetaTypesDefault import MetaTypesDefault
class TypeHandler(MetaTypesDefault):
def __init__(self, uuid, json_file):
super().__init__(uuid, json_file)
print('init_spec')
def test(self):
print('Class: ja3-jl')

View File

@ -8,6 +8,8 @@ import redis
import datetime
from meta_types_modules import MetaTypesDefault
host_redis_stream = "localhost"
port_redis_stream = 6379
@ -30,22 +32,16 @@ max_buffer_length = 100000
rotation_save_cycle = 10 #seconds
json_file_name = 'meta_json.json'
extended_type_name = None # # TODO: use default or json['file_type']
save_to_file = True
def get_class( package_class ):
parts = package_class.split('.')
module = ".".join(parts[:-1])
mod = __import__( module )
for comp in parts[1:]:
mod = getattr(mod, comp)
return mod
def get_dir_data_uuid(uuid, type):
return os.path.join('../../data', uuid, str(type))
def get_save_dir(dir_data_uuid, year, month, day, extended_type=None):
dir_path = os.path.join(dir_data_uuid, year, month, day)
if extended_type:
dir_path = os.path.join(dir_path, extended_type)
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
return dir_path
def check_json_file(json_file):
def check_default_json_file(json_file):
# the json object must contain a type field
if "type" in json_file:
return True
@ -72,6 +68,9 @@ def clean_stream(stream_name, type, session_uuid):
if __name__ == "__main__":
###################################################3
if len(sys.argv) != 2:
print('usage:', 'Worker.py', 'session_uuid')
exit(1)
@ -129,7 +128,8 @@ if __name__ == "__main__":
# complete json received
if full_json:
print(full_json)
if check_json_file(full_json):
if check_default_json_file(full_json):
# end type 2 processing
break
# Incorrect Json
else:
@ -151,24 +151,21 @@ if __name__ == "__main__":
clean_db(session_uuid)
sys.exit(1)
file_separator = b'\n' ## TODO: map all file separator or extract from json
extended_type_name = '{}.txt'.format(extended_type) # # TODO: create default or extract from JSON
#### Handle Specific MetaTypes ####
# Use Specific Handler defined
if os.path.isdir(os.path.join('meta_types_modules', extended_type)):
class_type_handler = get_class('meta_types_modules.{}.{}.TypeHandler'.format(extended_type, extended_type))
type_handler = class_type_handler(uuid, full_json)
# Use Standard Handler
else:
type_handler = MetaTypesDefault.MetaTypesDefault(uuid, full_json)
#file_separator = type_handler.get_file_separator(self)
#extended_type_name = type_handler.get_file_name()
# save json on disk
if save_to_file:
rotate_file = False
time_file = time.time()
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# get new save_path #use first or last received date ???
dir_data_uuid = get_dir_data_uuid(uuid, type_defined)
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type)
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], json_file_name)
save_path = os.path.join(dir_full_path, filename)
with open(save_path, 'w') as f:
f.write(json.dumps(full_json))
# get extended_type save_path
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name)
save_path = os.path.join(dir_full_path, filename)
type_handler.save_json_file(full_json)
# change stream_name/type
stream_name = stream_defined
@ -176,6 +173,8 @@ if __name__ == "__main__":
id = 0
buffer = b''
type_handler.test()
# handle 254 type
while True:
res = redis_server_stream.xread({stream_name: id}, count=1)
@ -186,37 +185,9 @@ if __name__ == "__main__":
data = res[0][1][0][1]
if id and data:
# save data on disk
if save_to_file:
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# check if a new rotation is needed
if ( new_date[0:8] != date_file[0:8] ) or ( time.time() - time_file > rotation_save_cycle ):
date_file = new_date
rotate_file = True
# file rotation
if rotate_file and file_separator in data[b'message']:
end_file, start_new_file = data[b'message'].rsplit(file_separator, maxsplit=1)
# save end of file
with open(save_path, 'ab') as f:
f.write(end_file)
# get new save_path
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8], extended_type=extended_type)
filename = '{}-{}-{}-{}-{}.{}'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14], extended_type_name)
save_path = os.path.join(dir_full_path, filename)
# save start of new file
if start_new_file != b'':
with open(save_path, 'ab') as f:
f.write(start_new_file)
# end of rotation
rotate_file = False
time_file = time.time()
else:
with open(save_path, 'ab') as f:
f.write(data[b'message'])
# process 254 data type
type_handler.process_data(data[b'message'])
# remove data from redis stream
redis_server_stream.xdel(stream_name, id)
else: