mirror of https://github.com/D4-project/d4-core
chg: [server worker2] add worker2 v0.0 + detect type change
parent
711e44d24d
commit
be3029721b
|
@ -23,7 +23,8 @@ from twisted.protocols.policies import TimeoutMixin
|
||||||
hmac_reset = bytearray(32)
|
hmac_reset = bytearray(32)
|
||||||
hmac_key = b'private key to change'
|
hmac_key = b'private key to change'
|
||||||
|
|
||||||
accepted_type = [1, 4, 8]
|
accepted_type = [1, 2, 4, 8, 254]
|
||||||
|
accepted_extended_type = ['ja3-jl']
|
||||||
|
|
||||||
timeout_time = 30
|
timeout_time = 30
|
||||||
|
|
||||||
|
@ -67,6 +68,9 @@ redis_server_metadata.set('server:hmac_default_key', hmac_key)
|
||||||
redis_server_metadata.delete('server:accepted_type')
|
redis_server_metadata.delete('server:accepted_type')
|
||||||
for type in accepted_type:
|
for type in accepted_type:
|
||||||
redis_server_metadata.sadd('server:accepted_type', type)
|
redis_server_metadata.sadd('server:accepted_type', type)
|
||||||
|
redis_server_metadata.delete('server:accepted_extended_type')
|
||||||
|
for type in accepted_type:
|
||||||
|
redis_server_metadata.sadd('server:accepted_extended_type', type)
|
||||||
|
|
||||||
class D4_Server(Protocol, TimeoutMixin):
|
class D4_Server(Protocol, TimeoutMixin):
|
||||||
|
|
||||||
|
@ -75,6 +79,7 @@ class D4_Server(Protocol, TimeoutMixin):
|
||||||
self.setTimeout(timeout_time)
|
self.setTimeout(timeout_time)
|
||||||
self.session_uuid = str(uuid.uuid4())
|
self.session_uuid = str(uuid.uuid4())
|
||||||
self.data_saved = False
|
self.data_saved = False
|
||||||
|
self.update_stream_type = True
|
||||||
self.first_connection = True
|
self.first_connection = True
|
||||||
self.ip = None
|
self.ip = None
|
||||||
self.source_port = None
|
self.source_port = None
|
||||||
|
@ -138,10 +143,10 @@ class D4_Server(Protocol, TimeoutMixin):
|
||||||
logger.warning('Incorrect header data size: the server received more data than expected by default, expected={}, received={} , uuid={}, session_uuid={}'.format(data_default_size_limit, data_header['size'] ,data_header['uuid_header'], self.session_uuid))
|
logger.warning('Incorrect header data size: the server received more data than expected by default, expected={}, received={} , uuid={}, session_uuid={}'.format(data_default_size_limit, data_header['size'] ,data_header['uuid_header'], self.session_uuid))
|
||||||
|
|
||||||
# Worker: Incorrect type
|
# Worker: Incorrect type
|
||||||
if redis_server_stream.sismember('Error:IncorrectType:{}'.format(data_header['type']), self.session_uuid):
|
if redis_server_stream.sismember('Error:IncorrectType', self.session_uuid):
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
redis_server_stream.delete('stream:{}:{}'.format(data_header['type'], self.session_uuid))
|
redis_server_stream.delete('stream:{}:{}'.format(data_header['type'], self.session_uuid))
|
||||||
redis_server_stream.srem('Error:IncorrectType:{}'.format(data_header['type']), self.session_uuid)
|
redis_server_stream.srem('Error:IncorrectType', self.session_uuid)
|
||||||
logger.warning('Incorrect type={} detected by worker, uuid={}, session_uuid={}'.format(data_header['type'] ,data_header['uuid_header'], self.session_uuid))
|
logger.warning('Incorrect type={} detected by worker, uuid={}, session_uuid={}'.format(data_header['type'] ,data_header['uuid_header'], self.session_uuid))
|
||||||
|
|
||||||
return data_header
|
return data_header
|
||||||
|
@ -192,6 +197,23 @@ class D4_Server(Protocol, TimeoutMixin):
|
||||||
self.transport.abortConnection()
|
self.transport.abortConnection()
|
||||||
else:
|
else:
|
||||||
#self.version = None
|
#self.version = None
|
||||||
|
# check if type change
|
||||||
|
if self.data_saved:
|
||||||
|
# type change detected
|
||||||
|
if self.type != data_header['type']:
|
||||||
|
# Meta types
|
||||||
|
if self.type == 2 and data_header['type'] == 254:
|
||||||
|
self.update_stream_type = True
|
||||||
|
# Type Error
|
||||||
|
else:
|
||||||
|
logger.warning('Unexpected type change, type={} new type={}, ip={} uuid={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid))
|
||||||
|
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: Unexpected type change type={}, new type={}'.format(self.type, data_header['type']))
|
||||||
|
self.transport.abortConnection()
|
||||||
|
# type 254, check if previous type 2 saved
|
||||||
|
elif data_header['type'] == 254:
|
||||||
|
logger.warning('a type 2 packet must be sent, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid))
|
||||||
|
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: a type 2 packet must be sent, type={}'.format(data_header['type']))
|
||||||
|
self.transport.abortConnection()
|
||||||
self.type = data_header['type']
|
self.type = data_header['type']
|
||||||
self.uuid = data_header['uuid_header']
|
self.uuid = data_header['uuid_header']
|
||||||
#active Connection
|
#active Connection
|
||||||
|
@ -308,14 +330,16 @@ class D4_Server(Protocol, TimeoutMixin):
|
||||||
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'last_seen', data_header['timestamp'])
|
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'last_seen', data_header['timestamp'])
|
||||||
|
|
||||||
if not self.data_saved:
|
if not self.data_saved:
|
||||||
redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
|
||||||
redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
|
||||||
|
|
||||||
#UUID IP: ## TODO: use d4 timestamp ?
|
#UUID IP: ## TODO: use d4 timestamp ?
|
||||||
redis_server_metadata.lpush('list_uuid_ip:{}'.format(data_header['uuid_header']), '{}-{}'.format(ip, datetime.datetime.now().strftime("%Y%m%d%H%M%S")))
|
redis_server_metadata.lpush('list_uuid_ip:{}'.format(data_header['uuid_header']), '{}-{}'.format(ip, datetime.datetime.now().strftime("%Y%m%d%H%M%S")))
|
||||||
redis_server_metadata.ltrim('list_uuid_ip:{}'.format(data_header['uuid_header']), 0, 15)
|
redis_server_metadata.ltrim('list_uuid_ip:{}'.format(data_header['uuid_header']), 0, 15)
|
||||||
|
|
||||||
self.data_saved = True
|
self.data_saved = True
|
||||||
|
if self.update_stream_type:
|
||||||
|
redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||||
|
redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
||||||
|
|
||||||
|
self.update_stream_type = False
|
||||||
else:
|
else:
|
||||||
logger.warning("stream exceed max entries limit, uuid={}, session_uuid={}, type={}".format(data_header['uuid_header'], self.session_uuid, data_header['type']))
|
logger.warning("stream exceed max entries limit, uuid={}, session_uuid={}, type={}".format(data_header['uuid_header'], self.session_uuid, data_header['type']))
|
||||||
## TODO: FIXME
|
## TODO: FIXME
|
||||||
|
|
|
@ -10,7 +10,7 @@ import datetime
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
def data_incorrect_format(stream_name, session_uuid, uuid):
|
def data_incorrect_format(stream_name, session_uuid, uuid):
|
||||||
redis_server_stream.sadd('Error:IncorrectType:{}'.format(type), session_uuid)
|
redis_server_stream.sadd('Error:IncorrectType', session_uuid)
|
||||||
redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, Incorrect file format'.format(type))
|
redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, Incorrect file format'.format(type))
|
||||||
clean_stream(stream_name, session_uuid)
|
clean_stream(stream_name, session_uuid)
|
||||||
print('Incorrect format')
|
print('Incorrect format')
|
||||||
|
|
|
@ -0,0 +1,159 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import gzip
|
||||||
|
import redis
|
||||||
|
import shutil
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
import signal
|
||||||
|
|
||||||
|
class GracefulKiller:
|
||||||
|
kill_now = False
|
||||||
|
def __init__(self):
|
||||||
|
signal.signal(signal.SIGINT, self.exit_gracefully)
|
||||||
|
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
||||||
|
|
||||||
|
def exit_gracefully(self,signum, frame):
|
||||||
|
self.kill_now = True
|
||||||
|
|
||||||
|
def compress_file(file_full_path, session_uuid,i=0):
|
||||||
|
redis_server_stream.set('data_in_process:{}'.format(session_uuid), file_full_path)
|
||||||
|
if i==0:
|
||||||
|
compressed_filename = '{}.gz'.format(file_full_path)
|
||||||
|
else:
|
||||||
|
compressed_filename = '{}.{}.gz'.format(file_full_path, i)
|
||||||
|
if os.path.isfile(compressed_filename):
|
||||||
|
compress_file(file_full_path, session_uuid, i+1)
|
||||||
|
else:
|
||||||
|
with open(file_full_path, 'rb') as f_in:
|
||||||
|
with gzip.open(compressed_filename, 'wb') as f_out:
|
||||||
|
shutil.copyfileobj(f_in, f_out)
|
||||||
|
try:
|
||||||
|
os.remove(file_full_path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
# save full path in anylyzer queue
|
||||||
|
for analyzer_uuid in redis_server_metadata.smembers('analyzer:{}'.format(type)):
|
||||||
|
analyzer_uuid = analyzer_uuid.decode()
|
||||||
|
redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), compressed_filename)
|
||||||
|
redis_server_metadata.hset('analyzer:{}'.format(analyzer_uuid), 'last_updated', time.time())
|
||||||
|
analyser_queue_max_size = redis_server_metadata.hget('analyzer:{}'.format(analyzer_uuid), 'max_size')
|
||||||
|
if analyser_queue_max_size is None:
|
||||||
|
analyser_queue_max_size = analyzer_list_max_default_size
|
||||||
|
redis_server_analyzer.ltrim('analyzer:{}:{}'.format(type, analyzer_uuid), 0, analyser_queue_max_size)
|
||||||
|
|
||||||
|
|
||||||
|
host_redis_stream = "localhost"
|
||||||
|
port_redis_stream = 6379
|
||||||
|
|
||||||
|
host_redis_metadata = "localhost"
|
||||||
|
port_redis_metadata = 6380
|
||||||
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
|
db=0)
|
||||||
|
|
||||||
|
redis_server_metadata = redis.StrictRedis(
|
||||||
|
host=host_redis_metadata,
|
||||||
|
port=port_redis_metadata,
|
||||||
|
db=0)
|
||||||
|
|
||||||
|
redis_server_analyzer = redis.StrictRedis(
|
||||||
|
host=host_redis_metadata,
|
||||||
|
port=port_redis_metadata,
|
||||||
|
db=2)
|
||||||
|
|
||||||
|
type = 1
|
||||||
|
sleep_time = 300
|
||||||
|
|
||||||
|
analyzer_list_max_default_size = 10000
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
killer = GracefulKiller()
|
||||||
|
|
||||||
|
if len(sys.argv) != 4:
|
||||||
|
print('usage:', 'Worker.py', 'session_uuid', 'tcpdump', 'date')
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
# TODO sanityse input
|
||||||
|
session_uuid = sys.argv[1]
|
||||||
|
directory_data_uuid = sys.argv[2]
|
||||||
|
date = sys.argv[3]
|
||||||
|
|
||||||
|
worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8])
|
||||||
|
full_datetime = datetime.datetime.now().strftime("%Y%m%d%H")
|
||||||
|
|
||||||
|
current_file = None
|
||||||
|
time_change = False
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if killer.kill_now:
|
||||||
|
break
|
||||||
|
|
||||||
|
new_date = datetime.datetime.now().strftime("%Y%m%d")
|
||||||
|
|
||||||
|
# get all directory files
|
||||||
|
all_files = os.listdir(worker_data_directory)
|
||||||
|
not_compressed_file = []
|
||||||
|
# filter: get all not compressed files
|
||||||
|
for file in all_files:
|
||||||
|
if file.endswith('.cap'):
|
||||||
|
not_compressed_file.append(os.path.join(worker_data_directory, file))
|
||||||
|
|
||||||
|
if not_compressed_file:
|
||||||
|
### check time-change (minus one hour) ###
|
||||||
|
new_full_datetime = datetime.datetime.now().strftime("%Y%m%d%H")
|
||||||
|
if new_full_datetime < full_datetime:
|
||||||
|
# sort list, last modified
|
||||||
|
not_compressed_file.sort(key=os.path.getctime)
|
||||||
|
else:
|
||||||
|
# sort list
|
||||||
|
not_compressed_file.sort()
|
||||||
|
### ###
|
||||||
|
|
||||||
|
# new day
|
||||||
|
if date != new_date:
|
||||||
|
# compress all file
|
||||||
|
for file in not_compressed_file:
|
||||||
|
if killer.kill_now:
|
||||||
|
break
|
||||||
|
compress_file(file, session_uuid)
|
||||||
|
# reset file tracker
|
||||||
|
current_file = None
|
||||||
|
date = new_date
|
||||||
|
# update worker_data_directory
|
||||||
|
worker_data_directory = os.path.join(directory_data_uuid, date[0:4], date[4:6], date[6:8])
|
||||||
|
# restart
|
||||||
|
continue
|
||||||
|
|
||||||
|
# file used by tcpdump
|
||||||
|
max_file = not_compressed_file[-1]
|
||||||
|
full_datetime = new_full_datetime
|
||||||
|
|
||||||
|
# Init: set current_file
|
||||||
|
if not current_file:
|
||||||
|
current_file = max_file
|
||||||
|
#print('max_file set: {}'.format(current_file))
|
||||||
|
|
||||||
|
# new file created
|
||||||
|
if max_file != current_file:
|
||||||
|
|
||||||
|
# get all previous files
|
||||||
|
for file in not_compressed_file:
|
||||||
|
if file != max_file:
|
||||||
|
if killer.kill_now:
|
||||||
|
break
|
||||||
|
#print('new file: {}'.format(file))
|
||||||
|
compress_file(file, session_uuid)
|
||||||
|
|
||||||
|
# update current_file tracker
|
||||||
|
current_file = max_file
|
||||||
|
|
||||||
|
if killer.kill_now:
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(sleep_time)
|
|
@ -0,0 +1,185 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
import redis
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
host_redis_stream = "localhost"
|
||||||
|
port_redis_stream = 6379
|
||||||
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
|
db=0)
|
||||||
|
|
||||||
|
host_redis_metadata = "localhost"
|
||||||
|
port_redis_metadata = 6380
|
||||||
|
|
||||||
|
redis_server_metadata = redis.StrictRedis(
|
||||||
|
host=host_redis_metadata,
|
||||||
|
port=port_redis_metadata,
|
||||||
|
db=0)
|
||||||
|
|
||||||
|
type_meta_header = 2
|
||||||
|
type_defined = 254
|
||||||
|
max_buffer_length = 100000
|
||||||
|
|
||||||
|
save_to_file = True
|
||||||
|
|
||||||
|
def get_save_dir(dir_data_uuid, year, month, day):
|
||||||
|
dir_path = os.path.join(dir_data_uuid, year, month, day)
|
||||||
|
if not os.path.isdir(dir_path):
|
||||||
|
os.makedirs(dir_path)
|
||||||
|
return dir_path
|
||||||
|
|
||||||
|
def check_json_file(json_file):
|
||||||
|
# the json object must contain a type field
|
||||||
|
if "type" in json_file:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def on_error(session_uuid, type_error, message):
|
||||||
|
redis_server_stream.sadd('Error:IncorrectType', session_uuid)
|
||||||
|
redis_server_metadata.hset('metadata_uuid:{}'.format(uuid), 'Error', 'Error: Type={}, {}'.format(type_error, message))
|
||||||
|
clean_db(session_uuid)
|
||||||
|
print('Incorrect format')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def clean_db(session_uuid):
|
||||||
|
clean_stream(stream_meta_json, type_meta_header, session_uuid)
|
||||||
|
clean_stream(stream_defined, type_defined, session_uuid)
|
||||||
|
redis_server_stream.srem('ended_session', session_uuid)
|
||||||
|
redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid)
|
||||||
|
|
||||||
|
def clean_stream(stream_name, type, session_uuid):
|
||||||
|
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||||
|
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||||
|
redis_server_stream.delete(stream_name)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print('usage:', 'Worker.py', 'session_uuid')
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
session_uuid = sys.argv[1]
|
||||||
|
stream_meta_json = 'stream:{}:{}'.format(type_meta_header, session_uuid)
|
||||||
|
stream_defined = 'stream:{}:{}'.format(type_defined, session_uuid)
|
||||||
|
|
||||||
|
id = '0'
|
||||||
|
buffer = b''
|
||||||
|
|
||||||
|
stream_name = stream_meta_json
|
||||||
|
type = type_meta_header
|
||||||
|
|
||||||
|
# track launched worker
|
||||||
|
redis_server_stream.sadd('working_session_uuid:{}'.format(type_meta_header), session_uuid)
|
||||||
|
|
||||||
|
# get uuid
|
||||||
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
|
if res:
|
||||||
|
uuid = res[0][1][0][1][b'uuid'].decode()
|
||||||
|
# init file rotation
|
||||||
|
if save_to_file:
|
||||||
|
rotate_file = False
|
||||||
|
time_file = time.time()
|
||||||
|
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||||
|
dir_data_uuid = os.path.join('../../data', uuid, str(type))
|
||||||
|
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
|
||||||
|
filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
|
||||||
|
save_path = os.path.join(dir_full_path, filename)
|
||||||
|
|
||||||
|
print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
|
else:
|
||||||
|
print('Incorrect Stream, Closing worker: type={} session_uuid={}'.format(type, session_uuid))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
full_json = None
|
||||||
|
|
||||||
|
# active session
|
||||||
|
while full_json is None:
|
||||||
|
|
||||||
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
|
if res:
|
||||||
|
new_id = res[0][1][0][0].decode()
|
||||||
|
if id != new_id:
|
||||||
|
id = new_id
|
||||||
|
data = res[0][1][0][1]
|
||||||
|
|
||||||
|
if id and data:
|
||||||
|
# reconstruct data
|
||||||
|
if buffer != b'':
|
||||||
|
data[b'message'] = b'{}{}'.format(buffer, data[b'message'])
|
||||||
|
buffer = b''
|
||||||
|
try:
|
||||||
|
full_json = json.loads()
|
||||||
|
except:
|
||||||
|
buffer += data[b'message']
|
||||||
|
# # TODO: filter too big json
|
||||||
|
redis_server_stream.xdel(stream_name, id)
|
||||||
|
|
||||||
|
# complete json received
|
||||||
|
if full_json:
|
||||||
|
if check_json_file(full_json):
|
||||||
|
break
|
||||||
|
# Incorrect Json
|
||||||
|
else:
|
||||||
|
on_error(session_uuid, type, 'Incorrect JSON object')
|
||||||
|
else:
|
||||||
|
# end session, no json received
|
||||||
|
if redis_server_stream.sismember('ended_session', session_uuid):
|
||||||
|
clean_db(session_uuid)
|
||||||
|
print('---- Incomplete JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
# extract/parse JSON
|
||||||
|
extended_type = full_json['type']
|
||||||
|
if not redis_server_metadata.sismember('server:accepted_extended_type', extended_type):
|
||||||
|
error_mess = 'Unsupported extended_type: {}'.format(extended_type)
|
||||||
|
on_error(session_uuid, type_error, error_mess)
|
||||||
|
print(error_mess)
|
||||||
|
clean_db(session_uuid)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# save json on disk
|
||||||
|
if save_to_file:
|
||||||
|
# get new save_path #use first or last received date ???
|
||||||
|
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
|
||||||
|
filename = '{}-{}-{}-{}-{}.meta_json.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
|
||||||
|
save_path = os.path.join(dir_full_path, filename)
|
||||||
|
with open(save_path, 'w') as f:
|
||||||
|
f.write(full_json)
|
||||||
|
|
||||||
|
# change stream_name/type
|
||||||
|
stream_name = stream_defined
|
||||||
|
type = type_defined
|
||||||
|
id = 0
|
||||||
|
buffer = b''
|
||||||
|
|
||||||
|
# Do the magic on 254 type
|
||||||
|
while True:
|
||||||
|
res = redis_server_stream.xread({stream_name: id}, count=1)
|
||||||
|
if res:
|
||||||
|
new_id = res[0][1][0][0].decode()
|
||||||
|
if id != new_id:
|
||||||
|
id = new_id
|
||||||
|
data = res[0][1][0][1]
|
||||||
|
|
||||||
|
if id and data:
|
||||||
|
print(data)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# end session, no json received
|
||||||
|
if redis_server_stream.sismember('ended_session', session_uuid):
|
||||||
|
clean_db(session_uuid)
|
||||||
|
print('---- Incomplete JSON object, DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
time.sleep(10)
|
|
@ -0,0 +1,37 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import redis
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
host_redis_stream = "localhost"
|
||||||
|
port_redis_stream = 6379
|
||||||
|
|
||||||
|
redis_server_stream = redis.StrictRedis(
|
||||||
|
host=host_redis_stream,
|
||||||
|
port=port_redis_stream,
|
||||||
|
db=0)
|
||||||
|
type = 2
|
||||||
|
|
||||||
|
try:
|
||||||
|
redis_server_stream.ping()
|
||||||
|
except redis.exceptions.ConnectionError:
|
||||||
|
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
stream_name = 'stream:{}'.format(type)
|
||||||
|
redis_server_stream.delete('working_session_uuid:{}'.format(type))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
for session_uuid in redis_server_stream.smembers('session_uuid:{}'.format(type)):
|
||||||
|
session_uuid = session_uuid.decode()
|
||||||
|
if not redis_server_stream.sismember('working_session_uuid:{}'.format(type), session_uuid):
|
||||||
|
|
||||||
|
process = subprocess.Popen(['./worker.py', session_uuid])
|
||||||
|
print('Launching new worker{} ... session_uuid={}'.format(type, session_uuid))
|
||||||
|
|
||||||
|
#print('.')
|
||||||
|
time.sleep(10)
|
Loading…
Reference in New Issue