mirror of https://github.com/D4-project/d4-core
chg: [server worker1] add uuid metadata + fix worker1
parent
8d041682b7
commit
d8f93160dd
|
@ -21,7 +21,7 @@ from twisted.internet.protocol import Protocol
|
||||||
from twisted.protocols.policies import TimeoutMixin
|
from twisted.protocols.policies import TimeoutMixin
|
||||||
|
|
||||||
hmac_reset = bytearray(32)
|
hmac_reset = bytearray(32)
|
||||||
hmac_key = b'private key to change\n'
|
hmac_key = b'private key to change'
|
||||||
|
|
||||||
timeout_time = 30
|
timeout_time = 30
|
||||||
|
|
||||||
|
@ -65,10 +65,10 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
self.resetTimeout()
|
self.resetTimeout()
|
||||||
self.buffer = b''
|
self.buffer = b''
|
||||||
logger.debug('buffer timeout, session_uuid={}'.format(self.session_uuid))
|
logger.debug('buffer timeout, session_uuid={}'.format(self.session_uuid))
|
||||||
#self.transport.abortConnection()
|
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
redis_server.sadd('ended_session', self.session_uuid)
|
redis_server.sadd('ended_session', self.session_uuid)
|
||||||
|
self.setTimeout(None)
|
||||||
logger.debug('Connection closed: session_uuid={}'.format(self.session_uuid))
|
logger.debug('Connection closed: session_uuid={}'.format(self.session_uuid))
|
||||||
|
|
||||||
def unpack_header(self, data):
|
def unpack_header(self, data):
|
||||||
|
@ -106,6 +106,7 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
if self.is_valid_uuid_v4(uuid_to_check):
|
if self.is_valid_uuid_v4(uuid_to_check):
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
|
logger.info('Invalid Header, uuid={}, session_uuid={}'.format(uuid_to_check, self.session_uuid))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def process_header(self, data, ip, source_port):
|
def process_header(self, data, ip, source_port):
|
||||||
|
@ -142,20 +143,17 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
print('discard data')
|
print('discard data')
|
||||||
print(data_header)
|
print(data_header)
|
||||||
print(data)
|
print(data)
|
||||||
#time.sleep(5)
|
logger.warning('Invalid Header, uuid={}, session_uuid={}'.format(data_header['uuid_header'], self.session_uuid))
|
||||||
#sys.exit(1)
|
|
||||||
else:
|
else:
|
||||||
if len(data) < header_size:
|
if len(data) < header_size:
|
||||||
self.buffer += data
|
self.buffer += data
|
||||||
logger.debug('Not enough data received, the header is incomplete, pushing data to buffer, session_uuid={}, data_received={}'.format(self.session_uuid, len(data)))
|
#logger.debug('Not enough data received, the header is incomplete, pushing data to buffer, session_uuid={}, data_received={}'.format(self.session_uuid, len(data)))
|
||||||
else:
|
else:
|
||||||
|
|
||||||
print('error discard data')
|
print('error discard data')
|
||||||
print(data_header)
|
print(data_header)
|
||||||
print(data)
|
print(data)
|
||||||
logger.warning('Error unpacking header: incorrect format, session_uuid={}'.format(self.session_uuid))
|
logger.warning('Error unpacking header: incorrect format, session_uuid={}'.format(self.session_uuid))
|
||||||
#time.sleep(5)
|
|
||||||
#sys.exit(1)
|
|
||||||
|
|
||||||
# not a header
|
# not a header
|
||||||
else:
|
else:
|
||||||
|
@ -201,6 +199,11 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
|
redis_server.sadd('daily_uuid:{}'.format(date), data_header['uuid_header'])
|
||||||
redis_server.sadd('daily_ip:{}'.format(date), ip)
|
redis_server.sadd('daily_ip:{}'.format(date), ip)
|
||||||
|
|
||||||
|
#
|
||||||
|
if not redis_server.hexists('metadata_uuid:{}'.format(data_header['uuid_header']), 'first_seen'):
|
||||||
|
redis_server.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'first_seen', data_header['timestamp'])
|
||||||
|
redis_server.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'last_seen', data_header['timestamp'])
|
||||||
|
|
||||||
if not self.data_saved:
|
if not self.data_saved:
|
||||||
redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
redis_server.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||||
redis_server.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
redis_server.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
||||||
|
@ -208,6 +211,7 @@ class Echo(Protocol, TimeoutMixin):
|
||||||
else:
|
else:
|
||||||
print('hmac do not match')
|
print('hmac do not match')
|
||||||
print(data)
|
print(data)
|
||||||
|
logger.debug("HMAC don't match, uuid={}, session_uuid={}".format(data_header['uuid_header'], self.session_uuid))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,10 @@ redis_server = redis.StrictRedis(
|
||||||
db=0)
|
db=0)
|
||||||
|
|
||||||
type = 1
|
type = 1
|
||||||
tcp_dump_cycle = '5'
|
tcp_dump_cycle = '300'
|
||||||
|
stream_buffer = 100
|
||||||
|
|
||||||
|
id_to_delete = []
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
@ -47,14 +50,12 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
#LAUNCH a tcpdump
|
#LAUNCH a tcpdump
|
||||||
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE)
|
process = subprocess.Popen(["tcpdump", '-n', '-r', '-', '-G', tcp_dump_cycle, '-w', '{}/%Y/%m/%d/{}-%Y-%m-%d-%H%M%S.cap'.format(tcpdump_path, uuid)], stdin=subprocess.PIPE)
|
||||||
#redis_server.xgroup_create('stream:{}:{}'.format(type, session_uuid), 'workers:{}:{}'.format(type, session_uuid))
|
nb_save = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
#print(redis_server.xpending(stream_name, group_name))
|
|
||||||
#redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
#redis_server.sadd('working_session_uuid:{}'.format(type), session_uuid)
|
||||||
|
|
||||||
res = redis_server.xread({stream_name: id}, count=1)
|
res = redis_server.xread({stream_name: id}, count=1)
|
||||||
#print(res)
|
|
||||||
if res:
|
if res:
|
||||||
new_id = res[0][1][0][0].decode()
|
new_id = res[0][1][0][0].decode()
|
||||||
if id != new_id:
|
if id != new_id:
|
||||||
|
@ -65,7 +66,6 @@ if __name__ == "__main__":
|
||||||
#print(data)
|
#print(data)
|
||||||
new_date = datetime.datetime.now().strftime("%Y%m%d")
|
new_date = datetime.datetime.now().strftime("%Y%m%d")
|
||||||
if new_date != date:
|
if new_date != date:
|
||||||
print('rrr')
|
|
||||||
date= new_date
|
date= new_date
|
||||||
rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8])
|
rel_path = os.path.join(tcpdump_path, date[0:4], date[4:6], date[6:8])
|
||||||
if not os.path.isdir(rel_path):
|
if not os.path.isdir(rel_path):
|
||||||
|
@ -73,12 +73,20 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
try:
|
try:
|
||||||
process.stdin.write(data[b'message'])
|
process.stdin.write(data[b'message'])
|
||||||
|
id_to_delete.append(id)
|
||||||
except:
|
except:
|
||||||
Error_message = process.stderr.read()
|
Error_message = process.stderr.read()
|
||||||
if Error_message == b'tcpdump: unknown file format\n':
|
if Error_message == b'tcpdump: unknown file format\n':
|
||||||
data_incorrect_format(session_uuid)
|
data_incorrect_format(session_uuid)
|
||||||
|
|
||||||
#print(process.stdout.read())
|
#print(process.stdout.read())
|
||||||
|
nb_save += 1
|
||||||
|
|
||||||
|
if nb_save > stream_buffer:
|
||||||
|
for id in id_to_delete:
|
||||||
|
redis_server.xdel(stream_name, id)
|
||||||
|
id_to_delete = []
|
||||||
|
nb_save = 0
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# sucess, all data are saved
|
# sucess, all data are saved
|
||||||
|
|
Loading…
Reference in New Issue