mirror of https://github.com/D4-project/d4-core
fix: [server] fix extended-types connection (allow concurrent 2/254) + fix extended types metadata + save connected types/extended in DB
parent
e6d98d2dbc
commit
bfc75e0db8
|
@ -24,7 +24,21 @@ sensor registrations, management of decoding protocols and dispatching to adequa
|
|||
| server:accepted_type | **accepted type** |
|
||||
| server:accepted_extended_type | **accepted extended type** |
|
||||
|
||||
###### Connection Manager
|
||||
| Set Key | Value |
|
||||
| --- | --- |
|
||||
| active_connection | **uuid** |
|
||||
| | |
|
||||
| active_connection:**type** | **uuid** |
|
||||
| active_connection_extended_type:**uuid** | **extended type** |
|
||||
| | |
|
||||
| active_uuid_type2:**uuid** | **session uuid** |
|
||||
| | |
|
||||
| map:active_connection-uuid-session_uuid:**uuid** | **session uuid** |
|
||||
|
||||
| Set Key | Field | Value |
|
||||
| --- | --- | --- |
|
||||
| map:session-uuid_active_extended_type | **session_uuid** | **extended_type** |
|
||||
|
||||
### Stats
|
||||
| Zset Key | Field | Value |
|
||||
|
@ -66,6 +80,7 @@ sensor registrations, management of decoding protocols and dispatching to adequa
|
|||
| Set Key | Value |
|
||||
| --- | --- |
|
||||
| all_types_by_uuid:**uuid** | **type** |
|
||||
| all_extended_types_by_uuid:**uuid** | **type** |
|
||||
|
||||
### analyzers
|
||||
###### metadata
|
||||
|
|
|
@ -136,15 +136,28 @@ class D4_Server(Protocol, TimeoutMixin):
|
|||
def connectionLost(self, reason):
|
||||
redis_server_stream.sadd('ended_session', self.session_uuid)
|
||||
self.setTimeout(None)
|
||||
|
||||
if not self.duplicate:
|
||||
if self.type == 254:
|
||||
type = 2
|
||||
if self.type == 254 or self.type == 2:
|
||||
redis_server_stream.srem('active_uuid_type{}:{}'.format(self.type, self.uuid), self.session_uuid)
|
||||
if not redis_server_stream.exists('active_uuid_type{}:{}'.format(self.type, self.uuid)):
|
||||
redis_server_stream.srem('active_connection:{}'.format(self.type), self.uuid)
|
||||
redis_server_stream.srem('active_connection_by_uuid:{}'.format(self.uuid), self.type)
|
||||
# clean extended type
|
||||
current_extended_type = redis_server_stream.hget('map:session-uuid_active_extended_type', self.session_uuid)
|
||||
if current_extended_type:
|
||||
redis_server_stream.hdel('map:session-uuid_active_extended_type', self.session_uuid)
|
||||
redis_server_stream.srem('active_connection_extended_type:{}'.format(self.uuid), current_extended_type)
|
||||
|
||||
else:
|
||||
type = self.type
|
||||
redis_server_stream.srem('active_connection:{}'.format(type), '{}:{}'.format(self.ip, self.uuid))
|
||||
redis_server_stream.srem('active_connection', '{}'.format(self.uuid))
|
||||
redis_server_stream.srem('active_connection:{}'.format(self.type), self.uuid)
|
||||
redis_server_stream.srem('active_connection_by_uuid:{}'.format(self.uuid), self.type)
|
||||
|
||||
if self.uuid:
|
||||
redis_server_stream.srem('map:active_connection-uuid-session_uuid:{}'.format(self.uuid), self.session_uuid)
|
||||
if not redis_server_stream.exists('active_connection_by_uuid:{}'.format(self.uuid)):
|
||||
redis_server_stream.srem('active_connection', self.uuid)
|
||||
|
||||
logger.debug('Connection closed: session_uuid={}'.format(self.session_uuid))
|
||||
dict_all_connection.pop(self.session_uuid)
|
||||
|
||||
|
@ -237,30 +250,44 @@ class D4_Server(Protocol, TimeoutMixin):
|
|||
# auto kill connection # TODO: map type
|
||||
if self.first_connection:
|
||||
self.first_connection = False
|
||||
if redis_server_stream.sismember('active_connection:{}'.format(data_header['type']), '{}:{}'.format(ip, data_header['uuid_header'])):
|
||||
if data_header['type'] == 2:
|
||||
redis_server_stream.sadd('active_uuid_type2:{}'.format(data_header['uuid_header']), self.session_uuid)
|
||||
|
||||
# type 254, check if previous type 2 saved
|
||||
elif data_header['type'] == 254:
|
||||
logger.warning('a type 2 packet must be sent, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid))
|
||||
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: a type 2 packet must be sent, type={}'.format(data_header['type']))
|
||||
self.duplicate = True
|
||||
self.transport.abortConnection()
|
||||
return 1
|
||||
|
||||
# accept only one type/by uuid (except for type 2/254)
|
||||
elif redis_server_stream.sismember('active_connection:{}'.format(data_header['type']), '{}'.format(data_header['uuid_header'])):
|
||||
# same IP-type for an UUID
|
||||
logger.warning('is using the same UUID for one type, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid))
|
||||
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: This UUID is using the same UUID for one type={}'.format(data_header['type']))
|
||||
self.duplicate = True
|
||||
self.transport.abortConnection()
|
||||
return 1
|
||||
else:
|
||||
#self.version = None
|
||||
# type 254, check if previous type 2 saved
|
||||
if data_header['type'] == 254:
|
||||
logger.warning('a type 2 packet must be sent, ip={} uuid={} type={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid))
|
||||
redis_server_metadata.hset('metadata_uuid:{}'.format(data_header['uuid_header']), 'Error', 'Error: a type 2 packet must be sent, type={}'.format(data_header['type']))
|
||||
self.duplicate = True
|
||||
self.transport.abortConnection()
|
||||
return 1
|
||||
|
||||
self.type = data_header['type']
|
||||
self.uuid = data_header['uuid_header']
|
||||
|
||||
# worker entry point: map type:session_uuid
|
||||
redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||
|
||||
## save active connection ##
|
||||
#active Connection
|
||||
redis_server_stream.sadd('active_connection:{}'.format(self.type), '{}:{}'.format(ip, self.uuid))
|
||||
redis_server_stream.sadd('active_connection', '{}'.format(self.uuid))
|
||||
redis_server_stream.sadd('active_connection:{}'.format(self.type), self.uuid)
|
||||
redis_server_stream.sadd('active_connection_by_uuid:{}'.format(self.uuid), self.type)
|
||||
redis_server_stream.sadd('active_connection', self.uuid)
|
||||
# map session_uuid/uuid
|
||||
redis_server_stream.sadd('map:active_connection-uuid-session_uuid:{}'.format(self.uuid), self.session_uuid)
|
||||
|
||||
# map all type by uuid ## TODO: # FIXME: put me in workers ??????
|
||||
redis_server_metadata.sadd('all_types_by_uuid:{}'.format(data_header['uuid_header']), data_header['type'])
|
||||
## ##
|
||||
|
||||
# check if type change
|
||||
if self.data_saved:
|
||||
# type change detected
|
||||
|
@ -269,6 +296,30 @@ class D4_Server(Protocol, TimeoutMixin):
|
|||
if self.type == 2 and data_header['type'] == 254:
|
||||
self.update_stream_type = True
|
||||
self.type = data_header['type']
|
||||
#redis_server_stream.hdel('map-type:session_uuid-uuid:2', self.session_uuid) # # TODO: to remove / refractor
|
||||
redis_server_stream.srem('active_uuid_type2:{}'.format(self.uuid), self.session_uuid)
|
||||
|
||||
# remove type 2 connection
|
||||
if not redis_server_stream.exists('active_uuid_type2:{}'.format(self.uuid)):
|
||||
redis_server_stream.srem('active_connection:2', self.uuid)
|
||||
redis_server_stream.srem('active_connection_by_uuid:{}'.format(self.uuid), 2)
|
||||
|
||||
## save active connection ##
|
||||
#active Connection
|
||||
redis_server_stream.sadd('active_connection:{}'.format(self.type), self.uuid)
|
||||
redis_server_stream.sadd('active_connection_by_uuid:{}'.format(self.uuid), self.type)
|
||||
redis_server_stream.sadd('active_connection', self.uuid)
|
||||
|
||||
redis_server_stream.sadd('active_uuid_type254:{}'.format(self.uuid), self.session_uuid)
|
||||
|
||||
# map all type by uuid ## TODO: # FIXME: put me in workers ??????
|
||||
redis_server_metadata.sadd('all_types_by_uuid:{}'.format(data_header['uuid_header']), data_header['type'])
|
||||
## ##
|
||||
|
||||
|
||||
#redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
||||
|
||||
|
||||
# Type Error
|
||||
else:
|
||||
logger.warning('Unexpected type change, type={} new type={}, ip={} uuid={} session_uuid={}'.format(ip, data_header['uuid_header'], data_header['type'], self.session_uuid))
|
||||
|
@ -408,9 +459,6 @@ class D4_Server(Protocol, TimeoutMixin):
|
|||
|
||||
self.data_saved = True
|
||||
if self.update_stream_type:
|
||||
redis_server_stream.sadd('session_uuid:{}'.format(data_header['type']), self.session_uuid.encode())
|
||||
redis_server_stream.hset('map-type:session_uuid-uuid:{}'.format(data_header['type']), self.session_uuid, data_header['uuid_header'])
|
||||
redis_server_metadata.sadd('all_types_by_uuid:{}'.format(data_header['uuid_header']), data_header['type'])
|
||||
|
||||
if not redis_server_metadata.hexists('metadata_type_by_uuid:{}:{}'.format(data_header['uuid_header'], data_header['type']), 'first_seen'):
|
||||
redis_server_metadata.hset('metadata_type_by_uuid:{}:{}'.format(data_header['uuid_header'], data_header['type']), 'first_seen', data_header['timestamp'])
|
||||
|
|
|
@ -261,7 +261,17 @@ def sensors_status():
|
|||
description = redis_server_metadata.hget('metadata_uuid:{}'.format(result), 'description')
|
||||
if not description:
|
||||
description = ''
|
||||
l_uuid_types = list(redis_server_metadata.smembers('all_types_by_uuid:{}'.format(result)))
|
||||
l_uuid_types = redis_server_metadata.smembers('all_types_by_uuid:{}'.format(result))
|
||||
for type in l_uuid_types:
|
||||
if redis_server_stream.sismember('active_connection:{}'.format(type), result):
|
||||
print('connected: {}'.format(type))
|
||||
if '254' in l_uuid_types:
|
||||
extended_type = redis_server_metadata.smembers('all_extended_types_by_uuid:{}'.format(result))
|
||||
for extended in extended_type:
|
||||
if redis_server_stream.sismember('active_connection_extended_type:{}'.format(result), extended):
|
||||
print('connected: {}'.format(extended))
|
||||
l_uuid_types.update(extended_type)
|
||||
l_uuid_types = list(l_uuid_types)
|
||||
l_uuid_types.sort()
|
||||
if redis_server_metadata.sismember('blacklist_ip_by_uuid', result):
|
||||
Error = "All IP using this UUID are Blacklisted"
|
||||
|
|
|
@ -60,10 +60,16 @@ def clean_db(session_uuid):
|
|||
clean_stream(stream_defined, type_defined, session_uuid)
|
||||
redis_server_stream.srem('ended_session', session_uuid)
|
||||
redis_server_stream.srem('working_session_uuid:{}'.format(type_meta_header), session_uuid)
|
||||
# clean extended type (used)
|
||||
redis_server_stream.hdel('map:session-uuid_active_extended_type', session_uuid)
|
||||
try:
|
||||
redis_server_stream.srem('active_connection_extended_type:{}'.format(uuid), extended_type)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def clean_stream(stream_name, type, session_uuid):
|
||||
redis_server_stream.srem('session_uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||
#redis_server_stream.hdel('map-type:session_uuid-uuid:{}'.format(type), session_uuid)
|
||||
redis_server_stream.delete(stream_name)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -151,6 +157,10 @@ if __name__ == "__main__":
|
|||
clean_db(session_uuid)
|
||||
sys.exit(1)
|
||||
|
||||
# create active_connection for extended type
|
||||
redis_server_stream.sadd('active_connection_extended_type:{}'.format(uuid), extended_type)
|
||||
|
||||
redis_server_stream.hset('map:session-uuid_active_extended_type', session_uuid, extended_type)
|
||||
|
||||
#### Handle Specific MetaTypes ####
|
||||
# Use Specific Handler defined
|
||||
|
@ -174,11 +184,8 @@ if __name__ == "__main__":
|
|||
buffer = b''
|
||||
type_handler.test()
|
||||
|
||||
# create active_connection for extended type
|
||||
#redis_server_stream.sadd('active_connection_extended_type:{}', '{}'.format(self.uuid))
|
||||
|
||||
# update uuid: extended type list
|
||||
#redis_server_metadata.sadd('all_extended_types_by_uuid:{}'.format(uuid), extended_type)
|
||||
redis_server_metadata.sadd('all_extended_types_by_uuid:{}'.format(uuid), extended_type)
|
||||
|
||||
# update metadata extended type
|
||||
time_val = int(time.time())
|
||||
|
|
Loading…
Reference in New Issue