chg: [server] add worker 8, push data to redis + save in disk

pull/8/head
Terrtia 2019-02-01 11:18:05 +01:00
parent 98ab29ae9f
commit d12e8f4a91
No known key found for this signature in database
GPG Key ID: 1E1B1F50D84613D0
4 changed files with 195 additions and 1 deletions

View File

@ -76,6 +76,8 @@ function launching_workers {
sleep 0.1
screen -S "Workers_D4" -X screen -t "4_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_4; ./workers_manager.py; read x"
sleep 0.1
screen -S "Workers_D4" -X screen -t "8_workers_manager" bash -c "cd ${D4_HOME}/workers/workers_8; ./workers_manager.py; read x"
sleep 0.1
}
function shutting_down_redis {

View File

@ -23,7 +23,7 @@ from twisted.protocols.policies import TimeoutMixin
hmac_reset = bytearray(32)
hmac_key = b'private key to change'
accepted_type = [1, 4]
accepted_type = [1, 4, 8]
timeout_time = 30

View File

@ -0,0 +1,155 @@
#!/usr/bin/env python3
import os
import sys
import time
import redis
import datetime
def data_incorrect_format(session_uuid):
print('Incorrect format')
sys.exit(1)
host_redis_stream = "localhost"
port_redis_stream = 6379
redis_server_stream = redis.StrictRedis(
host=host_redis_stream,
port=port_redis_stream,
db=0)
host_redis_metadata = "localhost"
port_redis_metadata = 6380
redis_server_metadata = redis.StrictRedis(
host=host_redis_metadata,
port=port_redis_metadata,
db=0)
redis_server_analyzer = redis.StrictRedis(
host=host_redis_metadata,
port=port_redis_metadata,
db=2)
type = 8
rotation_save_cycle = 10 #seconds
save_to_file = True
def get_save_dir(dir_data_uuid, year, month, day):
dir_path = os.path.join(dir_data_uuid, year, month, day)
if not os.path.isdir(dir_path):
os.makedirs(dir_path)
return dir_path
if __name__ == "__main__":
if len(sys.argv) != 2:
print('usage:', 'Worker.py', 'session_uuid')
exit(1)
session_uuid = sys.argv[1]
stream_name = 'stream:{}:{}'.format(type, session_uuid)
id = '0'
buffer = None
# track launched worker
redis_server_stream.sadd('working_session_uuid:{}'.format(type), session_uuid)
# get uuid
res = redis_server_stream.xread({stream_name: id}, count=1)
if res:
uuid = res[0][1][0][1][b'uuid'].decode()
# init file rotation
if save_to_file:
rotate_file = False
time_file = time.time()
date_file = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
dir_data_uuid = os.path.join('../../data', uuid, str(type))
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
filename = '{}-{}-{}-{}-{}.passivedns.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
save_path = os.path.join(dir_full_path, filename)
print('---- worker launched, uuid={} session_uuid={}'.format(uuid, session_uuid))
else:
print('Incorrect Stream, Closing worker: type={} session_uuid={}'.format(type, session_uuid))
sys.exit(1)
while True:
res = redis_server_stream.xread({stream_name: id}, count=1)
if res:
new_id = res[0][1][0][0].decode()
if id != new_id:
id = new_id
data = res[0][1][0][1]
if id and data:
# reconstruct data
if buffer:
data[b'message'] = '{}{}'.format(buffer, data[b'message'])
buffer = None
# send data to redis
# new line in received data
if b'\n' in data[b'message']:
all_line = data[b'message'].split(b'\n')
for line in all_line[:-1]:
for analyzer_uuid in redis_server_metadata.smembers('analyser:{}'.format(type)):
analyzer_uuid = analyzer_uuid.decode()
redis_server_analyzer.lpush('analyzer:{}:{}'.format(type, analyzer_uuid), line)
redis_server_metadata.hset('analyser:{}'.format(analyzer_uuid), 'last_updated', time.time())
# keep incomplete line
if all_line[-1] != b'':
buffer += data[b'message']
else:
buffer += data[b'message']
# save data on disk
if save_to_file:
new_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
# check if a new rotation is needed
if ( new_date[0:8] != date_file[0:8] ) or ( time.time() - time_file > rotation_save_cycle ):
date_file = new_date
rotate_file = True
# file rotation
if rotate_file and b'\n' in data[b'message']:
end_file, start_new_file = data[b'message'].rsplit(b'\n', maxsplit=1)
# save end of file
with open(save_path, 'ab') as f:
f.write(end_file)
# get new save_path
dir_full_path = get_save_dir(dir_data_uuid, date_file[0:4], date_file[4:6], date_file[6:8])
filename = '{}-{}-{}-{}-{}.passivedns.txt'.format(uuid, date_file[0:4], date_file[4:6], date_file[6:8], date_file[8:14])
save_path = os.path.join(dir_full_path, filename)
# save start of new file
if start_new_file != b'':
with open(save_path, 'ab') as f:
f.write(start_new_file)
# end of rotation
rotate_file = False
time_file = time.time()
else:
with open(save_path, 'ab') as f:
f.write(data[b'message'])
redis_server_stream.xdel(stream_name, id)
else:
# sucess, all data are saved
if redis_server_stream.sismember('ended_session', session_uuid):
redis_server_stream.srem('ended_session', session_uuid)
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
redis_server_stream.srem('working_session_uuid:{}'.format(type), session_uuid)
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
redis_server_stream.delete(stream_name)
print('---- passivedns DONE, uuid={} session_uuid={}'.format(uuid, session_uuid))
sys.exit(0)
else:
time.sleep(10)

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
import os
import sys
import time
import redis
import subprocess
host_redis_stream = "localhost"
port_redis_stream = 6379
redis_server_stream = redis.StrictRedis(
host=host_redis_stream,
port=port_redis_stream,
db=0)
type = 8
try:
redis_server_stream.ping()
except redis.exceptions.ConnectionError:
print('Error: Redis server {}:{}, ConnectionError'.format(host_redis, port_redis))
sys.exit(1)
if __name__ == "__main__":
stream_name = 'stream:{}'.format(type)
redis_server_stream.delete('working_session_uuid:{}'.format(type))
while True:
for session_uuid in redis_server_stream.smembers('session_uuid:{}'.format(type)):
session_uuid = session_uuid.decode()
if not redis_server_stream.sismember('working_session_uuid:{}'.format(type), session_uuid):
process = subprocess.Popen(['./worker.py', session_uuid])
print('Launching new worker{} ... session_uuid={}'.format(type, session_uuid))
#print('.')
time.sleep(10)